From b79552567ea281ef33acad87283fcfd4dffb77c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Wed, 21 Aug 2024 20:22:07 -0600 Subject: [PATCH] feat: Interruption and termination signals in taps and targets --- samples/sample_target_csv/csv_target.py | 2 +- singer_sdk/plugin_base.py | 25 ++++++++++++++++++++++++- singer_sdk/tap_base.py | 15 +++++++++++++++ singer_sdk/target_base.py | 19 +++++++++++++++++++ 4 files changed, 59 insertions(+), 2 deletions(-) diff --git a/samples/sample_target_csv/csv_target.py b/samples/sample_target_csv/csv_target.py index d52b76f48..906c430ae 100644 --- a/samples/sample_target_csv/csv_target.py +++ b/samples/sample_target_csv/csv_target.py @@ -12,7 +12,7 @@ class SampleTargetCSV(Target): name = "target-csv" config_jsonschema = th.PropertiesList( - th.Property("target_folder", th.StringType, required=True), + th.Property("target_folder", th.StringType, default="output"), th.Property("file_naming_scheme", th.StringType), ).to_dict() default_sink_class = SampleCSVTargetSink diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index eb5b39de3..9272fba34 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -5,12 +5,13 @@ import abc import logging import os +import signal import sys import time import typing as t from importlib import metadata from pathlib import Path, PurePath -from types import MappingProxyType +from types import FrameType, MappingProxyType import click @@ -176,6 +177,10 @@ def __init__( # Initialization timestamp self.__initialized_at = int(time.time() * 1000) + # Signal handling + signal.signal(signal.SIGINT, self._handle_termination) + signal.signal(signal.SIGTERM, self._handle_termination) + def setup_mapper(self) -> None: """Initialize the plugin mapper for this tap.""" self._mapper = PluginMapper( @@ -402,6 +407,24 @@ def _validate_config(self, *, raise_errors: bool = True) -> list[str]: return errors + def _handle_termination( # pragma: no cover + self, + signum: int, # noqa: ARG002 + frame: FrameType | None, # noqa: ARG002 + ) -> None: + """Handle termination signal. + + Args: + signum: Signal number. + frame: Frame. + + Raises: + click.Abort: If the termination signal is received. + """ + self.logger.info("Gracefully shutting down...") + errmsg = "Received termination signal" + raise click.Abort(errmsg) + @classmethod def print_version( cls: type[PluginBase], diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index d69fa5f38..293eb5358 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -31,6 +31,7 @@ if t.TYPE_CHECKING: from pathlib import PurePath + from types import FrameType from singer_sdk.connectors import SQLConnector from singer_sdk.mapper import PluginMapper @@ -473,6 +474,20 @@ def sync_all(self) -> None: # Command Line Execution + def _handle_termination( # pragma: no cover + self, + signum: int, + frame: FrameType | None, + ) -> None: + """Handle termination signal. + + Args: + signum: Signal number. + frame: Frame. + """ + self.write_message(StateMessage(value=self.state)) + super()._handle_termination(signum, frame) + @classmethod def invoke( # type: ignore[override] cls: type[Tap], diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 22ad28176..ca551b728 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -32,6 +32,7 @@ if t.TYPE_CHECKING: from pathlib import PurePath + from types import FrameType from singer_sdk.connectors import SQLConnector from singer_sdk.mapper import PluginMapper @@ -540,6 +541,24 @@ def _write_state_message(self, state: dict) -> None: # CLI handler + def _handle_termination( # pragma: no cover + self, + signum: int, + frame: FrameType | None, + ) -> None: + """Handle termination signals. + + Args: + signum: Signal number. + frame: Frame object. + """ + self.logger.info( + "Received termination signal %d, draining all sinks...", + signum, + ) + self.drain_all(is_endofpipe=True) + super()._handle_termination(signum, frame) + @classmethod def invoke( # type: ignore[override] cls: type[Target],