Skip to content

Commit

Permalink
fire GenericExceptionOnRun for batch-level exception
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk committed Nov 13, 2024
1 parent 6e1f64f commit 771792c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
10 changes: 8 additions & 2 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import HookNode, ModelNode, ResultNode
from dbt.events.types import (
GenericExceptionOnRun,
LogHookEndLine,
LogHookStartLine,
LogModelResult,
LogStartLine,
RunningOperationCaughtError,
)
from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError
from dbt.graph import ResourceTypeSelector
Expand Down Expand Up @@ -275,7 +275,13 @@ def print_batch_result_line(
level=level,
)
if exception:
fire_event(RunningOperationCaughtError(exc=str(exception)))
fire_event(

Check warning on line 278 in core/dbt/task/run.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/run.py#L278

Added line #L278 was not covered by tests
GenericExceptionOnRun(
unique_id=self.node.unique_id,
exc=f"Exception on worker thread. {str(exception)}",
node_info=self.node.node_info,
)
)

def print_batch_start_line(
self, batch_start: Optional[datetime], batch_idx: int, batch_total: int
Expand Down
11 changes: 9 additions & 2 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

from dbt.events.types import (
GenericExceptionOnRun,
LogModelResult,
MicrobatchMacroOutsideOfBatchesDeprecation,
MicrobatchModelNoEventTimeInputs,
Expand Down Expand Up @@ -526,9 +527,15 @@ def models(self):
}

def test_run_with_event_time(self, project):
# run all partitions from start - 2 expected rows in output, one failed
event_catcher = EventCatcher(
GenericExceptionOnRun, predicate=lambda event: event.data.node_info is not None
)

with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"], expect_pass=False)
run_dbt(["run"], callbacks=[event_catcher.catch], expect_pass=False)

assert len(event_catcher.caught_events) == 1
# run all partitions from start - 2 expected rows in output, one failed
self.assert_row_count(project, "microbatch_model", 2)

run_results = get_artifact(project.project_root, "target", "run_results.json")
Expand Down

0 comments on commit 771792c

Please sign in to comment.