Skip to content

Commit

Permalink
feat: Interruption and termination signals in taps and targets
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Nov 15, 2024
1 parent 344a218 commit af2c00e
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 2 deletions.
11 changes: 11 additions & 0 deletions samples/sample_tap_countries/countries_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
from singer_sdk import typing as th
from singer_sdk.streams.graphql import GraphQLStream

if t.TYPE_CHECKING:
from collections.abc import Iterable

SCHEMAS_DIR = importlib.resources.files(__package__) / "schemas"


Expand Down Expand Up @@ -83,6 +86,14 @@ class CountriesStream(CountriesAPIStream):
),
).to_dict()

# FIXME: revert these changes before merging
def request_records(self, context) -> Iterable[dict]:
import time # noqa: PLC0415

time.sleep(60) # Simulate a slow stream

return super().request_records(context)


class ContinentsStream(CountriesAPIStream):
"""Continents stream from the Countries API."""
Expand Down
5 changes: 4 additions & 1 deletion samples/sample_target_csv/csv_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ class SampleTargetCSV(Target):
name = "target-csv"
config_jsonschema = th.PropertiesList(
th.Property(
"target_folder", th.StringType, required=True, title="Target Folder"
"target_folder",
th.StringType,
default="output",
title="Target Folder",
),
th.Property("file_naming_scheme", th.StringType, title="File Naming Scheme"),
).to_dict()
Expand Down
25 changes: 24 additions & 1 deletion singer_sdk/plugin_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import abc
import logging
import os
import signal
import sys
import time
import typing as t
import warnings
from importlib import metadata
from pathlib import Path, PurePath
from types import MappingProxyType
from types import FrameType, MappingProxyType

import click

Expand Down Expand Up @@ -190,6 +191,10 @@ def __init__(
# Initialization timestamp
self.__initialized_at = int(time.time() * 1000)

# Signal handling
signal.signal(signal.SIGINT, self._handle_termination)
signal.signal(signal.SIGTERM, self._handle_termination)

def setup_mapper(self) -> None:
"""Initialize the plugin mapper for this tap."""
self._mapper = PluginMapper(
Expand Down Expand Up @@ -416,6 +421,24 @@ def _validate_config(self, *, raise_errors: bool = True) -> list[str]:

return errors

def _handle_termination( # pragma: no cover
self,
signum: int, # noqa: ARG002
frame: FrameType | None, # noqa: ARG002
) -> None:
"""Handle termination signal.
Args:
signum: Signal number.
frame: Frame.
Raises:
click.Abort: If the termination signal is received.
"""
self.logger.info("Gracefully shutting down...")
errmsg = "Received termination signal"
raise click.Abort(errmsg)

@classmethod
def print_version(
cls: type[PluginBase],
Expand Down
15 changes: 15 additions & 0 deletions singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

if t.TYPE_CHECKING:
from pathlib import PurePath
from types import FrameType

from singer_sdk.connectors import SQLConnector
from singer_sdk.mapper import PluginMapper
Expand Down Expand Up @@ -487,6 +488,20 @@ def sync_all(self) -> None:

# Command Line Execution

def _handle_termination( # pragma: no cover
self,
signum: int,
frame: FrameType | None,
) -> None:
"""Handle termination signal.
Args:
signum: Signal number.
frame: Frame.
"""
self.write_message(StateMessage(value=self.state))
super()._handle_termination(signum, frame)

@classmethod
def invoke( # type: ignore[override]
cls: type[Tap],
Expand Down
19 changes: 19 additions & 0 deletions singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

if t.TYPE_CHECKING:
from pathlib import PurePath
from types import FrameType

from singer_sdk.connectors import SQLConnector
from singer_sdk.mapper import PluginMapper
Expand Down Expand Up @@ -542,6 +543,24 @@ def _write_state_message(self, state: dict) -> None:

# CLI handler

def _handle_termination( # pragma: no cover
self,
signum: int,
frame: FrameType | None,
) -> None:
"""Handle termination signals.
Args:
signum: Signal number.
frame: Frame object.
"""
self.logger.info(
"Received termination signal %d, draining all sinks...",
signum,
)
self.drain_all(is_endofpipe=True)
super()._handle_termination(signum, frame)

@classmethod
def invoke( # type: ignore[override]
cls: type[Target],
Expand Down

0 comments on commit af2c00e

Please sign in to comment.