-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsave.txt
37 lines (30 loc) · 1.51 KB
/
save.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
consumedMessages := 0
// create a suitable sarama.Client
sconfig := sarama.NewConfig()
sconfig.Version = consumer.MinVersion // consumer requires at least 0.9
sconfig.Consumer.Return.Errors = true // needed if asynchronous ErrOffsetOutOfRange handling is desired (it's a good idea)
sclient, _ := sarama.NewClient([]string{"165.227.168.39:9093"}, nil)
// from that, create a consumer.Config with some fancy options
config := consumer.NewConfig()
config.Partitioner = stable.New(false) // use a stable (but inconsistent) partitioner
config.StartingOffset, config.OffsetOutOfRange = offsets.NoOlderThan(time.Second * 30) // always start and restart no more than 30 seconds in the past (NOTE: requires kafka 0.10 brokers to work properly)
// and finally a consumer Client
client, _ := consumer.NewClient("forwarder", config, sclient)
defer client.Close() // not strictly necessary, since we don't exit, but this is example code and someone might C&V it and exit
// consume and print errors
go func() {
for err := range client.Errors() {
fmt.Println(err)
}
}()
// consume a topic
topicConsumer, _ := client.Consume("gateway.success.report")
defer topicConsumer.AsyncClose() // same comment as for client.Close() above
// process messages
for msg := range topicConsumer.Messages() {
fmt.Println("processing message", string(msg.Value))
topicConsumer.Done(msg) // required
consumedMessages++
println(consumedMessages)
println("============================")
}