-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(data-warehouse): Added a new temporal workflow for compacting deltalake tables #28118
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
This PR introduces a new temporal workflow for compacting Delta Lake tables to address memory issues in pod execution. The changes move memory-intensive operations out of the critical path and into a separate workflow.
- Added new
DeltalakeCompactionJobWorkflow
indeltalake_compaction_job.py
with 5-minute timeout and no retries - Added
DATA_WAREHOUSE_COMPACTION_TASK_QUEUE
constant for dedicated compaction queue - Modified
DeltaTableHelper
to handle compaction through newcompact_table()
method with 24h retention - Added
trigger_compaction_job
utility with basic error handling for workflow coordination - Removed subprocess-based approach in
delta_table_subprocess.py
in favor of workflow solution
8 file(s) reviewed, 13 comment(s)
Edit PR Review Bot Settings | Greptile
@activity.defn | ||
def run_compaction(inputs: DeltalakeCompactionJobWorkflowInputs): | ||
logger = bind_temporal_worker_logger_sync(team_id=inputs.team_id) | ||
job = ExternalDataJob.objects.get(id=inputs.external_data_job_id, team_id=inputs.team_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: No error handling for when job doesn't exist. Should wrap in try/except to handle ObjectDoesNotExist gracefully.
job = ExternalDataJob.objects.get(id=inputs.external_data_job_id, team_id=inputs.team_id) | |
try: | |
job = ExternalDataJob.objects.get(id=inputs.external_data_job_id, team_id=inputs.team_id) | |
except ExternalDataJob.DoesNotExist: | |
logger.error("External data job not found", external_data_job_id=inputs.external_data_job_id) | |
raise |
await workflow.execute_activity( | ||
run_compaction, | ||
inputs, | ||
start_to_close_timeout=dt.timedelta(minutes=5), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: 5 minute timeout may be too short for large tables. Consider making this configurable or increasing default.
retry_policy=RetryPolicy( | ||
maximum_attempts=1, | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Single attempt with no retries could lead to transient failures. Consider allowing retries with backoff for recoverable errors.
|
||
delta_table_helper = DeltaTableHelper(resource_name=schema.name, job=job, logger=logger) | ||
|
||
delta_table_helper.compact_table() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: compact_table() errors are not caught or logged. Should handle potential DeltaLake errors.
logger = bind_temporal_worker_logger_sync(team_id=inputs.team_id) | ||
job = ExternalDataJob.objects.get(id=inputs.external_data_job_id, team_id=inputs.team_id) | ||
|
||
assert job.schema is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Using assert for runtime checks is not recommended in production code. Replace with proper validation.
assert job.schema is not None | |
if job.schema is None: | |
logger.error("Job schema is None", job_id=job.id) | |
raise ValueError(f"Job {job.id} has no associated schema") |
compaction_job_id = trigger_compaction_job(self._job, self._schema) | ||
self._logger.debug(f"Compaction workflow id: {compaction_job_id}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: No error handling around trigger_compaction_job. Should catch and log potential failures to ensure main workflow completion.
except WorkflowAlreadyStartedError: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: silently ignoring WorkflowAlreadyStartedError could hide important issues - should at least log this case
retry_policy=RetryPolicy( | ||
maximum_attempts=1, | ||
non_retryable_error_types=["NondeterminismError"], | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: retry policy is very restrictive - consider adding backoff strategy and increasing maximum attempts for transient failures
|
||
def trigger_compaction_job(job: ExternalDataJob, schema: ExternalDataSchema) -> str: | ||
temporal = sync_connect() | ||
workflow_id = f"{schema.id}-compaction" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: workflow_id should include more uniqueness guarantees, like job.id or timestamp to prevent conflicts
workflow_id = f"{schema.id}-compaction" | |
workflow_id = f"{schema.id}-{job.id}-compaction" |
asyncio.run( | ||
temporal.start_workflow( | ||
workflow="deltalake-compaction-job", | ||
arg=dataclasses.asdict( | ||
DeltalakeCompactionJobWorkflowInputs(team_id=job.team_id, external_data_job_id=job.id) | ||
), | ||
id=workflow_id, | ||
task_queue=str(DATA_WAREHOUSE_COMPACTION_TASK_QUEUE), | ||
retry_policy=RetryPolicy( | ||
maximum_attempts=1, | ||
non_retryable_error_types=["NondeterminismError"], | ||
), | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: asyncio.run() should have a timeout to prevent hanging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! consider the comments from slack thread https://posthog.slack.com/archives/C019RAX2XBN/p1738276797709289
Problem
Changes
Does this work well for both Cloud and self-hosted?
Yes
How did you test this code?
end to end tests include triggering the new workflow