diff --git a/include/mofka/ConsumerHandle.hpp b/include/mofka/ConsumerHandle.hpp index 48ffbfc..acde6c6 100644 --- a/include/mofka/ConsumerHandle.hpp +++ b/include/mofka/ConsumerHandle.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -72,13 +73,16 @@ class ConsumerHandle { * @param metadata Bulk wrapping the metadata. * @param data_desc_sizes Bulk wrapping data descriptor sizes (count*size_t). * @param data_desc Bulk wrapping data descriptors. + * + * @returns a Future representing the operation. The BulkRef objects passed + * to the function need to remain valid until the future has completed. */ - void feed(size_t count, - EventID firstID, - const BulkRef& metadata_sizes, - const BulkRef& metadata, - const BulkRef& data_desc_sizes, - const BulkRef& data_desc); + Future feed(size_t count, + EventID firstID, + const BulkRef& metadata_sizes, + const BulkRef& metadata, + const BulkRef& data_desc_sizes, + const BulkRef& data_desc); /** * @brief Check if we should stop feeding the ConsumerHandle. diff --git a/include/mofka/Future.hpp b/include/mofka/Future.hpp index 47a08d0..5fee68f 100644 --- a/include/mofka/Future.hpp +++ b/include/mofka/Future.hpp @@ -55,6 +55,13 @@ class Future { */ ~Future() = default; + /** + * @brief Check the validity of the Future. + */ + operator bool() const { + return static_cast(m_wait) || static_cast(m_completed); + } + /** * @brief Wait for the request to complete. */ diff --git a/src/ConsumerHandle.cpp b/src/ConsumerHandle.cpp index 4ee0546..9b04014 100644 --- a/src/ConsumerHandle.cpp +++ b/src/ConsumerHandle.cpp @@ -5,6 +5,7 @@ */ #include "ConsumerHandleImpl.hpp" #include "PimplUtil.hpp" +#include "Promise.hpp" #include #include @@ -23,7 +24,7 @@ bool ConsumerHandle::shouldStop() const { return self->m_should_stop; } -void ConsumerHandle::feed( +Future ConsumerHandle::feed( size_t count, EventID firstID, const BulkRef &metadata_sizes, @@ -32,7 +33,7 @@ void ConsumerHandle::feed( const BulkRef &data_desc) { try { - self->m_send_batch.on(self->m_consumer_endpoint)( + auto request = self->m_send_batch.on(self->m_consumer_endpoint).async( self->m_consumer_ptr, self->m_partition_index, count, @@ -41,8 +42,20 @@ void ConsumerHandle::feed( metadata, data_desc_sizes, data_desc); + auto req_ptr = std::make_shared(std::move(request)); + return Future{ + [req_ptr]() { + req_ptr->wait(); + }, + [req_ptr]() { + return req_ptr->received(); + } + }; } catch(const std::exception& ex) { - spdlog::warn("Exception thrown while sending batch to consumer: {}", ex.what()); + return Future{ + [ex](){ throw ex; }, + [](){ return true; } + }; } } diff --git a/src/YokanEventStore.hpp b/src/YokanEventStore.hpp index a0b0019..9be8807 100644 --- a/src/YokanEventStore.hpp +++ b/src/YokanEventStore.hpp @@ -144,56 +144,80 @@ class YokanEventStore { auto c = batchSize.value; - // buffers to hold the metadata and descriptors - std::vector ids(c); - std::vector metadata_sizes(c); - std::vector metadata_buffer(c * 1024 * 8); - - // note: because we are using docLoad for descriptors, we need - // the sizes and documents to be contiguous even if the number - // of items requested varies. - std::vector descriptors_sizes_and_data(c*(sizeof(size_t)+1024)); - - // TODO: change 1024*8 to an actually good estimation of metadata size - // TODO: properly add Adaptive support - - // expose these buffers as bulk handles - auto local_metadata_bulk = m_engine.expose( - {{metadata_sizes.data(), c*sizeof(metadata_sizes[0])}, - {ids.data(), c*sizeof(ids[0])}, - {metadata_buffer.data(), metadata_buffer.size()*sizeof(metadata_buffer[0])}}, - thallium::bulk_mode::read_write); - auto local_descriptors_bulk = m_engine.expose( - {{descriptors_sizes_and_data.data(), descriptors_sizes_and_data.size()}}, - thallium::bulk_mode::read_write); - - // create the BulkRef objects - auto self_addr = static_cast(m_engine.self()); - auto metadata_sizes_bulk_ref = BulkRef{ - local_metadata_bulk, - 0, - c*sizeof(size_t), - self_addr - }; - auto metadata_bulk_ref = BulkRef{ - local_metadata_bulk, - c*(sizeof(size_t) + sizeof(yk_id_t)), - metadata_buffer.size(), - self_addr - }; - auto descriptors_sizes_bulk_ref = BulkRef{ - local_descriptors_bulk, - 0, - c*sizeof(size_t), - self_addr - }; - auto descriptors_bulk_ref = BulkRef{ - local_descriptors_bulk, - c*sizeof(size_t), - descriptors_sizes_and_data.size() - c*sizeof(size_t), - self_addr + struct BufferSet { + + std::vector ids; + std::vector metadata_sizes; + std::vector metadata_buffer; + + // buffers to hold the metadata and descriptors + // note: because we are using docLoad for descriptors, we need + // the sizes and documents to be contiguous even if the number + // of items requested varies. + std::vector descriptors_sizes_and_data; + + thallium::bulk local_metadata_bulk; + thallium::bulk local_descriptors_bulk; + + std::string self_addr; + + BulkRef metadata_sizes_bulk_ref; + BulkRef metadata_bulk_ref; + BulkRef descriptors_sizes_bulk_ref; + BulkRef descriptors_bulk_ref; + + BufferSet(thallium::engine engine, size_t count) + : ids(count) + , metadata_sizes(count) + , metadata_buffer(count * 1024 * 8) + , descriptors_sizes_and_data(count * (sizeof(size_t) + 1024)) + , self_addr(engine.self()) + { + + // expose these buffers as bulk handles + local_metadata_bulk = engine.expose( + {{metadata_sizes.data(), count*sizeof(metadata_sizes[0])}, + {ids.data(), count*sizeof(ids[0])}, + {metadata_buffer.data(), metadata_buffer.size()*sizeof(metadata_buffer[0])}}, + thallium::bulk_mode::read_write); + local_descriptors_bulk = engine.expose( + {{descriptors_sizes_and_data.data(), + descriptors_sizes_and_data.size()}}, + thallium::bulk_mode::read_write); + // create bulk refs + metadata_sizes_bulk_ref = BulkRef{ + local_metadata_bulk, + 0, + count*sizeof(size_t), + self_addr + }; + metadata_bulk_ref = BulkRef{ + local_metadata_bulk, + count*(sizeof(size_t) + sizeof(yk_id_t)), + metadata_buffer.size(), + self_addr + }; + descriptors_sizes_bulk_ref = BulkRef{ + local_descriptors_bulk, + 0, + count*sizeof(size_t), + self_addr + }; + descriptors_bulk_ref = BulkRef{ + local_descriptors_bulk, + count*sizeof(size_t), + descriptors_sizes_and_data.size() - count*sizeof(size_t), + self_addr + }; + + } + }; + auto b1 = std::make_shared(m_engine, c); + auto b2 = std::make_shared(m_engine, c); + Future lastFeed; + while(!consumerHandle.shouldStop()) { bool should_stop = false; @@ -211,52 +235,61 @@ class YokanEventStore { if(num_available_events == 0) { // m_is_marked_complete must be true // feed consumer 0 events with first_id = NoMoreEvents to indicate // that there are no more events to consume from this partition - consumerHandle.feed( + if(lastFeed) lastFeed.wait(); + lastFeed = consumerHandle.feed( 0, NoMoreEvents, BulkRef{}, BulkRef{}, BulkRef{}, BulkRef{}); break; } // list metadata documents m_metadata_coll.listBulk( - firstID+1, 0, local_metadata_bulk.get_bulk(), - 0, metadata_buffer.size(), true, batchSize.value); + firstID+1, 0, b1->local_metadata_bulk.get_bulk(), + 0, b1->metadata_buffer.size(), true, batchSize.value); // check how many we actually pulled - auto it = std::find_if(metadata_sizes.begin(), - metadata_sizes.end(), + auto it = std::find_if(b1->metadata_sizes.begin(), + b1->metadata_sizes.end(), [](auto size) { return size > YOKAN_LAST_VALID_SIZE; }); - size_t num_events = it - metadata_sizes.begin(); - metadata_bulk_ref.size = std::accumulate(metadata_sizes.begin(), it, (size_t)0); - metadata_sizes_bulk_ref.size = num_events*sizeof(size_t); + size_t num_events = it - b1->metadata_sizes.begin(); + b1->metadata_bulk_ref.size = std::accumulate(b1->metadata_sizes.begin(), it, (size_t)0); + b1->metadata_sizes_bulk_ref.size = num_events*sizeof(size_t); // load the corresponding descriptors m_descriptors_coll.loadBulk( - num_events, ids.data(), local_descriptors_bulk.get_bulk(), - 0, local_descriptors_bulk.size(), true); - auto descriptors_sizes = reinterpret_cast(descriptors_sizes_and_data.data()); - descriptors_sizes_bulk_ref.size = num_events*sizeof(size_t); + num_events, b1->ids.data(), b1->local_descriptors_bulk.get_bulk(), + 0, b1->local_descriptors_bulk.size(), true); + auto descriptors_sizes = reinterpret_cast(b1->descriptors_sizes_and_data.data()); + b1->descriptors_sizes_bulk_ref.size = num_events*sizeof(size_t); for(size_t i = 0; i < num_events; ++i) { if(descriptors_sizes[i] > YOKAN_LAST_VALID_SIZE) descriptors_sizes[i] = 0; } - descriptors_bulk_ref.offset = descriptors_sizes_bulk_ref.size; - descriptors_bulk_ref.size = std::accumulate( + b1->descriptors_bulk_ref.offset = b1->descriptors_sizes_bulk_ref.size; + b1->descriptors_bulk_ref.size = std::accumulate( descriptors_sizes, descriptors_sizes + num_events, (size_t)0); + if(lastFeed) + lastFeed.wait(); + // feed the consumer handle - consumerHandle.feed( + lastFeed = consumerHandle.feed( num_events, firstID, - metadata_sizes_bulk_ref, - metadata_bulk_ref, - descriptors_sizes_bulk_ref, - descriptors_bulk_ref); + b1->metadata_sizes_bulk_ref, + b1->metadata_bulk_ref, + b1->descriptors_sizes_bulk_ref, + b1->descriptors_bulk_ref); firstID += num_events; + + std::swap(b1, b2); } + if(lastFeed) + lastFeed.wait(); + return result; }