-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: enable handling of nested fields when injecting request_option in request body_json #201
base: main
Are you sure you want to change the base?
Changes from all commits
f2d644f
00d2036
9c98313
a524f3e
fcaf110
fd2de58
20cc5d6
7292b57
ebb1791
5051d0d
fef4a46
9525357
6638dc8
eba438b
5dc29ca
6fc897f
97cff3f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,9 +4,10 @@ | |
|
||
from dataclasses import InitVar, dataclass | ||
from enum import Enum | ||
from typing import Any, Mapping, Union | ||
from typing import Any, List, Literal, Mapping, MutableMapping, Optional, Union | ||
|
||
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString | ||
from airbyte_cdk.sources.types import Config | ||
|
||
|
||
class RequestOptionType(Enum): | ||
|
@@ -26,13 +27,101 @@ class RequestOption: | |
Describes an option to set on a request | ||
|
||
Attributes: | ||
field_name (str): Describes the name of the parameter to inject | ||
field_name (str): Describes the name of the parameter to inject. Mutually exclusive with field_path. | ||
field_path (list(str)): Describes the path to a nested field as a list of field names. Mutually exclusive with field_name. | ||
inject_into (RequestOptionType): Describes where in the HTTP request to inject the parameter | ||
""" | ||
|
||
field_name: Union[InterpolatedString, str] | ||
inject_into: RequestOptionType | ||
parameters: InitVar[Mapping[str, Any]] | ||
field_name: Optional[Union[InterpolatedString, str]] = None | ||
field_path: Optional[List[Union[InterpolatedString, str]]] = None | ||
|
||
def __post_init__(self, parameters: Mapping[str, Any]) -> None: | ||
self.field_name = InterpolatedString.create(self.field_name, parameters=parameters) | ||
# Validate inputs. We should expect either field_name or field_path, but not both | ||
if self.field_name is None and self.field_path is None: | ||
raise ValueError("RequestOption requires either a field_name or field_path") | ||
|
||
if self.field_name is not None and self.field_path is not None: | ||
raise ValueError( | ||
"Only one of field_name or field_path can be provided to RequestOption" | ||
) | ||
|
||
if self.field_name is not None and not isinstance( | ||
self.field_name, (str, InterpolatedString) | ||
): | ||
raise TypeError(f"field_name expects a string, but got {type(self.field_name)}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is overly cautious and wouldn't validate that. In Python, any parameter can be of any type but we don't validate them everywhere. We have mypy for that which should hopefully catch most of these. I think at some point, there are readability concerns that we need to balance. Was there a specific gap we wanted to address? The some comment applies to the |
||
|
||
if self.field_path is not None: | ||
if not isinstance(self.field_path, list): | ||
raise TypeError(f"field_path expects a list, but got {type(self.field_path)}") | ||
for value in self.field_path: | ||
if not isinstance(value, (str, InterpolatedString)): | ||
raise TypeError(f"field_path values must be strings, got {type(value)}") | ||
|
||
if self.field_path is not None and self.inject_into != RequestOptionType.body_json: | ||
raise ValueError( | ||
"Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types." | ||
) | ||
|
||
# Convert field_name and field_path into InterpolatedString objects if they are strings | ||
if self.field_name is not None: | ||
self.field_name = InterpolatedString.create(self.field_name, parameters=parameters) | ||
if self.field_path is not None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: since only field_name or field_path will be defined, we can probably use |
||
self.field_path = [ | ||
InterpolatedString.create(segment, parameters=parameters) | ||
for segment in self.field_path | ||
] | ||
|
||
@property | ||
def _is_field_path(self) -> bool: | ||
"""Returns whether this option is a field path (ie, a nested field)""" | ||
return self.field_path is not None | ||
|
||
def inject_into_request( | ||
self, | ||
target: MutableMapping[str, Any], | ||
value: Any, | ||
config: Config, | ||
) -> None: | ||
""" | ||
Inject a request option value into a target request structure using either field_name or field_path. | ||
For non-body-json injection, only top-level field names are supported. | ||
For body-json injection, both field names and nested field paths are supported. | ||
|
||
Args: | ||
target: The request structure to inject the value into | ||
value: The value to inject | ||
config: The config object to use for interpolation | ||
""" | ||
if self._is_field_path: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we simplify the logic by having in post_init:
This way, we would only have one logic to maintain and it would be the |
||
if self.inject_into != RequestOptionType.body_json: | ||
raise ValueError( | ||
"Nested field injection is only supported for body JSON injection. Please use a top-level field_name for other injection types." | ||
) | ||
|
||
assert self.field_path is not None # for type checker | ||
current = target | ||
# Convert path segments into strings, evaluating any interpolated segments | ||
# Example: ["data", "{{ config[user_type] }}", "id"] -> ["data", "admin", "id"] | ||
*path_parts, final_key = [ | ||
str( | ||
segment.eval(config=config) | ||
if isinstance(segment, InterpolatedString) | ||
else segment | ||
) | ||
for segment in self.field_path | ||
] | ||
|
||
# Build a nested dictionary structure and set the final value at the deepest level | ||
for part in path_parts: | ||
current = current.setdefault(part, {}) | ||
current[final_key] = value | ||
else: | ||
# For non-nested fields, evaluate the field name if it's an interpolated string | ||
key = ( | ||
self.field_name.eval(config=config) | ||
if isinstance(self.field_name, InterpolatedString) | ||
else self.field_name | ||
) | ||
target[str(key)] = value |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,43 +3,104 @@ | |
# | ||
|
||
|
||
from typing import Any, List, Mapping, Optional, Set, Union | ||
from typing import Any, Dict, List, Mapping, Optional, Union | ||
|
||
|
||
def _has_nested_conflict(path1: List[str], value1: Any, path2: List[str], value2: Any) -> bool: | ||
""" | ||
Check if two paths conflict with each other. | ||
e.g. ["a", "b"] conflicts with ["a", "b"] if values differ | ||
e.g. ["a"] conflicts with ["a", "b"] (can't have both a value and a nested structure) | ||
""" | ||
# If one path is a prefix of the other, they conflict | ||
shorter, longer = sorted([path1, path2], key=len) | ||
if longer[: len(shorter)] == shorter: | ||
return True | ||
|
||
# If paths are the same but values differ, they conflict | ||
if path1 == path2 and value1 != value2: | ||
return True | ||
|
||
return False | ||
|
||
|
||
def _flatten_mapping( | ||
mapping: Mapping[str, Any], prefix: Optional[List[str]] = None | ||
) -> List[tuple[List[str], Any]]: | ||
""" | ||
Convert a nested mapping into a list of (path, value) pairs to make conflict detection easier. | ||
e.g. {"a": {"b": 1}} -> [(["a", "b"], 1)] | ||
""" | ||
prefix = prefix or [] | ||
result = [] | ||
|
||
for key, value in mapping.items(): | ||
current_path = prefix + [key] | ||
if isinstance(value, Mapping): | ||
result.extend(_flatten_mapping(value, current_path)) | ||
else: | ||
result.append((current_path, value)) | ||
|
||
return result | ||
|
||
|
||
def combine_mappings( | ||
mappings: List[Optional[Union[Mapping[str, Any], str]]], | ||
) -> Union[Mapping[str, Any], str]: | ||
""" | ||
Combine multiple mappings into a single mapping. If any of the mappings are a string, return | ||
that string. Raise errors in the following cases: | ||
* If there are duplicate keys across mappings | ||
Combine multiple mappings into a single mapping, supporting nested structures. | ||
If any of the mappings are a string, return that string. Raise errors in the following cases: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would precise that it is "If there only one mapping that is a string, it will return this mapping regardless of the other mappings". That being said, I don't understand this behavior though. Maybe I'm wrong but before, I think it would fail if there was a string and a mapping, right? |
||
* If there are conflicting paths across mappings (including nested conflicts) | ||
* If there are multiple string mappings | ||
* If there are multiple mappings containing keys and one of them is a string | ||
""" | ||
all_keys: List[Set[str]] = [] | ||
for part in mappings: | ||
if part is None: | ||
continue | ||
keys = set(part.keys()) if not isinstance(part, str) else set() | ||
all_keys.append(keys) | ||
|
||
string_options = sum(isinstance(mapping, str) for mapping in mappings) | ||
# If more than one mapping is a string, raise a ValueError | ||
# Count how many string options we have, ignoring None values | ||
string_options = sum(isinstance(mapping, str) for mapping in mappings if mapping is not None) | ||
if string_options > 1: | ||
raise ValueError("Cannot combine multiple string options") | ||
|
||
if string_options == 1 and sum(len(keys) for keys in all_keys) > 0: | ||
raise ValueError("Cannot combine multiple options if one is a string") | ||
# Filter out None values and empty mappings | ||
non_empty_mappings = [ | ||
m for m in mappings if m is not None and not (isinstance(m, Mapping) and not m) | ||
] | ||
|
||
# If there is only one string option, return it | ||
if string_options == 1: | ||
if len(non_empty_mappings) > 1: | ||
raise ValueError("Cannot combine multiple options if one is a string") | ||
return next(m for m in non_empty_mappings if isinstance(m, str)) | ||
|
||
# If any mapping is a string, return it | ||
# Convert all mappings to flat (path, value) pairs for conflict detection | ||
all_paths: List[List[tuple[List[str], Any]]] = [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels like the following logic is a bit complex. Would there be a way to simplify this? Right now, I see that we:
Would it be simpler to do just the last one and validate for conflicts while we do that? Something like:
If we are afraid of modifying the mappings in memory, we can create a deepcopy of it as well. |
||
for mapping in mappings: | ||
if isinstance(mapping, str): | ||
return mapping | ||
if mapping is None or not isinstance(mapping, Mapping): | ||
continue | ||
all_paths.append(_flatten_mapping(mapping)) | ||
|
||
# Check each path against all other paths for conflicts | ||
# Conflicts occur when the same path has different values or when one path is a prefix of another | ||
# e.g. {"a": 1} and {"a": {"b": 2}} conflict because "a" can't be both a value and a nested structure | ||
for i, paths1 in enumerate(all_paths): | ||
for path1, value1 in paths1: | ||
for paths2 in all_paths[i + 1 :]: | ||
for path2, value2 in paths2: | ||
if _has_nested_conflict(path1, value1, path2, value2): | ||
raise ValueError( | ||
f"Duplicate keys or nested path conflict found: {'.'.join(path1)} conflicts with {'.'.join(path2)}" | ||
) | ||
|
||
# If there are duplicate keys across mappings, raise a ValueError | ||
intersection = set().union(*all_keys) | ||
if len(intersection) < sum(len(keys) for keys in all_keys): | ||
raise ValueError(f"Duplicate keys found: {intersection}") | ||
# If no conflicts were found, merge all mappings | ||
result: Dict[str, Any] = {} | ||
for mapping in mappings: | ||
if mapping is None or not isinstance(mapping, Mapping): | ||
continue | ||
for path, value in _flatten_mapping(mapping): | ||
current = result | ||
*prefix, last = path | ||
# Create nested dictionaries for each prefix segment | ||
for key in prefix: | ||
current = current.setdefault(key, {}) | ||
current[last] = value | ||
|
||
# Return the combined mappings | ||
return {key: value for mapping in mappings if mapping for key, value in mapping.items()} # type: ignore # mapping can't be string here | ||
return result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we slowly deprecate this field in favor of
field_path
and therefore add this information in the description/title?