From 95798663e22c93d7a276b068ac07f5e0604e8ae1 Mon Sep 17 00:00:00 2001 From: Daniel Morell Date: Fri, 18 Nov 2022 21:01:15 -0600 Subject: [PATCH 1/4] Issue #412 Added thread_pool handler --- rollbar/__init__.py | 29 ++++++++++++++++++++++++++--- rollbar/lib/thread_pool.py | 31 +++++++++++++++++++++++++++++++ rollbar/test/test_rollbar.py | 26 ++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 3 deletions(-) create mode 100644 rollbar/lib/thread_pool.py diff --git a/rollbar/__init__.py b/rollbar/__init__.py index 2ec18d2c..521e521b 100644 --- a/rollbar/__init__.py +++ b/rollbar/__init__.py @@ -23,7 +23,7 @@ from rollbar.lib import events, filters, dict_merge, parse_qs, text, transport, urljoin, iteritems, defaultJSONEncode -__version__ = '0.16.3' +__version__ = '0.16.4beta' __log_name__ = 'rollbar' log = logging.getLogger(__log_name__) @@ -124,7 +124,7 @@ def wrap(*args, **kwargs): from twisted.internet.ssl import CertificateOptions from twisted.internet import task, defer, ssl, reactor from zope.interface import implementer - + @implementer(IPolicyForHTTPS) class VerifyHTTPS(object): def __init__(self): @@ -275,7 +275,12 @@ def _get_fastapi_request(): 'root': None, # root path to your code 'branch': None, # git branch name 'code_version': None, - 'handler': 'default', # 'blocking', 'thread' (default), 'async', 'agent', 'tornado', 'gae', 'twisted' or 'httpx' + # 'blocking', 'thread' (default), 'async', 'agent', 'tornado', 'gae', 'twisted', 'httpx' or 'thread_pool' + # 'async' requires Python 3.4 or higher. + # 'httpx' requires Python 3.7 or higher. + # 'thread_pool' requires Python 3.2 or higher. + 'handler': 'default', + 'thread_pool_workers': None, 'endpoint': DEFAULT_ENDPOINT, 'timeout': DEFAULT_TIMEOUT, 'agent.log_file': 'log.rollbar', @@ -383,6 +388,9 @@ def init(access_token, environment='production', scrub_fields=None, url_fields=N if SETTINGS.get('handler') == 'agent': agent_log = _create_agent_log() + elif SETTINGS.get('handler') == 'thread_pool': + from rollbar.lib.thread_pool import init_pool + init_pool(SETTINGS.get('thread_pool_workers', None)) if not SETTINGS['locals']['safelisted_types'] and SETTINGS['locals']['whitelisted_types']: warnings.warn('whitelisted_types deprecated use safelisted_types instead', DeprecationWarning) @@ -523,6 +531,7 @@ def send_payload(payload, access_token): - 'gae': calls _send_payload_appengine() (which makes a blocking call to Google App Engine) - 'twisted': calls _send_payload_twisted() (which makes an async HTTP request using Twisted and Treq) - 'httpx': calls _send_payload_httpx() (which makes an async HTTP request using HTTPX) + - 'thread_pool': uses a pool of worker threads to make HTTP requests off the main thread. Returns immediately. """ payload = events.on_payload(payload) if payload is False: @@ -569,6 +578,8 @@ def send_payload(payload, access_token): _send_payload_async(payload_str, access_token) elif handler == 'thread': _send_payload_thread(payload_str, access_token) + elif handler == 'thread_pool': + _send_payload_thread_pool(payload_str, access_token) else: # default to 'thread' _send_payload_thread(payload_str, access_token) @@ -1510,6 +1521,18 @@ def _send_payload_thread(payload_str, access_token): thread.start() +def _send_payload_pool(payload_str, access_token): + try: + _post_api('item/', payload_str, access_token=access_token) + except Exception as e: + log.exception('Exception while posting item %r', e) + + +def _send_payload_thread_pool(payload_str, access_token): + from rollbar.lib.thread_pool import submit + submit(_send_payload_pool, payload_str, access_token) + + def _send_payload_appengine(payload_str, access_token): try: _post_api_appengine('item/', payload_str, access_token=access_token) diff --git a/rollbar/lib/thread_pool.py b/rollbar/lib/thread_pool.py new file mode 100644 index 00000000..6d3408da --- /dev/null +++ b/rollbar/lib/thread_pool.py @@ -0,0 +1,31 @@ +import logging +from concurrent.futures import ThreadPoolExecutor + +_pool = None # type: ThreadPoolExecutor|None + +log = logging.getLogger(__name__) + + +def init_pool(max_workers): + """ + Creates the thread pool with the max workers. + + :type max_workers: int + """ + global _pool + _pool = ThreadPoolExecutor(max_workers) + + +def submit(worker, payload_str, access_token): + """ + Submit a new task to the thread pool. + + :type worker: function + :type payload_str: str + :type access_token: str + """ + global _pool + if _pool is None: + log.warning('pyrollbar: Thead pool not initialized. Please ensure init_pool() is called prior to submit().') + return + _pool.submit(worker, payload_str, access_token) diff --git a/rollbar/test/test_rollbar.py b/rollbar/test/test_rollbar.py index ed797883..03164c2f 100644 --- a/rollbar/test/test_rollbar.py +++ b/rollbar/test/test_rollbar.py @@ -1035,6 +1035,32 @@ def _raise(): send_payload_httpx.assert_called_once() + @unittest.skipUnless(sys.version_info >= (3, 2), 'concurrent.futures support requires Python3.2+') + @mock.patch('rollbar._send_payload_thread_pool') + def test_thread_pool_handler(self, send_payload_thread_pool): + def _raise(): + try: + raise Exception('foo') + except: + rollbar.report_exc_info() + rollbar.SETTINGS['handler'] = 'thread_pool' + _raise() + + send_payload_thread_pool.assert_called_once() + + @unittest.skipUnless(sys.version_info >= (3, 2), 'concurrent.futures support requires Python3.2+') + def test_thread_pool_submit(self): + from rollbar.lib.thread_pool import init_pool, submit + init_pool(1) + ran = {'nope': True} # dict used so it is not shadowed in run + + def run(payload_str, access_token): + ran['nope'] = False + + submit(run, 'foo', 'bar') + self.assertFalse(ran['nope']) + + @mock.patch('rollbar.send_payload') def test_args_constructor(self, send_payload): From c81236bd101ecbdabb9680e2d2318c4754bf22e4 Mon Sep 17 00:00:00 2001 From: Daniel Morell Date: Mon, 21 Nov 2022 20:41:56 -0600 Subject: [PATCH 2/4] Attempt to fix CI --- .github/workflows/ci.yml | 44 ++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c40ec05b..fd042284 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -205,6 +205,28 @@ jobs: - name: Install dependencies run: pip install setuptools==39.2.0 --force-reinstall + - name: Install Python 2 dependencies + if: ${{ contains(matrix.python-version, '2.7') }} + # certifi dropped support for Python 2 in 2020.4.5.2 but only started + # using Python 3 syntax in 2022.5.18. 2021.10.8 is the last release with + # Python 2 support. + run: pip install certifi==2021.10.8 requests==2.27.1 incremental==21.3.0 + + - name: Install Python 3.4 dependencies + if: ${{ contains(matrix.python-version, '3.4') }} + # certifi uses the 'typing' from Python 3.5 module starting in 2022.5.18 + run: pip install certifi==2021.10.8 "typing-extensions<4" incremental==21.3.0 + + - name: Install Python 3.5 dependencies + if: ${{ contains(matrix.python-version, '3.5') }} + # typing-extensions dropped support for Python 3.5 in version 4 + run: pip install "typing-extensions<4" + + - name: Install Python 3.6 dependencies + if: ${{ contains(matrix.python-version, '3.6') }} + # typing-extensions dropped support for Python 3.6 in version 4.2 + run: pip install "typing-extensions<4.2" + - name: Set the framework run: echo ${{ matrix.framework }} >> $GITHUB_ENV @@ -232,27 +254,5 @@ jobs: if: ${{ contains(matrix.framework, 'FASTAPI_VERSION') }} run: pip install fastapi==$FASTAPI_VERSION - - name: Install Python 2 dependencies - if: ${{ contains(matrix.python-version, '2.7') }} - # certifi dropped support for Python 2 in 2020.4.5.2 but only started - # using Python 3 syntax in 2022.5.18. 2021.10.8 is the last release with - # Python 2 support. - run: pip install certifi==2021.10.8 requests==2.27.1 - - - name: Install Python 3.4 dependencies - if: ${{ contains(matrix.python-version, '3.4') }} - # certifi uses the 'typing' from Python 3.5 module starting in 2022.5.18 - run: pip install certifi==2021.10.8 "typing-extensions<4" - - - name: Install Python 3.5 dependencies - if: ${{ contains(matrix.python-version, '3.5') }} - # typing-extensions dropped support for Python 3.5 in version 4 - run: pip install "typing-extensions<4" - - - name: Install Python 3.6 dependencies - if: ${{ contains(matrix.python-version, '3.6') }} - # typing-extensions dropped support for Python 3.6 in version 4.2 - run: pip install "typing-extensions<4.2" - - name: Run tests run: python setup.py test From f4043fea9e6004e7ef8c46f70f8a7b591487a03e Mon Sep 17 00:00:00 2001 From: Daniel Morell Date: Mon, 21 Nov 2022 20:48:12 -0600 Subject: [PATCH 3/4] Increase minimum version of python needed for test --- rollbar/test/test_rollbar.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rollbar/test/test_rollbar.py b/rollbar/test/test_rollbar.py index 03164c2f..8cc2be29 100644 --- a/rollbar/test/test_rollbar.py +++ b/rollbar/test/test_rollbar.py @@ -1035,7 +1035,7 @@ def _raise(): send_payload_httpx.assert_called_once() - @unittest.skipUnless(sys.version_info >= (3, 2), 'concurrent.futures support requires Python3.2+') + @unittest.skipUnless(sys.version_info >= (3, 6), 'assert_called_once support requires Python3.6+') @mock.patch('rollbar._send_payload_thread_pool') def test_thread_pool_handler(self, send_payload_thread_pool): def _raise(): From e14af00922ade40beac9cf578ad42f113eac5370 Mon Sep 17 00:00:00 2001 From: Daniel Morell Date: Fri, 25 Nov 2022 15:34:42 -0600 Subject: [PATCH 4/4] Added default number of max workers for Python < 3.5 --- rollbar/lib/thread_pool.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/rollbar/lib/thread_pool.py b/rollbar/lib/thread_pool.py index 6d3408da..8bd7162c 100644 --- a/rollbar/lib/thread_pool.py +++ b/rollbar/lib/thread_pool.py @@ -1,4 +1,6 @@ import logging +import os +import sys from concurrent.futures import ThreadPoolExecutor _pool = None # type: ThreadPoolExecutor|None @@ -10,8 +12,13 @@ def init_pool(max_workers): """ Creates the thread pool with the max workers. - :type max_workers: int + :type max_workers: int|None + :param max_workers: If max_workers is None it will use the logic from the standard library to calculate the number + of threads. However, we ported the logic from Python 3.5 to earlier versions. """ + if max_workers is None and sys.version_info < (3, 5): + max_workers = (os.cpu_count() or 1) * 5 + global _pool _pool = ThreadPoolExecutor(max_workers)