Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add options for librdkafka partitioning compatibility #455

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,25 @@ func ProducerBuilderWithConfig(config *sarama.Config) ProducerBuilder {
}
}

// ProducerBuilderWithHashPartitionerOptions creates a Kafka producer using the
// Sarama library. It can be used to configure the partitioner. If both
// sarama.WithCustomHashFunction and goka.WithHasher are used to set the hasher,
// the former will take precedence.
func ProducerBuilderWithHashPartitionerOptions(opts ...sarama.HashPartitionerOption) ProducerBuilder {
return func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config := globalConfig
defaults := []sarama.HashPartitionerOption{
// hasher may be goka.DefaultHasher or it may have been modified by
// goka.WithHasher. It may be overridden by opts.
sarama.WithCustomHashFunction(hasher),
}
opts = append(defaults, opts...)
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomPartitioner(opts...)
return NewProducer(brokers, &config)
}
}

// TopicManagerBuilder creates a TopicManager to check partition counts and
// create tables.
type TopicManagerBuilder func(brokers []string) (TopicManager, error)
Expand Down
32 changes: 25 additions & 7 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,17 +401,26 @@ func WithRebalanceCallback(cb RebalanceCallback) ProcessorOption {
// view options
///////////////////////////////////////////////////////////////////////////////

type viewPartitionerCompat int

const (
// Interpret the bytes of the key digest as an unsigned integer to match
// librdkafka's partitioning behavior.
librdkafkaCompat viewPartitionerCompat = iota + 1
)

// ViewOption defines a configuration option to be used when creating a view.
type ViewOption func(*voptions, Table, Codec)

type voptions struct {
log logger
clientID string
tableCodec Codec
updateCallback UpdateCallback
hasher func() hash.Hash32
autoreconnect bool
backoffResetTime time.Duration
log logger
clientID string
tableCodec Codec
updateCallback UpdateCallback
hasher func() hash.Hash32
partitionerCompat viewPartitionerCompat
autoreconnect bool
backoffResetTime time.Duration

builders struct {
storage storage.Builder
Expand Down Expand Up @@ -476,6 +485,15 @@ func WithViewHasher(hasher func() hash.Hash32) ViewOption {
}
}

// WithViewHashUnsigned instructs the partitioner to interpret the key digest
// as an unsigned integer when partitioning. Combine this option with the
// CRC-32 hash algorithm for compatibility with librdkafka.
func WithViewHashUnsigned() ViewOption {
return func(o *voptions, table Table, codec Codec) {
o.partitionerCompat = librdkafkaCompat
}
}

// WithViewClientID defines the client ID used to identify with Kafka.
func WithViewClientID(clientID string) ViewOption {
return func(o *voptions, table Table, codec Codec) {
Expand Down
25 changes: 18 additions & 7 deletions view.go
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file were inspired by the implementation of this function.

Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ func (v *View) close() error {
}

func (v *View) hash(key string) (int32, error) {
numPartitions := len(v.partitions)
if numPartitions < 1 {
return 0, errors.New("no partitions found")
}

// create a new hasher every time. Alternative would be to store the hash in
// view and every time reset the hasher (ie, hasher.Reset()). But that would
// also require us to protect the access of the hasher with a mutex.
Expand All @@ -304,14 +309,20 @@ func (v *View) hash(key string) (int32, error) {
if err != nil {
return -1, err
}
hash := int32(hasher.Sum32())
if hash < 0 {
hash = -hash
}
if len(v.partitions) == 0 {
return 0, errors.New("no partitions found")

var partition int32
hash := hasher.Sum32()
if v.opts.partitionerCompat == librdkafkaCompat {
partition = int32(hash % uint32(numPartitions))
} else {
partition = int32(hash) % int32(numPartitions)

if partition < 0 {
partition = -partition
}
}
return hash % int32(len(v.partitions)), nil

return partition, nil
}

func (v *View) find(key string) (*PartitionTable, error) {
Expand Down