Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a Kafka producer for sending events to the Event Bus #20895

Open
wants to merge 39 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9e3f490
WIP: Adding Kafka producer code
madebydna Jan 21, 2025
28b9d85
Ensuring that event payload does not get logged.
madebydna Jan 31, 2025
86ef512
Introduced Avro encoding with schema registry
madebydna Feb 7, 2025
bee43bd
Topic/schema naming convention update
madebydna Feb 7, 2025
583f1c4
Correction to Sidekiq example job
madebydna Feb 14, 2025
7b33b8b
Added integration tests with testcontainers-ruby
madebydna Feb 17, 2025
a3162a5
Uncommenting OAuthToken code
madebydna Feb 20, 2025
a004597
Merge branch 'master' into wip-kafka-producer
madebydna Feb 21, 2025
b05fe6d
Bundle w/Sidekiq Enterprise license
madebydna Feb 21, 2025
0f3db8a
Added CODEOWNERS entries
madebydna Feb 21, 2025
8a948e9
Fixing linting errors
madebydna Feb 21, 2025
b0acb98
Fix typo in CODEOWNERS
madebydna Feb 21, 2025
ab5ecfb
Refactoring code to remove initializer
madebydna Feb 21, 2025
17ff5f4
Merge branch 'master' into wip-kafka-producer
madebydna Feb 21, 2025
f0b56a4
CODEOWNERS fix and OAuth logic in test only
madebydna Feb 21, 2025
f653e1a
Merge branch 'wip-kafka-producer' of github.com:department-of-veteran…
madebydna Feb 21, 2025
dcaa0e4
Removed testcontainers gem and specs
madebydna Feb 23, 2025
68332a1
Added VCR patch
madebydna Feb 24, 2025
ec1cd3a
Fix Rubocop issues with patch file
madebydna Feb 24, 2025
7bba2b5
Added vcr webmock patch to module + change to codeowners
madebydna Feb 24, 2025
701a304
Fix Rubocop issue
madebydna Feb 24, 2025
39ab56a
Fix path to vcr patch
madebydna Feb 24, 2025
341abdb
Skipping AvroProducer integration test altogether. Comment on monkeyp…
madebydna Feb 24, 2025
4be5f6b
Merge branch 'master' into wip-kafka-producer
madebydna Feb 25, 2025
991e23b
Replaced avro_turf gem & removed vcr monkey patch
madebydna Feb 26, 2025
4cee7f7
Update Gemfile.lock - readded sidekiq enterprise gems
madebydna Feb 26, 2025
2af71bb
Adding test.yml settings
madebydna Feb 26, 2025
aa2a60a
Merge branch 'master' into wip-kafka-producer
madebydna Feb 27, 2025
d0801b5
PR review feedback; removing IDE formatting changes
madebydna Feb 28, 2025
b44656f
Merge branch 'master' into wip-kafka-producer
madebydna Feb 28, 2025
951b5f0
Removing schema from specs; replacing schema with Event Bus one
madebydna Feb 28, 2025
cef12aa
Merge branch 'wip-kafka-producer' of github.com:department-of-veteran…
madebydna Feb 28, 2025
66de1ee
Merge branch 'master' into wip-kafka-producer
madebydna Mar 3, 2025
891fb0b
Merge branch 'master' into wip-kafka-producer
madebydna Mar 3, 2025
65676f8
Bring kafka settings in alphabetical order
madebydna Mar 3, 2025
62868b0
Add all keys to all settings files
madebydna Mar 3, 2025
24e3ea4
Merge branch 'master' into wip-kafka-producer
madebydna Mar 3, 2025
7b3cd0b
Merge branch 'master' into wip-kafka-producer
madebydna Mar 3, 2025
a40c68a
Merge branch 'master' into wip-kafka-producer
madebydna Mar 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ app/uploaders/form1010cg @department-of-veterans-affairs/vfs-10-10 @department-o
app/uploaders/form1010cg/poa_uploader.rb @department-of-veterans-affairs/vfs-10-10 @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
app/uploaders/form_upload @department-of-veterans-affairs/platform-va-product-forms @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
app/uploaders/hca_attachment_uploader.rb @department-of-veterans-affairs/vfs-authenticated-experience-backend @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
app/uploaders/lighthouse_document_uploader_base.rb @department-of-veterans-affairs/benefits-management-tools-be @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group @department-of-veterans-affairs/benefits-admin
app/uploaders/lighthouse_document_uploader_base.rb @department-of-veterans-affairs/benefits-management-tools-be @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group @department-of-veterans-affairs/benefits-admin
app/uploaders/lighthouse_document_uploader.rb @department-of-veterans-affairs/benefits-management-tools-be @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group @department-of-veterans-affairs/benefits-admin
app/uploaders/preneed_attachment_uploader.rb @department-of-veterans-affairs/mbs-core-team @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
app/uploaders/set_aws_config.rb @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
Expand Down Expand Up @@ -657,6 +657,7 @@ app/sidekiq/hca @department-of-veterans-affairs/vfs-10-10 @department-of-veteran
app/sidekiq/identity @department-of-veterans-affairs/octo-identity
app/sidekiq/income_limits @department-of-veterans-affairs/vfs-public-websites-frontend @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
app/sidekiq/in_progress_form_cleaner.rb @department-of-veterans-affairs/vfs-authenticated-experience-backend @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
app/sidekiq/kafka @department-of-veterans-affairs/ves-submission-traceability @department-of-veterans-affairs/backend-review-group
app/sidekiq/kms_key_rotation @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
app/sidekiq/lighthouse @department-of-veterans-affairs/backend-review-group
app/sidekiq/lighthouse/submit_career_counseling_job.rb @department-of-veterans-affairs/my-education-benefits @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
Expand Down Expand Up @@ -947,6 +948,7 @@ lib/ihub @department-of-veterans-affairs/va-api-engineers @department-of-veteran
lib/income_and_assets @department-of-veterans-affairs/pension-and-burials @department-of-veterans-affairs/backend-review-group
lib/json_marshal @department-of-veterans-affairs/vsa-healthcare-health-quest-1-backend @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
lib/json_schema @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
lib/kafka @department-of-veterans-affairs/ves-submission-traceability @department-of-veterans-affairs/backend-review-group
lib/lgy @department-of-veterans-affairs/benefits-non-disability @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
lib/lgy/configuration.rb @department-of-veterans-affairs/benefits-non-disability @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
lib/lgy/service.rb @department-of-veterans-affairs/benefits-non-disability @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
Expand Down Expand Up @@ -1367,6 +1369,7 @@ spec/fixtures/supplemental_claims @department-of-veterans-affairs/benefits-decis
spec/fixtures/va_profile @department-of-veterans-affairs/vfs-authenticated-experience-backend @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
spec/fixtures/vba_documents @department-of-veterans-affairs/lighthouse-banana-peels @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
spec/fixtures/vbms @department-of-veterans-affairs/benefits-dependents-management @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
spec/lib/kafka @department-of-veterans-affairs/ves-submission-traceability @department-of-veterans-affairs/backend-review-group
spec/lib/vye @department-of-veterans-affairs/backend-review-group @department-of-veterans-affairs/govcio-vfep-codereviewers
spec/sidekiq/account_login_statistics_job_spec.rb @department-of-veterans-affairs/octo-identity
spec/sidekiq/benefits_intake_status_job_spec.rb @department-of-veterans-affairs/platform-va-product-forms @department-of-veterans-affairs/Disability-Experience @department-of-veterans-affairs/va-api-engineers @department-of-veterans-affairs/backend-review-group
Expand Down
3 changes: 3 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ gem 'aasm'
gem 'activerecord-import'
gem 'activerecord-postgis-adapter'
gem 'addressable'
gem 'avro'
gem 'aws-msk-iam-sasl-signer', '~> 0.1.1'
gem 'aws-sdk-kms'
gem 'aws-sdk-s3', '~> 1'
gem 'aws-sdk-sns', '~> 1'
Expand Down Expand Up @@ -163,6 +165,7 @@ gem 'utf8-cleaner'
gem 'vets_json_schema', git: 'https://github.com/department-of-veterans-affairs/vets-json-schema', branch: 'master'
gem 'virtus'
gem 'warden-github'
gem 'waterdrop'
gem 'will_paginate'
gem 'with_advisory_lock'

Expand Down
30 changes: 26 additions & 4 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ GEM
gserver
sidekiq (>= 7.3.7, < 8)
sidekiq-pro (>= 7.3.4, < 8)
sidekiq-pro (7.3.5)
sidekiq-pro (7.3.6)
sidekiq (>= 7.3.7, < 8)

GEM
Expand Down Expand Up @@ -235,16 +235,24 @@ GEM
fiddle
ast (2.4.2)
attr_extras (7.1.0)
avro (1.12.0)
multi_json (~> 1.0)
awesome_print (1.9.2)
aws-eventstream (1.3.0)
aws-partitions (1.1048.0)
aws-sdk-core (3.218.1)
aws-msk-iam-sasl-signer (0.1.1)
aws-sdk-kafka
thor
aws-partitions (1.1039.0)
aws-sdk-core (3.216.0)
aws-eventstream (~> 1, >= 1.3.0)
aws-partitions (~> 1, >= 1.992.0)
aws-sigv4 (~> 1.9)
base64
jmespath (~> 1, >= 1.6.1)
aws-sdk-kms (1.98.0)
aws-sdk-kafka (1.89.0)
aws-sdk-core (~> 3, >= 3.216.0)
aws-sigv4 (~> 1.5)
aws-sdk-kms (1.97.0)
aws-sdk-core (~> 3, >= 3.216.0)
aws-sigv4 (~> 1.5)
aws-sdk-s3 (1.180.0)
Expand Down Expand Up @@ -619,6 +627,13 @@ GEM
jwe (0.4.0)
jwt (2.10.1)
base64
karafka-core (2.4.8)
karafka-rdkafka (>= 0.17.6, < 0.19.0)
logger (>= 1.6.0)
karafka-rdkafka (0.18.1)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
kms_encrypted (1.6.0)
activesupport (>= 6.1)
kramdown (2.4.0)
Expand Down Expand Up @@ -1108,6 +1123,10 @@ GEM
addressable
faraday (>= 1.9, < 3)
nokogiri (>= 1.13.9)
waterdrop (2.8.1)
karafka-core (>= 2.4.3, < 3.0.0)
karafka-rdkafka (>= 0.17.5)
zeitwerk (~> 2.3)
web-console (4.2.1)
actionview (>= 6.0.0)
activemodel (>= 6.0.0)
Expand Down Expand Up @@ -1158,8 +1177,10 @@ DEPENDENCIES
appeals_api!
apps_api!
ask_va_api!
avro
avs!
awesome_print
aws-msk-iam-sasl-signer (~> 0.1.1)
aws-sdk-kms
aws-sdk-s3 (~> 1)
aws-sdk-sns (~> 1)
Expand Down Expand Up @@ -1349,6 +1370,7 @@ DEPENDENCIES
virtus
vye!
warden-github
waterdrop
web-console
webmock
webrick
Expand Down
15 changes: 15 additions & 0 deletions app/sidekiq/kafka/example_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# frozen_string_literal: true

require 'kafka/avro_producer'
module Kafka
class ExampleJob
include Sidekiq::Job
# Errors that might occur during the job execution are usually not retryable,
# though we might want to experiment with this in practice
sidekiq_options retry: false

def perform(topic, payload)
Kafka::AvroProducer.new.produce(topic, payload)
end
end
end
3 changes: 3 additions & 0 deletions config/features.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2039,6 +2039,9 @@ features:
virtual_agent_enable_datadog_logging:
actor_type: user
description: If enabled, allows for the use of Datadog logging for the chatbot
kafka_producer:
actor_type: cookie_id
description: Enables the Kafka producer for the VA.gov platform
show_about_yellow_ribbon_program:
actor_type: user
description: If enabled, show additional info about the yellow ribbon program
4 changes: 4 additions & 0 deletions config/initializers/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
config.death_handlers << lambda do |job, ex|
Rails.logger.error "#{job['class']} #{job['jid']} died with error #{ex.message}."
end

config.on(:shutdown) do
Kafka::ProducerManager.instance.producer&.close
end
end

Sidekiq.configure_client do |config|
Expand Down
4 changes: 4 additions & 0 deletions config/puma.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@
SemanticLogger.reopen
ActiveRecord::Base.establish_connection
end

on_worker_shutdown do
Kafka::ProducerManager.instance.producer&.close
end
5 changes: 5 additions & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,11 @@ ivc_forms:
sidekiq:
missing_form_status_job:
enabled: true
kafka_producer:
aws_region: "us-gov-west-1"
aws_role_arn: <%= ENV['KAFKA_PRODUCER__AWS_ROLE_ARN'] %>
broker_urls: <%= ENV['KAFKA_PRODUCER__BROKER_URLS'] %>
schema_registry_url: <%= ENV['KAFKA_PRODUCER__SCHEMA_REGISTRY_URL'] %>
kms_key_id: <%= ENV['kms_key_id'] %>
lgy:
api_key: <%= ENV['lgy__api_key'] %>
Expand Down
5 changes: 5 additions & 0 deletions config/settings/development.yml
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,11 @@ ivc_forms:
sidekiq:
missing_form_status_job:
enabled: true
kafka_producer:
aws_region: "us-gov-west-1"
aws_role_arn: "arn:aws:iam::123456789012:role/role-name" # this is an example
broker_urls: ["localhost:19092"] # for local Kafka cluster in Docker
schema_registry_url: "http://localhost:8081" # for local Schema Registry in Docker
kms_key_id: ~
lgy:
api_key: ~
Expand Down
6 changes: 5 additions & 1 deletion config/settings/test.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
---
acc_rep_management:
prefill: true
account:
Expand Down Expand Up @@ -640,6 +639,11 @@ ivc_forms:
sidekiq:
missing_form_status_job:
enabled: true
kafka_producer:
aws_region: "us-gov-west-1"
aws_role_arn: 'arn:aws:iam::123456789012:role/role-name' # this is an example
broker_urls: ['localhost:19092'] # for local Kafka cluster in Docker
schema_registry_url: 'http://localhost:8081' # for local Schema Registry in Docker
kms_key_id: ~
lgy:
api_key: fake_api_key
Expand Down
69 changes: 69 additions & 0 deletions lib/kafka/avro_producer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# frozen_string_literal: true

require 'avro'
require 'kafka/producer_manager'

module Kafka
class AvroProducer
attr_reader :producer

def initialize(producer: nil)
@producer = producer || Kafka::ProducerManager.instance.producer
end

def produce(topic, payload, schema_version: 1)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the calls to this method include the schema_version? They all use the default, but I remembered this coming up during the overview.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, schemas can evolve in which case the schema version would change. Schema evolution is quite the complex topic, actually.

schema = get_schema(topic, schema_version)
encoded_payload = encode_payload(schema, payload)
producer.produce_sync(topic:, payload: encoded_payload)
rescue => e
# https://karafka.io/docs/WaterDrop-Error-Handling/
# Errors are rescued and re-raised to demonstrate the types of errors that can occur
log_error(e, topic)
raise
end

private

def get_schema(topic, schema_version)
schema_path = Rails.root.join('lib', 'kafka', 'schemas', "#{topic}-value-#{schema_version}.avsc")
Avro::Schema.parse(File.read(schema_path))
end

def encode_payload(schema, payload)
validate_payload!(schema, payload)

datum_writer = Avro::IO::DatumWriter.new(schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
datum_writer.write(payload, encoder)
avro_payload = buffer.string

# Add magic byte and schema ID to the payload
magic_byte = [0].pack('C')
# NOTE: This is a placeholder schema ID. In a real-world scenario, this should be fetched from a schema registry
# ID = 5 is the Event Bus schema ID for test schema, replace this with the actual schema ID when running locally
Comment on lines +43 to +44

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a real-world scenario would both the schema and the ID be fetched from the schema registry?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I had to swap out the avro_turf gem we initially used for this purpose and that one did connect to a schema registry and performed these steps automatically. I have a followup ticket to add back that functionality with custom code.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect! It looks like I don't have permission to mark this as resolved, but it is resolved!

schema_id_bytes = [5].pack('N') # should be schema id
magic_byte + schema_id_bytes + avro_payload
end

def validate_payload!(schema, payload)
Avro::SchemaValidator.validate!(schema, payload)
end

def log_error(error, topic)
case error
when Avro::SchemaValidator::ValidationError
Rails.logger.error "Schema validation error: #{error}"
when WaterDrop::Errors::MessageInvalidError
Rails.logger.error "Message is invalid: #{error}"
when WaterDrop::Errors::ProduceError
Rails.logger.error 'Producer error. See the logs for more information. ' \
'This dispatch will not reach Kafka'
else
Rails.logger.error 'An unexpected error occurred while producing a message to ' \
"#{topic}. Please check the logs for more information. " \
'This dispatch will not reach Kafka'
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is in draft so please excuse me if you weren't expecting comments yet.

I'm curious about the pattern of catching the exception, sending it to the error log and then re-raising it. Why not just let the exception occur? To me, the exception class names are just as descriptive as the message that you prepend to the exception. I do see the utility of the else clause.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dellerbie thanks for taking a look! The PR is indeed still being worked on but this is a valid question. I was mainly rescuing and re-raising the errors to demonstrate the types of errors that can occur and to show that we can add specific log messages and custom error handling as appropriate. This is generally an experimental feature and one challenge is to ensure that we have proper instrumentation. I will likely remove this in the (near) future once we've had some trial runs

This comment was marked as resolved.

end
end
end
27 changes: 27 additions & 0 deletions lib/kafka/oauth_token_refresher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

require 'aws_msk_iam_sasl_signer'

module Kafka
class OauthTokenRefresher
# Refresh OAuth tokens when required by the WaterDrop connection lifecycle
def on_oauthbearer_token_refresh(event)
signer = AwsMskIamSaslSigner::MSKTokenProvider.new(region: Settings.kafka_producer.region)
token = signer.generate_auth_token_from_role_arn(
role_arn: Settings.kafka_producer.role_arn
)

if token
event[:bearer].oauthbearer_set_token(
token: token.token,
lifetime_ms: token.expiration_time_ms,
principal_name: 'kafka-cluster'
)
else
event[:bearer].oauthbearer_set_token_failure(
token.failure_reason
)
end
end
end
end
62 changes: 62 additions & 0 deletions lib/kafka/producer_manager.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# frozen_string_literal: true

require 'singleton'
require 'waterdrop'
require 'kafka/oauth_token_refresher'

module Kafka
class ProducerManager
include Singleton

attr_reader :producer

def initialize
setup_producer if Flipper.enabled?(:kafka_producer)
end

private

def setup_producer
@producer = WaterDrop::Producer.new do |config|
config.deliver = true
config.kafka = {
'bootstrap.servers': Settings.kafka_producer.broker_urls.join(','),
'request.required.acks': 1,
'message.timeout.ms': 100
}
config.logger = Rails.logger
config.client_class = if Rails.env.test?
WaterDrop::Clients::Buffered
else
WaterDrop::Clients::Rdkafka
end

# Authentication to MSK via IAM OauthBearer token
# Once we're ready to test connection to the Event Bus, this should be uncommented
config.oauth.token_provider_listener = Kafka::OauthTokenRefresher.new unless Rails.env.test?
end
setup_instrumentation
end

def setup_instrumentation
producer.monitor.subscribe('error.occurred') do |event|
producer_id = event[:producer_id]
case event[:type]
when 'librdkafka.dispatch_error'
Rails.logger.error(
"Waterdrop [#{producer_id}]: Message with label: #{event[:delivery_report].label} failed to be delivered"
)
else
Rails.logger.error "Waterdrop [#{producer_id}]: #{event[:type]} occurred"
end
end

producer.monitor.subscribe('message.acknowledged') do |event|
producer_id = event[:producer_id]
offset = event[:offset]

Rails.logger.info "WaterDrop [#{producer_id}] delivered message with offset: #{offset}"
end
end
end
end
12 changes: 12 additions & 0 deletions lib/kafka/schemas/test-value-1.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"type": "record",
"name": "TestRecord",
"namespace": "gov.va.eventbus.test.data",
"fields": [
{
"name": "data",
"type": { "type": "map", "values": "string" },
"default": {}
}
]
}
Loading
Loading