From 99f1a36efe17a9afdce3d50849e7db012d7c8371 Mon Sep 17 00:00:00 2001 From: Rustam Aliyev Date: Sat, 25 Nov 2023 23:25:02 +0000 Subject: [PATCH] Add max_batch_age and max_batch_size optional config params --- README.md | 4 +++- meltano.yml | 4 ++++ sample-config.json | 4 +++- target_s3/sinks.py | 11 +++++++++-- target_s3/target.py | 18 ++++++++++++++++++ 5 files changed, 37 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 98abef5..97cf0ae 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,9 @@ Build with the [Meltano Target SDK](https://sdk.meltano.com). "append_date_to_filename": true|false, "append_date_to_filename_grain": "microsecond", "flattening_enabled": true|false, - "flattening_max_depth": int + "flattening_max_depth": int, + "max_batch_age": int, + "max_batch_size": int } ``` `format.format_parquet.validate` [`Boolean`, default: `False`] - this flag determines whether the data types of incoming data elements should be validated. When set `True`, a schema is created from the first record and all subsequent records that don't match that data type are cast. diff --git a/meltano.yml b/meltano.yml index 7b78dd3..36ed2e9 100644 --- a/meltano.yml +++ b/meltano.yml @@ -58,4 +58,8 @@ plugins: - name: flatten_records kind: boolean value: false + - name: max_batch_age + value: 5 + - name: max_batch_size + value: 10000 diff --git a/sample-config.json b/sample-config.json index 193ed42..65b05b4 100644 --- a/sample-config.json +++ b/sample-config.json @@ -28,5 +28,7 @@ "append_date_to_filename": true, "append_date_to_filename_grain": "microsecond", "flattening_enabled": false, - "flattening_max_depth": 1 + "flattening_max_depth": 1, + "max_batch_age": 5, + "max_batch_size": 10000 } \ No newline at end of file diff --git a/target_s3/sinks.py b/target_s3/sinks.py index 2fd6e9f..c9f2acc 100644 --- a/target_s3/sinks.py +++ b/target_s3/sinks.py @@ -18,8 +18,6 @@ class s3Sink(BatchSink): """s3 target sink class.""" - MAX_SIZE = 10000 # Max records to write in one batch - def __init__( self, target: any, @@ -39,6 +37,15 @@ def __init__( else: raise Exception("No file type supplied.") + @property + def max_size(self) -> int: + """Get maximum batch size. + + Returns: + Maximum batch size + """ + return self.config.get("batch_size", 10000) + def process_batch(self, context: dict) -> None: """Write out any prepped records and return once fully written.""" # add stream name to context diff --git a/target_s3/target.py b/target_s3/target.py index 58b3ae8..dc19623 100644 --- a/target_s3/target.py +++ b/target_s3/target.py @@ -169,10 +169,28 @@ class Targets3(Target): allowed_values=DATE_GRAIN.keys(), default="day", ), + th.Property( + "max_batch_age", + th.NumberType, + description="Maximum time in minutes between state messages when records are streamed in.", + required=False, + default=5.0, + ), + th.Property( + "max_batch_size", + th.IntegerType, + description="Maximum size of batches when records are streamed in.", + required=False, + default=10000, + ), ).to_dict() default_sink_class = s3Sink + @property + def _MAX_RECORD_AGE_IN_MINUTES(self) -> float: # type: ignore + return float(self.config.get("max_batch_age", 5.0)) + def deserialize_json(self, line: str) -> dict: """Override base target's method to overcome Decimal cast, only applied when generating parquet schema from tap schema.