-
Notifications
You must be signed in to change notification settings - Fork 1
/
handler.go
136 lines (127 loc) · 3.42 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package prometheus_backfill
import (
"context"
"fmt"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"go.uber.org/atomic"
"golang.org/x/sync/semaphore"
"math"
"os"
"runtime"
"sync"
"text/tabwriter"
"time"
)
type backfillHandler struct {
ch chan interface{}
total atomic.Int64
done atomic.Int64
startTime time.Time
appender storage.Appender
blockWriter *tsdb.BlockWriter
writerLock sync.Locker
bstLock sync.Locker
ctx context.Context
minTime int64
counter int64
bst *bst
tmpWg sync.WaitGroup // Temporary auxiliary waitGroup todo delete or give it a good usage
maxParallelConsumes int64
blockDuration int64
maxPerAppender int64
storeThreshold int64
outputDir string
}
func NewPrometheusBackfillHandler(blockDuration, maxPerAppender, storeThreshold,
maxParallelConsumes int64, ch chan interface{}, total int64, outputDir string) *backfillHandler {
// make(chan interface{}, bufferedChanCap)
bh := &backfillHandler{
ch,
atomic.Int64{},
atomic.Int64{},
time.Now(),
nil,
nil,
&sync.Mutex{},
&sync.Mutex{},
context.Background(),
math.MaxInt64, 0,
new(bst),
sync.WaitGroup{},
maxParallelConsumes,
blockDuration,
maxPerAppender,
storeThreshold,
outputDir,
}
bh.total.Store(total)
return bh
}
func (bh *backfillHandler) RunJob() {
Notice("main", "Start parsing database")
bh.done.Store(0)
go bh.statusLoop()
bh.listenOnChannel()
bh.tmpWg.Wait()
Notice("main", "End of parsing")
}
func (bh *backfillHandler) listenOnChannel() {
Notice("Listening on channel")
bh.setBlockWriter()
sem := semaphore.NewWeighted(bh.maxParallelConsumes)
var counter atomic.Int64
for msg := range bh.ch {
table := msg
sem.Acquire(bh.ctx, 1)
go func() {
bh.checkAndStore(false)
_ = counter.Inc()
t := time.Now().UnixNano()
bh.marshal(table)
t = (time.Now().UnixNano() - t) / int64(time.Millisecond)
// Notice("Marshaled", i, "(", reflect.TypeOf(table).String(), ") in", t, "ms. Status:", bh.done.Load(), "/", bh.total.Load())
bh.done.Inc()
sem.Release(1)
}()
}
sem.Acquire(bh.ctx, bh.maxParallelConsumes)
bh.checkAndStore(true)
bh.writerLock.Lock()
bh.flushBlockWriter()
bh.writerLock.Unlock()
}
func (bh *backfillHandler) statusLoop() {
w := tabwriter.NewWriter(os.Stdout, 1, 2, 5, ' ', tabwriter.DiscardEmptyColumns)
mem := runtime.MemStats{}
for {
if bh.PrintStats(w, mem) {
return
}
time.Sleep(time.Second * 10)
}
}
func (bh *backfillHandler) PrintStats(w *tabwriter.Writer, mem runtime.MemStats) (endOfJob bool) {
done := bh.done.Load()
total := bh.total.Load()
now := time.Now()
fmt.Fprintln(w,"Progress\tProgress (%)\tRapidity\tStart time\tCurrent Time\tDuration")
fmt.Fprintf(w, "%d/%d\t%.2f%%\t%.2f tables/s\t%s\t%s\t%s\n",
done,
total,
float64(done)/float64(total)*100,
float64(done)/now.Sub(bh.startTime).Seconds(),
bh.startTime.Format(time.RFC3339),
now.Format(time.RFC3339),
now.Sub(bh.startTime))
runtime.ReadMemStats(&mem)
fmt.Fprintf(w, "mem.Alloc:\t%E\n", float64(mem.Alloc))
fmt.Fprintf(w, "mem.TotalAlloc:\t%E\n", float64(mem.TotalAlloc))
fmt.Fprintf(w, "mem.HeapAlloc:\t%E\n", float64(mem.HeapAlloc))
fmt.Fprintf(w, "mem.NumGC:\t%E\n", float64(mem.NumGC))
w.Flush()
if done == total {
return true
}
return false
}