-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(low-code): Add parent state migration from global state #322
Conversation
📝 WalkthroughWalkthroughThe changes update the state migration logic within the SubstreamPartitionRouter’s Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant SubstreamPartitionRouter
participant StateInput
Caller ->> SubstreamPartitionRouter: _migrate_child_state_to_parent_state(stream_state)
Note right of SubstreamPartitionRouter: Check for key "state" and extract global state
SubstreamPartitionRouter ->> StateInput: Evaluate input state details
StateInput -->> SubstreamPartitionRouter: Return global state (ignore per-partition data)
SubstreamPartitionRouter -->> Caller: Return mapped parent state with cursor values
Possibly related PRs
Suggested reviewers
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
⏰ Context from checks skipped due to timeout of 90000ms (8)
🔇 Additional comments (1)
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (2)
302-316
: Great improvements to the docstring! Consider adding validation examples?The docstring now clearly explains the state migration behavior. Would it be helpful to add examples of invalid formats that are ignored during migration, wdyt?
This method converts the child stream state—or, if present, the global state—into a format that is compatible with parent streams that use incremental synchronization. The migration occurs only for parent streams with incremental dependencies. It filters out per-partition states and retains only the global state in the form {cursor_field: cursor_value}. The method supports multiple input formats: - A simple global state, e.g.: {"updated_at": "2023-05-27T00:00:00Z"} - A state object that contains a "state" key (which is assumed to hold the global state), e.g.: {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...} In this case, the migration uses the first value from the "state" dictionary. - Any per-partition state formats or other non-simple structures are ignored during migration. + + Examples of ignored formats: + - Per-partition state: + {"states": [{"partition": {"id": 1}, "cursor": {"updated_at": "2023-05-27T00:00:00Z"}}]} + - Invalid state structure: + {"updated_at": ["2023-05-27T00:00:00Z"]}
339-345
: Consider adding validation for empty state dictionary.The code now handles global state under the "state" key, but what if the state dictionary is empty? Should we add a check for that case, wdyt?
# Ignore per-partition states or invalid formats. if isinstance(substream_state, (list, dict)) or len(substream_state_values) != 1: # If a global state is present under the key "state", use its first value. - if "state" in stream_state and isinstance(stream_state["state"], dict): + if "state" in stream_state and isinstance(stream_state["state"], dict) and stream_state["state"]: substream_state = list(stream_state["state"].values())[0] else: return {}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py
(2 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: MyPy Check
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Ruff Lint Check
- GitHub Check: Build and Inspect Python Package
- GitHub Check: preview_docs
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Ruff Format Check
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
1453-1465
: LGTM! Great test coverage for state migration.The new test case for global state without parent state helps ensure robust state migration. The descriptive test IDs also make it easier to understand test failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fear that this might not be 100% true in the following case:
- T0: HTTP request for parent stream. To be synced are parent_1 and parent_2
- T1: Parent stream emit parent_1
- T2: HTTP request for child stream on slice parent_1
- T3: parent_3 is also updated which would now make it available in the parent stream HTTP request
- T4: a new record is added on child stream for parent_2
- T5: finished processing parent_1 on child level
- T6: process child stream for slice parent_2
In this case, we would set the parent state as T4 and the update on parent_3 would not be picked up. That being said, the {<cursor field>: <cursor value>}
format was not safe for very similar reasons so I'm good with this as a migration.
@maxi297 I agree. Ideally, we should apply the lookback window to the parent stream as well. However, this migration is only meant for transitioning from the legacy format, so there’s no need to overcomplicate the logic since the lookback window won’t be present in that case. |
Summary by CodeRabbit