Skip to content

Commit

Permalink
fixed bug when using multiple partitions separated into multiple cons…
Browse files Browse the repository at this point in the history
…umers in Kafka
  • Loading branch information
mdorier committed Dec 30, 2024
1 parent 2f007d1 commit eecbe6e
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
5 changes: 4 additions & 1 deletion benchmark/mofka-vs-kafka/Benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ static void produce(int argc, char** argv) {
if(rank == 0) {
createKafkaTopic(driver, topicArg.getValue(), partitionsArgs.getValue());
}
MPI_Barrier(MPI_COMM_WORLD);
produce(driver,
topicArg.getValue(),
eventsArg.getValue(),
Expand All @@ -650,6 +651,7 @@ static void produce(int argc, char** argv) {
topicArg.getValue(),
partitionsArgs.getValue(), 1);
}
MPI_Barrier(MPI_COMM_WORLD);
rdkafka_produce_messages(
bootstrapArg.getValue(),
topicArg.getValue(),
Expand Down Expand Up @@ -828,7 +830,8 @@ static void consume(int argc, char** argv) {
int main(int argc, char** argv) {
spdlog::set_level(spdlog::level::info);

MPI_Init(&argc, &argv);
int provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided);

int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ void KafkaConsumer::handleReceivedMessage(rd_kafka_message_t* msg) {
auto event = Event{
std::make_shared<KafkaEvent>(
static_cast<EventID>(msg->offset),
m_partitions[msg->partition],
m_topic->m_partitions[msg->partition],
std::move(metadata), std::move(data),
shared_from_this())};
// set the promise
Expand Down

0 comments on commit eecbe6e

Please sign in to comment.