Skip to content

Commit

Permalink
fire GenericExceptionOnRun for batch-level exception (#11003)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleArk authored Nov 14, 2024
1 parent 1625eb0 commit 35c0920
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241114-112535.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Emit batch-level exception with node_info on microbatch batch run failure
time: 2024-11-14T11:25:35.050914-05:00
custom:
Author: michelleark
Issue: "10840"
10 changes: 8 additions & 2 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import HookNode, ModelNode, ResultNode
from dbt.events.types import (
GenericExceptionOnRun,
LogHookEndLine,
LogHookStartLine,
LogModelResult,
LogStartLine,
RunningOperationCaughtError,
)
from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError
from dbt.graph import ResourceTypeSelector
Expand Down Expand Up @@ -275,7 +275,13 @@ def print_batch_result_line(
level=level,
)
if exception:
fire_event(RunningOperationCaughtError(exc=str(exception)))
fire_event(
GenericExceptionOnRun(
unique_id=self.node.unique_id,
exc=f"Exception on worker thread. {str(exception)}",
node_info=self.node.node_info,
)
)

def print_batch_start_line(
self, batch_start: Optional[datetime], batch_idx: int, batch_total: int
Expand Down
20 changes: 16 additions & 4 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

from dbt.events.types import (
GenericExceptionOnRun,
LogModelResult,
MicrobatchMacroOutsideOfBatchesDeprecation,
MicrobatchModelNoEventTimeInputs,
Expand Down Expand Up @@ -516,7 +517,7 @@ def test_run_with_event_time_logs(self, project):
"""


class TestMicrobatchIncrementalPartitionFailure(BaseMicrobatchTest):
class TestMicrobatchIncrementalBatchFailure(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
Expand All @@ -526,9 +527,15 @@ def models(self):
}

def test_run_with_event_time(self, project):
event_catcher = EventCatcher(
GenericExceptionOnRun, predicate=lambda event: event.data.node_info is not None
)

# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"], expect_pass=False)
run_dbt(["run"], callbacks=[event_catcher.catch], expect_pass=False)

assert len(event_catcher.caught_events) == 1
self.assert_row_count(project, "microbatch_model", 2)

run_results = get_artifact(project.project_root, "target", "run_results.json")
Expand Down Expand Up @@ -626,7 +633,7 @@ def test_run_with_event_time(self, project):
"""


class TestMicrobatchInitialPartitionFailure(BaseMicrobatchTest):
class TestMicrobatchInitialBatchFailure(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
Expand All @@ -635,9 +642,14 @@ def models(self):
}

def test_run_with_event_time(self, project):
event_catcher = EventCatcher(
GenericExceptionOnRun, predicate=lambda event: event.data.node_info is not None
)

# run all partitions from start - 2 expected rows in output, one failed
with patch_microbatch_end_time("2020-01-03 13:57:00"):
run_dbt(["run"])
run_dbt(["run"], callbacks=[event_catcher.catch])
assert len(event_catcher.caught_events) == 1
self.assert_row_count(project, "microbatch_model", 2)


Expand Down

0 comments on commit 35c0920

Please sign in to comment.