Skip to content

Commit

Permalink
Fix configuration migrations to work for peripheral connectors (#1830)
Browse files Browse the repository at this point in the history
  • Loading branch information
navarone-feekery authored Oct 24, 2023
1 parent 5263e67 commit 03e2728
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 116 deletions.
175 changes: 71 additions & 104 deletions connectors/protocol/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,35 +743,27 @@ async def prepare(self, config, sources):

configured_connector_id = config.get("connector_id", "")
configured_service_type = config.get("service_type", "")
is_main_connector = self.id == configured_connector_id

if self.id != configured_connector_id:
# check configuration for native and other peripheral connectors
if is_main_connector:
if not configured_service_type:
self.log_error("Service type is not configured")
raise ServiceTypeNotConfiguredError("Service type is not configured.")

if configured_service_type not in sources:
raise ServiceTypeNotSupportedError(configured_service_type)
else:
if self.service_type not in sources:
self.log_debug(
f"Peripheral connector has invalid service type {self.service_type}, cannot check configuration formatting."
)
return

await self.validate_configuration_formatting(
sources[self.service_type], self.service_type
)

return

if not configured_service_type:
self.log_error("Service type is not configured")
raise ServiceTypeNotConfiguredError("Service type is not configured.")

if configured_service_type not in sources:
raise ServiceTypeNotSupportedError(configured_service_type)

if self.service_type is not None and not self.configuration.is_empty():
await self.validate_configuration_formatting(
sources[configured_service_type], configured_service_type
)

doc = {}
fqn = sources[configured_service_type]
fqn = (
sources[configured_service_type]
if is_main_connector
else sources[self.service_type]
)
try:
source_klass = get_source_klass(fqn)
except Exception as e:
Expand All @@ -780,25 +772,12 @@ async def prepare(self, config, sources):
f"Could not instantiate {fqn} for {configured_service_type}"
) from e

if self.service_type is None:
doc = self.validated_doc(source_klass)
if is_main_connector and self.service_type is None:
doc["service_type"] = configured_service_type
self.log_debug(f"Populated service type {configured_service_type}")

simple_config = source_klass.get_simple_configuration()
current_config = self.configuration.to_dict()
missing_keys = simple_config.keys() - current_config.keys()
if self.configuration.is_empty():
# sets the defaults and the flag to NEEDS_CONFIGURATION
doc["configuration"] = simple_config
doc["status"] = Status.NEEDS_CONFIGURATION.value
self.log_debug("Populated configuration")
elif missing_keys:
doc["configuration"] = self.updated_configuration(
missing_keys, current_config, simple_config
)
# doc["status"] = Status.NEEDS_CONFIGURATION.value # not setting status, because it may be that default values are sufficient

if self.features.features != source_klass.features():
if is_main_connector and self.features.features != source_klass.features():
doc["features"] = source_klass.features()
self.log_debug("Populated features")

Expand All @@ -813,7 +792,41 @@ async def prepare(self, config, sources):
)
await self.reload()

def updated_configuration(
def validated_doc(self, source_klass):
simple_config = source_klass.get_simple_configuration()
current_config = self.configuration.to_dict()

if self.configuration.is_empty():
# sets the defaults and the flag to NEEDS_CONFIGURATION
self.log_debug("Populated configuration")
return {
"configuration": simple_config,
"status": Status.NEEDS_CONFIGURATION.value,
}

missing_fields = simple_config.keys() - current_config.keys()
fields_missing_properties = filter_nested_dict_by_keys(
DEFAULT_CONFIGURATION.keys(), current_config
)
if not missing_fields and not fields_missing_properties:
return {}

doc = {"configuration": {}}
if missing_fields:
doc["configuration"] = self.updated_configuration_fields(
missing_fields, current_config, simple_config
)
if fields_missing_properties:
updated_config = self.updated_configuration_field_properties(
fields_missing_properties, simple_config
)
doc["configuration"] = deep_merge_dicts(
doc["configuration"], updated_config
)

return doc

def updated_configuration_fields(
self, missing_keys, current_config, simple_default_config
):
self.log_warning(
Expand All @@ -834,6 +847,25 @@ def updated_configuration(
draft_config[config_name] = draft_config_obj
return draft_config

def updated_configuration_field_properties(
self, fields_missing_properties, simple_config
):
"""Checks the field properties for every field in a configuration.
If a field is missing field properties, add those field properties
with default values.
"""
self.log_info(
f"Connector {self.id} ({self.service_type}) is missing configuration field properties. Generating defaults."
)

# filter the default config by what fields we want to update, then merge the actual config into it
filtered_simple_config = {
key: value
for key, value in simple_config.items()
if key in fields_missing_properties.keys()
}
return deep_merge_dicts(filtered_simple_config, fields_missing_properties)

@with_concurrency_control()
async def validate_filtering(self, validator):
await self.reload()
Expand Down Expand Up @@ -877,71 +909,6 @@ async def document_count(self):
)
return result["count"]

async def validate_configuration_formatting(self, fqn, service_type):
"""Wrapper function for validating configuration field properties.
Args:
fqn (string): the source fqn for a service, from config file
service_type (string): service type of the connector
"""
try:
source_klass = get_source_klass(fqn)
except Exception as e:
self.log_critical(e, exc_info=True)
raise DataSourceError(
f"Could not instantiate {fqn} for {service_type}"
) from e

default_config = source_klass.get_simple_configuration()
current_config = self.configuration.to_dict()

await self.add_missing_configuration_field_properties(
service_type, default_config, current_config
)

async def add_missing_configuration_field_properties(
self, service_type, default_config, current_config
):
"""Checks the field properties for every field in a configuration.
If a field is missing field properties, add those field properties
with default values.
If no field properties are missing, nothing is updated.
Args:
service_type (string): service type of the connector
default_config (dict): the default configuration for the connector
current_config (dict): the currently existing configuration for the connector
"""
configs_missing_properties = filter_nested_dict_by_keys(
DEFAULT_CONFIGURATION.keys(), current_config
)
if not configs_missing_properties:
return

self.log_info(
f"Connector for {service_type} is missing configuration field properties. Generating defaults."
)

# filter the default config by what fields we want to update, then merge the actual config into it
filtered_default_config = {
key: value
for key, value in default_config.items()
if key in configs_missing_properties.keys()
}
doc = {
"configuration": deep_merge_dicts(
filtered_default_config, configs_missing_properties
)
}

await self.index.update(
doc_id=self.id,
doc=doc,
if_seq_no=self._seq_no,
if_primary_term=self._primary_term,
)
await self.reload()

def _prefix(self):
return f"[Connector id: {self.id}, index name: {self.index_name}]"

Expand Down
Loading

0 comments on commit 03e2728

Please sign in to comment.