Skip to content

Commit

Permalink
Initialise Pushgateway per instance of SteveJobs
Browse files Browse the repository at this point in the history
Previously, `cls.pushgateway` was used in `process_message`, possibly causing race conditions
in multithreaded Celery tasks, leading to inaccurate metric reporting.
Related to packit#2430 packit#2697
  • Loading branch information
lbarcziova committed Feb 17, 2025
1 parent 478d94b commit acd42ba
Showing 1 changed file with 7 additions and 8 deletions.
15 changes: 7 additions & 8 deletions packit_service/worker/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,9 @@ class SteveJobs:
Steve makes sure all the jobs are done with precision.
"""

pushgateway = Pushgateway()

def __init__(self, event: Optional[Event] = None) -> None:
self.event = event
self.pushgateway = Pushgateway()

@cached_property
def service_config(self) -> ServiceConfig:
Expand Down Expand Up @@ -166,16 +165,16 @@ def process_message(
default=Parser.parse_event,
)
event_object: Optional[Event] = parser(event)

cls.pushgateway.events_processed.inc()
steve = cls(event_object)
steve.pushgateway.events_processed.inc()
if event_not_handled := not event_object:
cls.pushgateway.events_not_handled.inc()
steve.pushgateway.events_not_handled.inc()
elif pre_check_failed := not event_object.pre_check():
cls.pushgateway.events_pre_check_failed.inc()
steve.pushgateway.events_pre_check_failed.inc()

result = [] if (event_not_handled or pre_check_failed) else cls(event_object).process()
result = [] if (event_not_handled or pre_check_failed) else steve.process()

cls.pushgateway.push()
steve.pushgateway.push()
return result

def process(self) -> list[TaskResults]:
Expand Down

0 comments on commit acd42ba

Please sign in to comment.