-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implements flush.AsyncMessageSink #4
Conversation
@SpeedyCoder / @heedson could you take a look please? :) |
…management and greater unit test coverage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I would maybe hide the run function and just call it internally in a groutine, to make this more straight forward to use, but then you would have to do something with the returned error from there.
In synchronous sink we expose it when calling close, what is a bit weird. Maybe using a separate Err
method would be nicer, but I guess this approach is fine as well.
// occurs or the constructor context is done. If an AckFunc is configured, Run will execute it for | ||
// each ack recieved. If an error is returned, the user should cancel the constructor context to | ||
// prevent `Flush` from blocking. | ||
func (ams *AsyncMessageSink) Run() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of exposing Run
, why not run automatically on construction (in another goroutine) and exit that goroutine on Close
(and possibly also on error)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It complicates error handling a bit, for example which error do we return on close if sarama gives an error on close, or other goroutine produces an error?
It also gives users an option to choose when to start the engine. For example we have a thing which starts producing / consuming on GRPC API call, so we can do Run()
on API call, etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it would be slightly more complex to implement, including the error handling you mention. I'm wondering though whether it would overall be simpler from the user perspective.
It also avoids thinking about:
- Can
Run
be called again after it exits? - What if
Close
is called without ever callingRun
?
etc.
I'm curious, what is the problem in your GRPC thing if it started running earlier and kept running until Close
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that we need to register consumer only when API is called not on the startup, because everything is dynamic. We could make it part of the constructor, but blocking on constructor seems wrong, so this would be a spin off on its own goroutine and then more issues come to life.
On example what if user doesn't call Close()
and sarama craps out? Close should return errors for closing part not the running part of the code.
The way we do this right now is:
sink:= flush.NewAsyncSink()
go func(){
err := sink.Run()
if err != nil{
log.Panic(err)
}
}
and doing this seems wrong:
sink:= flush.NewAsyncSink()
go func(){
err := sink.Close()
if err != nil{
log.Panic(err)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way sarama got around this problem is sink.Errors() <-chan *ProducerError
method on sink, we could do that, but it adds a bit more complexity and really no benefit, this allows us to keep the same behaviour:
sink:= flush.NewAsyncSink()
go func(){
err := <- sink.Errors()
if err != nil{
log.Panic(err)
}
}
Ref: https://godoc.org/github.com/Shopify/sarama#AsyncProducer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally the substrate behaviour is similiar to our implementation, first you call a constructor then you run PublishMessages(ctx context.Context, acks chan<- Message, messages <-chan Message) error
which in our case named Run()
instead of PublishMessages
. So substrate users are already familiar with the pattern
Ref: https://github.com/uw-labs/substrate/blob/master/substrate.go#L37
Implements a message flushing wrapper which blocks until all messages produced have been acked by the user.
An example use case:
You have an application running a job which iterates a MongoDB collection and produces messages to Kafka. For each message produced, you mark the message status as produced in MongoDB. If the job were to crash mid-production, there's potential that your job could mark a message as produced but the messages never reaches Kafka - this could lead to missing data. One solution could be to only mark the message as produced when the ack is received. The flushing wrapper makes implementation of this easier by applying a callback method to each ack received and blocking until the callback returns.