Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: pair session output: AJ + Brian Lai #212

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airbyte_cdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@
- `airbyte_cdk.sources.file_based`
- `airbyte_cdk.sources.streams`

## Key Classes for Declarative Sources

Developers can implement these interfaces for custom declarative sources.

- `airbyte_cdk.sources.declarative.retrievers.Retriever`
- `airbyte_cdk.sources.declarative.SimpleRetriever` - takes a partition of a stream, and returns records.
- `airbyte_cdk.sources.declarative.Requester` - makes HTTP requests.
- `airbyte_cdk.sources.declarative.extractors.HttpSelector` - extracts data from HTTP responses.
- `airbyte_cdk.sources.streams.http.error_handlers.ErrorHandler` - handles errors from HTTP requests.

## Building Destination Connectors

To build a destination connector, you will want to refer to
Expand Down
17 changes: 7 additions & 10 deletions airbyte_cdk/sources/declarative/declarative_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

from typing_extensions import deprecated

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.incremental import (
GlobalSubstreamCursor,
Expand All @@ -28,18 +30,13 @@


@dataclass
@deprecated("Please use `DefaultStream` instead.")
class DeclarativeStream(Stream):
"""
DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever

Attributes:
name (str): stream name
primary_key (Optional[Union[str, List[str], List[List[str]]]]): the primary key of the stream
schema_loader (SchemaLoader): The schema loader
retriever (Retriever): The retriever
config (Config): The user-provided configuration as specified by the source's spec
stream_cursor_field (Optional[Union[InterpolatedString, str]]): The cursor field
stream. Transformations are applied in the order in which they are defined.
Deprecated. Please use `DefaultStream` instead.

Currently we do use this for model-to-component factory, but this will be refactored in the
future so that this class can be removed.
"""

retriever: Retriever
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Union

from isodate import Duration, duration_isoformat, parse_duration
from typing_extensions import deprecated

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, Type
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
Expand All @@ -23,9 +24,12 @@
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState


@deprecated("Please use `ConcurrentCursor` instead.")
@dataclass
class DatetimeBasedCursor(DeclarativeCursor):
"""
Deprecated. Please use `ConcurrentCursor` instead.

Slices the stream over a datetime range and create a state with format {<cursor_field>: <datetime> }

Given a start time, end time, a step function, and an optional lookback window,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import time
from typing import Any, Callable, Iterable, Mapping, Optional, TypeVar, Union

from typing_extensions import deprecated

from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
Expand Down Expand Up @@ -65,8 +67,11 @@ def finish(self) -> int:
raise RuntimeError("Global substream cursor timer not started")


@deprecated("Please use `ConcurrentCursor` instead.")
class GlobalSubstreamCursor(DeclarativeCursor):
"""
Deprecated. Please use `ConcurrentCursor` instead.

The GlobalSubstreamCursor is designed to track the state of substreams using a single global cursor.
This class is beneficial for streams with many partitions, as it allows the state to be managed globally
instead of per partition, simplifying state management and reducing the size of state messages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from collections import OrderedDict
from typing import Any, Callable, Iterable, Mapping, Optional, Union

from typing_extensions import deprecated

from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.streams.checkpoint.per_partition_key_serializer import (
Expand All @@ -24,8 +26,11 @@ def create(self) -> DeclarativeCursor:
return self._create_function()


@deprecated("Please use `ConcurrentCursor` instead.")
class PerPartitionCursor(DeclarativeCursor):
"""
Deprecated. Please use `ConcurrentCursor` instead.

Manages state per partition when a stream has many partitions, to prevent data loss or duplication.

**Partition Limitation and Limit Reached Logic**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#
from typing import Any, Iterable, Mapping, MutableMapping, Optional, Union

from typing_extensions import deprecated

from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import (
Expand All @@ -17,8 +19,11 @@
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState


@deprecated("Please use `ConcurrentCursor` instead.")
class PerPartitionWithGlobalCursor(DeclarativeCursor):
"""
Deprecated. Please use `ConcurrentCursor` instead.

Manages state for streams with multiple partitions, with an optional fallback to a global cursor when specific conditions are met.

This cursor handles partitioned streams by maintaining individual state per partition using `PerPartitionCursor`. If the number of partitions exceeds a defined limit, it switches to a global cursor (`GlobalSubstreamCursor`) to manage state more efficiently.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from typing import Any, Callable, Iterable, Mapping, Optional

from airbyte_cdk.sources.declarative.retrievers import Retriever
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ PyYAML = "^6.0.1"
rapidfuzz = "^3.10.1"
requests = "*"
requests_cache = "*"
typing_extensions = "*"
wcmatch = "10.0"
# Extras depedencies
avro = { version = "~1.11.2", optional = true }
Expand Down Expand Up @@ -87,7 +88,7 @@ freezegun = "*"
mypy = "*"
asyncio = "3.4.3"
ruff = "^0.7.2"
pdoc = "^15.0.0"
pdoc = "^15.0.1"
poethepoet = "^0.24.2"
pyproject-flake8 = "^6.1.0"
pytest = "^7"
Expand Down
Loading