Skip to content

Commit

Permalink
Add options for librdkafka partitioning compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
owenniles committed Jul 15, 2024
1 parent e6a3579 commit 0f5c477
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 14 deletions.
19 changes: 19 additions & 0 deletions builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,25 @@ func ProducerBuilderWithConfig(config *sarama.Config) ProducerBuilder {
}
}

// ProducerBuilderWithHashPartitionerOptions creates a Kafka producer using the
// Sarama library. It can be used to configure the partitioner. If both
// sarama.WithCustomHashFunction and goka.WithHasher are used to set the hasher,
// the former will take precedence.
func ProducerBuilderWithHashPartitionerOptions(opts ...sarama.HashPartitionerOption) ProducerBuilder {
return func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config := globalConfig
defaults := []sarama.HashPartitionerOption{
// hasher may be goka.DefaultHasher or it may have been modified by
// goka.WithHasher. It may be overridden by opts.
sarama.WithCustomHashFunction(hasher),
}
opts = append(defaults, opts...)
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomPartitioner(opts...)
return NewProducer(brokers, &config)
}
}

// TopicManagerBuilder creates a TopicManager to check partition counts and
// create tables.
type TopicManagerBuilder func(brokers []string) (TopicManager, error)
Expand Down
32 changes: 25 additions & 7 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,17 +401,26 @@ func WithRebalanceCallback(cb RebalanceCallback) ProcessorOption {
// view options
///////////////////////////////////////////////////////////////////////////////

type viewPartitionerCompat int

const (
// Interpret the bytes of the key digest as an unsigned integer to match
// librdkafka's partitioning behavior.
librdkafkaCompat viewPartitionerCompat = iota + 1
)

// ViewOption defines a configuration option to be used when creating a view.
type ViewOption func(*voptions, Table, Codec)

type voptions struct {
log logger
clientID string
tableCodec Codec
updateCallback UpdateCallback
hasher func() hash.Hash32
autoreconnect bool
backoffResetTime time.Duration
log logger
clientID string
tableCodec Codec
updateCallback UpdateCallback
hasher func() hash.Hash32
partitionerCompat viewPartitionerCompat
autoreconnect bool
backoffResetTime time.Duration

builders struct {
storage storage.Builder
Expand Down Expand Up @@ -476,6 +485,15 @@ func WithViewHasher(hasher func() hash.Hash32) ViewOption {
}
}

// WithViewHashUnsigned instructs the partitioner to interpret the key digest
// as an unsigned integer when partitioning. Combine this option with the
// CRC-32 hash algorithm for compatibility with librdkafka.
func WithViewHashUnsigned() ViewOption {
return func(o *voptions, table Table, codec Codec) {
o.partitionerCompat = librdkafkaCompat
}
}

// WithViewClientID defines the client ID used to identify with Kafka.
func WithViewClientID(clientID string) ViewOption {
return func(o *voptions, table Table, codec Codec) {
Expand Down
25 changes: 18 additions & 7 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ func (v *View) close() error {
}

func (v *View) hash(key string) (int32, error) {
numPartitions := len(v.partitions)
if numPartitions < 1 {
return 0, errors.New("no partitions found")
}

// create a new hasher every time. Alternative would be to store the hash in
// view and every time reset the hasher (ie, hasher.Reset()). But that would
// also require us to protect the access of the hasher with a mutex.
Expand All @@ -304,14 +309,20 @@ func (v *View) hash(key string) (int32, error) {
if err != nil {
return -1, err
}
hash := int32(hasher.Sum32())
if hash < 0 {
hash = -hash
}
if len(v.partitions) == 0 {
return 0, errors.New("no partitions found")

var partition int32
hash := hasher.Sum32()
if v.opts.partitionerCompat == librdkafkaCompat {
partition = int32(hash % uint32(numPartitions))
} else {
partition = int32(hash) % int32(numPartitions)

if partition < 0 {
partition = -partition
}
}
return hash % int32(len(v.partitions)), nil

return partition, nil
}

func (v *View) find(key string) (*PartitionTable, error) {
Expand Down

0 comments on commit 0f5c477

Please sign in to comment.