-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlistener.go
45 lines (42 loc) · 1.16 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package datalog
import (
"bufio"
"encoding/json"
"log"
"os"
"sync"
"time"
"github.com/kinluek/go-chat/backend/messagehub"
)
// Listener creates messagehub listener that can be used to log the events to a file for persistence.
// A buffered writer is used to speed up writes, the writer is flushed on the flushInterval.
// A WaitGroup can be used to wait on to make sure the store is completely flushed before shutting down.
func Listener(logFile *os.File, flushInterval time.Duration, buffSize int, wg *sync.WaitGroup) chan<- messagehub.Event {
listener := make(chan messagehub.Event, buffSize)
go func() {
ticker := time.NewTicker(flushInterval)
writer := bufio.NewWriter(logFile)
encoder := json.NewEncoder(writer)
defer func() {
ticker.Stop()
writer.Flush()
wg.Done()
}()
for {
select {
case event, ok := <-listener:
if !ok {
return
}
if err := encoder.Encode(event); err != nil {
log.Printf("[ERROR]: failed to write event - %#v - %v", event, err)
}
case <-ticker.C:
if err := writer.Flush(); err != nil {
log.Printf("[ERROR]: failed to flush event logs - %#v", err)
}
}
}
}()
return listener
}