Skip to content

Commit

Permalink
GH-407: Add agent groups support
Browse files Browse the repository at this point in the history
dds-submit: Added: Users can specify a GroupName tag for each submission. This tag will be assigned to agents and can be used as a requirement in topologies. (GH-407)
dds-topology: Added: A new groupName requirement. It can be used on task and collection. (GH-407)
  • Loading branch information
AnarManafov committed Feb 6, 2022
1 parent 8109d71 commit d633e7e
Show file tree
Hide file tree
Showing 25 changed files with 169 additions and 46 deletions.
8 changes: 7 additions & 1 deletion ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@

### DDS common

Added: every DDS module logs now its pid, group id and parent pid. (GH-403)
Added: every DDS module logs now its pid, group id and parent pid. (GH-403)

### dds-submit
Added: Users can specify a GroupName tag for each submission. This tag will be assigned to agents and can be used as a requirement in topologies. (GH-407)

### dds-topology
Added: A new groupName requirement. It can be used on task and collection. (GH-407)

## v3.6 (2022-01-11)

Expand Down
17 changes: 13 additions & 4 deletions bin/dds-prep-worker
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ DDS command line utility to prepare a worker package - all elements of DDS which
Usage: $(basename $0) [OPTION]
-i Re-pack user_worker_env.sh (for internal use only).
-s arg Submit time (optional).
-a arg DDS Session ID
-t arg A number of task slots
-l Create a lightweight package, without WN binaries
-a arg DDS Session ID.
-t arg A number of task slots.
-g arg Agent group name.
-l Create a lightweight package, without WN binaries.
-h Show summary of options.
Report bugs to http://dds.gsi.de
Expand All @@ -45,7 +46,8 @@ SUBMIT_TIME=""
DDS_SESSION_ID=""
LIGHTWEIGHT_PKG=""
DDS_AGENT_SLOTS=""
while getopts ":is:a:t:lh" opt; do
AGENT_GROUP_NAME=""
while getopts ":is:a:t:g:lh" opt; do
case $opt in
i)
REPACK_WRK_PKG="YES"
Expand All @@ -56,6 +58,8 @@ while getopts ":is:a:t:lh" opt; do
;;
t) DDS_AGENT_SLOTS="$OPTARG"
;;
g) AGENT_GROUP_NAME="$OPTARG"
;;
l) LIGHTWEIGHT_PKG="YES"
;;
h)
Expand Down Expand Up @@ -303,6 +307,11 @@ echo "$TOOL_NAME: setting agent task slots to $DDS_AGENT_SLOTS"
LANG=C LC_ALL=C sed -i.back "s/_DDS_AGENT_SLOTS_/$DDS_AGENT_SLOTS/g" $DDS_WRK_SCRIPT_TMP
rm -f "$DDS_WRK_SCRIPT_TMP.back"

# Replace a group name variable with the actual value
echo "$TOOL_NAME: setting agent group[ name to $AGENT_GROUP_NAME"
LANG=C LC_ALL=C sed -i.back "s/_AGENT_GROUP_NAME_/$AGENT_GROUP_NAME/g" $DDS_WRK_SCRIPT_TMP
rm -f "$DDS_WRK_SCRIPT_TMP.back"

# copy worker script to the sandbox directory
cp "$DDS_WRK_SCRIPT_TMP" "$WRK_SCRIPT" || { echo "error: Can't copy the worker script to the sandbox." >&2 ; exit 1; }
chmod u+wx "$WRK_SCRIPT"
Expand Down
1 change: 1 addition & 0 deletions dds-agent/src/AgentConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ void CAgentConnectionManager::createCommanderChannel(uint64_t _protocolHeaderID)
// Create new agent and push handshake message
m_commanderChannel = CCommanderChannel::makeNew(m_context, _protocolHeaderID, m_intercomContext);
m_commanderChannel->setNumberOfSlots(m_options.m_slots);
m_commanderChannel->setGroupName(m_options.m_groupName);

// Subscribe to Shutdown command
m_commanderChannel->registerHandler<cmdSHUTDOWN>(
Expand Down
8 changes: 8 additions & 0 deletions dds-agent/src/CommanderChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ void CCommanderChannel::setNumberOfSlots(size_t _nSlots)
m_nSlots = _nSlots;
}

void CCommanderChannel::setGroupName(const std::string& _groupName)
{
m_groupName = _groupName;
}

bool CCommanderChannel::on_cmdREPLY(SCommandAttachmentImpl<cmdREPLY>::ptr_t _attachment, SSenderInfo& /*_sender*/)
{
switch (_attachment->m_srcCommand)
Expand Down Expand Up @@ -221,6 +226,7 @@ bool CCommanderChannel::on_cmdGET_HOST_INFO(SCommandAttachmentImpl<cmdGET_HOST_I
cmd.m_DDSPath = CUserDefaults::getDDSPath();
cmd.m_agentPid = pid;
cmd.m_slots = m_nSlots;
cmd.m_groupName = m_groupName;

// get worker ID
string sWorkerId;
Expand All @@ -242,6 +248,8 @@ bool CCommanderChannel::on_cmdGET_HOST_INFO(SCommandAttachmentImpl<cmdGET_HOST_I
cmd.m_submitTime = stoll(sSubmitTime);
}

LOG(info) << cmd;

pushMsg<cmdREPLY_HOST_INFO>(cmd);
return true;
}
Expand Down
2 changes: 2 additions & 0 deletions dds-agent/src/CommanderChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ namespace dds
public:
void stopChannel();
void setNumberOfSlots(size_t _nSlots);
void setGroupName(const std::string& _groupName);

private:
// Message Handlers
Expand Down Expand Up @@ -151,6 +152,7 @@ namespace dds
SSlotInfo::container_t m_slots;
size_t m_nSlots{ 0 };
timerPtr_t m_resourceMonitorTimer;
std::string m_groupName;
};
} // namespace agent_cmd
} // namespace dds
Expand Down
3 changes: 3 additions & 0 deletions dds-agent/src/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ namespace dds

ECommands m_Command{ cmd_start };
size_t m_slots{ 0 };
std::string m_groupName;
} SOptions_t;

// Command line parser
Expand All @@ -69,6 +70,8 @@ namespace dds
" clean: \tCleaning");
options.add_options()(
"slots,s", bpo::value<size_t>(&_options->m_slots), "Defines a number of task slots per agent.");
options.add_options()(
"group-name,g", bpo::value<std::string>(&_options->m_groupName), "Defines a group name of the agent.");

//...positional
bpo::positional_options_description pd;
Expand Down
14 changes: 12 additions & 2 deletions dds-commander/src/ConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ void CConnectionManager::newClientCreated(CAgentChannel::connectionPtr_t _newCli
}

//=============================================================================
void CConnectionManager::_createWnPkg(bool _needInlineBashScript, bool _lightweightPkg, uint32_t _nSlots) const
void CConnectionManager::_createWnPkg(bool _needInlineBashScript,
bool _lightweightPkg,
uint32_t _nSlots,
const string& _groupName) const
{
// re-create the worker package if needed
string out;
Expand All @@ -145,6 +148,9 @@ void CConnectionManager::_createWnPkg(bool _needInlineBashScript, bool _lightwei
// Slots per agent
cmd += " -t ";
cmd += to_string(_nSlots);
// Group name
cmd += " -g ";
cmd += _groupName;

if (_lightweightPkg)
cmd += " -l ";
Expand Down Expand Up @@ -1015,7 +1021,10 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData&
sendToolsAPIMsg(_channel, _submitInfo.m_requestID, "Creating new worker package...", EMsgSeverity::info);

// Use a lightweightpackage when possible
_createWnPkg(!inlineShellScripCmds.empty(), (_submitInfo.m_rms == "localhost"), _submitInfo.m_slots);
_createWnPkg(!inlineShellScripCmds.empty(),
(_submitInfo.m_rms == "localhost"),
_submitInfo.m_slots,
_submitInfo.m_groupName);

// remember the UI channel, which requested to submit the job
m_SubmitAgents.m_channel = _channel;
Expand All @@ -1027,6 +1036,7 @@ void CConnectionManager::submitAgents(const dds::tools_api::SSubmitRequestData&
submitRequest.m_nInstances = _submitInfo.m_instances;
submitRequest.m_slots = _submitInfo.m_slots;
submitRequest.m_wrkPackagePath = CUserDefaults::instance().getWrkScriptPath();
submitRequest.m_groupName = _submitInfo.m_groupName;
m_SubmitAgents.m_strInitialSubmitRequest = submitRequest.toJSON();

string sPluginInfoMsg("RMS plug-in: ");
Expand Down
5 changes: 4 additions & 1 deletion dds-commander/src/ConnectionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ namespace dds
void activateTasks(const dds::tools_api::STopologyRequestData& _topologyInfo,
const CScheduler& _scheduler,
CAgentChannel::weakConnectionPtr_t _channel);
void _createWnPkg(bool _needInlineBashScript, bool _lightweightPkg, uint32_t _nSlots) const;
void _createWnPkg(bool _needInlineBashScript,
bool _lightweightPkg,
uint32_t _nSlots,
const std::string& _groupName) const;
void processToolsAPIRequests(const protocol_api::SCustomCmdCmd& _cmd,
CAgentChannel::weakConnectionPtr_t _channel);
void submitAgents(const dds::tools_api::SSubmitRequestData& _submitInfo,
Expand Down
20 changes: 16 additions & 4 deletions dds-commander/src/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ void CScheduler::makeScheduleImpl(const CTopoCore& _topology,
continue;

const SHostInfoCmd& hostInfo = info.m_remoteHostInfo;
hostToChannelMap[make_tuple(info.m_id, hostInfo.m_host, hostInfo.m_workerId)].push_back(iChannel);
hostToChannelMap[make_tuple(info.m_id, hostInfo.m_host, hostInfo.m_workerId, hostInfo.m_groupName)].push_back(
iChannel);
}

// Collect all tasks that belong to collections
Expand Down Expand Up @@ -173,8 +174,9 @@ void CScheduler::scheduleTasks(const CTopoCore& _topology,
{
const string hostName{ std::get<1>(v.first) };
const string wnName{ std::get<2>(v.first) };
const string groupName{ std::get<3>(v.first) };
const bool requirementOk{ checkRequirements(
task->getRequirements(), hostName, wnName, task->getName(), _hostCounterMap) };
task->getRequirements(), hostName, wnName, groupName, task->getName(), _hostCounterMap) };
if (requirementOk)
{
if (!v.second.empty())
Expand Down Expand Up @@ -244,8 +246,13 @@ void CScheduler::scheduleCollections(const CTopoCore& _topology,
{
const string hostName{ std::get<1>(v.first) };
const string wnName{ std::get<2>(v.first) };
const bool requirementOk{ checkRequirements(
collection->getRequirements(), hostName, wnName, collection->getName(), _hostCounterMap) };
const string groupName{ std::get<3>(v.first) };
const bool requirementOk{ checkRequirements(collection->getRequirements(),
hostName,
wnName,
groupName,
collection->getName(),
_hostCounterMap) };
if ((v.second.size() >= collectionInfo.m_collection->getNofTasks()) && requirementOk)
{
const STopoRuntimeCollection& collectionInfo{ _topology.getRuntimeCollectionById(id) };
Expand Down Expand Up @@ -290,6 +297,7 @@ void CScheduler::scheduleCollections(const CTopoCore& _topology,
bool CScheduler::checkRequirements(const topology_api::CTopoRequirement::PtrVector_t& _requirements,
const string& _hostName,
const string& _wnName,
const string& _groupName,
const string& _elementName,
hostCounterMap_t& _hostCounterMap) const
{
Expand All @@ -314,6 +322,10 @@ bool CScheduler::checkRequirements(const topology_api::CTopoRequirement::PtrVect
result = CScheduler::hostPatternMatches(requirement->getValue(),
(type == EType::HostName) ? _hostName : _wnName);
}
else if (type == EType::GroupName)
{
result = CScheduler::hostPatternMatches(requirement->getValue(), _groupName);
}
else if (type == EType::MaxInstancesPerHost)
{
try
Expand Down
7 changes: 5 additions & 2 deletions dds-commander/src/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ namespace dds
using weakChannelInfoVector_t = std::vector<dds::protocol_api::SWeakChannelInfo<CAgentChannel>>;

private:
// Map tuple<agent ID, host name, worker id> to vector of channel indeces.
using hostToChannelMap_t = std::map<std::tuple<uint64_t, std::string, std::string>, std::vector<size_t>>;
// Map tuple<agent ID, host name, worker id, group name> to vector of channel indeces.
using hostToChannelMap_t =
std::map<std::tuple<uint64_t, std::string, std::string, std::string>, std::vector<size_t>>;
// Map pair<host name, task/collection name> to counter.
using hostCounterMap_t = std::map<std::pair<std::string, std::string>, size_t>;

Expand Down Expand Up @@ -82,9 +83,11 @@ namespace dds
const topology_api::CTopoCore::IdSet_t* _addedTasks,
hostCounterMap_t& _hostCounterMap);

// TODO: host, wn and group names should be provided via a struct.
bool checkRequirements(const topology_api::CTopoRequirement::PtrVector_t& _requirements,
const std::string& _hostName,
const std::string& _wnName,
const std::string& _groupName,
const std::string& _elementName,
hostCounterMap_t& _hostCounterMap) const;

Expand Down
15 changes: 13 additions & 2 deletions dds-commander/tests/TestScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ void make_agent(boost::asio::io_context& _io_context,
const string& _hostName,
const string& _workerId,
uint64_t _protocolHeaderID,
size_t _numSlotsPerAgent)
size_t _numSlotsPerAgent,
const string& _groupName = "common")
{
CAgentChannel::connectionPtr_t agent{ CAgentChannel::makeNew(_io_context, _protocolHeaderID) };
SAgentInfo& info{ agent->getAgentInfo() };
info.m_remoteHostInfo.m_host = _hostName;
info.m_remoteHostInfo.m_workerId = _workerId;
info.m_remoteHostInfo.m_groupName = _groupName;

// Add slots to agent
for (size_t is = 0; is < _numSlotsPerAgent; ++is)
Expand Down Expand Up @@ -133,6 +135,14 @@ BOOST_AUTO_TEST_CASE(test_dds_scheduler_1)
// No Requirement
make_agent(io_context, agents, "noname_host", "noname_wn", counter++, numSlotsPerAgent);

// Requirement type "groupname" for collections
make_agent(io_context, agents, "host7_0", "", counter++, numSlotsPerAgent, "test_group");
make_agent(io_context, agents, "host7_0", "", counter++, numSlotsPerAgent, "test_group");
make_agent(io_context, agents, "host7_0", "", counter++, numSlotsPerAgent, "test_group");

// Requirement type "groupname" for tasks
make_agent(io_context, agents, "host7_0", "", counter++, numSlotsPerAgent, "test_group2");

using weak_t = CConnectionManager::weakChannelInfo_t;
weak_t::container_t weakAgents;
std::transform(
Expand All @@ -141,7 +151,8 @@ BOOST_AUTO_TEST_CASE(test_dds_scheduler_1)
});

CScheduler scheduler;
BOOST_CHECK_NO_THROW(scheduler.makeSchedule(topology, weakAgents));
scheduler.makeSchedule(topology, weakAgents);
// BOOST_CHECK_NO_THROW(scheduler.makeSchedule(topology, weakAgents));
cout << scheduler.toString();
}

Expand Down
24 changes: 24 additions & 0 deletions dds-commander/tests/topology_scheduler_test_2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
<declrequirement name="requirement5" type="wnname" value="wn5"/>

<declrequirement name="requirement6" type="wnname" value="wn6"/>

<declrequirement name="requirement7" type="groupname" value="test_group"/>

<declrequirement name="requirement8" type="groupname" value="test_group2"/>

<decltask name="TestTask1">
<exe reachable="false">test_task.exe</exe>
Expand Down Expand Up @@ -39,6 +43,13 @@
<name>requirement5</name>
</requirements>
</decltask>

<decltask name="TestTask5">
<exe reachable="false">test_task.exe</exe>
<requirements>
<name>requirement8</name>
</requirements>
</decltask>

<declcollection name="TestCollection1">
<requirements>
Expand All @@ -65,16 +76,29 @@
<name>TestTask1</name>
</tasks>
</declcollection>

<declcollection name="TestCollection4">
<requirements>
<name>requirement7</name>
</requirements>
<tasks>
<name>TestTask1</name>
<name>TestTask2</name>
<name>TestTask3</name>
</tasks>
</declcollection>

<main name="main">
<group name="group1" n="3">
<task>TestTask1</task>
<task>TestTask2</task>
<task>TestTask3</task>
<task>TestTask4</task>
<task>TestTask5</task>
<collection>TestCollection1</collection>
<collection>TestCollection2</collection>
<collection>TestCollection3</collection>
<collection>TestCollection4</collection>
</group>
</main>

Expand Down
1 change: 1 addition & 0 deletions dds-intercom-lib/src/Intercom.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ namespace dds
std::string m_cfgFilePath; ///< Path to the configuration file.
std::string m_id; ///< ID for communication with DDS commander.
std::string m_wrkPackagePath; ///< A full path of the agent worker package, which needs to be deployed.
std::string m_groupName; /// < Agent group name
};

/// \brief Structure holds information of message notification.
Expand Down
5 changes: 4 additions & 1 deletion dds-intercom-lib/src/dds_rms_plugin_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ std::string SSubmit::toJSON()
pt.put<int>("dds.plug-in.submit.slots", m_slots);
pt.put<string>("dds.plug-in.submit.cfgFilePath", m_cfgFilePath);
pt.put<string>("dds.plug-in.submit.wrkPackagePath", m_wrkPackagePath);
pt.put<string>("dds.plug-in.submit.groupName", m_groupName);

stringstream json;
write_json(json, pt);
Expand All @@ -81,12 +82,14 @@ void SSubmit::fromPT(const boost::property_tree::ptree& _pt)
m_cfgFilePath = pt.get<string>("submit.cfgFilePath", "");
m_wrkPackagePath = pt.get<string>("submit.wrkPackagePath", "");
m_id = pt.get<string>("id");
m_groupName = pt.get<string>("submit.groupName", "");
}

bool SSubmit::operator==(const SSubmit& val) const
{
return (m_id == val.m_id) && (m_nInstances == val.m_nInstances) && (m_slots == val.m_slots) &&
(m_cfgFilePath == val.m_cfgFilePath) && (m_wrkPackagePath == val.m_wrkPackagePath);
(m_cfgFilePath == val.m_cfgFilePath) && (m_wrkPackagePath == val.m_wrkPackagePath) &&
(m_groupName == val.m_groupName);
}

///////////////////////////////////
Expand Down
Loading

0 comments on commit d633e7e

Please sign in to comment.