Skip to content

Commit

Permalink
Initial date windowing logic for activities stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
shantanu73 committed Feb 20, 2024
1 parent 69ba884 commit 7801007
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
2 changes: 2 additions & 0 deletions tap_mambu/tap_generators/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def __init__(self, stream_name, client, config, state, sub_type):
self.config = config
self.state = state
self.sub_type = sub_type
self.date_windowing = False
self.date_window_size = 3

# Define parameters inside init
self.params = dict()
Expand Down
26 changes: 26 additions & 0 deletions tap_mambu/tap_generators/multithreaded_bookmark_generator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import datetime
import time
import backoff

from copy import deepcopy
from singer import get_logger
from datetime import datetime, timedelta

from .multithreaded_offset_generator import MultithreadedOffsetGenerator
from ..helpers import transform_json, convert
Expand Down Expand Up @@ -51,6 +53,30 @@ def queue_batches(self):
deepcopy(self.params)))
return futures

@backoff.on_exception(backoff.expo, RuntimeError, max_tries=5)
def _all_fetch_batch_steps(self):
futures = []
if self.date_windowing:
start = datetime.strptime(self.params["from"], '%Y-%m-%d').date()
end = datetime.strptime(self.params["to"], '%Y-%m-%d').date()
temp = start + timedelta(days=self.date_window_size)
while temp < end:
self.endpoint_intermediary_bookmark_offset = 0
self.params["from"] = datetime.strftime(start, '%Y-%m-%d')
self.params["to"] = datetime.strftime(temp, '%Y-%m-%d')
futures += self.queue_batches()
start = temp
temp = start + timedelta(days=self.date_window_size)
self.endpoint_intermediary_bookmark_offset = 0
self.params["from"] = datetime.strftime(start, '%Y-%m-%d')
self.params["to"] = datetime.strftime(end, '%Y-%m-%d')
futures += self.queue_batches()
final_buffer, stop_iteration = self.collect_batches(futures)
self.preprocess_batches(final_buffer)
if not final_buffer or stop_iteration:
return False
return True

def collect_batches(self, futures):
# wait for responses, and check them for errors
final_buffer = set()
Expand Down

0 comments on commit 7801007

Please sign in to comment.