Skip to content

Commit

Permalink
added ability to clean up orphaned jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
danielplohmann committed Nov 17, 2023
1 parent 8866fa9 commit e21f8a2
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
1 change: 1 addition & 0 deletions mcrit/Worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
class Worker(QueueRemoteCallee):
def __init__(self, queue=None, config=None, storage: Optional["StorageInterface"] = None, profiling=False):
self._worker_id = f"Worker-{uuid.uuid4()}"
LOGGER.info(f"Starting as worker: {self._worker_id}")
if config is None:
config = McritConfig()

Expand Down
27 changes: 25 additions & 2 deletions mcrit/libs/mongoqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def registerWorker(self):
{"$push": {"workers": self.consumer_id}},
upsert=True
)
self.release_orphaned_jobs()

def unregisterWorker(self):
if self.queue_counters is not None:
Expand Down Expand Up @@ -471,13 +472,35 @@ def clean(self):
# delete jobs
self._getCollection().delete_many(job_query)

def release_all_jobs(self):
def release_all_jobs(self, consumer_id=None):
# release all jobs associated with our consumer id if they are started, locked, but not finished.
self._getCollection().update_many(
filter={"locked_by": self.consumer_id, "started_at": {"$ne": None}, "finished_at": {"$eq": None}},
filter={"locked_by": consumer_id if consumer_id else self.consumer_id, "started_at": {"$ne": None}, "finished_at": {"$eq": None}},
update={"$set": {"locked_by": None, "locked_at": None}, "$inc": {"attempts_left": -1}}
)

def release_orphaned_jobs(self):
# release all jobs associated with non- or no longer existing worker_ids, if they are started, locked, but not finished.
all_worker_ids = set([wid for wid in self._getCollection().distinct("locked_by") if wid])
active_workers = self.queue_counters.find_one({"name": "workers"}, {"workers": 1, "_id": 0})
orphan_ids = []
if active_workers:
active_worker_ids = set(active_workers["workers"])
orphan_ids = all_worker_ids.difference(active_worker_ids)
else:
orphan_ids = all_worker_ids

orphaned_jobs = []
for orphan_id in orphan_ids:
for job in self._getCollection().find(filter={"locked_by": orphan_id , "started_at": {"$ne": None}, "finished_at": {"$eq": None}}):
orphaned_jobs.append(job)

for orphan_consumer_id in orphan_ids:
self._getCollection().update_many(
filter={"locked_by": orphan_consumer_id , "started_at": {"$ne": None}, "finished_at": {"$eq": None}},
update={"$set": {"locked_by": None, "locked_at": None}, "$inc": {"attempts_left": -1}}
)

def terminate_all_jobs(self):
pass

Expand Down

0 comments on commit e21f8a2

Please sign in to comment.