From de02287e8215397be2513f995b915319e7e655d0 Mon Sep 17 00:00:00 2001 From: Andy Lu Date: Wed, 2 Aug 2023 10:11:42 -0400 Subject: [PATCH] TDL-23384: Add retry for chunked encoding errors (#61) * add retry for chunked encoding error * Add exponential backoff to `request_export` * Remove backoff because this is a generator Throwing an exception in the generator seems to just raise a `StopIteration` * Add retry logic to the caller of `client.request_export` * Add a test for retrying on `ChunkedEncodingError`s * Bump to `v1.5.1`, update changelog --------- Co-authored-by: Leslie VanDeMark --- CHANGELOG.md | 3 ++ setup.py | 2 +- tap_mixpanel/client.py | 4 +-- tap_mixpanel/streams.py | 9 ++++++ tests/unittests/test_error_handling.py | 39 ++++++++++++++++++++++++++ 5 files changed, 54 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fe1ef4..101d9d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 1.5.1 + * Add retry logic for `ChunkedEncodingError`s [#61](https://github.com/singer-io/tap-mixpanel/pull/61) + ## 1.5.0 * Adds `export_events` as optional param to filter the data for export stream based on event names [#56](https://github.com/singer-io/tap-mixpanel/pull/56) diff --git a/setup.py b/setup.py index 57abc3e..f6c4dee 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup, find_packages setup(name='tap-mixpanel', - version='1.5.0', + version='1.5.1', description='Singer.io tap for extracting data from the mixpanel API', author='jeff.huth@bytecode.io', classifiers=['Programming Language :: Python :: 3 :: Only'], diff --git a/tap_mixpanel/client.py b/tap_mixpanel/client.py index 544f3d4..4e49a5d 100644 --- a/tap_mixpanel/client.py +++ b/tap_mixpanel/client.py @@ -4,7 +4,7 @@ import jsonlines import requests import singer -from requests.exceptions import ConnectionError, Timeout +from requests.exceptions import ChunkedEncodingError, ConnectionError, Timeout from requests.models import ProtocolError from singer import metrics @@ -204,7 +204,7 @@ def check_access(self): @backoff.on_exception( backoff.expo, - (Server5xxError, Server429Error, ReadTimeoutError, ConnectionError, Timeout, ProtocolError), + (Server5xxError, Server429Error, ReadTimeoutError, ConnectionError, Timeout, ProtocolError, ChunkedEncodingError), max_tries=BACKOFF_MAX_TRIES_REQUEST, factor=3, logger=LOGGER, diff --git a/tap_mixpanel/streams.py b/tap_mixpanel/streams.py index d8c00b9..da89c9e 100644 --- a/tap_mixpanel/streams.py +++ b/tap_mixpanel/streams.py @@ -7,6 +7,8 @@ import urllib import pytz +import requests +import backoff import singer from singer import Transformer, metadata, metrics, utils from singer.utils import strptime_to_utc @@ -695,6 +697,13 @@ class Export(MixPanel): replication_method = "INCREMENTAL" params = {} + + @backoff.on_exception( + backoff.expo, + (requests.exceptions.ChunkedEncodingError,), + max_tries=5, + factor=2, + ) def get_and_transform_records( self, querystring, diff --git a/tests/unittests/test_error_handling.py b/tests/unittests/test_error_handling.py index 7ba6abc..0b973c8 100644 --- a/tests/unittests/test_error_handling.py +++ b/tests/unittests/test_error_handling.py @@ -1,10 +1,12 @@ import unittest import requests +import jsonlines from unittest import mock from parameterized import parameterized from tap_mixpanel import client +from tap_mixpanel import streams # Mock response REQUEST_TIMEOUT = 300 @@ -249,3 +251,40 @@ def test_check_access_handle_timeout_error(self, mock_request, mock_time): # Verify that requests.Session.request is called 5 times self.assertEqual(mock_request.call_count, 5) + + @mock.patch("jsonlines.jsonlines.Reader.iter", side_effect=requests.exceptions.ChunkedEncodingError) + def test_ChunkedEncodingError(self, mock_jsonlines, mock_time): + """ + Check whether the request backoffs properly for `check_access` method for 5 times in case of Timeout error. + """ + mock_client = client.MixpanelClient(api_secret="mock_api_secret", api_domain="mock_api_domain", request_timeout=REQUEST_TIMEOUT) + mock_client._MixpanelClient__verified = True + + fake_response = MockResponse(500) + fake_response.iter_lines = lambda : [] + mock_client.perform_request = lambda *args, **kwargs: fake_response + + stream = streams.Export(mock_client) + + with self.assertRaises(requests.exceptions.ChunkedEncodingError) as error: + stream.get_and_transform_records( + querystring={}, + project_timezone=None, + max_bookmark_value=None, + state=None, + config=None, + catalog=None, + selected_streams=None, + last_datetime=None, + endpoint_total=None, + limit=None, + total_records=None, + parent_total=None, + record_count=None, + page=None, + offset=None, + parent_record=None, + date_total=None, + ) + + self.assertEqual(mock_jsonlines.call_count, 5)