diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fe1ef4..101d9d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 1.5.1 + * Add retry logic for `ChunkedEncodingError`s [#61](https://github.com/singer-io/tap-mixpanel/pull/61) + ## 1.5.0 * Adds `export_events` as optional param to filter the data for export stream based on event names [#56](https://github.com/singer-io/tap-mixpanel/pull/56) diff --git a/setup.py b/setup.py index 57abc3e..f6c4dee 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup, find_packages setup(name='tap-mixpanel', - version='1.5.0', + version='1.5.1', description='Singer.io tap for extracting data from the mixpanel API', author='jeff.huth@bytecode.io', classifiers=['Programming Language :: Python :: 3 :: Only'], diff --git a/tap_mixpanel/client.py b/tap_mixpanel/client.py index e2e6bfd..c0bf54e 100644 --- a/tap_mixpanel/client.py +++ b/tap_mixpanel/client.py @@ -4,7 +4,7 @@ import jsonlines import requests import singer -from requests.exceptions import ConnectionError, Timeout +from requests.exceptions import ChunkedEncodingError, ConnectionError, Timeout from requests.models import ProtocolError from singer import metrics @@ -230,7 +230,7 @@ def check_access(self): @backoff.on_exception( backoff.expo, - (Server5xxError, Server429Error, ReadTimeoutError, ConnectionError, Timeout, ProtocolError), + (Server5xxError, Server429Error, ReadTimeoutError, ConnectionError, Timeout, ProtocolError, ChunkedEncodingError), max_tries=BACKOFF_MAX_TRIES_REQUEST, factor=3, logger=LOGGER, diff --git a/tap_mixpanel/streams.py b/tap_mixpanel/streams.py index d8c00b9..da89c9e 100644 --- a/tap_mixpanel/streams.py +++ b/tap_mixpanel/streams.py @@ -7,6 +7,8 @@ import urllib import pytz +import requests +import backoff import singer from singer import Transformer, metadata, metrics, utils from singer.utils import strptime_to_utc @@ -695,6 +697,13 @@ class Export(MixPanel): replication_method = "INCREMENTAL" params = {} + + @backoff.on_exception( + backoff.expo, + (requests.exceptions.ChunkedEncodingError,), + max_tries=5, + factor=2, + ) def get_and_transform_records( self, querystring, diff --git a/tests/unittests/test_error_handling.py b/tests/unittests/test_error_handling.py index 357414a..1e7aecb 100644 --- a/tests/unittests/test_error_handling.py +++ b/tests/unittests/test_error_handling.py @@ -1,10 +1,12 @@ import unittest import requests +import jsonlines from unittest import mock from parameterized import parameterized from tap_mixpanel import client +from tap_mixpanel import streams # Mock response REQUEST_TIMEOUT = 300 @@ -271,3 +273,40 @@ def test_check_access_handle_timeout_error(self, mock_request, mock_time): # Verify that requests.Session.request is called 5 times self.assertEqual(mock_request.call_count, 5) + + @mock.patch("jsonlines.jsonlines.Reader.iter", side_effect=requests.exceptions.ChunkedEncodingError) + def test_ChunkedEncodingError(self, mock_jsonlines, mock_time): + """ + Check whether the request backoffs properly for `check_access` method for 5 times in case of Timeout error. + """ + mock_client = client.MixpanelClient(api_secret="mock_api_secret", api_domain="mock_api_domain", request_timeout=REQUEST_TIMEOUT) + mock_client._MixpanelClient__verified = True + + fake_response = MockResponse(500) + fake_response.iter_lines = lambda : [] + mock_client.perform_request = lambda *args, **kwargs: fake_response + + stream = streams.Export(mock_client) + + with self.assertRaises(requests.exceptions.ChunkedEncodingError) as error: + stream.get_and_transform_records( + querystring={}, + project_timezone=None, + max_bookmark_value=None, + state=None, + config=None, + catalog=None, + selected_streams=None, + last_datetime=None, + endpoint_total=None, + limit=None, + total_records=None, + parent_total=None, + record_count=None, + page=None, + offset=None, + parent_record=None, + date_total=None, + ) + + self.assertEqual(mock_jsonlines.call_count, 5)