Skip to content

Commit

Permalink
DataHandlingModel with IDT
Browse files Browse the repository at this point in the history
  • Loading branch information
denizergonul committed Oct 11, 2024
1 parent 0c34e48 commit 0b630cf
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 26 deletions.
10 changes: 8 additions & 2 deletions include/datahandlinglibs/models/DataHandlingModel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ using dunedaq::datahandlinglibs::logging::TLVL_WORK_STEPS;
namespace dunedaq {
namespace datahandlinglibs {

template<class ReadoutType, class RequestHandlerType, class LatencyBufferType, class RawDataProcessorType>
template<class ReadoutType, class RequestHandlerType, class LatencyBufferType, class RawDataProcessorType, class InputDataType = ReadoutType>
class DataHandlingModel : public DataHandlingConcept
{
public:
Expand All @@ -68,6 +68,7 @@ class DataHandlingModel : public DataHandlingConcept
using RHT = RequestHandlerType;
using LBT = LatencyBufferType;
using RPT = RawDataProcessorType;
using IDT = InputDataType;

// Using timestamp typenames
using timestamp_t = std::uint64_t; // NOLINT(build/unsigned)
Expand Down Expand Up @@ -139,6 +140,11 @@ class DataHandlingModel : public DataHandlingConcept
// Dispatch data request
void dispatch_requests(dfmessages::DataRequest& data_request);

// Transform input data type to readout
RDT& transform_payload(IDT& payload) const
{
return reinterpret_cast<RDT&>(payload);
}

// Operational monitoring
virtual void generate_opmon_data() override;
Expand Down Expand Up @@ -170,7 +176,7 @@ class DataHandlingModel : public DataHandlingConcept
// RAW RECEIVER
std::chrono::milliseconds m_raw_receiver_timeout_ms;
std::chrono::microseconds m_raw_receiver_sleep_us;
using raw_receiver_ct = iomanager::ReceiverConcept<ReadoutType>;
using raw_receiver_ct = iomanager::ReceiverConcept<InputDataType>;
std::shared_ptr<raw_receiver_ct> m_raw_data_receiver;
std::string m_raw_data_receiver_connection_name;

Expand Down
49 changes: 25 additions & 24 deletions include/datahandlinglibs/models/detail/DataHandlingModel.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
namespace dunedaq {
namespace datahandlinglibs {

template<class RDT, class RHT, class LBT, class RPT>
template<class RDT, class RHT, class LBT, class RPT, class IDT>
void
DataHandlingModel<RDT, RHT, LBT, RPT>::init(const appmodel::DataHandlerModule* mcfg)
DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::init(const appmodel::DataHandlerModule* mcfg)
{
// Setup request queues
//setup_request_queues(mcfg);
Expand Down Expand Up @@ -38,7 +38,7 @@ DataHandlingModel<RDT, RHT, LBT, RPT>::init(const appmodel::DataHandlerModule* m
}

if (!m_callback_mode) {
m_raw_data_receiver = get_iom_receiver<RDT>(m_raw_data_receiver_connection_name);
m_raw_data_receiver = get_iom_receiver<IDT>(m_raw_data_receiver_connection_name);
m_raw_receiver_timeout_ms = std::chrono::milliseconds(input->get_recv_timeout_ms());
}
}
Expand Down Expand Up @@ -92,14 +92,14 @@ DataHandlingModel<RDT, RHT, LBT, RPT>::init(const appmodel::DataHandlerModule* m
m_request_handler_impl->conf(mcfg);
}

template<class RDT, class RHT, class LBT, class RPT>
template<class RDT, class RHT, class LBT, class RPT, class IDT>
void
DataHandlingModel<RDT, RHT, LBT, RPT>::conf(const nlohmann::json& /*args*/)
DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::conf(const nlohmann::json& /*args*/)
{
// Register callbacks if operating in that mode.
if (m_callback_mode) {
// Configure and register consume callback
m_consume_callback = std::bind(&DataHandlingModel<RDT, RHT, LBT, RPT>::consume_payload, this, std::placeholders::_1);
m_consume_callback = std::bind(&DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::consume_payload, this, std::placeholders::_1);

// Register callback
auto dmcbr = DataMoveCallbackRegistry::get();
Expand All @@ -113,9 +113,9 @@ DataHandlingModel<RDT, RHT, LBT, RPT>::conf(const nlohmann::json& /*args*/)
}


template<class RDT, class RHT, class LBT, class RPT>
template<class RDT, class RHT, class LBT, class RPT, class IDT>
void
DataHandlingModel<RDT, RHT, LBT, RPT>::start(const nlohmann::json& args)
DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::start(const nlohmann::json& args)
{
// Reset opmon variables
m_sum_payloads = 0;
Expand All @@ -134,17 +134,17 @@ DataHandlingModel<RDT, RHT, LBT, RPT>::start(const nlohmann::json& args)
m_raw_processor_impl->start(args);
m_request_handler_impl->start(args);
if (!m_callback_mode) {
m_consumer_thread.set_work(&DataHandlingModel<RDT, RHT, LBT, RPT>::run_consume, this);
m_consumer_thread.set_work(&DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::run_consume, this);
}
if (m_generate_timesync) m_timesync_thread.set_work(&DataHandlingModel<RDT, RHT, LBT, RPT>::run_timesync, this);
if (m_generate_timesync) m_timesync_thread.set_work(&DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::run_timesync, this);
// Register callback to receive and dispatch data requests
m_data_request_receiver->add_callback(
std::bind(&DataHandlingModel<RDT, RHT, LBT, RPT>::dispatch_requests, this, std::placeholders::_1));
std::bind(&DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::dispatch_requests, this, std::placeholders::_1));
}

template<class RDT, class RHT, class LBT, class RPT>
template<class RDT, class RHT, class LBT, class RPT, class IDT>
void
DataHandlingModel<RDT, RHT, LBT, RPT>::stop(const nlohmann::json& args)
DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::stop(const nlohmann::json& args)
{
TLOG_DEBUG(TLVL_WORK_STEPS) << "Stoppping threads...";

Expand All @@ -168,9 +168,9 @@ DataHandlingModel<RDT, RHT, LBT, RPT>::stop(const nlohmann::json& args)
m_raw_processor_impl->reset_last_daq_time();
}

template<class RDT, class RHT, class LBT, class RPT>
template<class RDT, class RHT, class LBT, class RPT, class IDT>
void
DataHandlingModel<RDT, RHT, LBT, RPT>::generate_opmon_data()
DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::generate_opmon_data()
{
opmon::DataHandlerInfo ri;
ri.set_sum_payloads(m_sum_payloads.load());
Expand All @@ -192,9 +192,9 @@ DataHandlingModel<RDT, RHT, LBT, RPT>::generate_opmon_data()
this->publish(std::move(ri));
}

template<class RDT, class RHT, class LBT, class RPT>
template<class RDT, class RHT, class LBT, class RPT, class IDT>
void
DataHandlingModel<RDT, RHT, LBT, RPT>::run_consume()
DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::run_consume()
{

TLOG_DEBUG(TLVL_WORK_STEPS) << "Consumer thread started...";
Expand All @@ -220,7 +220,8 @@ DataHandlingModel<RDT, RHT, LBT, RPT>::run_consume()

if (opt_payload) {

RDT& payload = opt_payload.value();
IDT& idt_payload = opt_payload.value();
RDT& payload = transform_payload(idt_payload);

m_raw_processor_impl->preprocess_item(&payload);
if (m_request_handler_supports_cutoff_timestamp) {
Expand Down Expand Up @@ -287,9 +288,9 @@ DataHandlingModel<RDT, RHT, LBT, RPT>::run_consume()
TLOG_DEBUG(TLVL_WORK_STEPS) << "Consumer thread joins... ";
}

template<class RDT, class RHT, class LBT, class RPT>
template<class RDT, class RHT, class LBT, class RPT, class IDT>
void
DataHandlingModel<RDT, RHT, LBT, RPT>::consume_payload(RDT&& payload)
DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::consume_payload(RDT&& payload)
{
//m_rawq_timeout_count = 0;
//m_num_payloads = 0;
Expand All @@ -316,9 +317,9 @@ DataHandlingModel<RDT, RHT, LBT, RPT>::consume_payload(RDT&& payload)
++m_stats_packet_count;
}

template<class RDT, class RHT, class LBT, class RPT>
template<class RDT, class RHT, class LBT, class RPT, class IDT>
void
DataHandlingModel<RDT, RHT, LBT, RPT>::run_timesync()
DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::run_timesync()
{
TLOG_DEBUG(TLVL_WORK_STEPS) << "TimeSync thread started...";
m_num_requests = 0;
Expand Down Expand Up @@ -397,9 +398,9 @@ DataHandlingModel<RDT, RHT, LBT, RPT>::run_timesync()
<< total_timestamp_count << ")";
}

template<class RDT, class RHT, class LBT, class RPT>
template<class RDT, class RHT, class LBT, class RPT, class IDT>
void
DataHandlingModel<RDT, RHT, LBT, RPT>::dispatch_requests(dfmessages::DataRequest& data_request)
DataHandlingModel<RDT, RHT, LBT, RPT, IDT>::dispatch_requests(dfmessages::DataRequest& data_request)
{
if (data_request.request_information.component != m_sourceid) {
ers::error(RequestSourceIDMismatch(ERS_HERE, m_sourceid, data_request.request_information.component));
Expand Down

0 comments on commit 0b630cf

Please sign in to comment.