Skip to content

Commit

Permalink
Auto stop the worker after a set duration
Browse files Browse the repository at this point in the history
  • Loading branch information
Hipska committed Aug 30, 2024
1 parent 54fa2e7 commit 7325be0
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
11 changes: 10 additions & 1 deletion src/SnmpDiscoveryCollector.class.inc.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class SnmpDiscoveryCollector extends Collector
protected string $sQueue;
/** @var AMQPMessage The message containing the result from the worker */
protected AMQPMessage $oResponseMessage;
/** @var int The timestamp when the worker needs to quit */
protected int $iTimeout;

/**
* @inheritDoc
Expand Down Expand Up @@ -479,14 +481,17 @@ protected function PopulateMessageQueue(): void

/**
* Start the worker by listening to the correct queue.
* @param int $iDuration Time in seconds until the worker stops
* @return void
* @throws ErrorException
*/
public function StartWorker(): void
public function StartWorker(int $iDuration): void
{
$sConsumerTag = $this->oChannel->basic_consume($this->sQueue, callback: [$this, 'ProcessRequest']);
Utils::Log(LOG_DEBUG, sprintf('AMQP consumer tag: %s.', $sConsumerTag));

$this->iTimeout = time() + $iDuration;

// Start consuming
$this->oChannel->consume();
}
Expand Down Expand Up @@ -520,6 +525,10 @@ public function ProcessRequest(AMQPMessage $oRequest): void
Utils::Log(LOG_DEBUG, sprintf('Replying to %s', $oRequest->get('reply_to')));
$this->oChannel->basic_publish($oResponse, routing_key: $oRequest->get('reply_to'));
$oRequest->ack();

// Stop worker when duration is passed
if (time() >= $this->iTimeout)
$this->oChannel->getConnection()->close();
}

/**
Expand Down
3 changes: 2 additions & 1 deletion worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

try {
Utils::InitConsoleLogLevel();
$iDuration = filter_var(Utils::ReadParameter('max_duration', 14400), FILTER_VALIDATE_INT);

if (!Orchestrator::CheckRequirements()) {
exit(1);
Expand All @@ -23,7 +24,7 @@
$oCollector = new SnmpDiscoveryCollector();
$oCollector->InitMessageQueue();

$oCollector->StartWorker();
$oCollector->StartWorker($iDuration);

} catch (Exception $e) {
Utils::Log(LOG_ERR, $e->getMessage());
Expand Down

0 comments on commit 7325be0

Please sign in to comment.