Skip to content

Commit

Permalink
#16: Fix and optimize communication normalization, print warning for …
Browse files Browse the repository at this point in the history
…non existent objects
  • Loading branch information
pierrepebay committed Apr 15, 2024
1 parent 6459c47 commit 2edb768
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 24 deletions.
57 changes: 33 additions & 24 deletions src/vt-tv/api/info.h
Original file line number Diff line number Diff line change
Expand Up @@ -486,74 +486,83 @@ struct Info {

auto phase_objects = createPhaseObjectsMapping(phase);
// Checking all communications for object A in all objects of all ranks at given phase: A <- ... and A -> ...
for (auto& [A_id, object_work] : phase_objects) { fmt::print("- Object ID: {}\n", A_id);
for (auto& [A_id, object_work] : phase_objects) {
// fmt::print("- Object ID: {}\n", A_id);
auto sent = object_work.getSent();
fmt::print(" Has {} sent communications", sent.size());
// fmt::print(" Has {} sent communications", sent.size());
auto received = object_work.getReceived();
fmt::print(" and {} received communications.\n", received.size());
// fmt::print(" and {} received communications.\n", received.size());
// Going through A -> ... communications
fmt::print(" Checking sent communications:\n");
// fmt::print(" Checking sent communications:\n");
for (auto& [B_id, bytes] : sent) {
fmt::print(" Communication sent to object {} of {} bytes:\n", B_id, bytes);
// fmt::print(" Communication sent to object {} of {} bytes:\n", B_id, bytes);
// check if B exists for the A -> B communication
if (phase_objects.find(B_id) != phase_objects.end()) {
fmt::print(" Found recipient object {} when searching for communication sent by object {} of {} bytes.\n", B_id, A_id, bytes);
// fmt::print(" Found recipient object {} when searching for communication sent by object {} of {} bytes.\n", B_id, A_id, bytes);
auto to_object_work = phase_objects.at(B_id);
auto target_received = to_object_work.getReceived();
fmt::print( "Object {} has {} received communications.\n", B_id, target_received.size());
// fmt::print( "Object {} has {} received communications.\n", B_id, target_received.size());
// Check if B has symmetric B <- A received communication
if (target_received.find(A_id) != target_received.end()) {
fmt::print( " Object {} already has received communication from object {}.\n", B_id, A_id);
// fmt::print( " Object {} already has received communication from object {}.\n", B_id, A_id);
} else {
fmt::print( " Object {} doesn't have received communication from object {}. Pushing to list of communications to add.\n", B_id, A_id);
// fmt::print( " Object {} doesn't have received communication from object {}. Pushing to list of communications to add.\n", B_id, A_id);
communications_to_add.push_back(std::make_tuple("recipient", A_id, B_id, bytes));
}
} else {
fmt::print(" /!\\ Didn't find recipient object {} when searching for communication sent by object {} of {} bytes.\n", B_id, A_id, bytes);
}
}
// Going through A <- ... communications
fmt::print(" Checking received communications:\n");
// fmt::print(" Checking received communications:\n");
for (auto& [B_id, bytes] : received) { // Going through A <- ... communications
fmt::print(" Communication received from object {} of {} bytes:\n", B_id, bytes);
// fmt::print(" Communication received from object {} of {} bytes:\n", B_id, bytes);
// check if B exists for the A <- B communication
if (phase_objects.find(B_id) != phase_objects.end()) {
fmt::print(" Found sender object {} when searching for communication received by object {} of {} bytes.\n", B_id, A_id, bytes);
// fmt::print(" Found sender object {} when searching for communication received by object {} of {} bytes.\n", B_id, A_id, bytes);
auto from_object_work = phase_objects.at(B_id);
auto target_sent = from_object_work.getSent();
fmt::print( "Object {} has {} sent communications.\n", B_id, target_sent.size());
// fmt::print( "Object {} has {} sent communications.\n", B_id, target_sent.size());
// Check if B has symmetric B -> A received communication
if (target_sent.find(A_id) != target_sent.end()) {
fmt::print( " Object {} already has sent communication to object {}.\n", B_id, A_id);
// fmt::print( " Object {} already has sent communication to object {}.\n", B_id, A_id);
} else {
fmt::print( " Object {} doesn't have sent communication to object {}. Pushing to list of communications to add.\n", B_id, A_id);
// fmt::print( " Object {} doesn't have sent communication to object {}. Pushing to list of communications to add.\n", B_id, A_id);
communications_to_add.push_back(std::make_tuple("sender", B_id, A_id, bytes));
}
} else {
fmt::print(" /!\\ Didn't find sender object {} when searching for communication received by object {} of {} bytes.\n", B_id, A_id, bytes);
// fmt::print(" /!\\ Didn't find sender object {} when searching for communication received by object {} of {} bytes.\n", B_id, A_id, bytes);
}
}
}

// loop through ranks and add communications
fmt::print("Updating communications for phase {}.\n", phase);
for (auto& [rank_id, rank] : ranks_) {
for (auto &[rank_id, rank]: ranks_) {
fmt::print(" Checking objects in rank {}.\n", rank_id);
auto& phaseWork = rank.getPhaseWork();
auto& phaseWorkAtPhase = phaseWork.at(phase);
auto& objects = phaseWorkAtPhase.getObjectWork();
for (auto& [obj_id, obj_work] : objects) {
auto &phaseWork = rank.getPhaseWork();
auto &phaseWorkAtPhase = phaseWork.at(phase);
auto &objects = phaseWorkAtPhase.getObjectWork();
for (auto &[obj_id, obj_work]: objects) {
fmt::print(" Checking if object {} needs to be updated.\n", obj_id);
fmt::print(" Communications to update:\n");
for (auto& [object_to_update, sender_id, recipient_id, bytes] : communications_to_add) {
fmt::print(" {} needs to be updated in {} -> {} communication of {} bytes.\n", object_to_update, sender_id, recipient_id, bytes);
uint64_t i = 0;
for (auto &[object_to_update, sender_id, recipient_id, bytes]: communications_to_add) {
fmt::print(" {} needs to be updated in {} -> {} communication of {} bytes.\n", object_to_update,
sender_id, recipient_id, bytes);
if (object_to_update == "sender" && sender_id == obj_id) {
fmt::print(" Sender to be updated is object on this rank. Updating.\n");
rank.addObjectSentCommunicationAtPhase(phase, obj_id, recipient_id, bytes);
communications_to_add.erase(communications_to_add.begin() + i);
} else if (object_to_update == "recipient" && recipient_id == obj_id) {
fmt::print(" Recipient to be updated is object on this rank. Updating.\n");
rank.addObjectReceivedCommunicationAtPhase(phase, obj_id, recipient_id, bytes);
rank.addObjectReceivedCommunicationAtPhase(phase, obj_id, sender_id, bytes);
communications_to_add.erase(communications_to_add.begin() + i);
}
if (communications_to_add.empty()) {
return;
}
i++;
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/vt-tv/utility/json_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ std::unique_ptr<Info> JSONReader::parse() {
auto to_it = objects.find(to_id);
if (to_it != objects.end()) {
to_it->second.addReceivedCommunications(from_id, bytes);
} else {
fmt::print("Warning: Communication {} -> {}: neither sender nor recipient was found in objects.\n", from_id, to_id);
}
}
}
Expand Down

0 comments on commit 2edb768

Please sign in to comment.