Skip to content

Commit

Permalink
Changes:
Browse files Browse the repository at this point in the history
1) Removed endpoint params from activities generator.
2) Overrode modify_reques_params method for Loan Transactions generator.
3) Removed the method _all_fetch_batch_steps from multithreaded bookmark generator.
4) Modified the implementation for _all_fetch_batch_steps method in multithreaded offset generator to include date windowing.
5) Added new method modify_reques_params in multithreaded offset generator.
  • Loading branch information
shantanu73 committed Mar 6, 2024
1 parent e3851a8 commit 16d3fcb
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 40 deletions.
4 changes: 0 additions & 4 deletions tap_mambu/tap_generators/activities_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ def _init_endpoint_config(self):
self.endpoint_path = "activities"
self.endpoint_api_method = "GET"
self.endpoint_api_version = "v1"

self.endpoint_params["from"] = datetime_to_utc_str(str_to_localized_datetime(
get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date)))[:10]
self.endpoint_params["to"] = datetime_to_utc_str(utc_now())[:10]
self.endpoint_bookmark_field = "timestamp"

@staticmethod
Expand Down
20 changes: 15 additions & 5 deletions tap_mambu/tap_generators/loan_transactions_generator.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
from .multithreaded_bookmark_generator import MultithreadedBookmarkGenerator
from ..helpers import get_bookmark
from ..helpers.datetime_utils import datetime_to_utc_str, str_to_localized_datetime
from ..helpers.datetime_utils import datetime_to_utc_str
from datetime import datetime


class LoanTransactionsGenerator(MultithreadedBookmarkGenerator):
def __init__(self, stream_name, client, config, state, sub_type):
super(LoanTransactionsGenerator, self).__init__(stream_name, client, config, state, sub_type)
self.date_windowing = True

def _init_endpoint_config(self):
super(LoanTransactionsGenerator, self)._init_endpoint_config()
self.endpoint_path = "loans/transactions:search"
self.endpoint_bookmark_field = "creationDate"
self.endpoint_sorting_criteria["field"] = "id"
self.endpoint_filter_criteria = [

def modify_request_params(self, start, end):
self.endpoint_body['filterCriteria'] = [
{
"field": "creationDate",
"operator": "AFTER",
"value": datetime_to_utc_str(str_to_localized_datetime(
get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date)))
"value": datetime.strftime(start, '%Y-%m-%dT00:00:00.000000Z')
},
{
"field": "creationDate",
"operator": "BEFORE",
"value": datetime.strftime(end, '%Y-%m-%dT23:59:59.000000Z')
}
]

Expand Down
28 changes: 0 additions & 28 deletions tap_mambu/tap_generators/multithreaded_bookmark_generator.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import datetime
import time
import backoff

from copy import deepcopy
from singer import get_logger
from datetime import datetime, timedelta

from .multithreaded_offset_generator import MultithreadedOffsetGenerator
from ..helpers import transform_json, convert
Expand Down Expand Up @@ -53,32 +51,6 @@ def queue_batches(self):
deepcopy(self.params)))
return futures

@backoff.on_exception(backoff.expo, RuntimeError, max_tries=5)
def _all_fetch_batch_steps(self):
if self.date_windowing:
start = datetime.strptime(self.params["from"], '%Y-%m-%d').date()
end = datetime.strptime(self.params["to"], '%Y-%m-%d').date()
temp = start + timedelta(days=self.date_window_size)
stop_iteration = True
while temp < end:
if stop_iteration:
self.offset = 0
self.static_params["from"] = datetime.strftime(start, '%Y-%m-%d')
self.static_params["to"] = datetime.strftime(temp, '%Y-%m-%d')
final_buffer, stop_iteration = self.collect_batches(self.queue_batches())
self.preprocess_batches(final_buffer)
if not final_buffer or stop_iteration:
start = temp
temp = start + timedelta(days=self.date_window_size)
self.offset = 0
self.static_params["from"] = datetime.strftime(start, '%Y-%m-%d')
self.static_params["to"] = datetime.strftime(end, '%Y-%m-%d')
final_buffer, stop_iteration = self.collect_batches(self.queue_batches())
self.preprocess_batches(final_buffer)
if not final_buffer or stop_iteration:
return False
return True

def collect_batches(self, futures):
# wait for responses, and check them for errors
final_buffer = set()
Expand Down
30 changes: 27 additions & 3 deletions tap_mambu/tap_generators/multithreaded_offset_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

import backoff
from singer import get_logger
from datetime import datetime, timedelta

from .generator import TapGenerator
from ..helpers import transform_json
from ..helpers import transform_json, get_bookmark
from ..helpers.datetime_utils import str_to_localized_datetime, datetime_to_utc_str, utc_now
from ..helpers.multithreaded_requests import MultithreadedRequestsPool
from ..helpers.perf_metrics import PerformanceMetrics

Expand Down Expand Up @@ -120,13 +122,35 @@ def preprocess_batches(self, final_buffer):

@backoff.on_exception(backoff.expo, RuntimeError, max_tries=5)
def _all_fetch_batch_steps(self):
futures = self.queue_batches()
final_buffer, stop_iteration = self.collect_batches(futures)
if self.date_windowing:
start_datetime = datetime_to_utc_str(str_to_localized_datetime(
get_bookmark(self.state, self.stream_name, self.sub_type, self.start_date)))[:10]
end_datetime = datetime_to_utc_str(utc_now())[:10]
start = datetime.strptime(start_datetime, '%Y-%m-%d').date()
end = datetime.strptime(end_datetime, '%Y-%m-%d').date()
temp = start + timedelta(days=self.date_window_size)
stop_iteration = True
while temp < end:
if stop_iteration:
self.offset = 0
self.modify_request_params(start, temp)
final_buffer, stop_iteration = self.collect_batches(self.queue_batches())
self.preprocess_batches(final_buffer)
if not final_buffer or stop_iteration:
start = temp
temp = start + timedelta(days=self.date_window_size)
self.offset = 0
self.modify_request_params(start, end)
final_buffer, stop_iteration = self.collect_batches(self.queue_batches())
self.preprocess_batches(final_buffer)
if not final_buffer or stop_iteration:
return False
return True

def modify_request_params(self, start, end):
self.static_params["from"] = datetime.strftime(start, '%Y-%m-%d')
self.static_params["to"] = datetime.strftime(end, '%Y-%m-%d')

def error_check_and_fix(self, final_buffer: set, temp_buffer: set, futures: list):
try:
final_buffer = self.check_and_get_set_reunion(final_buffer, temp_buffer, self.artificial_limit)
Expand Down

0 comments on commit 16d3fcb

Please sign in to comment.