From 28b50f399d29fda21db94b39be94306c61f87dab Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Fri, 20 Sep 2024 13:06:27 +0000 Subject: [PATCH 01/18] define the pre_write struct, containing possible configuration important for pre write actions --- cpp/skyweaver/PipelineConfig.hpp | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/cpp/skyweaver/PipelineConfig.hpp b/cpp/skyweaver/PipelineConfig.hpp index 1656b0c..3902ea1 100644 --- a/cpp/skyweaver/PipelineConfig.hpp +++ b/cpp/skyweaver/PipelineConfig.hpp @@ -9,6 +9,19 @@ namespace skyweaver { + struct WaitConfig + { + bool is_enabled; + int iterations; + int sleep_time; + std::size_t min_free_space; + }; + + struct PreWriteConfig + { + WaitConfig wait; + }; + /** * @brief Class for wrapping the skyweaver pipeline configuration. From 3b5d2d64d41631ec284630e31d13e92ae77a3fba Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Fri, 20 Sep 2024 13:07:11 +0000 Subject: [PATCH 02/18] create pre write config infrastructure --- cpp/skyweaver/PipelineConfig.hpp | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/cpp/skyweaver/PipelineConfig.hpp b/cpp/skyweaver/PipelineConfig.hpp index 3902ea1..d582ff0 100644 --- a/cpp/skyweaver/PipelineConfig.hpp +++ b/cpp/skyweaver/PipelineConfig.hpp @@ -166,6 +166,11 @@ class PipelineConfig */ DedispersionPlan const& ddplan() const; + /** + * @brief configures wait for filesystem space + */ + void configure_wait(std::string argument); + /** * @brief Enable/disable incoherent dedispersion based fscrunch after * beamforming @@ -222,6 +227,11 @@ class PipelineConfig return SKYWEAVER_CB_NSAMPLES_PER_BLOCK; } + PreWriteConfig pre_write_config() const + { + return _pre_write_config; + } + /** * @brief Return the total number of samples to read from file in each gulp. * @@ -363,6 +373,7 @@ class PipelineConfig float _output_level; DedispersionPlan _ddplan; mutable std::vector _channel_frequencies; + PreWriteConfig _pre_write_config; }; } // namespace skyweaver From 5f694348188a3620b5e6b04f8c639d6ca1c34223 Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Fri, 20 Sep 2024 13:07:42 +0000 Subject: [PATCH 03/18] needed to compute bytes from units like G, M or k --- cpp/skyweaver/PipelineConfig.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/skyweaver/PipelineConfig.hpp b/cpp/skyweaver/PipelineConfig.hpp index d582ff0..ef1920b 100644 --- a/cpp/skyweaver/PipelineConfig.hpp +++ b/cpp/skyweaver/PipelineConfig.hpp @@ -346,6 +346,7 @@ class PipelineConfig } private: + std::size_t convertMemorySize(const std::string& str) const; void calculate_channel_frequencies() const; void update_power_offsets_and_scalings(); From 76785467ea488973d2b83379b0c4f748c1897cc8 Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Fri, 20 Sep 2024 13:08:27 +0000 Subject: [PATCH 04/18] add the pre write config to the MultiWriterConfig struct --- cpp/skyweaver/MultiFileWriter.cuh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/skyweaver/MultiFileWriter.cuh b/cpp/skyweaver/MultiFileWriter.cuh index ceeb2d8..77b41e1 100644 --- a/cpp/skyweaver/MultiFileWriter.cuh +++ b/cpp/skyweaver/MultiFileWriter.cuh @@ -24,7 +24,7 @@ struct MultiFileWriterConfig{ std::string output_dir; std::string prefix; std::string extension; - + PreWriteConfig pre_write; MultiFileWriterConfig() : header_size(4096), max_file_size(2147483647), stokes_mode("I"), output_dir("test"), prefix("test"), extension("default"){}; MultiFileWriterConfig(std::size_t header_size, std::size_t max_file_size, std::string stokes_mode, std::string output_dir, std::string prefix, std::string extension) : header_size(header_size), max_file_size(max_file_size), stokes_mode(stokes_mode), output_dir(output_dir), prefix(prefix), extension(extension){}; From bd50c5a3575c4f9febdbb3db8b25cf928bb55c8f Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Fri, 20 Sep 2024 13:08:58 +0000 Subject: [PATCH 05/18] define a type for a pre write callback function --- cpp/skyweaver/MultiFileWriter.cuh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/skyweaver/MultiFileWriter.cuh b/cpp/skyweaver/MultiFileWriter.cuh index 77b41e1..cf8e863 100644 --- a/cpp/skyweaver/MultiFileWriter.cuh +++ b/cpp/skyweaver/MultiFileWriter.cuh @@ -51,13 +51,13 @@ template class MultiFileWriter { public: - using CreateStreamCallBackType = std::function(MultiFileWriterConfig const&, ObservationHeader const&, VectorType const&, std::size_t)>; public: + using PreWriteCallback = std::function; /** * @brief Construct a new Multi File Writer object * From 168de0a58495a22845fb715d2f8707d3fe515ef7 Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Fri, 20 Sep 2024 13:09:33 +0000 Subject: [PATCH 06/18] overload the constructors for the MultiFileWriter class --- cpp/skyweaver/MultiFileWriter.cuh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/skyweaver/MultiFileWriter.cuh b/cpp/skyweaver/MultiFileWriter.cuh index cf8e863..21aa212 100644 --- a/cpp/skyweaver/MultiFileWriter.cuh +++ b/cpp/skyweaver/MultiFileWriter.cuh @@ -67,7 +67,9 @@ public: */ // MultiFileWriter(PipelineConfig const& config, std::string tag = ""); MultiFileWriter(PipelineConfig const& config, std::string tag, CreateStreamCallBackType create_stream_callback); + MultiFileWriter(PipelineConfig const& config, std::string tag, CreateStreamCallBackType create_stream_callback, PreWriteCallback pre_write_callback); MultiFileWriter(MultiFileWriterConfig config, std::string tag, CreateStreamCallBackType create_stream_callback); + MultiFileWriter(MultiFileWriterConfig config, std::string tag, CreateStreamCallBackType create_stream_callback, PreWriteCallback pre_write_callback); MultiFileWriter(MultiFileWriter const&) = delete; /** From 065b8f539283beef10524b445e8bae86a1fb9bbd Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Fri, 20 Sep 2024 13:09:51 +0000 Subject: [PATCH 07/18] define a private call back function --- cpp/skyweaver/MultiFileWriter.cuh | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/skyweaver/MultiFileWriter.cuh b/cpp/skyweaver/MultiFileWriter.cuh index 21aa212..643647c 100644 --- a/cpp/skyweaver/MultiFileWriter.cuh +++ b/cpp/skyweaver/MultiFileWriter.cuh @@ -118,6 +118,7 @@ public: std::string get_extension(VectorType const& stream_data); CreateStreamCallBackType _create_stream_callback; MultiFileWriterConfig _config; + PreWriteCallback _pre_write_callback; std::string _tag; ObservationHeader _header; std::map> _file_streams; From 00e86a3306a06ef7a305f9c7e30c172e758205d4 Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Fri, 20 Sep 2024 13:11:19 +0000 Subject: [PATCH 08/18] define the overladed constructors of the MultiFileWriter class --- cpp/skyweaver/detail/MultiFileWriter.cu | 29 +++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/cpp/skyweaver/detail/MultiFileWriter.cu b/cpp/skyweaver/detail/MultiFileWriter.cu index 0fdead7..9667ec7 100644 --- a/cpp/skyweaver/detail/MultiFileWriter.cu +++ b/cpp/skyweaver/detail/MultiFileWriter.cu @@ -50,6 +50,24 @@ MultiFileWriter::MultiFileWriter(PipelineConfig const& config, writer_config.max_file_size = config.max_output_filesize(); writer_config.stokes_mode = config.stokes_mode(); _config = writer_config; + _pre_write_callback = nullptr; +} + + template +MultiFileWriter::MultiFileWriter(PipelineConfig const& config, + std::string tag, + CreateStreamCallBackType create_stream_callback, + PreWriteCallback pre_write_callback) + : _tag(tag), _create_stream_callback(create_stream_callback), _pre_write_callback(pre_write_callback) +{ + MultiFileWriterConfig writer_config; + writer_config.header_size = config.dada_header_size(); + writer_config.max_file_size = config.max_output_filesize(); + writer_config.stokes_mode = config.stokes_mode(); + writer_config.output_dir = config.output_dir(); + writer_config.pre_write = config.pre_write_config(); + _config = writer_config; + _config.pre_write = writer_config.pre_write; } template @@ -57,11 +75,22 @@ MultiFileWriter::MultiFileWriter(MultiFileWriterConfig config, std::string tag, CreateStreamCallBackType create_stream_callback) : _config(config), _tag(tag), _create_stream_callback(create_stream_callback) +{ + _pre_write_callback = nullptr; +} + +template +MultiFileWriter::MultiFileWriter(MultiFileWriterConfig config, + std::string tag, + CreateStreamCallBackType create_stream_callback, + PreWriteCallback pre_write_callback) + : _config(config), _tag(tag), _create_stream_callback(create_stream_callback), _pre_write_callback(pre_write_callback) { } + template MultiFileWriter::~MultiFileWriter(){}; From 1d2566880f98dc0a29bd8cef929d2dbae0acc73e Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Fri, 20 Sep 2024 13:11:53 +0000 Subject: [PATCH 09/18] implement the logic for the pre write callback --- cpp/skyweaver/detail/MultiFileWriter.cu | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cpp/skyweaver/detail/MultiFileWriter.cu b/cpp/skyweaver/detail/MultiFileWriter.cu index 9667ec7..b28d991 100644 --- a/cpp/skyweaver/detail/MultiFileWriter.cu +++ b/cpp/skyweaver/detail/MultiFileWriter.cu @@ -178,6 +178,11 @@ template bool MultiFileWriter::operator()(VectorType const& stream_data, std::size_t stream_idx) { + std::size_t const data_size = stream_data.size() * sizeof(typename VectorType::value_type); + if (_pre_write_callback != nullptr && _config.pre_write.wait.is_enabled) + { + _pre_write_callback(data_size, _config); + } if(!has_stream(stream_idx)) { create_stream(stream_data, stream_idx); } @@ -192,8 +197,7 @@ bool MultiFileWriter::operator()(VectorType const& stream_data, _file_streams.at(stream_idx) ->write(reinterpret_cast( thrust::raw_pointer_cast(stream_data.data())), - stream_data.size() * - sizeof(typename VectorType::value_type)); + data_size); } return false; } From 044a939285a14ae38d07280a9317302eceb9bf4f Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Fri, 20 Sep 2024 13:12:58 +0000 Subject: [PATCH 10/18] method to read the wait arguments --- cpp/skyweaver/src/PipelineConfig.cpp | 58 ++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/cpp/skyweaver/src/PipelineConfig.cpp b/cpp/skyweaver/src/PipelineConfig.cpp index c2cdf3b..b5121e9 100644 --- a/cpp/skyweaver/src/PipelineConfig.cpp +++ b/cpp/skyweaver/src/PipelineConfig.cpp @@ -157,6 +157,64 @@ DedispersionPlan const& PipelineConfig::ddplan() const return _ddplan; } +std::size_t PipelineConfig::convertMemorySize(const std::string& str) const { + std::size_t lastCharPos = str.find_last_not_of("0123456789"); + std::string numberPart = str.substr(0, lastCharPos); + std::string unitPart = str.substr(lastCharPos); + + std::size_t number = std::stoull(numberPart); + + if (unitPart.empty()) + return number; + else if (unitPart == "K" || unitPart == "k") + return number * 1024; + else if (unitPart == "M" || unitPart == "m") + return number * 1024 * 1024; + else if (unitPart == "G" || unitPart == "g") + return number * 1024 * 1024 * 1024; + else + throw std::runtime_error("Invalid memory unit!"); +} + +void PipelineConfig::configure_wait(std::string argument) +{ + std::vector tokens; + std::string token; + std::istringstream tokenStream(argument); + int indx = 0; + _pre_write_config.wait.is_enabled = true; + while (std::getline(tokenStream, token, ':')) { + if(indx == 0) + { + errno = 0; + _pre_write_config.wait.iterations = std::stoi(token); + if (errno == ERANGE) { + throw std::runtime_error("Wait iteration number out of range!"); + } + if (_pre_write_config.wait.iterations < 0) _pre_write_config.wait.iterations = 0; + } else if(indx == 1) { + errno = 0; + _pre_write_config.wait.sleep_time = std::stoi(token); + if (errno == ERANGE) { + throw std::runtime_error("Sleep time out of range!"); + } + if (_pre_write_config.wait.sleep_time < 1) _pre_write_config.wait.sleep_time = 1; + } else if(indx == 2) { + if (!token.empty() && std::all_of(token.begin(), token.end(), ::isdigit)) + { + _pre_write_config.wait.min_free_space = std::stoull(token); + } else { + try { + _pre_write_config.wait.min_free_space = convertMemorySize(token); + } catch (std::runtime_error& e) { + std::cout << "Memory conversion error: " << e.what() << std::endl; + throw; + } + } + } + indx++; + } +} void PipelineConfig::enable_incoherent_dedispersion(bool enable) From a8a966b8387435b047715b7ffd224a6c339882b6 Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Fri, 20 Sep 2024 13:14:09 +0000 Subject: [PATCH 11/18] the pre write call back function --- cpp/skyweaver/src/skyweaver_cli.cu | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/cpp/skyweaver/src/skyweaver_cli.cu b/cpp/skyweaver/src/skyweaver_cli.cu index e168f63..d6d1763 100644 --- a/cpp/skyweaver/src/skyweaver_cli.cu +++ b/cpp/skyweaver/src/skyweaver_cli.cu @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -239,6 +240,34 @@ void run_pipeline(Pipeline& pipeline, stopwatch.show_all_timings(); } +auto pre_write_callback = [] (std::size_t const size, skyweaver::MultiFileWriterConfig const& config) +{ + if (!config.pre_write.wait.is_enabled) return; + std::filesystem::space_info space = std::filesystem::space(config.output_dir); + size_t limit = std::min(config.pre_write.wait.min_free_space, size); + if(space.available >= limit) + return; + + BOOST_LOG_TRIVIAL(info) + << space.available + << " bytes available space is not enough. Need at least " + << limit + << " bytes in " << config.output_dir << "."; + BOOST_LOG_TRIVIAL(warning) << "Start pausing."; + int max_iterations = (config.pre_write.wait.iterations == 0) ? INT_MAX : config.pre_write.wait.iterations; + for (int i = 0; i < max_iterations; i++) + { + sleep(config.pre_write.wait.sleep_time); + space = std::filesystem::space(config.output_dir); + if (space.available >= limit) + { + BOOST_LOG_TRIVIAL(info) << "Space has been freed up. Will proceed."; + return; + } + } + throw std::runtime_error("Space for writing hasn't been freed up in time."); +}; + template void setup_pipeline(skyweaver::PipelineConfig& config) { From f7766a615077ce950c813491a8e6a7d9820e1ebd Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Fri, 20 Sep 2024 13:14:40 +0000 Subject: [PATCH 12/18] use the pre write call back function --- cpp/skyweaver/src/skyweaver_cli.cu | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cpp/skyweaver/src/skyweaver_cli.cu b/cpp/skyweaver/src/skyweaver_cli.cu index d6d1763..24c2f9e 100644 --- a/cpp/skyweaver/src/skyweaver_cli.cu +++ b/cpp/skyweaver/src/skyweaver_cli.cu @@ -289,7 +289,8 @@ void setup_pipeline(skyweaver::PipelineConfig& config) typename IBWriterType::CreateStreamCallBackType create_stream_callback_ib = skyweaver::detail::create_dada_file_stream< skyweaver::BTFPowersH>; - IBWriterType ib_handler(config, "ib", create_stream_callback_ib); + + IBWriterType ib_handler(config, "ib", create_stream_callback_ib, pre_write_callback); using StatsWriterType = skyweaver::MultiFileWriter>; @@ -297,8 +298,8 @@ void setup_pipeline(skyweaver::PipelineConfig& config) create_stream_callback_stats = skyweaver::detail::create_dada_file_stream< skyweaver::FPAStatsD>; - StatsWriterType stats_handler(config, "stats", create_stream_callback_stats); + StatsWriterType stats_handler(config, "stats", create_stream_callback_stats, pre_write_callback); if constexpr(enable_incoherent_dedispersion) { using CBWriterType = skyweaver::MultiFileWriter>; @@ -306,7 +307,7 @@ void setup_pipeline(skyweaver::PipelineConfig& config) create_stream_callback_cb = skyweaver::detail::create_dada_file_stream>; skyweaver::MultiFileWriter> - cb_file_writer(config, "cb", create_stream_callback_cb); + cb_file_writer(config, "cb", create_stream_callback_cb, pre_write_callback); skyweaver::IncoherentDedispersionPipeline @@ -325,7 +326,7 @@ void setup_pipeline(skyweaver::PipelineConfig& config) typename CBWriterType::CreateStreamCallBackType create_stream_callback_cb = skyweaver::detail::create_dada_file_stream>; - CBWriterType cb_file_writer(config, "cb", create_stream_callback_cb); + CBWriterType cb_file_writer(config, "cb", create_stream_callback_cb, pre_write_callback); skyweaver::BeamformerPipeline Date: Fri, 20 Sep 2024 13:15:02 +0000 Subject: [PATCH 13/18] read possible wait arguments --- cpp/skyweaver/src/skyweaver_cli.cu | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cpp/skyweaver/src/skyweaver_cli.cu b/cpp/skyweaver/src/skyweaver_cli.cu index 24c2f9e..d1b9842 100644 --- a/cpp/skyweaver/src/skyweaver_cli.cu +++ b/cpp/skyweaver/src/skyweaver_cli.cu @@ -497,7 +497,14 @@ int main(int argc, char** argv) [](std::size_t nthreads) { omp_set_num_threads(nthreads); }), "The number of threads to use for incoherent dedispersion") - // Logging options + ("wait-for-space", + po::value() + ->notifier( + [&config](std::string key) { config.configure_wait(key); }), + "Wait for enough disk space for the output. " + "::") + + // Logging options ("log-level", po::value()->default_value("info")->notifier( [](std::string level) { skyweaver::set_log_level(level); }), From 4059a6345d247672b247ab872b76bc83a4360917 Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Fri, 20 Sep 2024 13:21:37 +0000 Subject: [PATCH 14/18] That shouldn't be removed --- cpp/skyweaver/MultiFileWriter.cuh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/skyweaver/MultiFileWriter.cuh b/cpp/skyweaver/MultiFileWriter.cuh index 643647c..d9e0a01 100644 --- a/cpp/skyweaver/MultiFileWriter.cuh +++ b/cpp/skyweaver/MultiFileWriter.cuh @@ -24,6 +24,7 @@ struct MultiFileWriterConfig{ std::string output_dir; std::string prefix; std::string extension; + std::string output_basename; PreWriteConfig pre_write; MultiFileWriterConfig() : header_size(4096), max_file_size(2147483647), stokes_mode("I"), output_dir("test"), prefix("test"), extension("default"){}; @@ -131,4 +132,4 @@ public: #include "skyweaver/detail/MultiFileWriter.cu" #include "skyweaver/detail/file_writer_callbacks.cpp" -#endif // SKYWEAVER_MULTIFILEWRITER_CUH \ No newline at end of file +#endif // SKYWEAVER_MULTIFILEWRITER_CUH From 86e5e4d5ee85924f64ea11254796aab3b75b5509 Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Fri, 20 Sep 2024 13:30:05 +0000 Subject: [PATCH 15/18] set default values --- cpp/skyweaver/src/PipelineConfig.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/skyweaver/src/PipelineConfig.cpp b/cpp/skyweaver/src/PipelineConfig.cpp index b5121e9..a7446a5 100644 --- a/cpp/skyweaver/src/PipelineConfig.cpp +++ b/cpp/skyweaver/src/PipelineConfig.cpp @@ -15,7 +15,7 @@ PipelineConfig::PipelineConfig() _bw(13375000.0), _channel_frequencies_stale(true), _gulp_length_samps(4096), _start_time(0.0f), _duration(std::numeric_limits::infinity()), _total_nchans(4096), - _stokes_mode("I"), _output_level(24.0f) + _stokes_mode("I"), _output_level(24.0f), _pre_write_config({{false, 0, 0, 0}}) { } From 620a52357f40b5c9f2f3d532922e5ef5f97acbda Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Mon, 23 Sep 2024 06:54:56 +0000 Subject: [PATCH 16/18] PreWriting could be used more broadly. So is_enabled shouldn't belong to the wait config --- cpp/skyweaver/PipelineConfig.hpp | 4 ++-- cpp/skyweaver/detail/MultiFileWriter.cu | 2 +- cpp/skyweaver/src/PipelineConfig.cpp | 4 ++-- cpp/skyweaver/src/skyweaver_cli.cu | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/skyweaver/PipelineConfig.hpp b/cpp/skyweaver/PipelineConfig.hpp index ef1920b..e54dca1 100644 --- a/cpp/skyweaver/PipelineConfig.hpp +++ b/cpp/skyweaver/PipelineConfig.hpp @@ -11,7 +11,6 @@ namespace skyweaver { struct WaitConfig { - bool is_enabled; int iterations; int sleep_time; std::size_t min_free_space; @@ -19,7 +18,8 @@ namespace skyweaver struct PreWriteConfig { - WaitConfig wait; + bool is_enabled; + WaitConfig wait; }; diff --git a/cpp/skyweaver/detail/MultiFileWriter.cu b/cpp/skyweaver/detail/MultiFileWriter.cu index b28d991..b963a56 100644 --- a/cpp/skyweaver/detail/MultiFileWriter.cu +++ b/cpp/skyweaver/detail/MultiFileWriter.cu @@ -179,7 +179,7 @@ bool MultiFileWriter::operator()(VectorType const& stream_data, std::size_t stream_idx) { std::size_t const data_size = stream_data.size() * sizeof(typename VectorType::value_type); - if (_pre_write_callback != nullptr && _config.pre_write.wait.is_enabled) + if (_pre_write_callback != nullptr && _config.pre_write.is_enabled) { _pre_write_callback(data_size, _config); } diff --git a/cpp/skyweaver/src/PipelineConfig.cpp b/cpp/skyweaver/src/PipelineConfig.cpp index a7446a5..9a4b3eb 100644 --- a/cpp/skyweaver/src/PipelineConfig.cpp +++ b/cpp/skyweaver/src/PipelineConfig.cpp @@ -15,7 +15,7 @@ PipelineConfig::PipelineConfig() _bw(13375000.0), _channel_frequencies_stale(true), _gulp_length_samps(4096), _start_time(0.0f), _duration(std::numeric_limits::infinity()), _total_nchans(4096), - _stokes_mode("I"), _output_level(24.0f), _pre_write_config({{false, 0, 0, 0}}) + _stokes_mode("I"), _output_level(24.0f), _pre_write_config({0, {false, 0, 0}}) { } @@ -182,7 +182,7 @@ void PipelineConfig::configure_wait(std::string argument) std::string token; std::istringstream tokenStream(argument); int indx = 0; - _pre_write_config.wait.is_enabled = true; + _pre_write_config.is_enabled = true; while (std::getline(tokenStream, token, ':')) { if(indx == 0) { diff --git a/cpp/skyweaver/src/skyweaver_cli.cu b/cpp/skyweaver/src/skyweaver_cli.cu index d1b9842..04aea39 100644 --- a/cpp/skyweaver/src/skyweaver_cli.cu +++ b/cpp/skyweaver/src/skyweaver_cli.cu @@ -242,7 +242,7 @@ void run_pipeline(Pipeline& pipeline, auto pre_write_callback = [] (std::size_t const size, skyweaver::MultiFileWriterConfig const& config) { - if (!config.pre_write.wait.is_enabled) return; + if (!config.pre_write.is_enabled) return; std::filesystem::space_info space = std::filesystem::space(config.output_dir); size_t limit = std::min(config.pre_write.wait.min_free_space, size); if(space.available >= limit) From c89902c1726bb48171ce13fa5e57b832877e5d85 Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Wed, 25 Sep 2024 07:46:53 +0000 Subject: [PATCH 17/18] introduce unique_ptr --- cpp/skyweaver/src/skyweaver_cli.cu | 46 +++++++++++++++++++----------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/cpp/skyweaver/src/skyweaver_cli.cu b/cpp/skyweaver/src/skyweaver_cli.cu index 04aea39..165d272 100644 --- a/cpp/skyweaver/src/skyweaver_cli.cu +++ b/cpp/skyweaver/src/skyweaver_cli.cu @@ -293,45 +293,59 @@ void setup_pipeline(skyweaver::PipelineConfig& config) IBWriterType ib_handler(config, "ib", create_stream_callback_ib, pre_write_callback); using StatsWriterType = - skyweaver::MultiFileWriter>; + skyweaver::MultiFileWriter>; + std::unique_ptr stats_handler; + typename StatsWriterType::CreateStreamCallBackType create_stream_callback_stats = skyweaver::detail::create_dada_file_stream< skyweaver::FPAStatsD>; - - StatsWriterType stats_handler(config, "stats", create_stream_callback_stats, pre_write_callback); - if constexpr(enable_incoherent_dedispersion) { using CBWriterType = skyweaver::MultiFileWriter>; typename CBWriterType::CreateStreamCallBackType create_stream_callback_cb = - skyweaver::detail::create_dada_file_stream>; - skyweaver::MultiFileWriter> - cb_file_writer(config, "cb", create_stream_callback_cb, pre_write_callback); + skyweaver::detail::create_dada_file_stream>; + std::unique_ptr cb_file_writer; + if (config.pre_write_config().is_enabled) + { + stats_handler.reset(new StatsWriterType(config, "stats", create_stream_callback_stats, pre_write_callback)); + cb_file_writer.reset(new CBWriterType(config, "cb", create_stream_callback_cb, pre_write_callback)); + }else{ + stats_handler.reset(new StatsWriterType(config, "stats", create_stream_callback_stats)); + cb_file_writer.reset(new CBWriterType(config, "cb", create_stream_callback_cb)); + } + skyweaver::IncoherentDedispersionPipeline - incoherent_dispersion_pipeline(config, cb_file_writer); + decltype(* cb_file_writer.get())> + incoherent_dispersion_pipeline(config, * cb_file_writer.get()); skyweaver::BeamformerPipeline pipeline(config, incoherent_dispersion_pipeline, ib_handler, - stats_handler); + * stats_handler.get()); run_pipeline(pipeline, config, file_reader, header); } else { using CBWriterType = skyweaver::MultiFileWriter>; + std::unique_ptr cb_file_writer; typename CBWriterType::CreateStreamCallBackType create_stream_callback_cb = skyweaver::detail::create_dada_file_stream>; - CBWriterType cb_file_writer(config, "cb", create_stream_callback_cb, pre_write_callback); - skyweaver::BeamformerPipeline - pipeline(config, cb_file_writer, ib_handler, stats_handler); + pipeline(config, * cb_file_writer.get(), ib_handler, * stats_handler.get()); run_pipeline(pipeline, config, file_reader, header); } } @@ -645,4 +659,4 @@ int main(int argc, char** argv) return ERROR_UNHANDLED_EXCEPTION; } return SUCCESS; -} \ No newline at end of file +} From 96adc08af53abd694e1082d40320c2f7f40f2471 Mon Sep 17 00:00:00 2001 From: Henning Fehrmann Date: Wed, 25 Sep 2024 08:38:03 +0000 Subject: [PATCH 18/18] also IBWriter is a unique_ptr now --- cpp/skyweaver/src/skyweaver_cli.cu | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/cpp/skyweaver/src/skyweaver_cli.cu b/cpp/skyweaver/src/skyweaver_cli.cu index 165d272..45e9ad3 100644 --- a/cpp/skyweaver/src/skyweaver_cli.cu +++ b/cpp/skyweaver/src/skyweaver_cli.cu @@ -290,7 +290,7 @@ void setup_pipeline(skyweaver::PipelineConfig& config) skyweaver::detail::create_dada_file_stream< skyweaver::BTFPowersH>; - IBWriterType ib_handler(config, "ib", create_stream_callback_ib, pre_write_callback); + std::unique_ptr ib_handler; using StatsWriterType = skyweaver::MultiFileWriter>; @@ -308,9 +308,11 @@ void setup_pipeline(skyweaver::PipelineConfig& config) std::unique_ptr cb_file_writer; if (config.pre_write_config().is_enabled) { + ib_handler.reset(new IBWriterType(config, "ib", create_stream_callback_ib, pre_write_callback)); stats_handler.reset(new StatsWriterType(config, "stats", create_stream_callback_stats, pre_write_callback)); cb_file_writer.reset(new CBWriterType(config, "cb", create_stream_callback_cb, pre_write_callback)); }else{ + ib_handler.reset(new IBWriterType(config, "ib", create_stream_callback_ib)); stats_handler.reset(new StatsWriterType(config, "stats", create_stream_callback_stats)); cb_file_writer.reset(new CBWriterType(config, "cb", create_stream_callback_cb)); } @@ -320,12 +322,12 @@ void setup_pipeline(skyweaver::PipelineConfig& config) decltype(* cb_file_writer.get())> incoherent_dispersion_pipeline(config, * cb_file_writer.get()); skyweaver::BeamformerPipeline pipeline(config, incoherent_dispersion_pipeline, - ib_handler, + * ib_handler.get(), * stats_handler.get()); run_pipeline(pipeline, config, file_reader, header); } else { @@ -336,16 +338,20 @@ void setup_pipeline(skyweaver::PipelineConfig& config) skyweaver::detail::create_dada_file_stream>; if (config.pre_write_config().is_enabled) { + ib_handler.reset(new IBWriterType(config, "ib", create_stream_callback_ib, pre_write_callback)); cb_file_writer.reset(new CBWriterType(config, "cb", create_stream_callback_cb, pre_write_callback)); + stats_handler.reset(new StatsWriterType(config, "stats", create_stream_callback_stats, pre_write_callback)); }else{ + ib_handler.reset(new IBWriterType(config, "ib", create_stream_callback_ib)); cb_file_writer.reset(new CBWriterType(config, "cb", create_stream_callback_cb)); + stats_handler.reset(new StatsWriterType(config, "stats", create_stream_callback_stats)); } skyweaver::BeamformerPipeline - pipeline(config, * cb_file_writer.get(), ib_handler, * stats_handler.get()); + pipeline(config, * cb_file_writer.get(), * ib_handler.get(), * stats_handler.get()); run_pipeline(pipeline, config, file_reader, header); } }