diff --git a/contentctl/input/director.py b/contentctl/input/director.py index 1b637101..993296e2 100644 --- a/contentctl/input/director.py +++ b/contentctl/input/director.py @@ -255,6 +255,23 @@ def createSecurityContent(self, contentType: SecurityContentType) -> None: end="", flush=True, ) + if contentType == SecurityContentType.data_sources: + # After resolving all data_sources, we need to complete mappings + # that can ONLY be done after all data_sources have been parsed. + # This is because datasources may reference each other + # and we cannot resolve those references until all data sources have been parsed. + for ds in self.output_dto.data_sources: + try: + ds.resolveDataSourceObject(self.output_dto) + except (ValidationError, ValueError) as e: + if ds.file_path is None: + validation_errors.append((relative_path, ValueError(f"File path for DataSource {ds.name} was None."))) + validation_errors.append((Path("PATH_NOT_FOUND"), e)) + else: + relative_path = ds.file_path.absolute().relative_to( + self.input_dto.path.absolute() + ) + validation_errors.append((relative_path, e)) print("Done!") if len(validation_errors) > 0: diff --git a/contentctl/objects/data_source.py b/contentctl/objects/data_source.py index ada9ad33..86dab457 100644 --- a/contentctl/objects/data_source.py +++ b/contentctl/objects/data_source.py @@ -1,29 +1,71 @@ from __future__ import annotations +from typing import Any, TYPE_CHECKING +if TYPE_CHECKING: + from contentctl.input.director import DirectorOutputDto -from typing import Any, Optional +from enum import StrEnum, auto -from pydantic import BaseModel, Field, HttpUrl, model_serializer + +from pydantic import BaseModel, Field, HttpUrl, model_serializer, ConfigDict, computed_field +from functools import cached_property from contentctl.objects.security_content_object import SecurityContentObject class TA(BaseModel): + model_config = ConfigDict(extra="forbid") name: str - url: HttpUrl | None = None + url: HttpUrl version: str +class DataSourceDataModel(StrEnum): + ocsf = auto() + custom_cim = auto() + cim = auto() + + +class Field_Mapping(BaseModel): + model_config = ConfigDict(extra="forbid") + data_model: DataSourceDataModel + data_set: str | None = None + mapping: dict[str, str] + + +class LogConvert(BaseModel): + model_config = ConfigDict(extra="forbid") + # This should really be a DataSource object, + # but the order in which they are defined makes + # this challenging. + # We will need to keep both these fields around for now + data_source: str + _data_source_object: DataSource | None = None + mapping: dict[str, str] + + @computed_field + @cached_property + def data_source_object(self)->DataSource: + if self._data_source_object is None: + raise ValueError(f"Error - LogConvert.data_source object {self.data_source} " + "has not been resolved. Please ensure that 'configure_data_source_object' has been called") + return self._data_source_object + + def resolveDataSourceObject(self, director: DirectorOutputDto | None )->None: + self._data_source_object = DataSource.mapNamesToSecurityContentObjects([self.data_source], director)[0] + + class DataSource(SecurityContentObject): + model_config = ConfigDict(extra="forbid") source: str = Field(...) sourcetype: str = Field(...) - separator: Optional[str] = None - configuration: Optional[str] = None - supported_TA: list[TA] = [] - fields: None | list = None - field_mappings: None | list = None - convert_to_log_source: None | list = None + separator: None | str = None + configuration: None | str = None + supported_TA: list[TA] + fields: list[str] = [] + field_mappings: list[Field_Mapping] = [] + convert_to_log_source: list[LogConvert] = [] example_log: None | str = None - output_fields: list[str] = None + output_fields: list[str] = [] @model_serializer def serialize_model(self): @@ -48,3 +90,10 @@ def serialize_model(self): # return the model return super_fields + + def resolveDataSourceObject(self, director: DirectorOutputDto | None )->None: + for index,log in enumerate(self.convert_to_log_source): + try: + log.resolveDataSourceObject(director) + except Exception as e: + raise ValueError(f"Error encountered when resolving field 'convert_to_log_source[{index}].data_source: {log.data_source}'. No DataSource by the name '{log.data_source}' exists") \ No newline at end of file