Skip to content

Commit

Permalink
SpawningWorker now timeouts its children
Browse files Browse the repository at this point in the history
  • Loading branch information
danielplohmann committed Apr 17, 2024
1 parent fd4e415 commit 5e34a9a
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 9 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ In July 2023, we started populating a [Github repository](https://github.com/dan


## Version History
* 2024-04-17 v1.3.15: Worker type `spawningworker` will now terminate children after QueueConfig.QUEUE_SPAWNINGWORKER_CHILDREN_TIMEOUT seconds.
* 2024-04-02 v1.3.14: Experimental: Introduction of new worker type `spawningworker` - this variant will consume jobs from the queue as usual but defer the actual job execution into a separate (sub)process, which should reduce issues with locked memory allocations.
* 2024-04-02 v1.3.13: When cleaning up the queue, now also [delete all failed jobs](https://github.com/danielplohmann/mcrit/pull/70) @yankovs - THX!!
* 2024-03-06 v1.3.12: Fixed a bug where protection of recent samples from queue cleanup would lead to key errors as reported by @yankovs - THX!!
Expand Down
17 changes: 10 additions & 7 deletions mcrit/SpawningWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,17 @@ def _executeJobPayload(self, job_payload, job):
# instead of execution within our own context, spawn a new process as worker for this job payload
console_handle = subprocess.Popen(["python", "-m", "mcrit", "singlejobworker", "--job_id", str(job.job_id)], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# extract result_id from console_output
stdout_result, stderr_result = console_handle.communicate()
stdout_result = stdout_result.strip().decode("utf-8")
last_line = stdout_result.split("\n")[-1]
# successful output should be just the result_id in a single line
result_id = None
match = re.match("(?P<result_id>[0-9a-fA-F]{24})", last_line)
if match:
result_id = match.group("result_id")
try:
stdout_result, stderr_result = console_handle.communicate(timeout=self._queue_config.QUEUE_SPAWNINGWORKER_CHILDREN_TIMEOUT)
stdout_result = stdout_result.strip().decode("utf-8")
last_line = stdout_result.split("\n")[-1]
# successful output should be just the result_id in a single line
match = re.match("(?P<result_id>[0-9a-fA-F]{24})", last_line)
if match:
result_id = match.group("result_id")
except subprocess.TimeoutExpired:
LOGGER.error(f"Job {str(job.job_id)} running as child from SpawningWorker timed out during processing.")
return result_id

def _executeJob(self, job):
Expand Down
2 changes: 1 addition & 1 deletion mcrit/config/McritConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class McritConfig(object):

# NOTE to self: always change this in setup.py as well!
VERSION = "1.3.13"
VERSION = "1.3.15"
# basic pathing info
CONFIG_FILE_PATH = str(os.path.abspath(__file__))
PROJECT_ROOT = str(os.path.abspath(os.sep.join([CONFIG_FILE_PATH, "..", ".."])))
Expand Down
2 changes: 2 additions & 0 deletions mcrit/config/QueueConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ class QueueConfig(ConfigInterface):
QUEUE_MAX_ATTEMPTS: int = 3
# QUEUE_CLEAN_INTERVAL is the time EACH WORKER waits between cleaning
QUEUE_CLEAN_INTERVAL: int = 20 * 60 # Clean every 20 minutes
# timeout in seconds for child processes spawned by SpawningWorker
QUEUE_SPAWNINGWORKER_CHILDREN_TIMEOUT: int = 60 * 60
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name='mcrit',
version="1.3.14",
version="1.3.15",
description='MCRIT is a framework created for simplified application of the MinHash algorithm to code similarity.',
long_description_content_type="text/markdown",
long_description=README,
Expand Down

0 comments on commit 5e34a9a

Please sign in to comment.