Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TMP development to show how things could work with concurrent cursor #228

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
ModelToComponentFactory,
)
from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
Expand Down Expand Up @@ -66,13 +67,18 @@ def __init__(
component_factory: Optional[ModelToComponentFactory] = None,
**kwargs: Any,
) -> None:
# todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
# no longer needs to store the original incoming state. But maybe there's an edge case?
self._state_manager = ConnectorStateManager(state=state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later

# To reduce the complexity of the concurrent framework, we are not enabling RFR with synthetic
# cursors. We do this by no longer automatically instantiating RFR cursors when converting
# the declarative models into runtime components. Concurrent sources will continue to checkpoint
# incremental streams running in full refresh.
component_factory = component_factory or ModelToComponentFactory(
emit_connector_builder_messages=emit_connector_builder_messages,
disable_resumable_full_refresh=True,
state_manager=self._state_manager,
)

super().__init__(
Expand All @@ -82,10 +88,6 @@ def __init__(
component_factory=component_factory,
)

# todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
# no longer needs to store the original incoming state. But maybe there's an edge case?
self._state = state

concurrency_level_from_manifest = self._source_config.get("concurrency_level")
if concurrency_level_from_manifest:
concurrency_level_component = self._constructor.create_component(
Expand Down Expand Up @@ -175,8 +177,6 @@ def _group_streams(
concurrent_streams: List[AbstractStream] = []
synchronous_streams: List[Stream] = []

state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later

# Combine streams and dynamic_streams. Note: both cannot be empty at the same time,
# and this is validated during the initialization of the source.
streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
Expand Down Expand Up @@ -216,19 +216,6 @@ def _group_streams(
if self._is_datetime_incremental_without_partition_routing(
declarative_stream, incremental_sync_component_definition
):
stream_state = state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)

cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
state_manager=state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
config=config or {},
stream_state=stream_state,
)

retriever = declarative_stream.retriever

Expand All @@ -241,19 +228,29 @@ def _group_streams(
# like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator
# still rely on a DatetimeBasedCursor that is properly initialized with state.
if retriever.cursor:
stream_state = self._state_manager.get_stream_state(
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
)
retriever.cursor.set_initial_state(stream_state=stream_state)
# We zero it out here, but since this is a cursor reference, the state is still properly
# instantiated for the other components that reference it
retriever.cursor = None

cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
model_type=DatetimeBasedCursorModel,
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
stream_name=declarative_stream.name,
stream_namespace=declarative_stream.namespace,
config=config or {},
) if type(declarative_stream.retriever).__name__ != "AsyncRetriever" else declarative_stream.retriever.stream_slicer.stream_slicer # type: ignore # AsyncRetriever has stream_slicer
partition_generator = StreamSlicerPartitionGenerator(
DeclarativePartitionFactory(
declarative_stream.name,
declarative_stream.get_json_schema(),
retriever,
self.message_repository,
),
cursor,
cursor if type(declarative_stream.retriever).__name__ != "AsyncRetriever" else declarative_stream.retriever.stream_slicer, # type: ignore # AsyncRetriever has stream_slicer
)

concurrent_streams.append(
Expand Down Expand Up @@ -325,7 +322,7 @@ def _is_datetime_incremental_without_partition_routing(
declarative_stream=declarative_stream
)
and hasattr(declarative_stream.retriever, "stream_slicer")
and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
and (isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter))
)

def _stream_supports_concurrent_partition_processing(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CheckStream as CheckStreamModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DeclarativeStream as DeclarativeStreamModel,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ def __init__(
disable_cache: bool = False,
disable_resumable_full_refresh: bool = False,
message_repository: Optional[MessageRepository] = None,
state_manager: Optional[ConnectorStateManager] = None
):
self._init_mappings()
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
Expand All @@ -487,6 +488,7 @@ def __init__(
self._message_repository = message_repository or InMemoryMessageRepository(
self._evaluate_log_level(emit_connector_builder_messages)
)
self._state_manager = state_manager
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is that this makes sense here given that we want to instantiate cursors with the state eventually to avoid a set_initial_state method call


def _init_mappings(self) -> None:
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
Expand Down Expand Up @@ -880,13 +882,11 @@ def create_concurrency_level(

def create_concurrent_cursor_from_datetime_based_cursor(
self,
state_manager: ConnectorStateManager,
model_type: Type[BaseModel],
component_definition: ComponentDefinition,
stream_name: str,
stream_namespace: Optional[str],
config: Config,
stream_state: MutableMapping[str, Any],
**kwargs: Any,
) -> ConcurrentCursor:
component_type = component_definition.get("type")
Expand Down Expand Up @@ -1021,9 +1021,9 @@ def create_concurrent_cursor_from_datetime_based_cursor(
return ConcurrentCursor(
stream_name=stream_name,
stream_namespace=stream_namespace,
stream_state=stream_state,
stream_state=self._state_manager.get_stream_state(stream_name, stream_namespace),
message_repository=self._message_repository,
connector_state_manager=state_manager,
connector_state_manager=self._state_manager,
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
slice_boundary_fields=slice_boundary_fields,
Expand Down Expand Up @@ -1476,6 +1476,17 @@ def _merge_stream_slicers(
stream_cursor=cursor_component,
)
elif model.incremental_sync:
if model.retriever.type == "AsyncRetriever":
if model.incremental_sync.type != "DatetimeBasedCursor":
# TODO explain why it isn't supported
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: I'm not exactly sure why other types of cursors wouldn't be supported but I was only doing this for source-amazon-ads so I wanted to be more restrictive than not.

Note that Global/PerPartition cursors were not updated which we will need for source-amazon-ads

raise ValueError("AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet")
return self.create_concurrent_cursor_from_datetime_based_cursor(
model_type=DatetimeBasedCursorModel,
component_definition=model.incremental_sync.__dict__,
stream_name=model.name,
stream_namespace=None,
config=config or {},
)
return (
self._create_component_from_model(model=model.incremental_sync, config=config)
if model.incremental_sync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _validate_and_get_stream_slice_partition(
"""
if not isinstance(stream_slice, StreamSlice) or "partition" not in stream_slice.partition:
raise AirbyteTracedException(
message="Invalid arguments to AsyncJobRetriever.read_records: stream_slice is no optional. Please contact Airbyte Support",
message="Invalid arguments to AsyncRetriever.read_records: stream_slice is no optional. Please contact Airbyte Support",
failure_type=FailureType.system_error,
)
return stream_slice["partition"] # type: ignore # stream_slice["partition"] has been added as an AsyncPartition as part of stream_slices
Expand Down
4 changes: 3 additions & 1 deletion airbyte_cdk/sources/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import orjson

from airbyte_cdk.utils.slice_hasher import SliceHasher

# A FieldPointer designates a path to a field inside a mapping. For example, retrieving ["k1", "k1.2"] in the object {"k1" :{"k1.2":
# "hello"}] returns "hello"
FieldPointer = List[str]
Expand Down Expand Up @@ -151,7 +153,7 @@ def __json_serializable__(self) -> Any:
return self._stream_slice

def __hash__(self) -> int:
return hash(orjson.dumps(self._stream_slice, option=orjson.OPT_SORT_KEYS))
return SliceHasher.hash("dummy_name", self._stream_slice)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had to be updated to support AsyncPartition. SliceHasher considers __json_serializable__ but orjson does not. I figure we should have the same slicing logic everywhere and if we want to update this logic to orjson, just do it once in SliceHasher

TODO: I'm not sure why we require the name of the stream for the slice hasher.


def __bool__(self) -> bool:
return bool(self._stream_slice) or bool(self._extra_fields)
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@
"http_method": "GET",
},
},
"incremental_sync": {"$ref": "#/definitions/incremental_cursor"},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure that async_retriever with incremental syncs are also concurrent

"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
Expand Down
Loading