diff --git a/docs/userguides/development.md b/docs/userguides/development.md index a62514d2..5970d782 100644 --- a/docs/userguides/development.md +++ b/docs/userguides/development.md @@ -59,6 +59,19 @@ Inside of `handle_token_transfer_events` you can define any logic that you want Again, you can return any serializable data structure from this function and that will be stored in the results database as a trackable metric for the execution of this handler. Any errors you raise during this function will get captured by the client, and recorded as a failure to handle this `transfer` event log. +## Cron Tasks + +You may also want to run some tasks according to a schedule, either for efficiency reasons or just that the task is not related to any chain-driven events. +You can do that with the `@cron` task decorator. + +```python +@app.cron("* */1 * * *") +def every_hour(): + ... +``` + +For more information see [the linux handbook section on the crontab syntax](https://linuxhandbook.com/crontab/#understanding-crontab-syntax) or the [crontab.guru](https://crontab.guru/) generator. + ## Startup and Shutdown ### Worker Events diff --git a/example.py b/example.py index 23039a7b..93d87f63 100644 --- a/example.py +++ b/example.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import Annotated from ape import chain @@ -38,6 +39,13 @@ def worker_startup(state: TaskiqState): # NOTE: You need the type hint here # raise Exception # NOTE: Any exception raised on worker startup aborts immediately +# You can run cron jobs in your apps (functions that execute at a regular time period) +# NOTE: Great for things like regular DB cleanups or producing metrics at regular intervals +@app.cron("*/2 * * * *") +def every_two_minutes(current_time_utc: datetime): + return {"crontime": current_time_utc} + + # This is how we trigger off of new blocks @app.on_(chain.blocks) # NOTE: The type hint for block is `BlockAPI`, but we parse it using `EcosystemAPI` diff --git a/silverback/application.py b/silverback/application.py index 2691a98a..c2bc0127 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -13,7 +13,7 @@ from .exceptions import ContainerTypeMismatchError, InvalidContainerTypeError from .settings import Settings -from .types import SilverbackID, TaskType +from .types import CronSchedule, SilverbackID, TaskType class SystemConfig(BaseModel): @@ -32,6 +32,9 @@ class TaskData(BaseModel): # NOTE: Any other items here must have a default value + def __hash__(self): + return hash(self.name) # NOTE: Name should be unique, okay for hashing + class SilverbackApp(ManagerAccessMixin): """ @@ -146,6 +149,7 @@ def broker_task_decorator( self, task_type: TaskType, container: BlockContainer | ContractEvent | None = None, + cron_schedule: CronSchedule | None = None, ) -> Callable[[Callable], AsyncTaskiqDecoratedTask]: """ Dynamically create a new broker task that handles tasks of ``task_type``. @@ -194,6 +198,9 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask: labels["contract_address"] = contract_address labels["event_signature"] = container.abi.signature + if task_type is TaskType.CRON_JOB: + labels["cron"] = str(cron_schedule) + self.tasks[task_type].append(TaskData(name=handler.__name__, labels=labels)) return self.broker.register_task( @@ -309,3 +316,13 @@ def on_( # TODO: Support account transaction polling # TODO: Support mempool polling raise InvalidContainerTypeError(container) + + def cron(self, cron_schedule: str) -> Callable: + """ + Create task to run on a schedule. + Args: + schedule (str): A cron-like schedule string. + """ + return self.broker_task_decorator( + TaskType.CRON_JOB, cron_schedule=CronSchedule(cron=cron_schedule) + ) diff --git a/silverback/runner.py b/silverback/runner.py index 3856b0e6..436b5558 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -1,4 +1,5 @@ import asyncio +import traceback from abc import ABC, abstractmethod from ape import chain @@ -16,7 +17,7 @@ from .recorder import BaseRecorder, TaskResult from .state import AppDatastore, AppState from .subscriptions import SubscriptionType, Web3SubscriptionsManager -from .types import TaskType +from .types import CRON_CHECK_SECONDS, CronSchedule, TaskType, utc_now from .utils import ( async_wrap_iter, hexbytes_dict, @@ -109,6 +110,38 @@ async def _event_task(self, task_data: TaskData): handle an event handler task for the given contract event """ + async def _cron_task(self, task_data: list[TaskData]): + cron_jobs: dict[TaskData, CronSchedule] = dict() + + for td in task_data: + if cron_schedule := td.labels.get("cron"): + cron_jobs[td] = CronSchedule(cron=cron_schedule) + + else: + # NOTE: Shouldn't happen but just in case + logger.warning(f"TaskData missing `cron` label: {td}") + + if cron_jobs_str := "\n- ".join(map(str, cron_jobs.values())): + logger.info(f"Cron Jobs:\n- {cron_jobs_str}") + + while True: + current_time = utc_now() + if current_time.second < CRON_CHECK_SECONDS: + # Print out current time every minute + logger.info(f"Current Time: {current_time}") + # NOTE: In the absence of any cron jobs, we still print out the current time + + if task_data_to_kiq := [ + td for td, cron in cron_jobs.items() if cron.is_ready(current_time) + ]: + tasks = await asyncio.gather( + *((self._create_task_kicker(td).kiq(current_time) for td in task_data_to_kiq)) + ) + await asyncio.gather(*(self._handle_task(task) for task in tasks)) + + # Check crons multiple times a minute + await asyncio.sleep(CRON_CHECK_SECONDS) + async def run(self): """ Run the task broker client for the assembled ``SilverbackApp`` application. @@ -138,7 +171,7 @@ async def run(self): raise StartupFailure("Worker SDK version too old, please rebuild") if not ( - system_tasks := set(TaskType(task_name) for task_name in result.return_value.task_types) + system_tasks := set(TaskType(task_type) for task_type in result.return_value.task_types) ): raise StartupFailure("No system tasks detected, startup failure") # NOTE: Guaranteed to be at least one because of `TaskType.SYSTEM_CONFIG` @@ -152,6 +185,10 @@ async def run(self): # `if TaskType. not in system_tasks: raise StartupFailure(...)` # or handle accordingly by having default logic if it is not available + # NOTE: In case we want to add new task types, we can detect feature support + supported_user_tasks = set( + [TaskType(task_type) for task_type in result.return_value.task_types] + ) # Initialize recorder (if available) and fetch state if app has been run previously if self.recorder: await self.recorder.init(app_id=self.app.identifier) @@ -204,16 +241,31 @@ async def run(self): if event_log_taskdata_results.is_err: raise StartupFailure(event_log_taskdata_results.error) + if TaskType.CRON_JOB in supported_user_tasks: + cron_job_taskdata_results = await run_taskiq_task_wait_result( + self._create_system_task_kicker(TaskType.SYSTEM_USER_TASKDATA), TaskType.CRON_JOB + ) + if cron_job_taskdata_results.is_err: + raise StartupFailure(cron_job_taskdata_results.error) + + else: # Not supported for `TaskType.SYSTEM_USER_TASKDATA` + # NOTE: This is just so that `.return_value` is a proper attribute for next line + cron_job_taskdata_results = type( + "MockAsyncTaskiqResult", (object,), dict(return_value=[]) + )() + if ( len(new_block_taskdata_results.return_value) == len(event_log_taskdata_results.return_value) - == 0 # Both are empty + == len(cron_job_taskdata_results.return_value) + == 0 # All are empty ): raise NoTasksAvailableError() # NOTE: Any propagated failure in here should be handled such that shutdown tasks also run # TODO: `asyncio.TaskGroup` added in Python 3.11 listener_tasks = ( + asyncio.create_task(self._cron_task(cron_job_taskdata_results.return_value)), *( asyncio.create_task(self._block_task(task_def)) for task_def in new_block_taskdata_results.return_value @@ -232,9 +284,11 @@ async def run(self): tasks_with_errors, tasks_running = await asyncio.wait( listener_tasks, return_when=asyncio.FIRST_EXCEPTION ) - if runtime_errors := "\n".join(str(task.exception()) for task in tasks_with_errors): + if runtime_errors := "\n\n".join( + "".join(traceback.format_exception(task.exception())) for task in tasks_with_errors + ): # NOTE: In case we are somehow not displaying the error correctly with task status - logger.debug(f"Runtime error(s) detected, shutting down:\n{runtime_errors}") + logger.warning(f"Runtime error(s) detected, shutting down\n{runtime_errors}") # Cancel any still running (task.cancel() for task in tasks_running) diff --git a/silverback/types.py b/silverback/types.py index e1584386..9ee46142 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -4,7 +4,7 @@ from typing import Literal from ape.logging import get_logger -from pydantic import BaseModel, Field, RootModel, ValidationError, model_validator +from pydantic import BaseModel, Field, RootModel, ValidationError, model_serializer, model_validator from pydantic.functional_serializers import PlainSerializer from typing_extensions import Annotated @@ -21,6 +21,7 @@ class TaskType(str, Enum): STARTUP = "user:startup" NEW_BLOCK = "user:new-block" EVENT_LOG = "user:event-log" + CRON_JOB = "user:cron-job" SHUTDOWN = "user:shutdown" def __str__(self) -> str: @@ -48,6 +49,82 @@ def utc_now() -> datetime: ] +CRON_CHECK_SECONDS = 5 + + +class CronSchedule(BaseModel): + minute: str + hour: str + day_month: str + month: str + day_week: str + + def __init__(self, cron: str = "", **field_values): + if cron: + field_values = dict(zip(self.model_fields, cron.split(" "))) + + super().__init__(**field_values) + + @model_serializer + def create_cron_string(self) -> str: + return " ".join(map(lambda f: getattr(self, f), self.model_fields)) + + def __str__(self) -> str: + return self.create_cron_string() + + def _check_value(self, val: str, current: int) -> bool: + if "/" in val: + val, step_str = val.split("/") + step = int(step_str) + + else: + step = 1 + + if "-" in val: + start, stop = map(int, val.split("-")) + matches = list(range(start, stop + 1, step)) + + elif "," in val: + matches = list(map(int, val.split(","))) + + elif val == "*": + return current % step == step - 1 + + else: + matches = [int(val)] + + return current in matches + + def is_ready(self, current_time: datetime) -> bool: + # Intersection/union "bug": https://crontab.guru/cron-bug.html + if self.day_month.startswith("*") or self.day_week.startswith("*"): + # Intersection (all must match) + return all( + [ + abs(current_time.second) + < CRON_CHECK_SECONDS, # NOTE: Ensure close to :00 seconds + self._check_value(self.minute, current_time.minute), + self._check_value(self.hour, current_time.hour), + self._check_value(self.day_month, current_time.day), + self._check_value(self.month, current_time.month), + self._check_value(self.day_week, current_time.weekday() + 1), + ] + ) + else: # Union: only one of day/wk and day/mth must match + return all( + [ + abs(current_time.second) + < CRON_CHECK_SECONDS, # NOTE: Ensure close to :00 seconds + self._check_value(self.minute, current_time.minute), + self._check_value(self.hour, current_time.hour), + self._check_value(self.month, current_time.month), + ] + ) and ( + self._check_value(self.day_month, current_time.day) + or self._check_value(self.day_week, current_time.weekday() + 1) + ) + + class _BaseDatapoint(BaseModel): type: str # discriminator diff --git a/tests/test_types.py b/tests/test_types.py index c87e3120..eed11f5b 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -1,8 +1,28 @@ +from datetime import datetime, timedelta from decimal import Decimal import pytest -from silverback.types import Datapoints +from silverback.types import CRON_CHECK_SECONDS, CronSchedule, Datapoints + + +@pytest.mark.parametrize( + "cron_schedule,current_time_str", + [ + ("5 0 * 8 *", "2024-08-01 00:05"), + ("0 22 * * 1-5", "2024-06-03 22:00"), + ("23 0-20/2 * * *", "2024-06-03 20:23"), + ("0 0,12 1 */2 *", "2024-07-01 00:00"), + ("0 4 8-14 * *", "2024-06-08 04:00"), + ("0 0 1,15 * 3", "2024-06-05 00:00"), + ], +) +def test_cron_is_ready(cron_schedule, current_time_str): + current_time = datetime.fromisoformat(current_time_str) + cron = CronSchedule(cron=cron_schedule) + assert cron.is_ready(current_time) + current_time += timedelta(seconds=CRON_CHECK_SECONDS) + assert not cron.is_ready(current_time) @pytest.mark.parametrize(