diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 418d7f12c..aca7a21aa 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -3,6 +3,7 @@ from __future__ import annotations import abc +import asyncio import copy import json import sys @@ -368,7 +369,7 @@ def _process_record_message(self, message_dict: dict) -> None: sink.stream_name, sink.current_size, ) - self.drain_one(sink) + asyncio.run(self.drain_one(sink)) self._handle_max_record_age() @@ -497,7 +498,7 @@ def drain_all(self, *, is_endofpipe: bool = False) -> None: self._reset_max_record_age() @t.final - def drain_one(self, sink: Sink) -> None: + async def drain_one(self, sink: Sink) -> None: """Drain a specific sink. This method is internal to the SDK and should not need to be overridden. @@ -519,7 +520,7 @@ def _drain_all(self, sink_list: list[Sink], parallelism: int) -> None: return def _drain_sink(sink: Sink) -> None: - self.drain_one(sink) + asyncio.run(self.drain_one(sink)) with parallel_config(backend="threading", n_jobs=parallelism): Parallel()(delayed(_drain_sink)(sink=sink) for sink in sink_list)