-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TMP development to show how things could work with concurrent cursor #228
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -476,6 +476,7 @@ def __init__( | |
disable_cache: bool = False, | ||
disable_resumable_full_refresh: bool = False, | ||
message_repository: Optional[MessageRepository] = None, | ||
state_manager: Optional[ConnectorStateManager] = None | ||
): | ||
self._init_mappings() | ||
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice | ||
|
@@ -487,6 +488,7 @@ def __init__( | |
self._message_repository = message_repository or InMemoryMessageRepository( | ||
self._evaluate_log_level(emit_connector_builder_messages) | ||
) | ||
self._state_manager = state_manager | ||
|
||
def _init_mappings(self) -> None: | ||
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = { | ||
|
@@ -880,13 +882,11 @@ def create_concurrency_level( | |
|
||
def create_concurrent_cursor_from_datetime_based_cursor( | ||
self, | ||
state_manager: ConnectorStateManager, | ||
model_type: Type[BaseModel], | ||
component_definition: ComponentDefinition, | ||
stream_name: str, | ||
stream_namespace: Optional[str], | ||
config: Config, | ||
stream_state: MutableMapping[str, Any], | ||
**kwargs: Any, | ||
) -> ConcurrentCursor: | ||
component_type = component_definition.get("type") | ||
|
@@ -1021,9 +1021,9 @@ def create_concurrent_cursor_from_datetime_based_cursor( | |
return ConcurrentCursor( | ||
stream_name=stream_name, | ||
stream_namespace=stream_namespace, | ||
stream_state=stream_state, | ||
stream_state=self._state_manager.get_stream_state(stream_name, stream_namespace), | ||
message_repository=self._message_repository, | ||
connector_state_manager=state_manager, | ||
connector_state_manager=self._state_manager, | ||
connector_state_converter=connector_state_converter, | ||
cursor_field=cursor_field, | ||
slice_boundary_fields=slice_boundary_fields, | ||
|
@@ -1476,6 +1476,17 @@ def _merge_stream_slicers( | |
stream_cursor=cursor_component, | ||
) | ||
elif model.incremental_sync: | ||
if model.retriever.type == "AsyncRetriever": | ||
if model.incremental_sync.type != "DatetimeBasedCursor": | ||
# TODO explain why it isn't supported | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: I'm not exactly sure why other types of cursors wouldn't be supported but I was only doing this for source-amazon-ads so I wanted to be more restrictive than not. Note that Global/PerPartition cursors were not updated which we will need for source-amazon-ads |
||
raise ValueError("AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet") | ||
return self.create_concurrent_cursor_from_datetime_based_cursor( | ||
model_type=DatetimeBasedCursorModel, | ||
component_definition=model.incremental_sync.__dict__, | ||
stream_name=model.name, | ||
stream_namespace=None, | ||
config=config or {}, | ||
) | ||
return ( | ||
self._create_component_from_model(model=model.incremental_sync, config=config) | ||
if model.incremental_sync | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,8 @@ | |
|
||
import orjson | ||
|
||
from airbyte_cdk.utils.slice_hasher import SliceHasher | ||
|
||
# A FieldPointer designates a path to a field inside a mapping. For example, retrieving ["k1", "k1.2"] in the object {"k1" :{"k1.2": | ||
# "hello"}] returns "hello" | ||
FieldPointer = List[str] | ||
|
@@ -151,7 +153,7 @@ def __json_serializable__(self) -> Any: | |
return self._stream_slice | ||
|
||
def __hash__(self) -> int: | ||
return hash(orjson.dumps(self._stream_slice, option=orjson.OPT_SORT_KEYS)) | ||
return SliceHasher.hash("dummy_name", self._stream_slice) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This had to be updated to support TODO: I'm not sure why we require the name of the stream for the slice hasher. |
||
|
||
def __bool__(self) -> bool: | ||
return bool(self._stream_slice) or bool(self._extra_fields) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -322,6 +322,7 @@ | |
"http_method": "GET", | ||
}, | ||
}, | ||
"incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To ensure that async_retriever with incremental syncs are also concurrent |
||
"schema_loader": { | ||
"type": "InlineSchemaLoader", | ||
"schema": { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My guess is that this makes sense here given that we want to instantiate cursors with the state eventually to avoid a
set_initial_state
method call