Skip to content

Commit

Permalink
fixed NoMoreEvents issues multiple times when using Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
mdorier committed Jan 2, 2025
1 parent e96c114 commit 6c54379
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
5 changes: 3 additions & 2 deletions benchmark/mofka-vs-kafka/Benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -715,8 +715,9 @@ static void consume(Driver driver, const std::string& consumer_name, const std::
t_start = std::chrono::high_resolution_clock::now();
}
if (event.id() == mofka::NoMoreEvents) {
num_partitions -= 1;
continue;
//num_partitions -= 1;
break;
//continue;
}
++num_events;
if (acknowledge_every && num_events % acknowledge_every.value() == 0) {
Expand Down
11 changes: 7 additions & 4 deletions src/KafkaConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,18 @@ void KafkaConsumer::handleReceivedMessage(rd_kafka_message_t* msg) {
size_t value_size;
if (headers &&
rd_kafka_header_get(headers, 0, "NoMoreEvents", &value, &value_size) == RD_KAFKA_RESP_ERR_NO_ERROR) {
auto ult = [promise=std::move(promise)]() mutable {
promise.setValue(Event{std::make_shared<KafkaEvent>()});
};
auto completed = ++m_completed_partitions;
// FIXME If partitions are completed there is no reason to continue running the polling thread
//if(completed == m_partitions.size()) {
//m_should_stop = true;
//}
m_thread_pool.pushWork(std::move(ult), std::numeric_limits<uint64_t>::max()-1);
if(completed == m_partitions.size()) {
//m_should_stop = true;
auto ult = [promise=std::move(promise)]() mutable {
promise.setValue(Event{std::make_shared<KafkaEvent>()});
};
m_thread_pool.pushWork(std::move(ult), std::numeric_limits<uint64_t>::max()-1);
}
rd_kafka_message_destroy(msg);
return;
}
Expand Down

0 comments on commit 6c54379

Please sign in to comment.