Skip to content

Commit

Permalink
Tools API: Topology request returns extended info
Browse files Browse the repository at this point in the history
* For each activated and stopped task an extended information `STopologyResponseData` is returned.
  • Loading branch information
AndreyLebedev authored and AnarManafov committed Oct 20, 2021
1 parent 356a6ce commit c6fbd3d
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 7 deletions.
1 change: 1 addition & 0 deletions ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Added: DDS agent monitors available disk space and if the (configurable) thresho
Added: CSession::userDefaultsGetValueForKey - returns a configuration value for a given configuration key.
Added: new `SSlotInfoRequest` returning a list of details of all active slots (GH-374).
Added: task path to `OnTaskDone` reply.
Added: `Topology` request returns extended info of each activated or stopped task via new `STopologyResponseData` data class.

### dds-topology
Added: new std::istream based APIs.
Expand Down
54 changes: 52 additions & 2 deletions dds-commander/src/ConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,9 @@ void CConnectionManager::broadcastUpdateTopologyAndWait(weakChannelInfo_t::conta
m_updateTopoCondition.wait();
}

void CConnectionManager::activateTasks(const CScheduler& _scheduler, CAgentChannel::weakConnectionPtr_t _channel)
void CConnectionManager::activateTasks(const dds::tools_api::STopologyRequestData& _topologyInfo,
const CScheduler& _scheduler,
CAgentChannel::weakConnectionPtr_t _channel)
{
const CScheduler::ScheduleVector_t& schedule = _scheduler.getSchedule();

Expand Down Expand Up @@ -415,6 +417,25 @@ void CConnectionManager::activateTasks(const CScheduler& _scheduler, CAgentChann

slot.m_taskID = sch.m_taskID;
slot.m_state = EAgentState::executing;

try
{
// Notify Tools API befor activating the tasks
STopologyResponseData info;
info.m_requestID = _topologyInfo.m_requestID;
info.m_activated = true;
info.m_agentID = inf.m_id;
info.m_slotID = slot.m_id;
info.m_taskID = sch.m_taskID;
info.m_path = m_topo.getRuntimeTaskById(sch.m_taskID).m_taskPath;
info.m_host = inf.m_remoteHostInfo.m_host;
info.m_wrkDir = inf.m_remoteHostInfo.m_DDSPath;
sendCustomCommandResponse(_channel, info.toJSON());
}
catch (exception& _e)
{
LOG(error) << "Failed to notify Tools API about activated task (" << sch.m_taskID << "): " << _e.what();
}
}

broadcastUpdateTopologyAndWait<cmdACTIVATE_USER_TASK>(
Expand Down Expand Up @@ -1166,6 +1187,35 @@ void CConnectionManager::updateTopology(const dds::tools_api::STopologyRequestDa
// ptr->dequeueMsg<cmdUPDATE_KEY>();
// }

// Notify Tools API before stopping tasks
for (auto taskID : removedTasks)
{
try
{
auto agent{ m_taskIDToAgentChannelMap[taskID] };
if (agent.m_channel.expired())
continue;
auto ptr{ agent.m_channel.lock() };
SAgentInfo& inf{ ptr->getAgentInfo() };

STopologyResponseData info;
info.m_requestID = _topologyInfo.m_requestID;
info.m_activated = false;
info.m_agentID = inf.m_id;
info.m_slotID = 0; // TODO: we don't set slot ID for the moment.
// Setting it will require locking and looping over the container of slots.
info.m_taskID = taskID;
info.m_path = m_topo.getRuntimeTaskById(taskID).m_taskPath;
info.m_host = inf.m_remoteHostInfo.m_host;
info.m_wrkDir = inf.m_remoteHostInfo.m_DDSPath;
sendCustomCommandResponse(_channel, info.toJSON());
}
catch (exception& _e)
{
LOG(error) << "Failed to notify Tools API about stopped task (" << taskID << "): " << _e.what();
}
}

broadcastUpdateTopologyAndWait<cmdSTOP_USER_TASK>(agents, _channel, "Stopping removed tasks...");
}
//
Expand Down Expand Up @@ -1233,7 +1283,7 @@ void CConnectionManager::updateTopology(const dds::tools_api::STopologyRequestDa
m_taskIDToAgentChannelMap[sch.m_taskID] = sch.m_weakChannelInfo;
}

activateTasks(scheduler, _channel);
activateTasks(_topologyInfo, scheduler, _channel);
}

// Send shutdown to UI channel at the end
Expand Down
4 changes: 3 additions & 1 deletion dds-commander/src/ConnectionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ namespace dds
weakChannelInfo_t _agent,
const std::vector<typename protocol_api::SCommandAttachmentImpl<_cmd>::ptr_t>& _attachments);

void activateTasks(const CScheduler& _scheduler, CAgentChannel::weakConnectionPtr_t _channel);
void activateTasks(const dds::tools_api::STopologyRequestData& _topologyInfo,
const CScheduler& _scheduler,
CAgentChannel::weakConnectionPtr_t _channel);
void _createWnPkg(bool _needInlineBashScript, bool _lightweightPkg, uint32_t _nSlots) const;
void processToolsAPIRequests(const protocol_api::SCustomCmdCmd& _cmd,
CAgentChannel::weakConnectionPtr_t _channel);
Expand Down
6 changes: 5 additions & 1 deletion dds-tools-lib/src/Tools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,11 @@ void CSession::notify(std::istream& _stream)
}
else if (it->second.type() == typeid(STopologyRequest::ptr_t))
{
processRequest<STopologyRequest>(it->second, child, nullptr);
processRequest<STopologyRequest>(
it->second,
child,
[&child](STopologyRequest::ptr_t _request)
{ _request->execResponseCallback(STopologyResponseData(child.second)); });
}
else if (it->second.type() == typeid(SGetLogRequest::ptr_t))
{
Expand Down
67 changes: 65 additions & 2 deletions dds-tools-lib/src/ToolsProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,68 @@ namespace dds
} // namespace tools_api
} // namespace dds

///////////////////////////////////
// STopologyResponseData
///////////////////////////////////

// this declaration is important to help older compilers to eat this static constexpr
constexpr const char* STopologyResponseData::_protocolTag;

STopologyResponseData::STopologyResponseData()
{
}

STopologyResponseData::STopologyResponseData(const boost::property_tree::ptree& _pt)
{
fromPT(_pt);
}

void STopologyResponseData::_toPT(boost::property_tree::ptree& _pt) const
{
_pt.put<bool>("activated", m_activated);
_pt.put<uint64_t>("agentID", m_agentID);
_pt.put<uint64_t>("slotID", m_slotID);
_pt.put<uint64_t>("taskID", m_taskID);
_pt.put<string>("path", m_path);
_pt.put<string>("host", m_host);
_pt.put<string>("wrkDir", m_wrkDir);
}

void STopologyResponseData::_fromPT(const boost::property_tree::ptree& _pt)
{
m_activated = _pt.get<bool>("activated", true);
m_agentID = _pt.get<uint64_t>("agentID", 0);
m_slotID = _pt.get<uint64_t>("slotID", 0);
m_taskID = _pt.get<uint64_t>("taskID", 0);
m_path = _pt.get<string>("path", "");
m_host = _pt.get<string>("host", "");
m_wrkDir = _pt.get<string>("wrkDir", "");
}

bool STopologyResponseData::operator==(const STopologyResponseData& _val) const
{
return (SBaseData::operator==(_val) && m_activated == _val.m_activated && m_agentID == _val.m_agentID &&
m_slotID == _val.m_slotID && m_taskID == _val.m_taskID && m_path == _val.m_path && m_host == _val.m_host &&
m_wrkDir == _val.m_wrkDir);
}

// We need to put function implementation in the same "dds::tools_api" namespace as a friend function declaration.
// Such declaration "std::ostream& dds::tools_api::operator<<(std::ostream& _os, const ***& _data)" doesn't help.
// In order to silent GCC warning "*** has not been declared within 'dds::tools_api'"
namespace dds
{
namespace tools_api
{
std::ostream& operator<<(std::ostream& _os, const STopologyResponseData& _data)
{
return _os << _data.defaultToString() << "; activated: " << _data.m_activated
<< "; agentID: " << _data.m_agentID << "; slotID: " << _data.m_slotID
<< "; taskID: " << _data.m_taskID << "; path: " << quoted(_data.m_path)
<< "; host: " << _data.m_host << "; wrkDir: " << quoted(_data.m_wrkDir);
}
} // namespace tools_api
} // namespace dds

///////////////////////////////////
// SCommanderInfoResponseData
///////////////////////////////////
Expand Down Expand Up @@ -441,7 +503,8 @@ namespace dds
{
return _os << _data.defaultToString() << "; index: " << _data.m_index << "; agentID: " << _data.m_agentID
<< "; slotID: " << _data.m_slotID << "; taskID: " << _data.m_taskID
<< "; state: " << _data.m_state << "; host: " << _data.m_host << "; wrkDir: " << _data.m_wrkDir;
<< "; state: " << _data.m_state << "; host: " << _data.m_host
<< "; wrkDir: " << quoted(_data.m_wrkDir);
}
} // namespace tools_api
} // namespace dds
Expand Down Expand Up @@ -552,7 +615,7 @@ namespace dds
{
return _os << _data.defaultToString() << "; taskID: " << _data.m_taskID
<< "; exitCode: " << _data.m_exitCode << "; signal: " << _data.m_signal
<< "; host: " << _data.m_host << "; wrkDir: " << _data.m_wrkDir
<< "; host: " << _data.m_host << "; wrkDir: " << quoted(_data.m_wrkDir)
<< "; taskPath: " << _data.m_taskPath;
}
} // namespace tools_api
Expand Down
30 changes: 29 additions & 1 deletion dds-tools-lib/src/ToolsProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,36 @@ namespace dds
friend std::ostream& operator<<(std::ostream& _os, const STopologyRequestData& _data);
};

/// \brief Structure holds information of topology response - activated and stopped tasks.
struct STopologyResponseData : SBaseResponseData<STopologyResponseData>
{
STopologyResponseData();
STopologyResponseData(const boost::property_tree::ptree& _pt);

bool m_activated{ true }; ///< True if task was activated, otherwise it's stopped
uint64_t m_agentID{ 0 }; ///< Agent ID
uint64_t m_slotID{ 0 }; ///< Slot ID
uint64_t m_taskID{ 0 }; ///< Task ID, 0 if not assigned
std::string m_path; ///< Path in the topology
std::string m_host; ///< Hostname
std::string m_wrkDir; ///< Wrk directory

private:
friend SBaseData<STopologyResponseData>;
friend SBaseResponseData<STopologyResponseData>;
void _fromPT(const boost::property_tree::ptree& _pt);
void _toPT(boost::property_tree::ptree& _pt) const;
static constexpr const char* _protocolTag = "topology";

public:
/// \brief Equality operator.
bool operator==(const STopologyResponseData& _val) const;
/// \brief Ostream operator.
friend std::ostream& operator<<(std::ostream& _os, const STopologyResponseData& _data);
};

/// \brief Request class of topology.
using STopologyRequest = SBaseRequestImpl<STopologyRequestData, SEmptyResponseData>;
using STopologyRequest = SBaseRequestImpl<STopologyRequestData, STopologyResponseData>;

/// \brief Structure holds information of a getlog request.
DDS_TOOLS_DECLARE_DATA_CLASS(SBaseRequestData, SGetLogRequestData, "getlog")
Expand Down
3 changes: 3 additions & 0 deletions dds-topology/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ int main(int argc, char* argv[])
return;
});

requestPtr->setResponseCallback([](const STopologyResponseData& _info)
{ LOG(info) << "Topology response: " << _info; });

requestPtr->setDoneCallback([&session]() { session.unblockCurrentThread(); });

session.sendRequest<STopologyRequest>(requestPtr);
Expand Down

0 comments on commit c6fbd3d

Please sign in to comment.