Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data-warehouse): Added a new temporal workflow for compacting deltalake tables #28118

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

Gilbert09
Copy link
Member

Problem

  • context thread on slack
  • tl;dr; deltalake compaction and vacuuming blows up our pod memory, switching to outsource this to a different pod and not in the critical path of the external-data-job workflow would be ideal

Changes

  • Spin up a new workflow
  • Trigger this workflow from the external-data-job workflow
  • TODO:
    • Add some more unit tests for the new workflow

Does this work well for both Cloud and self-hosted?

Yes

How did you test this code?

end to end tests include triggering the new workflow

@Gilbert09 Gilbert09 requested a review from a team January 30, 2025 21:05
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Summary

This PR introduces a new temporal workflow for compacting Delta Lake tables to address memory issues in pod execution. The changes move memory-intensive operations out of the critical path and into a separate workflow.

  • Added new DeltalakeCompactionJobWorkflow in deltalake_compaction_job.py with 5-minute timeout and no retries
  • Added DATA_WAREHOUSE_COMPACTION_TASK_QUEUE constant for dedicated compaction queue
  • Modified DeltaTableHelper to handle compaction through new compact_table() method with 24h retention
  • Added trigger_compaction_job utility with basic error handling for workflow coordination
  • Removed subprocess-based approach in delta_table_subprocess.py in favor of workflow solution

8 file(s) reviewed, 13 comment(s)
Edit PR Review Bot Settings | Greptile

@activity.defn
def run_compaction(inputs: DeltalakeCompactionJobWorkflowInputs):
logger = bind_temporal_worker_logger_sync(team_id=inputs.team_id)
job = ExternalDataJob.objects.get(id=inputs.external_data_job_id, team_id=inputs.team_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: No error handling for when job doesn't exist. Should wrap in try/except to handle ObjectDoesNotExist gracefully.

Suggested change
job = ExternalDataJob.objects.get(id=inputs.external_data_job_id, team_id=inputs.team_id)
try:
job = ExternalDataJob.objects.get(id=inputs.external_data_job_id, team_id=inputs.team_id)
except ExternalDataJob.DoesNotExist:
logger.error("External data job not found", external_data_job_id=inputs.external_data_job_id)
raise

await workflow.execute_activity(
run_compaction,
inputs,
start_to_close_timeout=dt.timedelta(minutes=5),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: 5 minute timeout may be too short for large tables. Consider making this configurable or increasing default.

Comment on lines +45 to +47
retry_policy=RetryPolicy(
maximum_attempts=1,
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Single attempt with no retries could lead to transient failures. Consider allowing retries with backoff for recoverable errors.


delta_table_helper = DeltaTableHelper(resource_name=schema.name, job=job, logger=logger)

delta_table_helper.compact_table()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: compact_table() errors are not caught or logged. Should handle potential DeltaLake errors.

logger = bind_temporal_worker_logger_sync(team_id=inputs.team_id)
job = ExternalDataJob.objects.get(id=inputs.external_data_job_id, team_id=inputs.team_id)

assert job.schema is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Using assert for runtime checks is not recommended in production code. Replace with proper validation.

Suggested change
assert job.schema is not None
if job.schema is None:
logger.error("Job schema is None", job_id=job.id)
raise ValueError(f"Job {job.id} has no associated schema")

Comment on lines +164 to +165
compaction_job_id = trigger_compaction_job(self._job, self._schema)
self._logger.debug(f"Compaction workflow id: {compaction_job_id}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: No error handling around trigger_compaction_job. Should catch and log potential failures to ensure main workflow completion.

Comment on lines +291 to +292
except WorkflowAlreadyStartedError:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: silently ignoring WorkflowAlreadyStartedError could hide important issues - should at least log this case

Comment on lines +285 to +288
retry_policy=RetryPolicy(
maximum_attempts=1,
non_retryable_error_types=["NondeterminismError"],
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: retry policy is very restrictive - consider adding backoff strategy and increasing maximum attempts for transient failures


def trigger_compaction_job(job: ExternalDataJob, schema: ExternalDataSchema) -> str:
temporal = sync_connect()
workflow_id = f"{schema.id}-compaction"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: workflow_id should include more uniqueness guarantees, like job.id or timestamp to prevent conflicts

Suggested change
workflow_id = f"{schema.id}-compaction"
workflow_id = f"{schema.id}-{job.id}-compaction"

Comment on lines +277 to +290
asyncio.run(
temporal.start_workflow(
workflow="deltalake-compaction-job",
arg=dataclasses.asdict(
DeltalakeCompactionJobWorkflowInputs(team_id=job.team_id, external_data_job_id=job.id)
),
id=workflow_id,
task_queue=str(DATA_WAREHOUSE_COMPACTION_TASK_QUEUE),
retry_policy=RetryPolicy(
maximum_attempts=1,
non_retryable_error_types=["NondeterminismError"],
),
)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: asyncio.run() should have a timeout to prevent hanging

Copy link
Member

@EDsCODE EDsCODE left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm! consider the comments from slack thread https://posthog.slack.com/archives/C019RAX2XBN/p1738276797709289

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants