Skip to content

Commit

Permalink
Merge branch 'main' into feature/nrse-validation-case
Browse files Browse the repository at this point in the history
  • Loading branch information
mgovers authored Jan 16, 2024
2 parents 8deb45d + 584f047 commit f92f254
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,21 +411,20 @@ class MainModelImpl<ExtraRetrievableTypes<ExtraRetrievableType...>, ComponentLis
// execute one power flow in the current instance, no batch calculation is needed
// NOTE: if the map is not empty but the datasets inside are empty
// that will be considered as a zero batch_size
bool const all_empty = update_data.empty();
if (all_empty) {
if (update_data.empty()) {
calculation_fn(*this, result_data, 0);
return BatchParameter{};
}

// get number of batches (can't be empty, because then all_empty would have been true)
Idx const n_batch = update_data.cbegin()->second.batch_size();
// get batch size (can't be empty; see previous check)
Idx const n_scenarios = update_data.cbegin()->second.batch_size();
// assert if all component types have the same number of batches
assert(std::all_of(update_data.cbegin(), update_data.cend(),
[n_batch](auto const& x) { return x.second.batch_size() == n_batch; }));
[n_scenarios](auto const& x) { return x.second.batch_size() == n_scenarios; }));

// if the batch_size is zero, it is a special case without doing any calculations at all
// we consider in this case the batch set is independent and but not topology cachable
if (n_batch == 0) {
if (n_scenarios == 0) {
return BatchParameter{};
}

Expand All @@ -436,92 +435,153 @@ class MainModelImpl<ExtraRetrievableTypes<ExtraRetrievableType...>, ComponentLis
// missing entries are provided in the update data
}

// error messages
std::vector<std::string> exceptions(n_scenarios, "");
std::vector<CalculationInfo> infos(n_scenarios);

// lambda for sub batch calculation
auto sub_batch = sub_batch_calculation_(calculation_fn, result_data, update_data, exceptions, infos);

batch_dispatch(sub_batch, n_scenarios, threading);

handle_batch_exceptions(exceptions);
calculation_info_ = main_core::merge_calculation_info(infos);

return BatchParameter{};
}

template <typename Calculate>
requires std::invocable<std::remove_cvref_t<Calculate>, MainModelImpl&, Dataset const&, Idx>
auto sub_batch_calculation_(Calculate&& calculation_fn, Dataset const& result_data, ConstDataset const& update_data,
std::vector<std::string>& exceptions, std::vector<CalculationInfo>& infos) {
// const ref of current instance
MainModelImpl const& base_model = *this;

// cache component update order if possible
bool const is_independent = MainModelImpl::is_update_independent(update_data);

// error messages
std::vector<std::string> exceptions(n_batch, "");
std::vector<CalculationInfo> infos(n_batch);
return [&base_model, &exceptions, &infos, &calculation_fn, &result_data, &update_data,
is_independent](Idx start, Idx stride, Idx n_scenarios) {
assert(n_scenarios <= narrow_cast<Idx>(exceptions.size()));
assert(n_scenarios <= narrow_cast<Idx>(infos.size()));

// lambda for sub batch calculation
auto sub_batch = [&base_model, &exceptions, &infos, &calculation_fn, &result_data, &update_data, n_batch,
is_independent](Idx start, Idx stride) {
Timer const t_total(infos[start], 0000, "Total in thread");

auto model = [&base_model, &infos, start] {
Timer const t_copy_model(infos[start], 1100, "Copy model");
auto copy_model = [&base_model, &infos](Idx scenario_idx) {
Timer const t_copy_model(infos[scenario_idx], 1100, "Copy model");
return MainModelImpl{base_model};
}();
};
auto model = copy_model(start);

SequenceIdx scenario_sequence = is_independent ? model.get_sequence_idx_map(update_data) : SequenceIdx{};

for (Idx batch_number = start; batch_number < n_batch; batch_number += stride) {
Timer const t_total_single(infos[batch_number], 0100, "Total single calculation in thread");
// try to update model and run calculation
try {
{
Timer const t_update_model(infos[batch_number], 1200, "Update model");
if (!is_independent) {
scenario_sequence = model.get_sequence_idx_map(update_data, batch_number);
}
model.template update_component<cached_update_t>(update_data, batch_number, scenario_sequence);
}
calculation_fn(model, result_data, batch_number);
{
Timer const t_update_model(infos[batch_number], 1201, "Restore model");
model.restore_components(scenario_sequence);
if (!is_independent) {
std::ranges::for_each(scenario_sequence, [](auto& comp_seq_idx) { comp_seq_idx.clear(); });
}
}
} catch (std::exception const& ex) {
exceptions[batch_number] = ex.what();
model = [&base_model, &infos, start] {
Timer const t_copy_model(infos[start], 1100, "Copy model");
return MainModelImpl{base_model};
}();
} catch (...) {
exceptions[batch_number] = "unknown exception";
model = [&base_model, &infos, start] {
Timer const t_copy_model(infos[start], 1100, "Copy model");
return MainModelImpl{base_model};
}();
}
auto [setup, winddown] =
scenario_update_restore(model, update_data, is_independent, scenario_sequence, infos);

auto calculate_scenario = MainModelImpl::call_with<Idx>(
[&model, &calculation_fn, &result_data, &infos](Idx scenario_idx) {
calculation_fn(model, result_data, scenario_idx);
infos[scenario_idx].merge(model.calculation_info_);
},
std::move(setup), std::move(winddown), scenario_exception_handler(model, exceptions, infos),
[&model, &copy_model](Idx scenario_idx) { model = copy_model(scenario_idx); });

for (Idx scenario_idx = start; scenario_idx < n_scenarios; scenario_idx += stride) {
Timer const t_total_single(infos[scenario_idx], 0100, "Total single calculation in thread");

infos[batch_number].merge(model.calculation_info_);
calculate_scenario(scenario_idx);
}
};
}

// run sequential if
// specified threading < 0
// use hardware threads, but it is either unknown (0) or only has one thread (1)
// specified threading = 1
template <typename RunSubBatchFn>
requires std::invocable<std::remove_cvref_t<RunSubBatchFn>, Idx /*start*/, Idx /*stride*/, Idx /*n_scenarios*/>
static void batch_dispatch(RunSubBatchFn sub_batch, Idx n_scenarios, Idx threading) {
// run batches sequential or parallel
auto const hardware_thread = static_cast<Idx>(std::thread::hardware_concurrency());
// run sequential if
// specified threading < 0
// use hardware threads, but it is either unknown (0) or only has one thread (1)
// specified threading = 1
if (threading < 0 || threading == 1 || (threading == 0 && hardware_thread < 2)) {
// run all in sequential
sub_batch(0, 1);
sub_batch(0, 1, n_scenarios);
} else {
// create parallel threads
Idx const n_thread = std::min(threading == 0 ? hardware_thread : threading, n_batch);
Idx const n_thread = std::min(threading == 0 ? hardware_thread : threading, n_scenarios);
std::vector<std::thread> threads;
threads.reserve(n_thread);
for (Idx thread_number = 0; thread_number < n_thread; ++thread_number) {
// compute each sub batch with stride
threads.emplace_back(sub_batch, thread_number, n_thread);
threads.emplace_back(sub_batch, thread_number, n_thread, n_scenarios);
}
for (auto& thread : threads) {
thread.join();
}
}
}

handle_batch_exceptions(exceptions);
calculation_info_ = main_core::merge_calculation_info(infos);
template <typename... Args, typename RunFn, typename SetupFn, typename WinddownFn, typename HandleExceptionFn,
typename RecoverFromBadFn>
requires std::invocable<std::remove_cvref_t<RunFn>, Args const&...> &&
std::invocable<std::remove_cvref_t<SetupFn>, Args const&...> &&
std::invocable<std::remove_cvref_t<WinddownFn>, Args const&...> &&
std::invocable<std::remove_cvref_t<HandleExceptionFn>, Args const&...> &&
std::invocable<std::remove_cvref_t<RecoverFromBadFn>, Args const&...>
static auto call_with(RunFn run, SetupFn setup, WinddownFn winddown, HandleExceptionFn handle_exception,
RecoverFromBadFn recover_from_bad) {
return [setup_ = std::move(setup), run_ = std::move(run), winddown_ = std::move(winddown),
handle_exception_ = std::move(handle_exception),
recover_from_bad_ = std::move(recover_from_bad)](Args const&... args) {
try {
setup_(args...);
run_(args...);
winddown_(args...);
} catch (...) {
handle_exception_(args...);
try {
winddown_(args...);
} catch (...) {
recover_from_bad_(args...);
}
}
};
}

return BatchParameter{};
static auto scenario_update_restore(MainModelImpl& model, ConstDataset const& update_data,
bool const is_independent, SequenceIdx& scenario_sequence,
std::vector<CalculationInfo>& infos) {
return std::make_pair(
[&model, &update_data, &scenario_sequence, is_independent, &infos](Idx scenario_idx) {
Timer const t_update_model(infos[scenario_idx], 1200, "Update model");
if (!is_independent) {
scenario_sequence = model.get_sequence_idx_map(update_data, scenario_idx);
}
model.template update_component<cached_update_t>(update_data, scenario_idx, scenario_sequence);
},
[&model, &scenario_sequence, is_independent, &infos](Idx scenario_idx) {
Timer const t_update_model(infos[scenario_idx], 1201, "Restore model");
model.restore_components(scenario_sequence);
if (!is_independent) {
std::ranges::for_each(scenario_sequence, [](auto& comp_seq_idx) { comp_seq_idx.clear(); });
}
});
}

// Lippincott pattern
static auto scenario_exception_handler(MainModelImpl& model, std::vector<std::string>& messages,
std::vector<CalculationInfo>& infos) {
return [&model, &messages, &infos](Idx scenario_idx) {
std::exception_ptr const ex_ptr = std::current_exception();
try {
std::rethrow_exception(ex_ptr);
} catch (std::exception const& ex) {
messages[scenario_idx] = ex.what();
} catch (...) {
messages[scenario_idx] = "unknown exception";
}
infos[scenario_idx].merge(model.calculation_info_);
};
}

static void handle_batch_exceptions(std::vector<std::string> const& exceptions) {
Expand Down
11 changes: 5 additions & 6 deletions tests/cpp_unit_tests/test_main_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1095,11 +1095,10 @@ auto incomplete_input_model(State const& state) -> MainModel {

std::vector<SourceInput> const incomplete_source_input{{6, 1, 1, nan, nan, 1e12, nan, nan},
{10, 3, 1, nan, nan, 1e12, nan, nan}};
std::vector<SymLoadGenInput> incomplete_sym_load_input{{7, 3, 1, LoadGenType::const_y, nan, 0.0}};
std::vector<AsymLoadGenInput> incomplete_asym_load_input{
std::vector<SymLoadGenInput> const incomplete_sym_load_input{{7, 3, 1, LoadGenType::const_y, nan, 0.0}};
std::vector<AsymLoadGenInput> const incomplete_asym_load_input{
{8, 3, 1, LoadGenType::const_y, RealValue<false>{nan}, RealValue<false>{0.0}}};

ConstDataset input_data;
main_model.add_component<Node>(state.node_input);
main_model.add_component<Line>(state.line_input);
main_model.add_component<Link>(state.link_input);
Expand All @@ -1119,7 +1118,7 @@ TEST_CASE("Test main model - incomplete input") {
using CalculationMethod::linear_current;
using CalculationMethod::newton_raphson;

State state;
State const state;
auto main_model = default_model(state);
auto test_model = incomplete_input_model(state);

Expand Down Expand Up @@ -1272,10 +1271,10 @@ TEST_CASE("Test main model - incomplete input") {
}
}

TEST_CASE("Incomplete followed by complete") {
TEST_CASE("Test main model - Incomplete followed by complete") {
using CalculationMethod::linear;

State state;
State const state;
auto main_model = default_model(state);
auto test_model = incomplete_input_model(state);

Expand Down

0 comments on commit f92f254

Please sign in to comment.