-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(concurrent-partition-cursor): Fix cursor comparison error #298
Conversation
📝 WalkthroughWalkthroughThe pull request introduces modifications to the Changes
Sequence DiagramsequenceDiagram
participant Factory as ModelToComponentFactory
participant Cursor as ConcurrentPerPartitionCursor
participant StateConverter as CustomFormatConcurrentStreamStateConverter
Factory->>StateConverter: Create with datetime_format
Factory->>Cursor: Initialize with state_converter
Cursor->>Cursor: Extract cursor value using converter
Possibly related PRs
Suggested reviewers
Hey there! 👋 I noticed a few interesting changes in this PR. Would you like me to elaborate on any specific aspect of the modifications? The new state conversion approach looks quite neat! Wdyt? 🤔 ✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
78-86
: Consider adding parameter documentation.The new
connector_state_converter
parameter could benefit from docstring documentation explaining its purpose and requirements. What do you think about adding it to the class or constructor docstring?
378-381
: Consider adding method documentation.This new method could benefit from docstring documentation explaining:
- Purpose of the method
- Parameter and return type descriptions
- Any assumptions about the state format
What do you think about adding it?
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3)
68-68
: Consider adding a comment explaining the%ms
format.The addition of
"%ms"
tocursor_datetime_formats
suggests support for millisecond timestamps, but its purpose might not be immediately clear to other developers. Would you consider adding a comment to explain this format and when it's used? wdyt?- "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z", "%ms"], + "cursor_datetime_formats": [ + "%Y-%m-%dT%H:%M:%SZ", # ISO 8601 UTC + "%Y-%m-%dT%H:%M:%S%z", # ISO 8601 with timezone + "%ms", # Unix timestamp in milliseconds + ],
402-402
: Consider using a more descriptive constant name.The constant
VOTE_300_CREATED_AT_TIMESTAMP
represents a Unix timestamp in milliseconds. Would you consider making this more explicit in the name? wdyt?-VOTE_300_CREATED_AT_TIMESTAMP = 1704844800000 # Latest vote in partition 30 +VOTE_300_CREATED_AT_MS_TIMESTAMP = 1704844800000 # Latest vote in partition 30 (2024-01-10T00:00:00Z)
985-993
: Consider using a helper function for repeated test data.I notice the same vote data structure is repeated in multiple test cases. Would you consider extracting this into a helper function to improve maintainability? wdyt?
def create_vote_data(comment_id, created_at): """Create a vote test data structure.""" return { "votes": [ { "id": 300, "comment_id": comment_id, "created_at": created_at, } ] }Also applies to: 1924-1932, 1364-1372
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(4 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(2 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(11 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)
25-27
: LGTM! Clean import addition.The new import for
AbstractStreamStateConverter
is well-organized and follows the existing import structure.
152-154
: LGTM! Improved cursor value comparison.The cursor comparison now correctly uses the state converter to parse values, which should fix the comparison error mentioned in the PR title.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
1205-1211
: LGTM! Clean datetime handling implementation.The datetime format and cursor granularity handling is well-implemented, with proper parsing of the cursor granularity using
parse_duration
.
1213-1219
: Verify sequential state assumption.The code assumes that
ConcurrentPerPartitionCursor
only works with sequential state. While this is currently true, should we make this more flexible for future use cases? What are your thoughts on making this configurable?unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
Line range hint
600-641
: Verify timestamp conversion consistency.The test expects the timestamp to be converted to a string representation. Let's ensure this behavior is consistent with the actual implementation.
✅ Verification successful
Timestamp conversion is being handled consistently ✅
The test is correctly converting the Unix timestamp (VOTE_300_CREATED_AT_TIMESTAMP) to a string representation using
str()
, which aligns with how other timestamp values are handled in the test. All expected records consistently use string format for timestamps.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check if the timestamp conversion is handled consistently across the codebase # Search for timestamp handling in the source code rg -A 5 "def _extract_cursor_value_from_state|def _convert_datetime"Length of output: 757
Script:
#!/bin/bash # Search for connector state converter and timestamp handling rg -A 5 "class ConnectorStateConverter" # Search for timestamp-related patterns in the test file rg -B 2 -A 2 "VOTE_.*_CREATED_AT" unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py # Search for timestamp handling in stream implementations rg -A 5 "def parse_value|def format_value"Length of output: 24979
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)
402-402
: Add documentation for timestamp valuesWould it be helpful to add comments explaining how these timestamp values were calculated? For example:
-VOTE_300_CREATED_AT_TIMESTAMP = 1704844800000 # Latest vote in partition 30 +VOTE_300_CREATED_AT_TIMESTAMP = 1704844800000 # 2024-01-10T00:00:00Z in milliseconds -INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP = 1704153601000 -INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP = 1704240002000 +INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP = 1704153601000 # 2024-01-02T00:00:01Z in milliseconds +INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP = 1704240002000 # 2024-01-03T00:00:02Z in millisecondsThis would make it easier to maintain and update the test data. wdyt?
Also applies to: 409-411
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(6 hunks)unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
(13 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2)
987-995
: LGTM! Comprehensive test coverage for timestamp handlingThe test cases thoroughly cover the timestamp format changes across different scenarios, including:
- Normal operation with timestamp format
- Error handling
- State management
Also applies to: 1366-1374, 1926-1934
68-68
: Verify the millisecond format specificationThe "%ms" format was added to support millisecond timestamps. Should we add a test case to verify that other timestamp formats (e.g., microseconds) are properly handled as well? wdyt?
✅ Verification successful
The millisecond format is correctly implemented
The "%ms" format is intentionally added to handle millisecond timestamps (e.g., 1704844800000) returned by the API, while maintaining ISO format (e.g., "2024-01-10T00:00:00Z") in the state cursors. The test cases demonstrate successful conversion between these formats. No additional test cases are needed.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check if there are any other timestamp formats used in the codebase rg -g '*.py' -A 5 'datetime_formats.*%[mu]s'Length of output: 925
Script:
#!/bin/bash # Search for datetime parser implementation rg -g '*.py' -B 5 -A 10 'def parse.*datetime'Length of output: 11538
Script:
#!/bin/bash # Search for test cases using the %ms format and their results rg -g '*.py' -B 5 -A 5 'VOTE_300_CREATED_AT' unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.pyLength of output: 7738
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Just a couple of comments that are not blocking
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
343-351
: Consider adding error handling for parsing failures?The state parsing looks good, but what happens if
parse_value
fails? Would it make sense to add some error handling here to provide more context about the failing value? wdyt?if self.cursor_field.cursor_field_key in stream_state: global_state_value = stream_state[self.cursor_field.cursor_field_key] - final_format_global_state_value = self._connector_state_converter.output_format( - self._connector_state_converter.parse_value(global_state_value) - ) + try: + parsed_value = self._connector_state_converter.parse_value(global_state_value) + final_format_global_state_value = self._connector_state_converter.output_format(parsed_value) + except Exception as e: + logger.error(f"Failed to parse cursor value: {global_state_value}") + raise ValueError(f"Failed to parse cursor value: {global_state_value}") from e
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)
25-27
: LGTM! Clean import addition.The new import for
AbstractStreamStateConverter
is well-organized with the existing imports.
78-86
: LGTM! Clean constructor update.The addition of
connector_state_converter
parameter follows good dependency injection practices and is properly typed.
335-354
: Verify datetime parsing consistency across the codebase.The implementation uses
connector_state_converter
for parsing and formatting cursor values, which aligns withConcurrentCursor
's approach. However, let's verify this consistency across the codebase.✅ Verification successful
Implementation is consistent with concurrent streams pattern
The use of
connector_state_converter
for parsing and formatting cursor values aligns with the established pattern in concurrent stream implementations. The codebase shows clear separation of concerns where different datetime parsing approaches are used consistently within their respective domains.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check how datetime parsing is handled across the codebase # Look for other implementations using either connector_state_converter or ab_datetime_parse echo "Searching for connector_state_converter usage:" rg -A 3 "connector_state_converter.*parse_value" echo -e "\nSearching for ab_datetime_parse usage:" rg -A 3 "ab_datetime_parse" echo -e "\nSearching for other datetime parsing implementations:" ast-grep --pattern 'datetime.strptime($_, $_)'Length of output: 37954
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1221-1227
: Consider extracting common state converter creation logic?This logic appears to be duplicated with
create_concurrent_cursor_from_datetime_based_cursor
. Would it make sense to extract this into a helper method to maintain consistency and reduce duplication? Something like_create_datetime_state_converter
? wdyt?+ def _create_datetime_state_converter( + self, + datetime_format: str, + cursor_datetime_formats: List[str], + cursor_granularity: Optional[datetime.timedelta], + ) -> DateTimeStreamStateConverter: + return CustomFormatConcurrentStreamStateConverter( + datetime_format=datetime_format, + input_datetime_formats=cursor_datetime_formats, + is_sequential_state=True, + cursor_granularity=cursor_granularity, + ) def create_concurrent_cursor_from_perpartition_cursor( self, ... ) -> ConcurrentPerPartitionCursor: ... - connector_state_converter = CustomFormatConcurrentStreamStateConverter( - datetime_format=datetime_format, - input_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats, - is_sequential_state=True, - cursor_granularity=cursor_granularity, - ) + connector_state_converter = self._create_datetime_state_converter( + datetime_format=datetime_format, + cursor_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats, + cursor_granularity=cursor_granularity, + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
1213-1219
: LGTM! Clean initialization of datetime format and cursor granularity.The code clearly handles both the datetime format and cursor granularity parsing with proper null checks.
1252-1252
: LGTM! Good addition of the state converter parameter.The
connector_state_converter
parameter is correctly passed to theConcurrentPerPartitionCursor
constructor.
Summary by CodeRabbit
New Features
Improvements
Testing