-
Notifications
You must be signed in to change notification settings - Fork 69
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding a Kafka producer for sending events to the Event Bus #20895
base: master
Are you sure you want to change the base?
Changes from all commits
9e3f490
28b9d85
86ef512
bee43bd
583f1c4
7b33b8b
a3162a5
a004597
b05fe6d
0f3db8a
8a948e9
b0acb98
ab5ecfb
17ff5f4
f0b56a4
f653e1a
dcaa0e4
68332a1
ec1cd3a
7bba2b5
701a304
39ab56a
341abdb
4be5f6b
991e23b
4cee7f7
2af71bb
aa2a60a
d0801b5
b44656f
951b5f0
cef12aa
66de1ee
891fb0b
65676f8
62868b0
24e3ea4
7b3cd0b
a40c68a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
# frozen_string_literal: true | ||
|
||
require 'kafka/avro_producer' | ||
module Kafka | ||
class ExampleJob | ||
include Sidekiq::Job | ||
# Errors that might occur during the job execution are usually not retryable, | ||
# though we might want to experiment with this in practice | ||
sidekiq_options retry: false | ||
|
||
def perform(topic, payload) | ||
Kafka::AvroProducer.new.produce(topic, payload) | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
# frozen_string_literal: true | ||
|
||
require 'avro' | ||
require 'kafka/producer_manager' | ||
|
||
module Kafka | ||
class AvroProducer | ||
attr_reader :producer | ||
|
||
def initialize(producer: nil) | ||
@producer = producer || Kafka::ProducerManager.instance.producer | ||
end | ||
|
||
def produce(topic, payload, schema_version: 1) | ||
schema = get_schema(topic, schema_version) | ||
encoded_payload = encode_payload(schema, payload) | ||
producer.produce_sync(topic:, payload: encoded_payload) | ||
rescue => e | ||
# https://karafka.io/docs/WaterDrop-Error-Handling/ | ||
# Errors are rescued and re-raised to demonstrate the types of errors that can occur | ||
log_error(e, topic) | ||
raise | ||
end | ||
Mitch-A6 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private | ||
|
||
def get_schema(topic, schema_version) | ||
schema_path = Rails.root.join('lib', 'kafka', 'schemas', "#{topic}-value-#{schema_version}.avsc") | ||
Avro::Schema.parse(File.read(schema_path)) | ||
end | ||
|
||
def encode_payload(schema, payload) | ||
validate_payload!(schema, payload) | ||
|
||
datum_writer = Avro::IO::DatumWriter.new(schema) | ||
buffer = StringIO.new | ||
encoder = Avro::IO::BinaryEncoder.new(buffer) | ||
datum_writer.write(payload, encoder) | ||
avro_payload = buffer.string | ||
|
||
# Add magic byte and schema ID to the payload | ||
magic_byte = [0].pack('C') | ||
# NOTE: This is a placeholder schema ID. In a real-world scenario, this should be fetched from a schema registry | ||
# ID = 5 is the Event Bus schema ID for test schema, replace this with the actual schema ID when running locally | ||
Comment on lines
+43
to
+44
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a real-world scenario would both the schema and the ID be fetched from the schema registry? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes! I had to swap out the avro_turf gem we initially used for this purpose and that one did connect to a schema registry and performed these steps automatically. I have a followup ticket to add back that functionality with custom code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perfect! It looks like I don't have permission to mark this as resolved, but it is resolved! |
||
schema_id_bytes = [5].pack('N') # should be schema id | ||
magic_byte + schema_id_bytes + avro_payload | ||
end | ||
|
||
def validate_payload!(schema, payload) | ||
Avro::SchemaValidator.validate!(schema, payload) | ||
end | ||
|
||
def log_error(error, topic) | ||
case error | ||
when Avro::SchemaValidator::ValidationError | ||
Rails.logger.error "Schema validation error: #{error}" | ||
when WaterDrop::Errors::MessageInvalidError | ||
Rails.logger.error "Message is invalid: #{error}" | ||
when WaterDrop::Errors::ProduceError | ||
Rails.logger.error 'Producer error. See the logs for more information. ' \ | ||
'This dispatch will not reach Kafka' | ||
else | ||
Rails.logger.error 'An unexpected error occurred while producing a message to ' \ | ||
"#{topic}. Please check the logs for more information. " \ | ||
'This dispatch will not reach Kafka' | ||
end | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know this is in draft so please excuse me if you weren't expecting comments yet. I'm curious about the pattern of catching the exception, sending it to the error log and then re-raising it. Why not just let the exception occur? To me, the exception class names are just as descriptive as the message that you prepend to the exception. I do see the utility of the else clause. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dellerbie thanks for taking a look! The PR is indeed still being worked on but this is a valid question. I was mainly rescuing and re-raising the errors to demonstrate the types of errors that can occur and to show that we can add specific log messages and custom error handling as appropriate. This is generally an experimental feature and one challenge is to ensure that we have proper instrumentation. I will likely remove this in the (near) future once we've had some trial runs
This comment was marked as resolved.
Sorry, something went wrong. |
||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# frozen_string_literal: true | ||
|
||
require 'aws_msk_iam_sasl_signer' | ||
|
||
module Kafka | ||
class OauthTokenRefresher | ||
# Refresh OAuth tokens when required by the WaterDrop connection lifecycle | ||
def on_oauthbearer_token_refresh(event) | ||
signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: Settings.kafka_producer.region) | ||
token = signer.generate_auth_token_from_role_arn( | ||
role_arn: Settings.kafka_producer.role_arn | ||
) | ||
|
||
if token | ||
event[:bearer].oauthbearer_set_token( | ||
token: token.token, | ||
lifetime_ms: token.expiration_time_ms, | ||
principal_name: 'kafka-cluster' | ||
) | ||
else | ||
event[:bearer].oauthbearer_set_token_failure( | ||
token.failure_reason | ||
) | ||
end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
# frozen_string_literal: true | ||
|
||
require 'singleton' | ||
require 'waterdrop' | ||
require 'kafka/oauth_token_refresher' | ||
|
||
module Kafka | ||
class ProducerManager | ||
include Singleton | ||
|
||
attr_reader :producer | ||
|
||
def initialize | ||
setup_producer if Flipper.enabled?(:kafka_producer) | ||
end | ||
|
||
private | ||
|
||
def setup_producer | ||
@producer = WaterDrop::Producer.new do |config| | ||
config.deliver = true | ||
config.kafka = { | ||
'bootstrap.servers': Settings.kafka_producer.broker_urls.join(','), | ||
'request.required.acks': 1, | ||
'message.timeout.ms': 100 | ||
} | ||
config.logger = Rails.logger | ||
config.client_class = if Rails.env.test? | ||
aherzberg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
WaterDrop::Clients::Buffered | ||
else | ||
WaterDrop::Clients::Rdkafka | ||
end | ||
|
||
# Authentication to MSK via IAM OauthBearer token | ||
# Once we're ready to test connection to the Event Bus, this should be uncommented | ||
config.oauth.token_provider_listener = Kafka::OauthTokenRefresher.new unless Rails.env.test? | ||
end | ||
setup_instrumentation | ||
end | ||
|
||
def setup_instrumentation | ||
producer.monitor.subscribe('error.occurred') do |event| | ||
producer_id = event[:producer_id] | ||
case event[:type] | ||
when 'librdkafka.dispatch_error' | ||
Rails.logger.error( | ||
"Waterdrop [#{producer_id}]: Message with label: #{event[:delivery_report].label} failed to be delivered" | ||
) | ||
else | ||
Rails.logger.error "Waterdrop [#{producer_id}]: #{event[:type]} occurred" | ||
end | ||
end | ||
|
||
producer.monitor.subscribe('message.acknowledged') do |event| | ||
producer_id = event[:producer_id] | ||
offset = event[:offset] | ||
|
||
Rails.logger.info "WaterDrop [#{producer_id}] delivered message with offset: #{offset}" | ||
end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
{ | ||
"type": "record", | ||
"name": "TestRecord", | ||
"namespace": "gov.va.eventbus.test.data", | ||
"fields": [ | ||
{ | ||
"name": "data", | ||
"type": { "type": "map", "values": "string" }, | ||
"default": {} | ||
} | ||
] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the calls to this method include the schema_version? They all use the default, but I remembered this coming up during the overview.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, schemas can evolve in which case the schema version would change. Schema evolution is quite the complex topic, actually.