diff --git a/power_grid_model_c/power_grid_model/include/power_grid_model/main_model.hpp b/power_grid_model_c/power_grid_model/include/power_grid_model/main_model.hpp index 5d213a901..c11660037 100644 --- a/power_grid_model_c/power_grid_model/include/power_grid_model/main_model.hpp +++ b/power_grid_model_c/power_grid_model/include/power_grid_model/main_model.hpp @@ -411,21 +411,20 @@ class MainModelImpl, ComponentLis // execute one power flow in the current instance, no batch calculation is needed // NOTE: if the map is not empty but the datasets inside are empty // that will be considered as a zero batch_size - bool const all_empty = update_data.empty(); - if (all_empty) { + if (update_data.empty()) { calculation_fn(*this, result_data, 0); return BatchParameter{}; } - // get number of batches (can't be empty, because then all_empty would have been true) - Idx const n_batch = update_data.cbegin()->second.batch_size(); + // get batch size (can't be empty; see previous check) + Idx const n_scenarios = update_data.cbegin()->second.batch_size(); // assert if all component types have the same number of batches assert(std::all_of(update_data.cbegin(), update_data.cend(), - [n_batch](auto const& x) { return x.second.batch_size() == n_batch; })); + [n_scenarios](auto const& x) { return x.second.batch_size() == n_scenarios; })); // if the batch_size is zero, it is a special case without doing any calculations at all // we consider in this case the batch set is independent and but not topology cachable - if (n_batch == 0) { + if (n_scenarios == 0) { return BatchParameter{}; } @@ -436,92 +435,153 @@ class MainModelImpl, ComponentLis // missing entries are provided in the update data } + // error messages + std::vector exceptions(n_scenarios, ""); + std::vector infos(n_scenarios); + + // lambda for sub batch calculation + auto sub_batch = sub_batch_calculation_(calculation_fn, result_data, update_data, exceptions, infos); + + batch_dispatch(sub_batch, n_scenarios, threading); + + handle_batch_exceptions(exceptions); + calculation_info_ = main_core::merge_calculation_info(infos); + + return BatchParameter{}; + } + + template + requires std::invocable, MainModelImpl&, Dataset const&, Idx> + auto sub_batch_calculation_(Calculate&& calculation_fn, Dataset const& result_data, ConstDataset const& update_data, + std::vector& exceptions, std::vector& infos) { // const ref of current instance MainModelImpl const& base_model = *this; // cache component update order if possible bool const is_independent = MainModelImpl::is_update_independent(update_data); - // error messages - std::vector exceptions(n_batch, ""); - std::vector infos(n_batch); + return [&base_model, &exceptions, &infos, &calculation_fn, &result_data, &update_data, + is_independent](Idx start, Idx stride, Idx n_scenarios) { + assert(n_scenarios <= narrow_cast(exceptions.size())); + assert(n_scenarios <= narrow_cast(infos.size())); - // lambda for sub batch calculation - auto sub_batch = [&base_model, &exceptions, &infos, &calculation_fn, &result_data, &update_data, n_batch, - is_independent](Idx start, Idx stride) { Timer const t_total(infos[start], 0000, "Total in thread"); - auto model = [&base_model, &infos, start] { - Timer const t_copy_model(infos[start], 1100, "Copy model"); + auto copy_model = [&base_model, &infos](Idx scenario_idx) { + Timer const t_copy_model(infos[scenario_idx], 1100, "Copy model"); return MainModelImpl{base_model}; - }(); + }; + auto model = copy_model(start); SequenceIdx scenario_sequence = is_independent ? model.get_sequence_idx_map(update_data) : SequenceIdx{}; - for (Idx batch_number = start; batch_number < n_batch; batch_number += stride) { - Timer const t_total_single(infos[batch_number], 0100, "Total single calculation in thread"); - // try to update model and run calculation - try { - { - Timer const t_update_model(infos[batch_number], 1200, "Update model"); - if (!is_independent) { - scenario_sequence = model.get_sequence_idx_map(update_data, batch_number); - } - model.template update_component(update_data, batch_number, scenario_sequence); - } - calculation_fn(model, result_data, batch_number); - { - Timer const t_update_model(infos[batch_number], 1201, "Restore model"); - model.restore_components(scenario_sequence); - if (!is_independent) { - std::ranges::for_each(scenario_sequence, [](auto& comp_seq_idx) { comp_seq_idx.clear(); }); - } - } - } catch (std::exception const& ex) { - exceptions[batch_number] = ex.what(); - model = [&base_model, &infos, start] { - Timer const t_copy_model(infos[start], 1100, "Copy model"); - return MainModelImpl{base_model}; - }(); - } catch (...) { - exceptions[batch_number] = "unknown exception"; - model = [&base_model, &infos, start] { - Timer const t_copy_model(infos[start], 1100, "Copy model"); - return MainModelImpl{base_model}; - }(); - } + auto [setup, winddown] = + scenario_update_restore(model, update_data, is_independent, scenario_sequence, infos); + + auto calculate_scenario = MainModelImpl::call_with( + [&model, &calculation_fn, &result_data, &infos](Idx scenario_idx) { + calculation_fn(model, result_data, scenario_idx); + infos[scenario_idx].merge(model.calculation_info_); + }, + std::move(setup), std::move(winddown), scenario_exception_handler(model, exceptions, infos), + [&model, ©_model](Idx scenario_idx) { model = copy_model(scenario_idx); }); + + for (Idx scenario_idx = start; scenario_idx < n_scenarios; scenario_idx += stride) { + Timer const t_total_single(infos[scenario_idx], 0100, "Total single calculation in thread"); - infos[batch_number].merge(model.calculation_info_); + calculate_scenario(scenario_idx); } }; + } + // run sequential if + // specified threading < 0 + // use hardware threads, but it is either unknown (0) or only has one thread (1) + // specified threading = 1 + template + requires std::invocable, Idx /*start*/, Idx /*stride*/, Idx /*n_scenarios*/> + static void batch_dispatch(RunSubBatchFn sub_batch, Idx n_scenarios, Idx threading) { // run batches sequential or parallel auto const hardware_thread = static_cast(std::thread::hardware_concurrency()); - // run sequential if - // specified threading < 0 - // use hardware threads, but it is either unknown (0) or only has one thread (1) - // specified threading = 1 if (threading < 0 || threading == 1 || (threading == 0 && hardware_thread < 2)) { // run all in sequential - sub_batch(0, 1); + sub_batch(0, 1, n_scenarios); } else { // create parallel threads - Idx const n_thread = std::min(threading == 0 ? hardware_thread : threading, n_batch); + Idx const n_thread = std::min(threading == 0 ? hardware_thread : threading, n_scenarios); std::vector threads; threads.reserve(n_thread); for (Idx thread_number = 0; thread_number < n_thread; ++thread_number) { // compute each sub batch with stride - threads.emplace_back(sub_batch, thread_number, n_thread); + threads.emplace_back(sub_batch, thread_number, n_thread, n_scenarios); } for (auto& thread : threads) { thread.join(); } } + } - handle_batch_exceptions(exceptions); - calculation_info_ = main_core::merge_calculation_info(infos); + template + requires std::invocable, Args const&...> && + std::invocable, Args const&...> && + std::invocable, Args const&...> && + std::invocable, Args const&...> && + std::invocable, Args const&...> + static auto call_with(RunFn run, SetupFn setup, WinddownFn winddown, HandleExceptionFn handle_exception, + RecoverFromBadFn recover_from_bad) { + return [setup_ = std::move(setup), run_ = std::move(run), winddown_ = std::move(winddown), + handle_exception_ = std::move(handle_exception), + recover_from_bad_ = std::move(recover_from_bad)](Args const&... args) { + try { + setup_(args...); + run_(args...); + winddown_(args...); + } catch (...) { + handle_exception_(args...); + try { + winddown_(args...); + } catch (...) { + recover_from_bad_(args...); + } + } + }; + } - return BatchParameter{}; + static auto scenario_update_restore(MainModelImpl& model, ConstDataset const& update_data, + bool const is_independent, SequenceIdx& scenario_sequence, + std::vector& infos) { + return std::make_pair( + [&model, &update_data, &scenario_sequence, is_independent, &infos](Idx scenario_idx) { + Timer const t_update_model(infos[scenario_idx], 1200, "Update model"); + if (!is_independent) { + scenario_sequence = model.get_sequence_idx_map(update_data, scenario_idx); + } + model.template update_component(update_data, scenario_idx, scenario_sequence); + }, + [&model, &scenario_sequence, is_independent, &infos](Idx scenario_idx) { + Timer const t_update_model(infos[scenario_idx], 1201, "Restore model"); + model.restore_components(scenario_sequence); + if (!is_independent) { + std::ranges::for_each(scenario_sequence, [](auto& comp_seq_idx) { comp_seq_idx.clear(); }); + } + }); + } + + // Lippincott pattern + static auto scenario_exception_handler(MainModelImpl& model, std::vector& messages, + std::vector& infos) { + return [&model, &messages, &infos](Idx scenario_idx) { + std::exception_ptr const ex_ptr = std::current_exception(); + try { + std::rethrow_exception(ex_ptr); + } catch (std::exception const& ex) { + messages[scenario_idx] = ex.what(); + } catch (...) { + messages[scenario_idx] = "unknown exception"; + } + infos[scenario_idx].merge(model.calculation_info_); + }; } static void handle_batch_exceptions(std::vector const& exceptions) { diff --git a/tests/cpp_unit_tests/test_main_model.cpp b/tests/cpp_unit_tests/test_main_model.cpp index 18f2392ad..1a464c7a8 100644 --- a/tests/cpp_unit_tests/test_main_model.cpp +++ b/tests/cpp_unit_tests/test_main_model.cpp @@ -1095,11 +1095,10 @@ auto incomplete_input_model(State const& state) -> MainModel { std::vector const incomplete_source_input{{6, 1, 1, nan, nan, 1e12, nan, nan}, {10, 3, 1, nan, nan, 1e12, nan, nan}}; - std::vector incomplete_sym_load_input{{7, 3, 1, LoadGenType::const_y, nan, 0.0}}; - std::vector incomplete_asym_load_input{ + std::vector const incomplete_sym_load_input{{7, 3, 1, LoadGenType::const_y, nan, 0.0}}; + std::vector const incomplete_asym_load_input{ {8, 3, 1, LoadGenType::const_y, RealValue{nan}, RealValue{0.0}}}; - ConstDataset input_data; main_model.add_component(state.node_input); main_model.add_component(state.line_input); main_model.add_component(state.link_input); @@ -1119,7 +1118,7 @@ TEST_CASE("Test main model - incomplete input") { using CalculationMethod::linear_current; using CalculationMethod::newton_raphson; - State state; + State const state; auto main_model = default_model(state); auto test_model = incomplete_input_model(state); @@ -1272,10 +1271,10 @@ TEST_CASE("Test main model - incomplete input") { } } -TEST_CASE("Incomplete followed by complete") { +TEST_CASE("Test main model - Incomplete followed by complete") { using CalculationMethod::linear; - State state; + State const state; auto main_model = default_model(state); auto test_model = incomplete_input_model(state);