Skip to content

Commit

Permalink
Dev better kafka producer (#14)
Browse files Browse the repository at this point in the history
* trying out external progress ULT for kafka producer

* slight fix in kafka progress loop

* fixed flushing of batches

* working on de-batching the KafkaProducer

* fixed producer

* fixed producer (again)

* re-enabled batch-based producer

* fixed hang in batch producer

* small change to consumer
  • Loading branch information
mdorier authored Oct 26, 2024
1 parent 328a470 commit c237a87
Show file tree
Hide file tree
Showing 18 changed files with 538 additions and 115 deletions.
14 changes: 14 additions & 0 deletions include/mofka/ArgsUtil.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ decltype(auto) GetArgOrDefault(Expected&& exp, T1&& arg1, Ts&&... args) {
}
}

template<typename Expected>
decltype(auto) GetArgOrDefaultExactType(Expected&& exp) {
return std::forward<Expected>(exp);
}

template<typename Expected, typename T1, typename ... Ts>
decltype(auto) GetArgOrDefaultExactType(Expected&& exp, T1&& arg1, Ts&&... args) {
if constexpr (std::is_same_v<std::decay_t<Expected>, std::decay_t<T1>>) {
return std::forward<T1>(arg1);
} else {
return GetArgOrDefault(exp, std::forward<Ts>(args)...);
}
}

template<typename Expected>
decltype(auto) GetArg() {
static_assert(std::is_same_v<void,Expected>, "Could not find mandatory argument of Expected type");
Expand Down
27 changes: 19 additions & 8 deletions include/mofka/TopicHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,15 @@ class TopicHandleInterface {
* @param batch_size Batch size.
* @param thread_pool Thread pool.
* @param ordering Whether to enforce strict ordering.
* @param options Extra options.
*
* @return Producer instance.
*/
virtual Producer makeProducer(std::string_view name,
BatchSize batch_size,
ThreadPool thread_pool,
Ordering ordering) const = 0;
Ordering ordering,
Metadata options) const = 0;

/**
* @brief Create a Consumer object from the full
Expand All @@ -90,6 +92,7 @@ class TopicHandleInterface {
* @param data_broker Data broker.
* @param data_selector Data selector.
* @param targets Indices of the partitions to consumer from.
* @param options Extra options.
*
* @return Consumer instance.
*/
Expand All @@ -98,7 +101,8 @@ class TopicHandleInterface {
ThreadPool thread_pool,
DataBroker data_broker,
DataSelector data_selector,
const std::vector<size_t>& targets) const = 0;
const std::vector<size_t>& targets,
Metadata options) const = 0;

};

Expand Down Expand Up @@ -166,7 +170,8 @@ class TopicHandle {
GetArgOrDefault(std::string_view{""}, std::forward<Options>(opts)...),
GetArgOrDefault(BatchSize::Adaptive(), std::forward<Options>(opts)...),
GetArgOrDefault(ThreadPool{}, std::forward<Options>(opts)...),
GetArgOrDefault(Ordering::Strict, std::forward<Options>(opts)...));
GetArgOrDefault(Ordering::Strict, std::forward<Options>(opts)...),
GetArgOrDefaultExactType(Metadata{}, std::forward<Options>(opts)...));
}

/**
Expand All @@ -185,7 +190,8 @@ class TopicHandle {
GetArgOrDefault(ThreadPool{}, std::forward<Options>(opts)...),
GetArgOrDefault(DataBroker{}, std::forward<Options>(opts)...),
GetArgOrDefault(DataSelector{}, std::forward<Options>(opts)...),
GetArgOrDefault(std::vector<size_t>(), std::forward<Options>(opts)...));
GetArgOrDefault(std::vector<size_t>(), std::forward<Options>(opts)...),
GetArgOrDefault(Metadata{}, std::forward<Options>(opts)...));
}

/**
Expand Down Expand Up @@ -250,8 +256,10 @@ class TopicHandle {
Producer makeProducer(std::string_view name,
BatchSize batch_size,
ThreadPool thread_pool,
Ordering ordering) const {
return self->makeProducer(name, batch_size, std::move(thread_pool), ordering);
Ordering ordering,
Metadata options) const {
return self->makeProducer(
name, batch_size, std::move(thread_pool), ordering, std::move(options));
}

/**
Expand All @@ -272,8 +280,11 @@ class TopicHandle {
ThreadPool thread_pool,
DataBroker data_broker,
DataSelector data_selector,
const std::vector<size_t>& targets) const {
return self->makeConsumer(name, batch_size, thread_pool, data_broker, data_selector, targets);
const std::vector<size_t>& targets,
Metadata options) const {
return self->makeConsumer(
name, batch_size, std::move(thread_pool), data_broker,
data_selector, targets, std::move(options));
}

};
Expand Down
1 change: 1 addition & 0 deletions src/ActiveProducerBatchQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class ActiveProducerBatchQueue {
{
std::unique_lock<thallium::mutex> guard{m_mutex};
m_request_flush = true;
m_cv.notify_one();
m_cv.wait(guard, [this]() { return m_batch_queue.empty(); });
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ set (kafka-src-files
KafkaDriver.cpp
KafkaTopicHandle.cpp
KafkaProducer.cpp
KafkaBatchProducer.cpp
KafkaProducerBatch.cpp
KafkaConsumer.cpp
KafkaEvent.cpp)

Expand Down
81 changes: 81 additions & 0 deletions src/KafkaBatchProducer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* (C) 2023 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include "mofka/Producer.hpp"
#include "mofka/Result.hpp"
#include "mofka/Exception.hpp"
#include "mofka/TopicHandle.hpp"
#include "mofka/Future.hpp"

#include "Promise.hpp"
#include "KafkaBatchProducer.hpp"
#include "ActiveProducerBatchQueue.hpp"
#include "PimplUtil.hpp"
#include <limits>

#include <thallium/serialization/stl/string.hpp>
#include <thallium/serialization/stl/pair.hpp>

namespace mofka {

KafkaBatchProducer::KafkaBatchProducer(
std::string_view name,
BatchSize batch_size,
ThreadPool thread_pool,
Ordering ordering,
std::shared_ptr<KafkaTopicHandle> topic,
std::shared_ptr<rd_kafka_t> kprod,
std::shared_ptr<rd_kafka_topic_t> ktopic)
: BatchProducer(name, batch_size, std::move(thread_pool), ordering, TopicHandle(topic))
, m_topic(std::move(topic))
, m_kafka_producer(std::move(kprod))
, m_kafka_topic(std::move(ktopic))
{
start();
}

std::shared_ptr<ProducerBatchInterface> KafkaBatchProducer::newBatchForPartition(size_t index) const {
if(index >= m_topic->m_partitions.size()) {
throw Exception{"Invalid index returned by partition selector"};
}
return std::make_shared<KafkaProducerBatch>(
this, m_kafka_producer, m_kafka_topic, index);
}

void KafkaBatchProducer::flush() {
BatchProducer::flush();
}

void KafkaBatchProducer::start() {
m_should_stop = false;
auto run = [this](){
while(!m_should_stop) {
{
std::unique_lock<tl::mutex> pending_guard{m_num_pending_batches_mtx};
m_num_pending_batches_cv.wait(pending_guard,
[this](){ return m_should_stop || m_num_pending_batches > 0; });
}
while(rd_kafka_poll(m_kafka_producer.get(), 0) != 0) {
if(m_should_stop) break;
tl::thread::yield();
}
if(m_should_stop) break;
tl::thread::yield();
}
m_poll_ult_stopped.set_value();
};
m_thread_pool.pushWork(std::move(run));
}

KafkaBatchProducer::~KafkaBatchProducer() {
flush();
if(!m_should_stop) {
m_should_stop = true;
m_num_pending_batches_cv.notify_all();
m_poll_ult_stopped.wait();
}
}

}
64 changes: 64 additions & 0 deletions src/KafkaBatchProducer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* (C) 2024 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef MOFKA_KAFKA_BATCH_PRODUCER_IMPL_H
#define MOFKA_KAFKA_BATCH_PRODUCER_IMPL_H

#include "KafkaTopicHandle.hpp"
#include "KafkaPartitionInfo.hpp"
#include "KafkaProducerBatch.hpp"
#include "BatchProducer.hpp"

#include "mofka/TopicHandle.hpp"
#include "mofka/Producer.hpp"
#include "mofka/UUID.hpp"
#include "mofka/Ordering.hpp"

#include <thallium.hpp>
#include <string_view>
#include <queue>

#include <librdkafka/rdkafka.h>

namespace mofka {

namespace tl = thallium;

class KafkaTopicHandle;

class KafkaBatchProducer : public BatchProducer {

public:

std::shared_ptr<KafkaTopicHandle> m_topic;
std::shared_ptr<rd_kafka_t> m_kafka_producer;
std::shared_ptr<rd_kafka_topic_t> m_kafka_topic;
std::atomic<bool> m_should_stop = true;
tl::eventual<void> m_poll_ult_stopped;

mutable size_t m_num_pending_batches = 0;
mutable tl::mutex m_num_pending_batches_mtx;
mutable tl::condition_variable m_num_pending_batches_cv;

KafkaBatchProducer(std::string_view name,
BatchSize batch_size,
ThreadPool thread_pool,
Ordering ordering,
std::shared_ptr<KafkaTopicHandle> topic,
std::shared_ptr<rd_kafka_t> kprod,
std::shared_ptr<rd_kafka_topic_t> ktopic);

~KafkaBatchProducer();

void flush() override;

std::shared_ptr<ProducerBatchInterface> newBatchForPartition(size_t index) const override;

void start();
};

}

#endif
2 changes: 1 addition & 1 deletion src/KafkaConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void KafkaConsumer::subscribe() {
m_should_stop = false;
auto run = [this](){
while(!m_should_stop) {
int timeout = m_thread_pool.size() > 1 ? 0 : 100;
int timeout = 0; // m_thread_pool.size() > 1 ? 0 : 100;
rd_kafka_message_t* msg = rd_kafka_consumer_poll(
m_kafka_consumer.get(), timeout);
if(!msg) {
Expand Down
Loading

0 comments on commit c237a87

Please sign in to comment.