diff --git a/synapseclient/api/entity_services.py b/synapseclient/api/entity_services.py index 7d64025bf..d2f2f01cd 100644 --- a/synapseclient/api/entity_services.py +++ b/synapseclient/api/entity_services.py @@ -2,11 +2,15 @@ """ +import asyncio import json from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union from async_lru import alru_cache +from synapseclient.core.exceptions import SynapseHTTPError +from synapseclient.core.retry import RETRYABLE_CONNECTION_EXCEPTIONS_NO_READ_ISSUES + if TYPE_CHECKING: from synapseclient import Synapse @@ -36,9 +40,45 @@ async def post_entity( params = {} if generated_by: params["generatedBy"] = generated_by - return await client.rest_post_async( - uri="/entity", body=json.dumps(request), params=params - ) + + try: + return await client.rest_post_async( + uri="/entity", + body=json.dumps(request), + params=params, + retry_policy={ + "retry_exceptions": RETRYABLE_CONNECTION_EXCEPTIONS_NO_READ_ISSUES + }, + ) + except SynapseHTTPError: + if "name" in request and "parentId" in request: + loop = asyncio.get_event_loop() + entity_id = await loop.run_in_executor( + None, + lambda: Synapse.get_client(synapse_client=synapse_client).findEntityId( + name=request["name"], + parent=request["parentId"], + ), + ) + if not entity_id: + raise + + client.logger.warning( + "This is a temporary exception to recieving an HTTP 409 conflict error - Retrieving the state of the object in Synapse. If an error is not printed after this, assume the operation was successful (SYNSD-1233)." + ) + existing_instance = await get_entity( + entity_id, synapse_client=synapse_client + ) + # Loop over the request params and if any of them do not match the existing instance, raise the error + for key, value in request.items(): + if key not in existing_instance or existing_instance[key] != value: + client.logger.error( + f"Failed temporary HTTP 409 logic check because {key} not in instance in Synapse or value does not match: [Existing: {existing_instance[key]}, Expected: {value}]." + ) + raise + return existing_instance + else: + raise async def put_entity( diff --git a/synapseclient/client.py b/synapseclient/client.py index ce25aa5ab..d11a5cdd1 100644 --- a/synapseclient/client.py +++ b/synapseclient/client.py @@ -286,6 +286,7 @@ def __init__( requests_session_storage: httpx.Client = None, asyncio_event_loop: asyncio.AbstractEventLoop = None, cache_client: bool = True, + max_concurrent_file_transfers: int = None, ) -> "Synapse": """ Initialize Synapse object @@ -315,11 +316,14 @@ def __init__( When working in a multi-user environment it is recommended to set this to False, or use `Synapse.allow_client_caching(False)`. + max_concurrent_file_transfers: The maximum number of concurrent file + transfers that can be executed at the same time. Raises: ValueError: Warn for non-boolean debug value. """ self._requests_session = requests_session or requests.Session() + self._max_concurrent_file_transfers = max_concurrent_file_transfers # `requests_session_async_synapse` and the thread pools are being stored in # a dict based on the current running event loop. This is to ensure that the @@ -498,7 +502,11 @@ def _get_parallel_file_transfer_semaphore( return self._parallel_file_transfer_semaphore[asyncio_event_loop] self._parallel_file_transfer_semaphore.update( - {asyncio_event_loop: asyncio.Semaphore(max(self.max_threads * 2, 1))} + { + asyncio_event_loop: asyncio.Semaphore( + self._max_concurrent_file_transfers or max(self.max_threads * 2, 1) + ) + } ) return self._parallel_file_transfer_semaphore[asyncio_event_loop] @@ -6299,6 +6307,7 @@ async def _rest_call_async( Returns: JSON encoding of response """ + self.logger.debug(f"Sending {method} request to {uri}") uri, headers = self._build_uri_and_headers( uri, endpoint=endpoint, headers=headers ) diff --git a/synapseclient/core/retry.py b/synapseclient/core/retry.py index 5851df5bc..87654159f 100644 --- a/synapseclient/core/retry.py +++ b/synapseclient/core/retry.py @@ -73,6 +73,21 @@ "SSLZeroReturnError", ] +RETRYABLE_CONNECTION_EXCEPTIONS_NO_READ_ISSUES = [ + "ChunkedEncodingError", + "ConnectionError", + "ConnectionResetError", + "Timeout", + "timeout", + # HTTPX Specific connection exceptions: + "RemoteProtocolError", + "TimeoutException", + "ConnectError", + "ConnectTimeout", + # SSL Specific exceptions: + "SSLZeroReturnError", +] + DEBUG_EXCEPTION = "calling %s resulted in an Exception" diff --git a/synapseclient/synapsePythonClient b/synapseclient/synapsePythonClient index 815dea16d..d57ed71db 100644 --- a/synapseclient/synapsePythonClient +++ b/synapseclient/synapsePythonClient @@ -1,6 +1,6 @@ { "client": "synapsePythonClient", - "latestVersion": "4.5.0", + "latestVersion": "4.5.999", "blacklist": [ "0.0.0", "0.4.1", diff --git a/tests/integration/synapseclient/models/async/test_file_async.py b/tests/integration/synapseclient/models/async/test_file_async.py index 29bcb2f43..0ead015bf 100644 --- a/tests/integration/synapseclient/models/async/test_file_async.py +++ b/tests/integration/synapseclient/models/async/test_file_async.py @@ -4,9 +4,12 @@ import os import uuid from typing import Callable -from unittest.mock import patch +from unittest.mock import AsyncMock, patch +import httpcore import pytest +from httpcore._backends.anyio import AnyIOStream +from pytest_mock import MockerFixture from synapseclient import Synapse from synapseclient.core import utils @@ -82,6 +85,82 @@ async def test_store_in_project(self, project_model: Project, file: File) -> Non assert file.file_handle.key is not None assert file.file_handle.external_url is None + async def test_store_in_project_with_read_error( + self, + project_model: Project, + file: File, + mocker: MockerFixture, + ) -> None: + # GIVEN a file + file.name = str(uuid.uuid4()) + + # AND spy/mocks to simulate a read error + spy_tls_stream = mocker.spy(httpcore._backends.anyio.AnyIOStream, "start_tls") + logger_warning_spy = mocker.spy(self.syn.logger, "warning") + original_read = httpcore._backends.anyio.AnyIOStream.read + call_count = 0 + + async def new_read(*args, **kwargs) -> bytes: + nonlocal call_count + async_network_stream: httpcore.AsyncNetworkStream = ( + spy_tls_stream.spy_return + ) + call_count = call_count + 1 + # This is very brittle, however, I do not see data to determine what the + # read is. In this case the 7th call is going to be the HTTP POST to create + # the entity. + if call_count == 7: + return b"" + return await original_read(async_network_stream, *args, **kwargs) + + # WHEN I store the file + with patch( + "httpcore._backends.anyio.AnyIOStream.read", + new_callable=AsyncMock, + side_effect=new_read, + ) as mock_read: + mock_read.return_value = b"" + file_copy_object = await file.store_async( + parent=project_model, synapse_client=self.syn + ) + self.schedule_for_cleanup(file.id) + + # THEN I expect the file to be stored + assert file.id is not None + assert file_copy_object.id is not None + assert file_copy_object == file + assert file.parent_id == project_model.id + assert file.content_type == CONTENT_TYPE + assert file.version_comment == VERSION_COMMENT + assert file.version_label is not None + assert file.version_number == 1 + assert file.created_by is not None + assert file.created_on is not None + assert file.modified_by is not None + assert file.modified_on is not None + assert file.data_file_handle_id is not None + assert file.file_handle is not None + assert file.file_handle.id is not None + assert file.file_handle.etag is not None + assert file.file_handle.created_by is not None + assert file.file_handle.created_on is not None + assert file.file_handle.modified_on is not None + assert file.file_handle.concrete_type is not None + assert file.file_handle.content_type is not None + assert file.file_handle.content_md5 is not None + assert file.file_handle.file_name is not None + assert file.file_handle.storage_location_id is not None + assert file.file_handle.content_size is not None + assert file.file_handle.status is not None + assert file.file_handle.bucket_name is not None + assert file.file_handle.key is not None + assert file.file_handle.external_url is None + + # AND the temporary warning is present: + logger_warning_spy.assert_called_once_with( + "This is a temporary exception to recieving an HTTP 409 conflict error - Retrieving the state of the object in Synapse. If an error is not printed after this, assume the operation was successful (SYNSD-1233)." + ) + async def test_activity_store_then_delete( self, project_model: Project, file: File ) -> None: