Skip to content

Commit

Permalink
Combining organisation schedulers
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbruinsslot committed Nov 13, 2024
1 parent cc0bec4 commit 57d040f
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 318 deletions.
55 changes: 0 additions & 55 deletions mula/scheduler/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ def run(self) -> None:
# Start schedulers
self.start_schedulers()

# Start monitors
self.start_monitors()

# Start metrics collecting
if self.ctx.config.collect_metrics:
self.start_collectors()
Expand Down Expand Up @@ -108,14 +105,6 @@ def start_collectors(self) -> None:
name="App-metrics_collector", target=self._collect_metrics, stop_event=self.stop_event, interval=10
).start()

def start_monitors(self) -> None:
thread.ThreadRunner(
name="App-monitor_organisations",
target=self._monitor_organisations,
stop_event=self.stop_event,
interval=self.ctx.config.monitor_organisations_interval,
).start()

def start_server(self) -> None:
self.server = server.Server(self.ctx, self.schedulers)
thread.ThreadRunner(name="App-server", target=self.server.run, stop_event=self.stop_event, loop=False).start()
Expand Down Expand Up @@ -169,47 +158,3 @@ def _collect_metrics(self) -> None:
status_counts = self.ctx.datastores.task_store.get_status_counts(s.id)
for status, count in status_counts.items():
self.ctx.metrics_task_status_counts.labels(scheduler_id=s.id, status=status).set(count)

# TODO: exception handling
def _monitor_organisations(self) -> None:
"""We make a difference between the organisation id's that are used
by the schedulers, and the organisation id's that are in the
Katalogus service. We will add/remove schedulers based on the
difference between these two sets.
"""
current_schedulers, _ = self.ctx.datastores.scheduler_store.get_schedulers()

scheduler_orgs: set[str] = {s.organisation for s in current_schedulers}
katalogus_orgs: set[str] = {org.id for org in self.ctx.services.katalogus.get_organisations()}

removals = scheduler_orgs.difference(katalogus_orgs)
self.logger.debug("Organisations to remove: %s", len(removals), removals=sorted(removals))

for org_id in removals:
organisation_schedulers = self.ctx.datastores.scheduler_store.get_schedulers(organisation=org_id)
if not organisation_schedulers:
self.logger.error("Failed to get scheduler", org_id=org_id)
continue

for scheduler in organisation_schedulers:
with self.lock:
self.schedulers[scheduler.id].stop()
self.ctx.datastores.scheduler_store.delete_scheduler(scheduler.id)

additions = katalogus_orgs.difference(scheduler_orgs)
self.logger.debug("Organisations to add: %s", len(additions), additions=sorted(additions))

for org_id in additions:
created_schedulers = create_schedulers_for_organisation(self.ctx, org_id)
if not schedulers:
self.logger.error("Failed to create schedulers", org_id=org_id)
continue

for scheduler in created_schedulers:
with self.lock:
self.schedulers[scheduler.scheduler_id] = scheduler
scheduler.run()

if additions:
# TODO: optimize this
self.ctx.services.katalogus.flush_caches()
1 change: 1 addition & 0 deletions mula/scheduler/models/ooi.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ class ScanProfileMutation(BaseModel):
operation: MutationOperationType
primary_key: str
value: OOI | None
organisation: str
1 change: 1 addition & 0 deletions mula/scheduler/models/raw_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ class RawData(BaseModel):
mime_types: list[dict[str, str]]
secure_hash: str | None = None
hash_retrieval_link: str | None = None
organisation: str
1 change: 1 addition & 0 deletions mula/scheduler/models/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class Schedule(BaseModel):

id: uuid.UUID = Field(default_factory=uuid.uuid4)
scheduler_id: str
organisation: str
hash: str | None = Field(None, max_length=32)
data: dict | None = None
enabled: bool = True
Expand Down
7 changes: 5 additions & 2 deletions mula/scheduler/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ class Task(BaseModel):
model_config = ConfigDict(from_attributes=True, use_enum_values=True)

id: uuid.UUID = Field(default_factory=uuid.uuid4)
scheduler_id: str | None = None
scheduler_id: str
schedule_id: uuid.UUID | None = None
organisation: str
priority: int | None = 0
status: TaskStatus = TaskStatus.PENDING
type: str | None = None
Expand All @@ -64,13 +65,15 @@ class TaskDB(Base):
id = Column(GUID, primary_key=True)
scheduler_id = Column(String, nullable=False)
schedule_id = Column(GUID, ForeignKey("schedules.id", ondelete="SET NULL"), nullable=True)
schedule = relationship("ScheduleDB", back_populates="tasks")
organisation = Column(String, nullable=False)
type = Column(String, nullable=False)
hash = Column(String(32), index=True)
priority = Column(Integer)
data = Column(JSONB, nullable=False)
status = Column(Enum(TaskStatus), nullable=False, default=TaskStatus.PENDING)

schedule = relationship("ScheduleDB", back_populates="tasks")

created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now())
modified_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now())

Expand Down
Loading

0 comments on commit 57d040f

Please sign in to comment.