Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concurrent-partition-cursor): Fix cursor comparison error #298

Merged
merged 4 commits into from
Jan 30, 2025

Conversation

tolik0
Copy link
Contributor

@tolik0 tolik0 commented Jan 30, 2025

Summary by CodeRabbit

  • New Features

    • Enhanced cursor state management for datetime-based incremental synchronization.
    • Added support for millisecond-level datetime parsing.
  • Improvements

    • Improved modularity of cursor state extraction logic.
    • More precise handling of datetime formats in stream cursors.
  • Testing

    • Updated test cases to reflect new datetime timestamp handling.
    • Introduced new timestamp constant for consistent testing.
    • Adjusted expected outputs for various test cases to align with new timestamp format.

@github-actions github-actions bot added the bug Something isn't working label Jan 30, 2025
Copy link
Contributor

coderabbitai bot commented Jan 30, 2025

📝 Walkthrough

Walkthrough

The pull request introduces modifications to the ConcurrentPerPartitionCursor class in the Airbyte CDK, focusing on enhancing state management and cursor handling. Key changes include the addition of a connector_state_converter parameter in the constructor, the implementation of a new method for extracting cursor values, and updates to the cursor creation process to support flexible datetime format parsing. These modifications aim to improve the modularity and flexibility of incremental synchronization for partitioned streams.

Changes

File Change Summary
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py - Added connector_state_converter parameter to constructor
- Introduced _extract_cursor_value_from_state method
- Enhanced state extraction logic with _set_global_state method
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py - Initialized datetime_format and cursor_granularity
- Introduced CustomFormatConcurrentStreamStateConverter
- Updated cursor creation process to use converter
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py - Added VOTE_300_CREATED_AT_TIMESTAMP constant
- Updated test cases with new timestamp format
- Added "%ms" to datetime formats in cursor_incremental_sync

Sequence Diagram

sequenceDiagram
    participant Factory as ModelToComponentFactory
    participant Cursor as ConcurrentPerPartitionCursor
    participant StateConverter as CustomFormatConcurrentStreamStateConverter

    Factory->>StateConverter: Create with datetime_format
    Factory->>Cursor: Initialize with state_converter
    Cursor->>Cursor: Extract cursor value using converter
Loading

Possibly related PRs

Suggested reviewers

  • maxi297
  • aaronsteers

Hey there! 👋 I noticed a few interesting changes in this PR. Would you like me to elaborate on any specific aspect of the modifications? The new state conversion approach looks quite neat! Wdyt? 🤔

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

78-86: Consider adding parameter documentation.

The new connector_state_converter parameter could benefit from docstring documentation explaining its purpose and requirements. What do you think about adding it to the class or constructor docstring?


378-381: Consider adding method documentation.

This new method could benefit from docstring documentation explaining:

  • Purpose of the method
  • Parameter and return type descriptions
  • Any assumptions about the state format

What do you think about adding it?

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (3)

68-68: Consider adding a comment explaining the %ms format.

The addition of "%ms" to cursor_datetime_formats suggests support for millisecond timestamps, but its purpose might not be immediately clear to other developers. Would you consider adding a comment to explain this format and when it's used? wdyt?

-            "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z", "%ms"],
+            "cursor_datetime_formats": [
+                "%Y-%m-%dT%H:%M:%SZ",  # ISO 8601 UTC
+                "%Y-%m-%dT%H:%M:%S%z",  # ISO 8601 with timezone
+                "%ms",                   # Unix timestamp in milliseconds
+            ],

402-402: Consider using a more descriptive constant name.

The constant VOTE_300_CREATED_AT_TIMESTAMP represents a Unix timestamp in milliseconds. Would you consider making this more explicit in the name? wdyt?

-VOTE_300_CREATED_AT_TIMESTAMP = 1704844800000  # Latest vote in partition 30
+VOTE_300_CREATED_AT_MS_TIMESTAMP = 1704844800000  # Latest vote in partition 30 (2024-01-10T00:00:00Z)

985-993: Consider using a helper function for repeated test data.

I notice the same vote data structure is repeated in multiple test cases. Would you consider extracting this into a helper function to improve maintainability? wdyt?

def create_vote_data(comment_id, created_at):
    """Create a vote test data structure."""
    return {
        "votes": [
            {
                "id": 300,
                "comment_id": comment_id,
                "created_at": created_at,
            }
        ]
    }

Also applies to: 1924-1932, 1364-1372

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3af96dc and cd74ed2.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (4 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (11 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (5)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (2)

25-27: LGTM! Clean import addition.

The new import for AbstractStreamStateConverter is well-organized and follows the existing import structure.


152-154: LGTM! Improved cursor value comparison.

The cursor comparison now correctly uses the state converter to parse values, which should fix the comparison error mentioned in the PR title.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

1205-1211: LGTM! Clean datetime handling implementation.

The datetime format and cursor granularity handling is well-implemented, with proper parsing of the cursor granularity using parse_duration.


1213-1219: Verify sequential state assumption.

The code assumes that ConcurrentPerPartitionCursor only works with sequential state. While this is currently true, should we make this more flexible for future use cases? What are your thoughts on making this configurable?

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)

Line range hint 600-641: Verify timestamp conversion consistency.

The test expects the timestamp to be converted to a string representation. Let's ensure this behavior is consistent with the actual implementation.

✅ Verification successful

Timestamp conversion is being handled consistently

The test is correctly converting the Unix timestamp (VOTE_300_CREATED_AT_TIMESTAMP) to a string representation using str(), which aligns with how other timestamp values are handled in the test. All expected records consistently use string format for timestamps.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if the timestamp conversion is handled consistently across the codebase

# Search for timestamp handling in the source code
rg -A 5 "def _extract_cursor_value_from_state|def _convert_datetime"

Length of output: 757


Script:

#!/bin/bash
# Search for connector state converter and timestamp handling
rg -A 5 "class ConnectorStateConverter" 

# Search for timestamp-related patterns in the test file
rg -B 2 -A 2 "VOTE_.*_CREATED_AT" unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

# Search for timestamp handling in stream implementations
rg -A 5 "def parse_value|def format_value" 

Length of output: 24979

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (1)

402-402: Add documentation for timestamp values

Would it be helpful to add comments explaining how these timestamp values were calculated? For example:

-VOTE_300_CREATED_AT_TIMESTAMP = 1704844800000  # Latest vote in partition 30
+VOTE_300_CREATED_AT_TIMESTAMP = 1704844800000  # 2024-01-10T00:00:00Z in milliseconds
-INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP = 1704153601000
-INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP = 1704240002000
+INITIAL_STATE_PARTITION_10_CURSOR_TIMESTAMP = 1704153601000  # 2024-01-02T00:00:01Z in milliseconds
+INITIAL_STATE_PARTITION_11_CURSOR_TIMESTAMP = 1704240002000  # 2024-01-03T00:00:02Z in milliseconds

This would make it easier to maintain and update the test data. wdyt?

Also applies to: 409-411

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cd74ed2 and 25b6980.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (6 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (13 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (2)
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (2)

987-995: LGTM! Comprehensive test coverage for timestamp handling

The test cases thoroughly cover the timestamp format changes across different scenarios, including:

  • Normal operation with timestamp format
  • Error handling
  • State management

Also applies to: 1366-1374, 1926-1934


68-68: Verify the millisecond format specification

The "%ms" format was added to support millisecond timestamps. Should we add a test case to verify that other timestamp formats (e.g., microseconds) are properly handled as well? wdyt?

✅ Verification successful

The millisecond format is correctly implemented

The "%ms" format is intentionally added to handle millisecond timestamps (e.g., 1704844800000) returned by the API, while maintaining ISO format (e.g., "2024-01-10T00:00:00Z") in the state cursors. The test cases demonstrate successful conversion between these formats. No additional test cases are needed.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if there are any other timestamp formats used in the codebase
rg -g '*.py' -A 5 'datetime_formats.*%[mu]s'

Length of output: 925


Script:

#!/bin/bash
# Search for datetime parser implementation
rg -g '*.py' -B 5 -A 10 'def parse.*datetime'

Length of output: 11538


Script:

#!/bin/bash
# Search for test cases using the %ms format and their results
rg -g '*.py' -B 5 -A 5 'VOTE_300_CREATED_AT' unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Length of output: 7738

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Just a couple of comments that are not blocking

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)

343-351: Consider adding error handling for parsing failures?

The state parsing looks good, but what happens if parse_value fails? Would it make sense to add some error handling here to provide more context about the failing value? wdyt?

 if self.cursor_field.cursor_field_key in stream_state:
     global_state_value = stream_state[self.cursor_field.cursor_field_key]
-    final_format_global_state_value = self._connector_state_converter.output_format(
-        self._connector_state_converter.parse_value(global_state_value)
-    )
+    try:
+        parsed_value = self._connector_state_converter.parse_value(global_state_value)
+        final_format_global_state_value = self._connector_state_converter.output_format(parsed_value)
+    except Exception as e:
+        logger.error(f"Failed to parse cursor value: {global_state_value}")
+        raise ValueError(f"Failed to parse cursor value: {global_state_value}") from e
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 25b6980 and be80c9f.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.12, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (3)

25-27: LGTM! Clean import addition.

The new import for AbstractStreamStateConverter is well-organized with the existing imports.


78-86: LGTM! Clean constructor update.

The addition of connector_state_converter parameter follows good dependency injection practices and is properly typed.


335-354: Verify datetime parsing consistency across the codebase.

The implementation uses connector_state_converter for parsing and formatting cursor values, which aligns with ConcurrentCursor's approach. However, let's verify this consistency across the codebase.

✅ Verification successful

Implementation is consistent with concurrent streams pattern

The use of connector_state_converter for parsing and formatting cursor values aligns with the established pattern in concurrent stream implementations. The codebase shows clear separation of concerns where different datetime parsing approaches are used consistently within their respective domains.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check how datetime parsing is handled across the codebase
# Look for other implementations using either connector_state_converter or ab_datetime_parse

echo "Searching for connector_state_converter usage:"
rg -A 3 "connector_state_converter.*parse_value"

echo -e "\nSearching for ab_datetime_parse usage:"
rg -A 3 "ab_datetime_parse"

echo -e "\nSearching for other datetime parsing implementations:"
ast-grep --pattern 'datetime.strptime($_, $_)'

Length of output: 37954

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

1221-1227: Consider extracting common state converter creation logic?

This logic appears to be duplicated with create_concurrent_cursor_from_datetime_based_cursor. Would it make sense to extract this into a helper method to maintain consistency and reduce duplication? Something like _create_datetime_state_converter? wdyt?

+ def _create_datetime_state_converter(
+     self,
+     datetime_format: str,
+     cursor_datetime_formats: List[str],
+     cursor_granularity: Optional[datetime.timedelta],
+ ) -> DateTimeStreamStateConverter:
+     return CustomFormatConcurrentStreamStateConverter(
+         datetime_format=datetime_format,
+         input_datetime_formats=cursor_datetime_formats,
+         is_sequential_state=True,
+         cursor_granularity=cursor_granularity,
+     )

  def create_concurrent_cursor_from_perpartition_cursor(
      self,
      ...
  ) -> ConcurrentPerPartitionCursor:
      ...
-     connector_state_converter = CustomFormatConcurrentStreamStateConverter(
-         datetime_format=datetime_format,
-         input_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats,
-         is_sequential_state=True,
-         cursor_granularity=cursor_granularity,
-     )
+     connector_state_converter = self._create_datetime_state_converter(
+         datetime_format=datetime_format,
+         cursor_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats,
+         cursor_granularity=cursor_granularity,
+     )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between be80c9f and 2d22a54.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)

1213-1219: LGTM! Clean initialization of datetime format and cursor granularity.

The code clearly handles both the datetime format and cursor granularity parsing with proper null checks.


1252-1252: LGTM! Good addition of the state converter parameter.

The connector_state_converter parameter is correctly passed to the ConcurrentPerPartitionCursor constructor.

@tolik0 tolik0 merged commit 33e9f5e into main Jan 30, 2025
22 of 24 checks passed
@tolik0 tolik0 deleted the tolik0/fix-cursor-comparison-error branch January 30, 2025 17:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants