Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concurrent-partition-cursor): Fix cursor comparison error #298

Merged
merged 4 commits into from
Jan 30, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -22,6 +22,9 @@
)
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, Cursor, CursorField
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
AbstractStreamStateConverter,
)
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState

logger = logging.getLogger("airbyte")
@@ -72,13 +75,15 @@ def __init__(
stream_state: Any,
message_repository: MessageRepository,
connector_state_manager: ConnectorStateManager,
connector_state_converter: AbstractStreamStateConverter,
cursor_field: CursorField,
) -> None:
self._global_cursor: Optional[StreamState] = {}
self._stream_name = stream_name
self._stream_namespace = stream_namespace
self._message_repository = message_repository
self._connector_state_manager = connector_state_manager
self._connector_state_converter = connector_state_converter
self._cursor_field = cursor_field

self._cursor_factory = cursor_factory
@@ -301,8 +306,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
):
# We assume that `stream_state` is in a global format that can be applied to all partitions.
# Example: {"global_state_format_key": "global_state_format_value"}
self._global_cursor = deepcopy(stream_state)
self._new_global_cursor = deepcopy(stream_state)
self._set_global_state(stream_state)

else:
self._use_global_cursor = stream_state.get("use_global_cursor", False)
@@ -319,8 +323,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None:

# set default state for missing partitions if it is per partition with fallback to global
if self._GLOBAL_STATE_KEY in stream_state:
self._global_cursor = deepcopy(stream_state[self._GLOBAL_STATE_KEY])
self._new_global_cursor = deepcopy(stream_state[self._GLOBAL_STATE_KEY])
self._set_global_state(stream_state[self._GLOBAL_STATE_KEY])

# Set initial parent state
if stream_state.get("parent_state"):
@@ -329,6 +332,27 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
# Set parent state for partition routers based on parent streams
self._partition_router.set_initial_state(stream_state)

def _set_global_state(self, stream_state: Mapping[str, Any]) -> None:
"""
Initializes the global cursor state from the provided stream state.

If the cursor field key is present in the stream state, its value is parsed,
formatted, and stored as the global cursor. This ensures consistency in state
representation across partitions.
"""
if self.cursor_field.cursor_field_key in stream_state:
global_state_value = stream_state[self.cursor_field.cursor_field_key]
final_format_global_state_value = self._connector_state_converter.output_format(
self._connector_state_converter.parse_value(global_state_value)
)

fixed_global_state = {
self.cursor_field.cursor_field_key: final_format_global_state_value
}

self._global_cursor = deepcopy(fixed_global_state)
self._new_global_cursor = deepcopy(fixed_global_state)

def observe(self, record: Record) -> None:
if not self._use_global_cursor and self.limit_reached():
self._use_global_cursor = True
Original file line number Diff line number Diff line change
@@ -1210,6 +1210,22 @@ def create_concurrent_cursor_from_perpartition_cursor(
)
cursor_field = CursorField(interpolated_cursor_field.eval(config=config))

datetime_format = datetime_based_cursor_model.datetime_format

cursor_granularity = (
parse_duration(datetime_based_cursor_model.cursor_granularity)
if datetime_based_cursor_model.cursor_granularity
else None
)

connector_state_converter: DateTimeStreamStateConverter
connector_state_converter = CustomFormatConcurrentStreamStateConverter(
datetime_format=datetime_format,
input_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats,
is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state
cursor_granularity=cursor_granularity,
)

# Create the cursor factory
cursor_factory = ConcurrentCursorFactory(
partial(
@@ -1233,6 +1249,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
stream_state=stream_state,
message_repository=self._message_repository, # type: ignore
connector_state_manager=state_manager,
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
)

Original file line number Diff line number Diff line change
@@ -65,7 +65,7 @@
},
"cursor_incremental_sync": {
"type": "DatetimeBasedCursor",
"cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"],
"cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z", "%ms"],
"datetime_format": "%Y-%m-%dT%H:%M:%SZ",
"cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}",
"start_datetime": {"datetime": "{{ config.get('start_date')}}"},
@@ -399,13 +399,16 @@ def _run_read(
VOTE_200_CREATED_AT = "2024-01-12T00:00:00Z" # Latest vote in partition 20
VOTE_210_CREATED_AT = "2024-01-12T00:00:15Z" # Latest vote in partition 21
VOTE_300_CREATED_AT = "2024-01-10T00:00:00Z" # Latest vote in partition 30
VOTE_300_CREATED_AT_TIMESTAMP = 1704844800000 # Latest vote in partition 30

# Initial State Constants
PARENT_COMMENT_CURSOR_PARTITION_1 = "2023-01-04T00:00:00Z" # Parent comment cursor (partition)
PARENT_POSTS_CURSOR = "2024-01-05T00:00:00Z" # Parent posts cursor (expected in state)

INITIAL_STATE_PARTITION_10_CURSOR = "2024-01-02T00:00:01Z"
INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP = 1704153601000
INITIAL_STATE_PARTITION_11_CURSOR = "2024-01-03T00:00:02Z"
INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP = 1704240002000
INITIAL_GLOBAL_CURSOR = INITIAL_STATE_PARTITION_11_CURSOR
INITIAL_GLOBAL_CURSOR_DATE = datetime.fromisoformat(
INITIAL_STATE_PARTITION_11_CURSOR.replace("Z", "")
@@ -596,7 +599,7 @@ def _run_read(
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
@@ -637,7 +640,7 @@ def _run_read(
{
"comment_id": 30,
"comment_updated_at": COMMENT_30_UPDATED_AT,
"created_at": VOTE_300_CREATED_AT,
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
"id": 300,
},
],
@@ -662,7 +665,7 @@ def _run_read(
"id": 10,
"parent_slice": {"id": 1, "parent_slice": {}},
},
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR},
"cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP},
},
{
"partition": {
@@ -672,7 +675,7 @@ def _run_read(
"cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
},
],
"state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR},
"state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP},
"lookback_window": 86400,
},
# Expected state
@@ -981,7 +984,15 @@ def run_incremental_parent_state_test(
# Fetch the first page of votes for comment 30 of post 3
(
f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={LOOKBACK_DATE}",
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
{
"votes": [
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
),
# Requests with intermediate states
# Fetch votes for comment 10 of post 1
@@ -1018,7 +1029,15 @@ def run_incremental_parent_state_test(
# Fetch votes for comment 30 of post 3
(
f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={VOTE_300_CREATED_AT}",
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
{
"votes": [
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
),
],
# Expected records
@@ -1056,7 +1075,7 @@ def run_incremental_parent_state_test(
{
"comment_id": 30,
"comment_updated_at": COMMENT_30_UPDATED_AT,
"created_at": VOTE_300_CREATED_AT,
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
"id": 300,
},
],
@@ -1344,7 +1363,15 @@ def test_incremental_parent_state(
(
f"https://api.example.com/community/posts/3/comments/30/votes"
f"?per_page=100&start_time={PARTITION_SYNC_START_TIME}",
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
{
"votes": [
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
),
],
# Expected records
@@ -1382,7 +1409,7 @@ def test_incremental_parent_state(
{
"comment_id": 30,
"comment_updated_at": COMMENT_30_UPDATED_AT,
"created_at": VOTE_300_CREATED_AT,
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
"id": 300,
},
],
@@ -1896,7 +1923,15 @@ def test_incremental_parent_state_no_records(
(
f"https://api.example.com/community/posts/3/comments/30/votes"
f"?per_page=100&start_time={LOOKBACK_DATE}",
{"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]},
{
"votes": [
{
"id": 300,
"comment_id": 30,
"created_at": VOTE_300_CREATED_AT_TIMESTAMP,
}
]
},
),
],
# Expected records
@@ -1928,7 +1963,7 @@ def test_incremental_parent_state_no_records(
{
"comment_id": 30,
"comment_updated_at": COMMENT_30_UPDATED_AT,
"created_at": VOTE_300_CREATED_AT,
"created_at": str(VOTE_300_CREATED_AT_TIMESTAMP),
"id": 300,
},
],