Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OP Atlas Project asset failing to materialize #2987

Open
ccerv1 opened this issue Feb 6, 2025 · 6 comments
Open

OP Atlas Project asset failing to materialize #2987

ccerv1 opened this issue Feb 6, 2025 · 6 comments
Assignees
Labels
c:data Gathering data (e.g. indexing)

Comments

@ccerv1
Copy link
Member

ccerv1 commented Feb 6, 2025

Which area(s) are affected? (leave empty if unsure)

Indexer

To Reproduce

Try to materialize the Project asset: https://admin-dagster.opensource.observer/assets/op_atlas/Project

Describe the Bug

Materialization fails. See log here

dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "op_atlas_Project":

  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/execute_plan.py", line 245, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/execute_step.py", line 499, in core_dagster_event_sequence_for_step
    for user_event in _step_output_error_checked_user_event_sequence(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/execute_step.py", line 183, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
                      ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/execute_step.py", line 87, in _process_asset_results_to_events
    for user_event in user_event_sequence:
                      ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/compute.py", line 197, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn, compute_context):
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/compute.py", line 166, in _yield_compute_results
    for event in iterate_with_context(
                 ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dagster/_utils/__init__.py", line 480, in iterate_with_context
    with context_fn():
         ^^^^^^^^^^^^
  File "/usr/lib/python3.12/contextlib.py", line 158, in __exit__
    self.gen.throw(value)
  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage load when processing package 1738851723.633488 with exception:

<class 'dlt.destinations.exceptions.DatabaseTransientException'>
400 POST https://bigquery.googleapis.com/bigquery/v2/projects/opensource-observer/queries?prettyPrint=false: Cannot add required fields to an existing schema. (field: is_submitted_to_oso)

  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/usr/local/lib/python3.12/dist-packages/dagster/_utils/__init__.py", line 482, in iterate_with_context
    next_output = next(iterator)
                  ^^^^^^^^^^^^^^
  File "/usr/src/app/warehouse/oso_dagster/factories/sql.py", line 87, in _asset
    yield from dlt.run(context=context, loader_file_format="jsonl", **kwargs)
  File "/usr/local/lib/python3.12/dist-packages/dagster_embedded_elt/dlt/dlt_event_iterator.py", line 77, in __next__
    return next(self._inner_iterator)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dagster_embedded_elt/dlt/resource.py", line 286, in _run
    load_info = dlt_pipeline.run(dlt_source, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 225, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 274, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 746, in run
    return self.load(destination, dataset_name, credentials=credentials)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 225, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 165, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 274, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 614, in load
    raise PipelineStepFailed(

The above exception was caused by the following exception:
dlt.destinations.exceptions.DatabaseTransientException: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/opensource-observer/queries?prettyPrint=false: Cannot add required fields to an existing schema. (field: is_submitted_to_oso)

  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 607, in load
    runner.run_pool(load_step.config, load_step)
  File "/usr/local/lib/python3.12/dist-packages/dlt/common/runners/pool_runner.py", line 91, in run_pool
    while _run_func():
          ^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/common/runners/pool_runner.py", line 84, in _run_func
    run_metrics = run_f.run(cast(TExecutor, pool))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/load/load.py", line 638, in run
    self.load_single_package(load_id, schema)
  File "/usr/local/lib/python3.12/dist-packages/dlt/load/load.py", line 527, in load_single_package
    applied_update = init_client(
                     ^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/load/utils.py", line 117, in init_client
    applied_update = _init_dataset_and_update_schema(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/load/utils.py", line 180, in _init_dataset_and_update_schema
    applied_update = job_client.update_stored_schema(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/destinations/job_client_impl.py", line 198, in update_stored_schema
    applied_update = self._execute_schema_update_sql(only_tables)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/destinations/job_client_impl.py", line 501, in _execute_schema_update_sql
    self.sql_client.execute_many(sql_scripts)
  File "/usr/local/lib/python3.12/dist-packages/dlt/destinations/sql_client.py", line 190, in execute_many
    ret.append(self.execute_sql(sql_fragment, *args, **kwargs))
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/destinations/impl/bigquery/sql_client.py", line 202, in execute_sql
    with self.execute_query(sql, *args, **kwargs) as curr:
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/contextlib.py", line 137, in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/destinations/sql_client.py", line 418, in _wrap_gen
    raise self._make_database_exception(ex)

The above exception occurred during handling of the following exception:
google.cloud.bigquery.dbapi.exceptions.DatabaseError: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/opensource-observer/queries?prettyPrint=false: Cannot add required fields to an existing schema. (field: is_submitted_to_oso)

  File "/usr/local/lib/python3.12/dist-packages/dlt/destinations/sql_client.py", line 416, in _wrap_gen
    return (yield from f(self, *args, **kwargs))
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/destinations/impl/bigquery/sql_client.py", line 221, in execute_query
    curr.execute(query, db_args, job_config=self._session_query or self._default_query)
  File "/usr/local/lib/python3.12/dist-packages/google/cloud/bigquery/dbapi/_helpers.py", line 496, in with_closed_check
    return method(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/cloud/bigquery/dbapi/cursor.py", line 189, in execute
    self._execute(
  File "/usr/local/lib/python3.12/dist-packages/google/cloud/bigquery/dbapi/cursor.py", line 226, in _execute
    raise exceptions.DatabaseError(exc)

The above exception occurred during handling of the following exception:
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/opensource-observer/queries?prettyPrint=false: Cannot add required fields to an existing schema. (field: is_submitted_to_oso)

  File "/usr/local/lib/python3.12/dist-packages/google/cloud/bigquery/dbapi/cursor.py", line 220, in _execute
    rows = client.query_and_wait(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/cloud/bigquery/client.py", line 3611, in query_and_wait
    return _job_helpers.query_and_wait(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/cloud/bigquery/_job_helpers.py", line 509, in query_and_wait
    return job_retry(do_query)()
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
    _retry_error_helper(
  File "/usr/local/lib/python3.12/dist-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
    raise final_exc from source_exc
  File "/usr/local/lib/python3.12/dist-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
    result = target()
             ^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/cloud/bigquery/_job_helpers.py", line 450, in do_query
    response = retry(client._call_api)(
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
    _retry_error_helper(
  File "/usr/local/lib/python3.12/dist-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
    raise final_exc from source_exc
  File "/usr/local/lib/python3.12/dist-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
    result = target()
             ^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/cloud/bigquery/client.py", line 843, in _call_api
    return call()
           ^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/google/cloud/_http/__init__.py", line 494, in api_request
    raise exceptions.from_http_response(response)

Expected Behavior

The asset should materialize even if new fields are added

Copy link

linear bot commented Feb 6, 2025

@github-project-automation github-project-automation bot moved this to Backlog in OSO Feb 6, 2025
@ccerv1 ccerv1 added this to the [c] Retro Funding S7 Metrics milestone Feb 6, 2025
@ccerv1 ccerv1 added the c:data Gathering data (e.g. indexing) label Feb 6, 2025
@ryscheng
Copy link
Member

ryscheng commented Feb 7, 2025

@ccerv1 let me know if the dataset deletion doesnt fix it

@ccerv1
Copy link
Member Author

ccerv1 commented Feb 7, 2025

It did. I'm running another materialization right now to confirm.

@ccerv1
Copy link
Member Author

ccerv1 commented Feb 7, 2025

All good! Closing the issue.

@ccerv1 ccerv1 closed this as completed Feb 7, 2025
@github-project-automation github-project-automation bot moved this from Backlog to Done in OSO Feb 7, 2025
@ccerv1 ccerv1 self-assigned this Feb 7, 2025
@ccerv1 ccerv1 reopened this Feb 10, 2025
@github-project-automation github-project-automation bot moved this from Done to Needs Review in OSO Feb 10, 2025
@ccerv1
Copy link
Member Author

ccerv1 commented Feb 10, 2025

I'm reopening. Here's the message I shared in the bugs channel, with a copy of a log below:

I have been trying to pull new data from the OP Atlas Project asset and have been getting various issues over the weekend. First it was only returning a small set of projects. Then I tried dropping the table and rematerializing and the asset is completely failing.

This seems like a different issue than what was failing previously.

dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "op_atlas_Project":

  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/execute_plan.py", line 245, in dagster_event_sequence_for_step
    for step_event in check.generator(step_events):
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/execute_step.py", line 499, in core_dagster_event_sequence_for_step
    for user_event in _step_output_error_checked_user_event_sequence(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/execute_step.py", line 183, in _step_output_error_checked_user_event_sequence
    for user_event in user_event_sequence:
                      ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/execute_step.py", line 87, in _process_asset_results_to_events
    for user_event in user_event_sequence:
                      ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/compute.py", line 197, in execute_core_compute
    for step_output in _yield_compute_results(step_context, inputs, compute_fn, compute_context):
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/compute.py", line 166, in _yield_compute_results
    for event in iterate_with_context(
                 ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dagster/_utils/__init__.py", line 480, in iterate_with_context
    with context_fn():
         ^^^^^^^^^^^^
  File "/usr/lib/python3.12/contextlib.py", line 158, in __exit__
    self.gen.throw(value)
  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/utils.py", line 84, in op_execution_error_boundary
    raise error_cls(

The above exception was caused by the following exception:
dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage load with exception:

<class 'dlt.load.exceptions.LoadClientJobFailed'>
Job for project.dce43b48ef.reference failed terminally in load 1739192914.9111016 with message {"error_result":{"reason":"notFound","message":"Not found: Table opensource-observer:op_atlas.project was not found in location US"},"errors":[{"reason":"notFound","message":"Not found: Table opensource-observer:op_atlas.project was not found in location US"}],"job_start":"2025-02-10T13:08:43.223000Z","job_end":"2025-02-10T13:08:43.223000Z","job_id":"project_dce43b48ef_0_reference"}. The package is aborted and cannot be retried.

  File "/usr/local/lib/python3.12/dist-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/usr/local/lib/python3.12/dist-packages/dagster/_utils/__init__.py", line 482, in iterate_with_context
    next_output = next(iterator)
                  ^^^^^^^^^^^^^^
  File "/usr/src/app/warehouse/oso_dagster/factories/sql.py", line 87, in _asset
    yield from dlt.run(context=context, loader_file_format="jsonl", **kwargs)
  File "/usr/local/lib/python3.12/dist-packages/dagster_embedded_elt/dlt/dlt_event_iterator.py", line 77, in __next__
    return next(self._inner_iterator)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dagster_embedded_elt/dlt/resource.py", line 286, in _run
    load_info = dlt_pipeline.run(dlt_source, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 225, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 274, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 746, in run
    return self.load(destination, dataset_name, credentials=credentials)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 225, in _wrap
    step_info = f(self, *args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 165, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 274, in _wrap
    return f(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 614, in load
    raise PipelineStepFailed(

The above exception was caused by the following exception:
dlt.load.exceptions.LoadClientJobFailed: Job for project.dce43b48ef.reference failed terminally in load 1739192914.9111016 with message {"error_result":{"reason":"notFound","message":"Not found: Table opensource-observer:op_atlas.project was not found in location US"},"errors":[{"reason":"notFound","message":"Not found: Table opensource-observer:op_atlas.project was not found in location US"}],"job_start":"2025-02-10T13:08:43.223000Z","job_end":"2025-02-10T13:08:43.223000Z","job_id":"project_dce43b48ef_0_reference"}. The package is aborted and cannot be retried.

  File "/usr/local/lib/python3.12/dist-packages/dlt/pipeline/pipeline.py", line 607, in load
    runner.run_pool(load_step.config, load_step)
  File "/usr/local/lib/python3.12/dist-packages/dlt/common/runners/pool_runner.py", line 91, in run_pool
    while _run_func():
          ^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/common/runners/pool_runner.py", line 84, in _run_func
    run_metrics = run_f.run(cast(TExecutor, pool))
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/dlt/load/load.py", line 638, in run
    self.load_single_package(load_id, schema)
  File "/usr/local/lib/python3.12/dist-packages/dlt/load/load.py", line 597, in load_single_package
    raise pending_exception

@ryscheng
Copy link
Member

Here's the steps I just tried.

  • Re-run just the Project asset. I get the same error as you
  • Delete the entire op_atlas BigQuery dataset, try again
  • Seems to run fine
  • Project has 4053 rows.

@ccerv1 I don't think you can delete a single table within the op_atlas dataset without confusing Dagster/dlt. Deleting the entire dataset seems to work.
Can you clarify why you deleted a single table in the first place? Is there a log or trace to reproduce the original issue?

Unrelated to this bug, I think Dagster is still trying to open up too many parallel connections to the postgres database, which is leading to contention and retries. It has nothing to do with this bug so I'll file a separate issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
c:data Gathering data (e.g. indexing)
Projects
Status: Needs Review
Development

No branches or pull requests

2 participants