Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDF-24184] 🚍 Strongly coupled views. #1490

Merged
merged 8 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
)
from cognite_toolkit._cdf_tk.utils.cdf import iterate_instances
from cognite_toolkit._cdf_tk.utils.diff_list import diff_list_identifiable, dm_identifier
from cognite_toolkit._cdf_tk.utils.tarjan import tarjan

from .auth_loaders import GroupAllScopedLoader

Expand Down Expand Up @@ -601,38 +602,77 @@ def create(self, items: Sequence[ViewApply]) -> ViewList:
if self._is_auto_retryable(e1):
# Fallback to creating one by one if the error is auto-retryable.
return self._fallback_create_one_by_one(items, e1)
elif self._is_deployment_order(e1, set(self.get_ids(items))):
return self._fallback_create_one_by_one(self._topological_sort(items), e1, warn=False)
Comment on lines -604 to -605
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is handled by the new method, so we no longer need it.

elif self._is_false_not_exists(e1, {item.as_id() for item in items}):
return self._try_to_recover_coupled(items, e1)
raise

@staticmethod
def _is_deployment_order(e: CogniteAPIError, ids: set[ViewId]) -> bool:
if match := re.match(
r"One or more views do not exist: '([a-zA-Z0-9_-]+):([a-zA-Z0-9_]+)/([.a-zA-Z0-9_-]+)'", e.message
):
return ViewId(*match.groups()) in ids
return False

def _topological_sort(self, items: Sequence[ViewApply]) -> Sequence[ViewApply]:
def _is_false_not_exists(e: CogniteAPIError, request_views: set[ViewId]) -> bool:
if "not exist" not in e.message and 400 <= e.code < 500:
return False
results = re.findall(r"'([a-zA-Z0-9_-]+):([a-zA-Z0-9_]+)/([.a-zA-Z0-9_-]+)'", e.message)
if not results:
# No view references in the message
return False
error_message_views = {ViewId(*result) for result in results}
return error_message_views.issubset(request_views)

def _try_to_recover_coupled(self, items: Sequence[ViewApply], original_error: CogniteAPIError) -> ViewList:
"""The /models/views endpoint can give faulty 400 about missing views that are part of the request.

This method tries to recover from such errors by identifying the strongly connected components in the graph
defined by the implements and through properties of the views. We then create the components in topological
order.

Args:
items: The items that failed to create.
original_error: The original error that was raised. If the problem is not recoverable, this error is raised.

Returns:
The views that were created.

"""
views_by_id = {self.get_id(item): item for item in items}
dependencies_by_id: dict[ViewId, set[ViewId]] = {}
for item in items:
view_id = self.get_id(item)
dependencies_by_id[view_id] = set()
for prop in (item.properties or {}).values():
if isinstance(prop, ReverseDirectRelationApply):
if (
isinstance(prop.through.source, ViewId)
and prop.through.source in views_by_id
and prop.through.source != view_id
):
dependencies_by_id[view_id].add(prop.through.source)
parents_by_child: dict[ViewId, set[ViewId]] = {
view_id: {parent for parent in view.implements or [] if parent in views_by_id}
for view_id, view in views_by_id.items()
}
# Check for cycles in the implements graph
try:
return [views_by_id[view_id] for view_id in TopologicalSorter(dependencies_by_id).static_order()]
TopologicalSorter(parents_by_child).static_order()
except CycleError as e:
raise ToolkitCycleError(
f"Cycle detected in views: {e.args[0]}. Please fix the cycle before deploying."
) from e
raise ToolkitCycleError(f"Failed to deploy views. This likely due to a cycle in implements. {e.args[1]}")

dependencies_by_id: dict[ViewId, set[ViewId]] = defaultdict(set)
for view_id, view in views_by_id.items():
dependencies_by_id[view_id].update([parent for parent in view.implements or [] if parent in views_by_id])
for properties in (view.properties or {}).values():
if isinstance(properties, ReverseDirectRelationApply):
if isinstance(properties.through.source, ViewId) and properties.through.source in views_by_id:
dependencies_by_id[view_id].add(properties.through.source)

LowSeverityWarning(
f"Failed to create {len(items)} views: {escape(original_error.message)}.\nAttempting to recover..."
).print_warning(include_timestamp=True, console=self.console)
created = ViewList([])
for strongly_connected in tarjan(dependencies_by_id):
to_create = [views_by_id[view_id] for view_id in strongly_connected]
try:
created_set = self.client.data_modeling.views.apply(to_create)
except CogniteAPIError as error:
self.client.data_modeling.views.delete(created.as_ids())
HighSeverityWarning(
f"Recovering attempt failed. Could not create views {self.get_ids(to_create)}: "
f"{escape(error.message)}.\n Raising original error."
).print_warning(console=self.console)
raise original_error
created.extend(created_set)
message = f"Recovery attempt succeeded. Created {len(created)} views."
if self.console:
self.console.print(message)
else:
print(message)
return created

@staticmethod
def _is_auto_retryable(e: CogniteAPIError) -> bool:
Expand Down
46 changes: 46 additions & 0 deletions cognite_toolkit/_cdf_tk/utils/tarjan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import TypeVar

T = TypeVar("T")


def tarjan(dependencies_by_id: dict[T, set[T]]) -> list[set[T]]:
"""Returns the strongly connected components of the dependency graph
in topological order.

Args:
dependencies_by_id: A dictionary where the keys are ids and the values are sets of ids that the key depends on.

Returns:
A list of sets of ids that are strongly connected components in the dependency graph.
"""

stack = []
stack_set = set()
index: dict[T, int] = {}
lowlink = {}
result = []

def visit(v: T) -> None:
index[v] = len(index)
lowlink[v] = index[v]
stack.append(v)
stack_set.add(v)
for w in dependencies_by_id.get(v, []):
if w not in index:
visit(w)
lowlink[v] = min(lowlink[w], lowlink[v])
elif w in stack_set:
lowlink[v] = min(lowlink[v], index[w])
if lowlink[v] == index[v]:
scc = set()
dependency: T | None = None
while v != dependency:
dependency = stack.pop()
scc.add(dependency)
stack_set.remove(dependency)
result.append(scc)

for view_id in dependencies_by_id.keys():
if view_id not in index:
visit(view_id)
return result
2 changes: 2 additions & 0 deletions tests/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
COMPLETE_ORG = DATA_FOLDER / "complete_org"
COMPLETE_ORG_ALPHA_FLAGS = DATA_FOLDER / "complete_org_alpha_flags"
CDF_TOML_DATA = DATA_FOLDER / "cdf_toml_data"
STRONGLY_COUPLED_MODEL = DATA_FOLDER / "strongly_coupled_model"

__all__ = [
"AUTH_DATA",
Expand All @@ -32,5 +33,6 @@
"PROJECT_WITH_DUPLICATES",
"RESOURCES_WITH_ENVIRONMENT_VARIABLES",
"RUN_DATA",
"STRONGLY_COUPLED_MODEL",
"TRANSFORMATION_CLI",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
space: strongly-coupled-model
externalId: Production
name: Production
description: Missing
properties:
id:
type:
list: false
collation: ucs_basic
type: text
immutable: false
nullable: true
autoIncrement: false
name: id
description: Missing
type:
type:
list: false
collation: ucs_basic
type: text
immutable: false
nullable: true
autoIncrement: false
name: type
constraints:
cogniteAssetPresent:
require:
space: cdf_cdm
externalId: CogniteAsset
type: container
constraintType: requires
indexes:
id:
properties:
- id
cursorable: true
indexType: btree
usedFor: node
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
space: strongly-coupled-model
externalId: SimNode
name: SimNode
usedFor: node
description: Missing
properties:
coordinate:
type:
type: direct
list: false
immutable: false
nullable: true
autoIncrement: false
name: coordinate
constraints:
cogniteDescribablePresent:
require:
space: strongly-coupled-model
externalId: SimNodeAndEdge
type: container
constraintType: requires
indexes: {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
space: strongly-coupled-model
externalId: SimSubNode
name: SimSubNode
usedFor: node
description: Missing
properties:
parent:
type:
list: false
type: direct
immutable: false
nullable: true
autoIncrement: false
name: parent
constraints:
cogniteDescribablePresent:
require:
space: strongly-coupled-model
externalId: SimNode
type: container
constraintType: requires
indexes: {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
space: strongly-coupled-model
externalId: SimEdge
name: SimEdge
usedFor: node
description: Missing
properties:
source:
type:
type: direct
list: false
immutable: false
nullable: true
autoIncrement: false
name: source
destination:
type:
type: direct
list: false
immutable: false
nullable: true
autoIncrement: false
name: destination
coordinates:
type:
type: direct
list: false
immutable: false
nullable: true
autoIncrement: false
name: coordinates
constraints:
cogniteDescribablePresent:
require:
space: strongly-coupled-model
externalId: SimNodeAndEdge
type: container
constraintType: requires
indexes: {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
space: strongly-coupled-model
externalId: Property
name: Property
usedFor: node
description: Missing
properties:
type:
type:
type: enum
values:
Input: {}
Output: {}
immutable: true
nullable: false
autoIncrement: false
name: type
description: Missing
nodeOrEdge:
type:
list: false
type: direct
immutable: false
nullable: true
autoIncrement: false
name: nodeOrEdge
workflow:
type:
type: text
list: false
collation: ucs_basic
immutable: false
nullable: true
autoIncrement: false
name: workflow
unit:
type:
type: text
list: false
collation: ucs_basic
immutable: false
nullable: true
autoIncrement: false
name: unit
valueClassicRef:
type:
type: timeseries
list: false
immutable: false
nullable: true
autoIncrement: false
name: value
valueRef:
type:
list: false
type: direct
immutable: false
nullable: true
autoIncrement: false
name: valueRef
constraints:
cogniteDescribablePresent:
require:
space: cdf_cdm
externalId: CogniteDescribable
type: container
constraintType: requires
indexes: {}
Loading