Skip to content

Commit

Permalink
Automatically reattempt after internal errors (#1424)
Browse files Browse the repository at this point in the history
* refactor finalize run service

* refactor complete attempt service

* remove separate graceful exit handling

* refactor task status helpers

* clearly separate statuses in prisma schema

* all non-final statuses should be failable

* new import payload error code

* store default retry config if none set on task

* failed run service now respects retries

* fix merged task retry config indexing

* some errors should never be retried

* finalize run service takes care of acks now

* execution payload helper now with single object arg

* internal error code enum export

* unify failed and crashed run retries

* Prevent uncaught socket ack exceptions (#1415)

* catch all the remaining socket acks that could possibly throw

* wrap the remaining handlers in try catch

* New onboarding question (#1404)

* Updated “Twitter” to be “X (Twitter)”

* added Textarea to storybook

* Updated textarea styling to match input field

* WIP adding new text field to org creation page

* Added description to field

* Submit feedback to Plain when an org signs up

* Formatting improvement

* type improvement

* removed userId

* Moved submitting to Plain into its own file

* Change orgName with name

* use sendToPlain function for the help & feedback email form

* use name not orgName

* import cleanup

* Downgrading plan form uses sendToPlain

* Get the userId from requireUser only

* Added whitespace-pre-wrap to the message property on the run page

* use requireUserId

* Removed old Plain submit code

* Added a new Context page for the docs (#1416)

* Added a new context page with task context properties

* Removed code comments

* Added more crosslinks

* Fix updating many environment variables at once (#1413)

* Move code example to the side menu

* New docs example for creating a HN email summary

* doc: add instructions to create new reference project and run it locally (#1417)

* doc: add instructions to create new reference project and run it locally

* doc: Add instruction for running tunnel

* minor language improvement

* Fix several restore and resume bugs (#1418)

* try to correct resume messages with missing checkpoint

* prevent creating checkpoints for outdated task waits

* prevent creating checkpoints for outdated batch waits

* use heartbeats to check for and clean up any leftover containers

* lint

* improve exec logging

* improve resume attempt logs

* fix for resuming parents of canceled child runs

* separate SIGTERM from maybe OOM errors

* pretty errors can have magic dashboard links

* prevent uncancellable checkpoints

* simplify task run error code enum export

* grab the last, not the first child run

* Revert "prevent creating checkpoints for outdated batch waits"

This reverts commit f2b5c2a.

* Revert "grab the last, not the first child run"

This reverts commit 89ec5c8.

* Revert "prevent creating checkpoints for outdated task waits"

This reverts commit 11066b4.

* more logs for resume message handling

* add magic error link comment

* add changeset

* chore: Update version for release (#1410)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* Release 3.0.13

* capture ffmpeg oom errors

* respect maxAttempts=1 when failing before first attempt creation

* request worker exit on fatal errors

* fix error code merge

* add new error code to should retry

* pretty segfault errors

* pretty internal errors for attempt spans

* decrease oom false positives

* fix timeline event color for failed runs

* auto-retry packet import and export

* add sdk version check and complete event while completing attempt

* all internal errors become crashes by default

* use pretty error helpers exclusively

* error to debug log

* zodfetch fixes

* rename import payload to task input error

* fix true non-zero exit error display

* fix retry config parsing

* correctly mark crashes as crashed

* add changeset

* remove non-zero exit comment

* pretend we don't support default default retry configs yet

---------

Co-authored-by: James Ritchie <[email protected]>
Co-authored-by: shubham yadav <[email protected]>
Co-authored-by: Tarun Pratap Singh <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
6 people authored Oct 24, 2024
1 parent 9a5e6e5 commit 212f853
Show file tree
Hide file tree
Showing 23 changed files with 1,111 additions and 401 deletions.
9 changes: 9 additions & 0 deletions .changeset/eight-turtles-itch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

- Include retries.default in task retry config when indexing
- New helpers for internal error retry mechanics
- Detection for segfaults and ffmpeg OOM errors
- Retries for packet import and export
5 changes: 3 additions & 2 deletions apps/webapp/app/components/runs/v3/RunInspector.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import {
} from "~/utils/pathBuilder";
import { TraceSpan } from "~/utils/taskEvent";
import { SpanLink } from "~/v3/eventRepository.server";
import { isFinalRunStatus } from "~/v3/taskStatus";
import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
import { RunTimelineEvent, RunTimelineLine } from "./InspectorTimeline";
import { RunTag } from "./RunTag";
import { TaskRunStatusCombo } from "./TaskRunStatus";
Expand Down Expand Up @@ -479,6 +479,7 @@ function RunTimeline({ run }: { run: RawRun }) {
const updatedAt = new Date(run.updatedAt);

const isFinished = isFinalRunStatus(run.status);
const isError = isFailedRunStatus(run.status);

return (
<div className="min-w-fit max-w-80">
Expand Down Expand Up @@ -535,7 +536,7 @@ function RunTimeline({ run }: { run: RawRun }) {
<RunTimelineEvent
title="Finished"
subtitle={<DateTimeAccurate date={updatedAt} />}
state="complete"
state={isError ? "error" : "complete"}
/>
</>
) : (
Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
import { eventRepository } from "~/v3/eventRepository.server";
import { machinePresetFromName } from "~/v3/machinePresets.server";
import { FINAL_ATTEMPT_STATUSES, isFinalRunStatus } from "~/v3/taskStatus";
import { FINAL_ATTEMPT_STATUSES, isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
import { BasePresenter } from "./basePresenter.server";
import { getMaxDuration } from "~/v3/utils/maxDuration";

Expand Down Expand Up @@ -294,6 +294,7 @@ export class SpanPresenter extends BasePresenter {
usageDurationMs: run.usageDurationMs,
isFinished,
isRunning: RUNNING_STATUSES.includes(run.status),
isError: isFailedRunStatus(run.status),
payload,
payloadType: run.payloadType,
output,
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ export async function action({ request, params }: ActionFunctionArgs) {
const service = new CreateTaskRunAttemptService();

try {
const { execution } = await service.call(runParam, authenticationResult.environment);
const { execution } = await service.call({
runId: runParam,
authenticatedEnv: authenticationResult.environment,
});

return json(execution, { status: 200 });
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ function RunTimeline({ run }: { run: SpanRun }) {
<RunTimelineEvent
title="Finished"
subtitle={<DateTimeAccurate date={run.updatedAt} />}
state="complete"
state={run.isError ? "error" : "complete"}
/>
</>
) : (
Expand Down
277 changes: 248 additions & 29 deletions apps/webapp/app/v3/failedTaskRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,41 @@
import { sanitizeError, TaskRunFailedExecutionResult } from "@trigger.dev/core/v3";
import {
calculateNextRetryDelay,
RetryOptions,
TaskRunExecution,
TaskRunExecutionRetry,
TaskRunFailedExecutionResult,
} from "@trigger.dev/core/v3";
import { logger } from "~/services/logger.server";
import { createExceptionPropertiesFromError, eventRepository } from "./eventRepository.server";
import { BaseService } from "./services/baseService.server";
import { FinalizeTaskRunService } from "./services/finalizeTaskRun.server";
import { FAILABLE_RUN_STATUSES } from "./taskStatus";
import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus";
import type { Prisma, TaskRun } from "@trigger.dev/database";
import { CompleteAttemptService } from "./services/completeAttempt.server";
import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.server";
import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server";
import * as semver from "semver";

const includeAttempts = {
attempts: {
orderBy: {
createdAt: "desc",
},
take: 1,
},
lockedBy: true, // task
lockedToVersion: true, // worker
} satisfies Prisma.TaskRunInclude;

type TaskRunWithAttempts = Prisma.TaskRunGetPayload<{
include: typeof includeAttempts;
}>;

export class FailedTaskRunService extends BaseService {
public async call(anyRunId: string, completion: TaskRunFailedExecutionResult) {
logger.debug("[FailedTaskRunService] Handling failed task run", { anyRunId, completion });

const isFriendlyId = anyRunId.startsWith("run_");

const taskRun = await this._prisma.taskRun.findUnique({
const taskRun = await this._prisma.taskRun.findFirst({
where: {
friendlyId: isFriendlyId ? anyRunId : undefined,
id: !isFriendlyId ? anyRunId : undefined,
Expand All @@ -25,7 +51,7 @@ export class FailedTaskRunService extends BaseService {
return;
}

if (!FAILABLE_RUN_STATUSES.includes(taskRun.status)) {
if (!isFailableRunStatus(taskRun.status)) {
logger.error("[FailedTaskRunService] Task run is not in a failable state", {
taskRun,
completion,
Expand All @@ -34,33 +60,226 @@ export class FailedTaskRunService extends BaseService {
return;
}

// No more retries, we need to fail the task run
logger.debug("[FailedTaskRunService] Failing task run", { taskRun, completion });
const retryHelper = new FailedTaskRunRetryHelper(this._prisma);
const retryResult = await retryHelper.call({
runId: taskRun.id,
completion,
});

logger.debug("[FailedTaskRunService] Completion result", {
runId: taskRun.id,
result: retryResult,
});
}
}

interface TaskRunWithWorker extends TaskRun {
lockedBy: { retryConfig: Prisma.JsonValue } | null;
lockedToVersion: { sdkVersion: string } | null;
}

const finalizeService = new FinalizeTaskRunService();
await finalizeService.call({
id: taskRun.id,
status: "SYSTEM_FAILURE",
completedAt: new Date(),
attemptStatus: "FAILED",
error: sanitizeError(completion.error),
export class FailedTaskRunRetryHelper extends BaseService {
async call({
runId,
completion,
isCrash,
}: {
runId: string;
completion: TaskRunFailedExecutionResult;
isCrash?: boolean;
}) {
const taskRun = await this._prisma.taskRun.findFirst({
where: {
id: runId,
},
include: includeAttempts,
});

// Now we need to "complete" the task run event/span
await eventRepository.completeEvent(taskRun.spanId, {
endTime: new Date(),
attributes: {
isError: true,
if (!taskRun) {
logger.error("[FailedTaskRunRetryHelper] Task run not found", {
runId,
completion,
});

return "NO_TASK_RUN";
}

const retriableExecution = await this.#getRetriableAttemptExecution(taskRun, completion);

if (!retriableExecution) {
return "NO_EXECUTION";
}

logger.debug("[FailedTaskRunRetryHelper] Completing attempt", { taskRun, completion });

const executionRetry =
completion.retry ??
(await FailedTaskRunRetryHelper.getExecutionRetry({
run: taskRun,
execution: retriableExecution,
}));

const completeAttempt = new CompleteAttemptService(this._prisma);
const completeResult = await completeAttempt.call({
completion: {
...completion,
retry: executionRetry,
},
events: [
{
name: "exception",
time: new Date(),
properties: {
exception: createExceptionPropertiesFromError(completion.error),
},
},
],
execution: retriableExecution,
isSystemFailure: !isCrash,
isCrash,
});

return completeResult;
}

async #getRetriableAttemptExecution(
run: TaskRunWithAttempts,
completion: TaskRunFailedExecutionResult
): Promise<TaskRunExecution | undefined> {
let attempt = run.attempts[0];

// We need to create an attempt if:
// - None exists yet
// - The last attempt has a final status, e.g. we failed between attempts
if (!attempt || isFinalAttemptStatus(attempt.status)) {
logger.debug("[FailedTaskRunRetryHelper] No attempts found", {
run,
completion,
});

const createAttempt = new CreateTaskRunAttemptService(this._prisma);

try {
const { execution } = await createAttempt.call({
runId: run.id,
// This ensures we correctly respect `maxAttempts = 1` when failing before the first attempt was created
startAtZero: true,
});
return execution;
} catch (error) {
logger.error("[FailedTaskRunRetryHelper] Failed to create attempt", {
run,
completion,
error,
});

return;
}
}

// We already have an attempt with non-final status, let's use it
try {
const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt({
id: attempt.id,
skipStatusChecks: true,
});

return executionPayload?.execution;
} catch (error) {
logger.error("[FailedTaskRunRetryHelper] Failed to get execution payload", {
run,
completion,
error,
});

return;
}
}

static async getExecutionRetry({
run,
execution,
}: {
run: TaskRunWithWorker;
execution: TaskRunExecution;
}): Promise<TaskRunExecutionRetry | undefined> {
try {
const retryConfig = run.lockedBy?.retryConfig;

if (!retryConfig) {
if (!run.lockedToVersion) {
logger.error("[FailedTaskRunRetryHelper] Run not locked to version", {
run,
execution,
});

return;
}

const sdkVersion = run.lockedToVersion.sdkVersion ?? "0.0.0";
const isValid = semver.valid(sdkVersion);

if (!isValid) {
logger.error("[FailedTaskRunRetryHelper] Invalid SDK version", {
run,
execution,
});

return;
}

// With older SDK versions, tasks only have a retry config stored in the DB if it's explicitly defined on the task itself
// It won't get populated with retry.default in trigger.config.ts
if (semver.lt(sdkVersion, FailedTaskRunRetryHelper.DEFAULT_RETRY_CONFIG_SINCE_VERSION)) {
logger.warn(
"[FailedTaskRunRetryHelper] SDK version not recent enough to determine retry config",
{
run,
execution,
}
);

return;
}
}

const parsedRetryConfig = RetryOptions.nullable().safeParse(retryConfig);

if (!parsedRetryConfig.success) {
logger.error("[FailedTaskRunRetryHelper] Invalid retry config", {
run,
execution,
});

return;
}

if (!parsedRetryConfig.data) {
logger.debug("[FailedTaskRunRetryHelper] No retry config", {
run,
execution,
});

return;
}

const delay = calculateNextRetryDelay(parsedRetryConfig.data, execution.attempt.number);

if (!delay) {
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
run,
execution,
});

return;
}

return {
timestamp: Date.now() + delay,
delay,
};
} catch (error) {
logger.error("[FailedTaskRunRetryHelper] Failed to get execution retry", {
run,
execution,
error,
});

return;
}
}

// TODO: update this to the correct version
static DEFAULT_RETRY_CONFIG_SINCE_VERSION = "3.0.14";
}
6 changes: 5 additions & 1 deletion apps/webapp/app/v3/handleSocketIo.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ function createCoordinatorNamespace(io: Server) {
}

const service = new CreateTaskRunAttemptService();
const { attempt } = await service.call(message.runId, environment, false);
const { attempt } = await service.call({
runId: message.runId,
authenticatedEnv: environment,
setToExecuting: false,
});

const payload = await sharedQueueTasks.getExecutionPayloadFromAttempt({
id: attempt.id,
Expand Down
Loading

0 comments on commit 212f853

Please sign in to comment.