Skip to content

Commit

Permalink
register executors for bq.source
Browse files Browse the repository at this point in the history
  • Loading branch information
karakanb committed Jan 10, 2025
1 parent 91ca82c commit 8c84f0e
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 0 deletions.
4 changes: 4 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,10 @@ func setupExecutors(
mainExecutors[pipeline.AssetTypeBigqueryQuery][scheduler.TaskInstanceTypeCustomCheck] = customCheckRunner
mainExecutors[pipeline.AssetTypeBigqueryQuery][scheduler.TaskInstanceTypeMetadataPush] = metadataPushOperator

mainExecutors[pipeline.AssetTypeBigquerySource][scheduler.TaskInstanceTypeMetadataPush] = metadataPushOperator
mainExecutors[pipeline.AssetTypeBigquerySource][scheduler.TaskInstanceTypeColumnCheck] = bqCheckRunner
mainExecutors[pipeline.AssetTypeBigquerySource][scheduler.TaskInstanceTypeCustomCheck] = customCheckRunner

mainExecutors[pipeline.AssetTypeBigquerySeed][scheduler.TaskInstanceTypeMain] = seedOperator
mainExecutors[pipeline.AssetTypeBigquerySeed][scheduler.TaskInstanceTypeColumnCheck] = bqCheckRunner
mainExecutors[pipeline.AssetTypeBigquerySeed][scheduler.TaskInstanceTypeCustomCheck] = customCheckRunner
Expand Down
1 change: 1 addition & 0 deletions pkg/bigquery/checks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (m *mockQuerierWithResult) IsSameClustering(meta *bigquery.TableMetadata, a
args := m.Called(meta, asset)
return args.Bool(0)
}

func (m *mockQuerierWithResult) DeleteTableIfMaterializationTypeMismatch(ctx context.Context, tableName string, asset *pipeline.Asset) error {
args := m.Called(ctx, tableName, asset)
return args.Error(0)
Expand Down
2 changes: 2 additions & 0 deletions pkg/executor/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var DefaultExecutorsV2 = map[pipeline.AssetType]Config{
pipeline.AssetTypeBigquerySource: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
scheduler.TaskInstanceTypeMetadataPush: NoOpOperator{},
scheduler.TaskInstanceTypeColumnCheck: NoOpOperator{},
scheduler.TaskInstanceTypeCustomCheck: NoOpOperator{},
},
pipeline.AssetTypeBigquerySeed: {
scheduler.TaskInstanceTypeMain: NoOpOperator{},
Expand Down
1 change: 1 addition & 0 deletions pkg/snowflake/checks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (m *mockQuerierWithResult) Ping(ctx context.Context) error {
args := m.Called(ctx)
return args.Error(0)
}

func (m *mockQuerierWithResult) CreateSchemaIfNotExist(ctx context.Context, asset *pipeline.Asset) error {
args := m.Called(asset, ctx)
return args.Error(0)
Expand Down

0 comments on commit 8c84f0e

Please sign in to comment.