From 5410957c73866c74e4d78828cc2592a64b31809c Mon Sep 17 00:00:00 2001 From: reugn Date: Sat, 19 Aug 2023 12:17:04 +0300 Subject: [PATCH] implement Redis stream connector --- examples/redis/.gitignore | 3 +- examples/redis/docker-compose.yml | 2 +- examples/redis/stream/main.go | 68 ++++++++++++ redis/doc.go | 2 +- redis/redis_stream.go | 177 ++++++++++++++++++++++++++++++ 5 files changed, 249 insertions(+), 3 deletions(-) create mode 100644 examples/redis/stream/main.go create mode 100644 redis/redis_stream.go diff --git a/examples/redis/.gitignore b/examples/redis/.gitignore index 74b362f..6cb07cf 100644 --- a/examples/redis/.gitignore +++ b/examples/redis/.gitignore @@ -1 +1,2 @@ -redis \ No newline at end of file +stream/stream +pubsub/pubsub \ No newline at end of file diff --git a/examples/redis/docker-compose.yml b/examples/redis/docker-compose.yml index 09722ef..1554914 100644 --- a/examples/redis/docker-compose.yml +++ b/examples/redis/docker-compose.yml @@ -3,6 +3,6 @@ version: '3' services: redis: image: redis - container_name: pubsub + container_name: redis-streams ports: - 6379:6379 \ No newline at end of file diff --git a/examples/redis/stream/main.go b/examples/redis/stream/main.go new file mode 100644 index 0000000..fe8b3e0 --- /dev/null +++ b/examples/redis/stream/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "fmt" + "log" + "strings" + "time" + + rs "github.com/reugn/go-streams/redis" + + "github.com/redis/go-redis/v9" + "github.com/reugn/go-streams/flow" +) + +// XADD stream1 * key1 a key2 b key3 c +// XLEN stream2 +// XREAD COUNT 1 BLOCK 100 STREAMS stream2 0 +func main() { + ctx, cancelFunc := context.WithCancel(context.Background()) + + timer := time.NewTimer(time.Minute * 30) + go func() { + <-timer.C + cancelFunc() + }() + + config := &redis.Options{ + Addr: "localhost:6379", // use default Addr + Password: "", // no password set + DB: 0, // use default DB + } + + redisClient := redis.NewClient(config) + + readGroupArgs := &redis.XReadGroupArgs{ + Group: "group1", + Consumer: "consumer1", + Streams: []string{"stream1", ">"}, + } + // groupCreateArgs := &rs.XGroupCreateArgs{ + // Stream: "stream1", + // Group: "group1", + // StartID: "$", + // MkStream: true, + // } + source, err := rs.NewStreamSource(ctx, redisClient, readGroupArgs, nil) + if err != nil { + log.Fatal(err) + } + + toUpperMapFlow := flow.NewMap(toUpper, 1) + sink := rs.NewStreamSink(ctx, redisClient, "stream2") + + source. + Via(toUpperMapFlow). + To(sink) +} + +var toUpper = func(msg *redis.XMessage) *redis.XMessage { + fmt.Printf("Got: %v\n", msg.Values) + values := make(map[string]interface{}, len(msg.Values)) + for key, element := range msg.Values { + values[key] = strings.ToUpper(fmt.Sprintf("%v", element)) + } + msg.Values = values + return msg +} diff --git a/redis/doc.go b/redis/doc.go index c18587a..595ae90 100644 --- a/redis/doc.go +++ b/redis/doc.go @@ -1,2 +1,2 @@ -// Package redis implements the Redis Pub/Sub connector. +// Package redis implements the Redis streaming connectors. package redis diff --git a/redis/redis_stream.go b/redis/redis_stream.go new file mode 100644 index 0000000..4cb9640 --- /dev/null +++ b/redis/redis_stream.go @@ -0,0 +1,177 @@ +package redis + +import ( + "context" + "log" + + "github.com/redis/go-redis/v9" + "github.com/reugn/go-streams" + "github.com/reugn/go-streams/flow" +) + +// StreamSource represents a Redis stream source connector. +// +// A Redis stream is a data structure that acts like an append-only log but +// also implements several operations to overcome some of the limits of a typical +// append-only log. These include random access in O(1) time and complex +// consumption strategies, such as consumer groups. +type StreamSource struct { + ctx context.Context + redisClient *redis.Client + readGroupArgs *redis.XReadGroupArgs + groupCreateArgs *XGroupCreateArgs + out chan interface{} +} + +// XGroupCreateArgs represents the arguments for creating a consumer group. +// +// Use the special StartID "$" to fetch only the new elements arriving in the stream. +// If instead you want the consumer to fetch the whole stream history, +// use zero ("0") as the starting ID for the consumer group. +type XGroupCreateArgs struct { + Stream string + Group string + StartID string + MkStream bool // set to true to create an empty stream automatically +} + +// NewStreamSource returns a new StreamSource instance. +// Pass in nil for the groupCreateArgs parameter if the consumer group already exists. +func NewStreamSource(ctx context.Context, redisClient *redis.Client, + readGroupArgs *redis.XReadGroupArgs, groupCreateArgs *XGroupCreateArgs) (*StreamSource, error) { + if groupCreateArgs != nil { + // Create a new consumer group uniquely identified by for the stream stored at . + // By default, the XGROUP CREATE command expects that the target stream exists, + // and returns an error when it doesn't. + var err error + if groupCreateArgs.MkStream { + err = redisClient.XGroupCreateMkStream( + ctx, + groupCreateArgs.Stream, + groupCreateArgs.Group, + groupCreateArgs.StartID).Err() + } else { + err = redisClient.XGroupCreate( + ctx, + groupCreateArgs.Stream, + groupCreateArgs.Group, + groupCreateArgs.StartID).Err() + } + if err != nil { + return nil, err + } + } + + source := &StreamSource{ + ctx: ctx, + redisClient: redisClient, + readGroupArgs: readGroupArgs, + groupCreateArgs: groupCreateArgs, + out: make(chan interface{}), + } + + go source.init() + return source, nil +} + +// init starts the main loop +func (rs *StreamSource) init() { +loop: + for { + select { + case <-rs.ctx.Done(): + break loop + + default: + // The XREADGROUP command is a special version of the XREAD command with + // support for consumer groups. + entries, err := rs.redisClient.XReadGroup(rs.ctx, rs.readGroupArgs).Result() + if err != nil { + log.Printf("Error in redisClient.XReadGroup: %s", err) + } + + for _, e := range entries { + for _, msg := range e.Messages { + rs.out <- &msg + } + } + } + } + + log.Printf("Closing Redis stream consumer") + close(rs.out) + rs.redisClient.Close() +} + +// Via streams data through the given flow +func (rs *StreamSource) Via(_flow streams.Flow) streams.Flow { + flow.DoStream(rs, _flow) + return _flow +} + +// Out returns an output channel for sending data +func (rs *StreamSource) Out() <-chan interface{} { + return rs.out +} + +// StreamSink represents a Redis stream sink connector. +type StreamSink struct { + ctx context.Context + redisClient *redis.Client + stream string + in chan interface{} +} + +// NewStreamSink returns a new StreamSink instance. +// +// The incoming messages will be streamed to the given target stream using the +// provided redis.Client. +func NewStreamSink(ctx context.Context, redisClient *redis.Client, stream string) *StreamSink { + sink := &StreamSink{ + ctx: ctx, + redisClient: redisClient, + stream: stream, + in: make(chan interface{}), + } + + go sink.init() + return sink +} + +// init starts the main loop +func (rs *StreamSink) init() { + for msg := range rs.in { + switch m := msg.(type) { + case *redis.XMessage: + rs.xAdd(&redis.XAddArgs{ + Stream: rs.stream, // use the target stream name + Values: m.Values, + }) + case map[string]interface{}: + rs.xAdd(&redis.XAddArgs{ + Stream: rs.stream, + Values: m, + }) + default: + log.Printf("Unsupported message type %v", m) + } + } + + log.Printf("Closing Redis stream producer") + rs.redisClient.Close() +} + +// xAdd appends the message to the target stream +func (rs *StreamSink) xAdd(args *redis.XAddArgs) { + // Streams are an append-only data structure. The fundamental write + // command, called XADD, appends a new entry to the specified stream. + err := rs.redisClient.XAdd(rs.ctx, args).Err() + if err != nil { + log.Printf("Error in redisClient.XAdd: %s", err) + } +} + +// In returns an input channel for receiving data +func (rs *StreamSink) In() chan<- interface{} { + return rs.in +}