Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implements flush.AsyncMessageSink #4

Merged
merged 8 commits into from
Jul 20, 2020
Merged

Conversation

kaperys
Copy link
Collaborator

@kaperys kaperys commented Jul 15, 2020

Implements a message flushing wrapper which blocks until all messages produced have been acked by the user.

An example use case:

You have an application running a job which iterates a MongoDB collection and produces messages to Kafka. For each message produced, you mark the message status as produced in MongoDB. If the job were to crash mid-production, there's potential that your job could mark a message as produced but the messages never reaches Kafka - this could lead to missing data. One solution could be to only mark the message as produced when the ack is received. The flushing wrapper makes implementation of this easier by applying a callback method to each ack received and blocking until the callback returns.

povilasv
povilasv previously approved these changes Jul 15, 2020
@povilasv
Copy link
Contributor

@SpeedyCoder / @heedson could you take a look please? :)

@kaperys kaperys requested a review from SpeedyCoder July 16, 2020 10:49
povilasv
povilasv previously approved these changes Jul 16, 2020
@SpeedyCoder SpeedyCoder requested a review from mjgarton July 16, 2020 12:27
@kaperys kaperys requested a review from SpeedyCoder July 16, 2020 13:02
Copy link
Contributor

@SpeedyCoder SpeedyCoder left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I would maybe hide the run function and just call it internally in a groutine, to make this more straight forward to use, but then you would have to do something with the returned error from there.

In synchronous sink we expose it when calling close, what is a bit weird. Maybe using a separate Err method would be nicer, but I guess this approach is fine as well.

// occurs or the constructor context is done. If an AckFunc is configured, Run will execute it for
// each ack recieved. If an error is returned, the user should cancel the constructor context to
// prevent `Flush` from blocking.
func (ams *AsyncMessageSink) Run() error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of exposing Run, why not run automatically on construction (in another goroutine) and exit that goroutine on Close (and possibly also on error)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It complicates error handling a bit, for example which error do we return on close if sarama gives an error on close, or other goroutine produces an error?

It also gives users an option to choose when to start the engine. For example we have a thing which starts producing / consuming on GRPC API call, so we can do Run() on API call, etc

Copy link

@mjgarton mjgarton Jul 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it would be slightly more complex to implement, including the error handling you mention. I'm wondering though whether it would overall be simpler from the user perspective.

It also avoids thinking about:

  • Can Run be called again after it exits?
  • What if Close is called without ever calling Run?
    etc.

I'm curious, what is the problem in your GRPC thing if it started running earlier and kept running until Close?

Copy link
Contributor

@povilasv povilasv Jul 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that we need to register consumer only when API is called not on the startup, because everything is dynamic. We could make it part of the constructor, but blocking on constructor seems wrong, so this would be a spin off on its own goroutine and then more issues come to life.

On example what if user doesn't call Close() and sarama craps out? Close should return errors for closing part not the running part of the code.

The way we do this right now is:

sink:= flush.NewAsyncSink()
go func(){
   err := sink.Run()
   if err != nil{
       log.Panic(err)
   }
}

and doing this seems wrong:

sink:= flush.NewAsyncSink()
go func(){
   err := sink.Close()
   if err != nil{
       log.Panic(err)
   }
}

Copy link
Contributor

@povilasv povilasv Jul 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way sarama got around this problem is sink.Errors() <-chan *ProducerError method on sink, we could do that, but it adds a bit more complexity and really no benefit, this allows us to keep the same behaviour:

sink:= flush.NewAsyncSink()
go func(){
   err := <- sink.Errors()
   if err != nil{
       log.Panic(err)
   }
}

Ref: https://godoc.org/github.com/Shopify/sarama#AsyncProducer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally the substrate behaviour is similiar to our implementation, first you call a constructor then you run PublishMessages(ctx context.Context, acks chan<- Message, messages <-chan Message) error which in our case named Run() instead of PublishMessages. So substrate users are already familiar with the pattern

Ref: https://github.com/uw-labs/substrate/blob/master/substrate.go#L37

@jakekeeys jakekeeys merged commit c8b4189 into uw-labs:master Jul 20, 2020
@kaperys kaperys deleted the flush-wrapper branch July 20, 2020 12:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

6 participants