-
Notifications
You must be signed in to change notification settings - Fork 15
Plugin API specifications
Most of plugin methods take instances of JobSpec class or WorkSpec class as input. The dictionary of job specifications given by the PanDA server is available in JobSpec.jobParams. Each worker communicates with Harvester through the access point which is specified in WorkSpec.getAccessPoint().
Plugins can be configured per panda queue in $PANDA_HOME/etc/panda/panda_queueconfig.json. Here is an example of panda_queueconfig.json. For example,
"BNL_HARVESTER_TEST": {
...
"preparator":{
"name":"DummyPreparator",
"module":"pandaharvester.harvesterpreparator.dummy_preparator"
where name and module define the class name and the module name for the plugin, respectively. Internally “from [module] import [name]” is invoked. Each plugin should inherit from pandaharvester.harvestercore.plugin_base.PluginBase so that it can have arbitrary configuration parameters as long as their parameter names are not the same as name or module.For example,
"preparator":{
"name":"XyzPreparator",
"module":"pandaharvester.harvesterpreparator.xyz_preparator",
"host":"abc.cern.ch",
"port":123,
Then “host” and “port” can be used in the plugin as instance variables, i.e. self.host and self.port.
Specifications of plugins and their methods are explained in the following sections.
def trigger_preparation(self, jobspec)
Trigger stage-in procedure synchronously or asynchronously for the job. Input file attributes are available through jobspec.get_input_file_attributes() which gives a dictionary. The key of the dictionary is LFN of the input file and the value is a dictionary of file attributes. The attribute names are fsize, guid, checksum, scope, dataset, and endpoint.
Args:
name | type | description |
---|---|---|
jobspec | JobSpec | Job specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
error dialog | string | Error dialog if any |
def resolve_input_paths(self, jobspec)
Set input file paths to jobspec.get_input_file_attributes[LFN]['path'] for the job. New input file attributes need to be set to jobspec using jobspec.set_input_file_paths() after setting the file paths.
Args:
name | type | description |
---|---|---|
jobspec | JobSpec | Job specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
error dialog | string | Error dialog if any |
def check_status(self, jobspec)
Check status of stage-in procedure. If that is done synchronously in trigger_preparation this method should always return True.
Args:
name | type | description |
---|---|---|
jobspec | JobSpec | Job specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False for failure, None if still on-going |
error dialog | string | Error dialog if any |
def trigger_stage_out(self, jobspec)
Trigger stage-out procedure for the job.
Output files are available through jobspec.outFiles which gives a list of FileSpecs.
FileSpec.status should be checked to skip 'finished' or 'failed' files. If this method returns False,
stager invokes this method again after triggerInterval
seconds which is defined in the stager section of panda_harvester.cfg. This means that if stage-out is permanently failed for a file, this method should
set fileSpec.status to 'failed' for the file and return True to give up further reattempts. fileSpec.attemptNr
is incremented every time the method is invoked, so that plugins can give up based on a large attemptNr.
Args:
name | type | description |
---|---|---|
jobspec | JobSpec | Job specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
error dialog | string | Error dialog if any |
def check_status(self, jobspec)
Check status of stage-out procedure. If that is done synchronously in trigger_stage_out this method should always return True. Output files are available through jobspec.outFiles which gives a list of FileSpecs. FileSpec.status needs to be set to 'finished' if stage-out was successful for a file, or to 'failed' if it failed. If this method returns False, stager invokes this method again after checkInterval
seconds.
Args:
name | type | description |
---|---|---|
jobspec | JobSpec | Job specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
error dialog | string | Error dialog if any |
def zip_output(self, jobspec)
Zip output files. This method loops over jobspec.outFiles to make a zip file for each outFileSpec from FileSpec.associatedFiles which is a list of toZipFileSpec to be zipped. The file path is available in toZipFileSpec. One zip files are made, their toZipFileSpec.path and toZipFileSpec.fsize need to be set.
Args:
name | type | description |
---|---|---|
jobspec | JobSpec | Job specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
error dialog | string | Error dialog if any |
def check_workers(self, workspec_list)
Check status of workers. This method takes a list of WorkSpecs as input argument and returns a list of worker's statuses. Nth element in the return list corresponds to the status of Nth WorkSpec in the given list. Worker's status is one of WorkSpec.ST_finished, WorkSpec.ST_failed, WorkSpec.ST_cancelled, WorkSpec.ST_running, WorkSpec.ST_submitted.
Args:
name | type | description |
---|---|---|
workspec_list | [WorkSpec,] | A list of Work specifications instances |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
status list | [string,] | a list of worker's statuses |
def submit_workers(self, workspec_list)
Submit workers to a scheduling system like batch systems and computing elements. This method takes a list of WorkSpecs as input argument, and returns a list of tuples. Each tuple is composed of a return code and a dialog message. Nth tuple in the returned list corresponds to submission status and dialog message for Nth worker in the given WorkSpec list. A unique identifier is set to WorkSpec.batchID when submission is successful, so that they can be identified in the scheduling system. It would be useful to set other attributes like queueName (batch queue name), computingElement (CE's host name), and nodeID (identifier of the node where the worker is running).
Args:
name | type | description |
---|---|---|
workspec_list | [WorkSpec,] | A list of Work specifications instances |
Return:
name | type | description |
---|---|---|
status and dialog message list | [(bool, string),] | A list of tuples. Each tuple is composed of submission status (True for success, False otherwise) and dialog message |
def kill_worker(self, workspec)
Kill a worker in a scheduling system like batch systems and computing elements. This method takes a WorkSpec as input argument, and returns a tuple of a return code and a dialog message.
Args:
name | type | description |
---|---|---|
workspec | WorkSpec | Work specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
error dialog | string | Error dialog if any |
def sweep_worker(self, workspec)
Perform cleanup procedures for a worker, such as deletion of work directory. This method takes a WorkSpec as input argument, and returns a tuple of a return code and a dialog message.
Args:
name | type | description |
---|---|---|
workspec | WorkSpec | Work specifications |
Return:
name | type | description |
---|---|---|
return code | bool | True for success, False otherwise |
error dialog | string | Error dialog if any |
Getting started |
---|
Installation and configuration |
Testing and running |
Debugging |
Work with Middleware |
Admin FAQ |
Development guides |
---|
Development workflow |
Tagging |
Production & commissioning |
---|
Scale up submission |
Condor experiences |
Commissioning on the grid |
Production servers |
Service monitoring |
Auto Queue Configuration with CRIC |
SSH+RPC middleware setup |
Kubernetes section |
---|
Kubernetes setup |
X509 credentials |
AWS setup |
GKE setup |
CERN setup |
CVMFS installation |
Generic service accounts |
Advanced payloads |
---|
Horovod integration |