diff --git a/apps/coordinator/src/index.ts b/apps/coordinator/src/index.ts index ce849adf09..9581ee78f1 100644 --- a/apps/coordinator/src/index.ts +++ b/apps/coordinator/src/index.ts @@ -30,7 +30,7 @@ const PLATFORM_WS_PORT = process.env.PLATFORM_WS_PORT || 3030; const PLATFORM_SECRET = process.env.PLATFORM_SECRET || "coordinator-secret"; const SECURE_CONNECTION = ["1", "true"].includes(process.env.SECURE_CONNECTION ?? "false"); -const logger = new SimpleStructuredLogger(NODE_NAME); +const logger = new SimpleStructuredLogger("coordinator", undefined, { nodeName: NODE_NAME }); const chaosMonkey = new ChaosMonkey( !!process.env.CHAOS_MONKEY_ENABLED, !!process.env.CHAOS_MONKEY_DISABLE_ERRORS, @@ -105,7 +105,7 @@ class TaskCoordinator { // MARK: SOCKET: PLATFORM #createPlatformSocket() { if (!PLATFORM_ENABLED) { - console.log("INFO: platform connection disabled"); + logger.log("INFO: platform connection disabled"); return; } @@ -116,8 +116,12 @@ class TaskCoordinator { const host = PLATFORM_HOST; const port = Number(PLATFORM_WS_PORT); - logger.log(`connecting to platform: ${host}:${port}`); - logger.debug(`connecting with extra headers`, { extraHeaders }); + const platformLogger = new SimpleStructuredLogger("socket-platform", undefined, { + namespace: "coordinator", + }); + + platformLogger.log("connecting", { host, port }); + platformLogger.debug("connecting with extra headers", { extraHeaders }); const platformConnection = new ZodSocketConnection({ namespace: "coordinator", @@ -131,14 +135,17 @@ class TaskCoordinator { logHandlerPayloads: false, handlers: { RESUME_AFTER_DEPENDENCY: async (message) => { - const log = logger.child({ + const log = platformLogger.child({ eventName: "RESUME_AFTER_DEPENDENCY", ...omit(message, "completions", "executions"), - completions: message.completions.map((c) => c.id), + completions: message.completions.map((c) => ({ + id: c.id, + ok: c.ok, + })), executions: message.executions.length, }); - log.debug("Handling RESUME_AFTER_DEPENDENCY"); + log.log("Handling RESUME_AFTER_DEPENDENCY"); const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId); @@ -147,7 +154,7 @@ class TaskCoordinator { return; } - log.addFields({ socketData: taskSocket.data }); + log.addFields({ socketId: taskSocket.id, socketData: taskSocket.data }); log.log("Found task socket for RESUME_AFTER_DEPENDENCY"); await chaosMonkey.call(); @@ -158,12 +165,17 @@ class TaskCoordinator { taskSocket.emit("RESUME_AFTER_DEPENDENCY", message); }, RESUME_AFTER_DEPENDENCY_WITH_ACK: async (message) => { - const log = logger.child({ + const log = platformLogger.child({ eventName: "RESUME_AFTER_DEPENDENCY_WITH_ACK", ...omit(message, "completions", "executions"), + completions: message.completions.map((c) => ({ + id: c.id, + ok: c.ok, + })), + executions: message.executions.length, }); - log.debug("Handling RESUME_AFTER_DEPENDENCY_WITH_ACK"); + log.log("Handling RESUME_AFTER_DEPENDENCY_WITH_ACK"); const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId); @@ -178,7 +190,7 @@ class TaskCoordinator { }; } - log.addFields({ socketData: taskSocket.data }); + log.addFields({ socketId: taskSocket.id, socketData: taskSocket.data }); log.log("Found task socket for RESUME_AFTER_DEPENDENCY_WITH_ACK"); //if this is set, we want to kill the process because it will be resumed with the checkpoint from the queue @@ -207,23 +219,21 @@ class TaskCoordinator { }; }, RESUME_AFTER_DURATION: async (message) => { - const log = logger.child({ + const log = platformLogger.child({ eventName: "RESUME_AFTER_DURATION", ...message, }); - log.debug("Handling RESUME_AFTER_DURATION"); + log.log("Handling RESUME_AFTER_DURATION"); const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId); if (!taskSocket) { - logger.log("Socket for attempt not found", { - attemptFriendlyId: message.attemptFriendlyId, - }); + log.debug("Socket for attempt not found"); return; } - log.addFields({ socketData: taskSocket.data }); + log.addFields({ socketId: taskSocket.id, socketData: taskSocket.data }); log.log("Found task socket for RESUME_AFTER_DURATION"); await chaosMonkey.call(); @@ -231,41 +241,41 @@ class TaskCoordinator { taskSocket.emit("RESUME_AFTER_DURATION", message); }, REQUEST_ATTEMPT_CANCELLATION: async (message) => { - const log = logger.child({ + const log = platformLogger.child({ eventName: "REQUEST_ATTEMPT_CANCELLATION", ...message, }); + log.log("Handling REQUEST_ATTEMPT_CANCELLATION"); + const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId); if (!taskSocket) { - logger.log("Socket for attempt not found", { - attemptFriendlyId: message.attemptFriendlyId, - }); + logger.debug("Socket for attempt not found"); return; } - log.addFields({ socketData: taskSocket.data }); + log.addFields({ socketId: taskSocket.id, socketData: taskSocket.data }); log.log("Found task socket for REQUEST_ATTEMPT_CANCELLATION"); taskSocket.emit("REQUEST_ATTEMPT_CANCELLATION", message); }, REQUEST_RUN_CANCELLATION: async (message) => { - const log = logger.child({ + const log = platformLogger.child({ eventName: "REQUEST_RUN_CANCELLATION", ...message, }); + log.log("Handling REQUEST_RUN_CANCELLATION"); + const taskSocket = await this.#getRunSocket(message.runId); if (!taskSocket) { - logger.log("Socket for run not found", { - runId: message.runId, - }); + logger.debug("Socket for run not found"); return; } - log.addFields({ socketData: taskSocket.data }); + log.addFields({ socketId: taskSocket.id, socketData: taskSocket.data }); log.log("Found task socket for REQUEST_RUN_CANCELLATION"); this.#cancelCheckpoint(message.runId); @@ -283,7 +293,7 @@ class TaskCoordinator { } }, READY_FOR_RETRY: async (message) => { - const log = logger.child({ + const log = platformLogger.child({ eventName: "READY_FOR_RETRY", ...message, }); @@ -291,13 +301,11 @@ class TaskCoordinator { const taskSocket = await this.#getRunSocket(message.runId); if (!taskSocket) { - logger.log("Socket for attempt not found", { - runId: message.runId, - }); + logger.debug("Socket for attempt not found"); return; } - log.addFields({ socketData: taskSocket.data }); + log.addFields({ socketId: taskSocket.id, socketData: taskSocket.data }); log.log("Found task socket for READY_FOR_RETRY"); await chaosMonkey.call(); @@ -305,11 +313,13 @@ class TaskCoordinator { taskSocket.emit("READY_FOR_RETRY", message); }, DYNAMIC_CONFIG: async (message) => { - const log = logger.child({ + const log = platformLogger.child({ eventName: "DYNAMIC_CONFIG", ...message, }); + log.log("Handling DYNAMIC_CONFIG"); + this.#delayThresholdInMs = message.checkpointThresholdInMs; // The first time we receive a dynamic config, the worker namespace will be created @@ -392,16 +402,26 @@ class TaskCoordinator { next(); }, onConnection: async (socket, handler, sender) => { - const logger = new SimpleStructuredLogger("prod-worker", undefined, { + const logger = new SimpleStructuredLogger("ns-prod-worker", undefined, { + namespace: "prod-worker", socketId: socket.id, + socketData: socket.data, }); + const getSocketMetadata = () => { + return { + attemptFriendlyId: socket.data.attemptFriendlyId, + attemptNumber: socket.data.attemptNumber, + requiresCheckpointResumeWithMessage: socket.data.requiresCheckpointResumeWithMessage, + }; + }; + const getAttemptNumber = () => { return socket.data.attemptNumber ? parseInt(socket.data.attemptNumber) : undefined; }; const exitRun = () => { - logger.log("exitRun", { runId: socket.data.runId }); + logger.log("exitRun", getSocketMetadata()); socket.emit("REQUEST_EXIT", { version: "v1", @@ -409,6 +429,8 @@ class TaskCoordinator { }; const crashRun = async (error: { name: string; message: string; stack?: string }) => { + logger.error("crashRun", { ...getSocketMetadata(), error }); + try { this.#platformSocket?.send("RUN_CRASHED", { version: "v1", @@ -435,7 +457,9 @@ class TaskCoordinator { reason?: string; } > => { - logger.log("readyToCheckpoint", { runId: socket.data.runId, reason }); + const log = logger.child(getSocketMetadata()); + + log.log("readyToCheckpoint", { runId: socket.data.runId, reason }); if (checkpointInProgress()) { return { @@ -466,7 +490,7 @@ class TaskCoordinator { success: true, }; } catch (error) { - logger.error("Error while waiting for checkpointable state", { error }); + log.error("Error while waiting for checkpointable state", { error }); if (error instanceof CheckpointReadinessTimeoutError) { await crashRun({ @@ -510,14 +534,20 @@ class TaskCoordinator { }); socket.on("TEST", (message, callback) => { - logger.log("[TEST]", { runId: socket.data.runId, message }); + logger.log("Handling TEST", { eventName: "TEST", ...getSocketMetadata(), ...message }); callback(); }); // Deprecated: Only workers without support for lazy attempts use this socket.on("READY_FOR_EXECUTION", async (message) => { - logger.log("[READY_FOR_EXECUTION]", message); + const log = logger.child({ + eventName: "READY_FOR_EXECUTION", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling READY_FOR_EXECUTION"); try { const executionAck = await this.#platformSocket?.sendWithAck( @@ -526,7 +556,7 @@ class TaskCoordinator { ); if (!executionAck) { - logger.error("no execution ack", { runId: socket.data.runId }); + log.error("no execution ack"); await crashRun({ name: "ReadyForExecutionError", @@ -537,7 +567,7 @@ class TaskCoordinator { } if (!executionAck.success) { - logger.error("failed to get execution payload", { runId: socket.data.runId }); + log.error("failed to get execution payload"); await crashRun({ name: "ReadyForExecutionError", @@ -555,7 +585,7 @@ class TaskCoordinator { updateAttemptFriendlyId(executionAck.payload.execution.attempt.id); updateAttemptNumber(executionAck.payload.execution.attempt.number); } catch (error) { - logger.error("Error", { error }); + log.error("Error", { error }); await crashRun({ name: "ReadyForExecutionError", @@ -569,7 +599,13 @@ class TaskCoordinator { // MARK: LAZY ATTEMPT socket.on("READY_FOR_LAZY_ATTEMPT", async (message) => { - logger.log("[READY_FOR_LAZY_ATTEMPT]", message); + const log = logger.child({ + eventName: "READY_FOR_LAZY_ATTEMPT", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling READY_FOR_LAZY_ATTEMPT"); try { const lazyAttempt = await this.#platformSocket?.sendWithAck("READY_FOR_LAZY_ATTEMPT", { @@ -578,7 +614,7 @@ class TaskCoordinator { }); if (!lazyAttempt) { - logger.error("no lazy attempt ack", { runId: socket.data.runId }); + log.error("no lazy attempt ack"); await crashRun({ name: "ReadyForLazyAttemptError", @@ -589,7 +625,7 @@ class TaskCoordinator { } if (!lazyAttempt.success) { - logger.error("failed to get lazy attempt payload", { runId: socket.data.runId }); + log.error("failed to get lazy attempt payload"); await crashRun({ name: "ReadyForLazyAttemptError", @@ -607,11 +643,11 @@ class TaskCoordinator { }); } catch (error) { if (error instanceof ChaosMonkey.Error) { - logger.error("ChaosMonkey error, won't crash run", { runId: socket.data.runId }); + log.error("ChaosMonkey error, won't crash run"); return; } - logger.error("Error", { error }); + log.error("Error", { error }); await crashRun({ name: "ReadyForLazyAttemptError", @@ -625,7 +661,13 @@ class TaskCoordinator { // MARK: RESUME READY socket.on("READY_FOR_RESUME", async (message) => { - logger.log("[READY_FOR_RESUME]", message); + const log = logger.child({ + eventName: "READY_FOR_RESUME", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling READY_FOR_RESUME"); updateAttemptFriendlyId(message.attemptFriendlyId); @@ -638,9 +680,19 @@ class TaskCoordinator { // MARK: RUN COMPLETED socket.on("TASK_RUN_COMPLETED", async (message, callback) => { - const { completion, execution } = message; + const log = logger.child({ + eventName: "TASK_RUN_COMPLETED", + ...getSocketMetadata(), + ...omit(message, "completion", "execution"), + completion: { + id: message.completion.id, + ok: message.completion.ok, + }, + }); + + log.log("Handling TASK_RUN_COMPLETED"); - logger.log("completed task", { completionId: completion.id }); + const { completion, execution } = message; // Cancel all in-progress checkpoints (if any) this.#cancelCheckpoint(socket.data.runId); @@ -705,10 +757,7 @@ class TaskCoordinator { const ready = await readyToCheckpoint("RETRY"); if (!ready.success) { - logger.error("Failed to become checkpointable", { - runId: socket.data.runId, - reason: ready.reason, - }); + log.error("Failed to become checkpointable", { reason: ready.reason }); return; } @@ -721,11 +770,13 @@ class TaskCoordinator { }); if (!checkpoint) { - logger.error("Failed to checkpoint", { runId: socket.data.runId }); + log.error("Failed to checkpoint"); completeWithoutCheckpoint(false); return; } + log.addFields({ checkpoint }); + this.#platformSocket?.send("TASK_RUN_COMPLETED", { version: "v1", execution, @@ -740,7 +791,16 @@ class TaskCoordinator { // MARK: TASK FAILED socket.on("TASK_RUN_FAILED_TO_RUN", async ({ completion }) => { - logger.log("task failed to run", { completionId: completion.id }); + const log = logger.child({ + eventName: "TASK_RUN_FAILED_TO_RUN", + ...getSocketMetadata(), + completion: { + id: completion.id, + ok: completion.ok, + }, + }); + + log.log("Handling TASK_RUN_FAILED_TO_RUN"); // Cancel all in-progress checkpoints (if any) this.#cancelCheckpoint(socket.data.runId); @@ -755,12 +815,18 @@ class TaskCoordinator { // MARK: CHECKPOINT socket.on("READY_FOR_CHECKPOINT", async (message) => { - logger.log("[READY_FOR_CHECKPOINT]", message); + const log = logger.child({ + eventName: "READY_FOR_CHECKPOINT", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling READY_FOR_CHECKPOINT"); const checkpointable = this.#checkpointableTasks.get(socket.data.runId); if (!checkpointable) { - logger.error("No checkpoint scheduled", { runId: socket.data.runId }); + log.error("No checkpoint scheduled"); return; } @@ -769,7 +835,13 @@ class TaskCoordinator { // MARK: CXX CHECKPOINT socket.on("CANCEL_CHECKPOINT", async (message, callback) => { - logger.log("[CANCEL_CHECKPOINT]", message); + const log = logger.child({ + eventName: "CANCEL_CHECKPOINT", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling CANCEL_CHECKPOINT"); if (message.version === "v1") { this.#cancelCheckpoint(socket.data.runId); @@ -784,12 +856,18 @@ class TaskCoordinator { // MARK: DURATION WAIT socket.on("WAIT_FOR_DURATION", async (message, callback) => { - logger.log("[WAIT_FOR_DURATION]", message); + const log = logger.child({ + eventName: "WAIT_FOR_DURATION", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling WAIT_FOR_DURATION"); await chaosMonkey.call({ throwErrors: false }); if (checkpointInProgress()) { - logger.error("Checkpoint already in progress", { runId: socket.data.runId }); + log.error("Checkpoint already in progress"); callback({ willCheckpointAndRestore: false }); return; } @@ -807,10 +885,7 @@ class TaskCoordinator { const ready = await readyToCheckpoint("WAIT_FOR_DURATION"); if (!ready.success) { - logger.error("Failed to become checkpointable", { - runId: socket.data.runId, - reason: ready.reason, - }); + log.error("Failed to become checkpointable", { reason: ready.reason }); return; } @@ -823,10 +898,12 @@ class TaskCoordinator { if (!checkpoint) { // The task container will keep running until the wait duration has elapsed - logger.error("Failed to checkpoint", { runId: socket.data.runId }); + log.error("Failed to checkpoint"); return; } + log.addFields({ checkpoint }); + const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { version: "v1", attemptFriendlyId: message.attemptFriendlyId, @@ -840,7 +917,7 @@ class TaskCoordinator { }); if (ack?.keepRunAlive) { - logger.log("keeping run alive after duration checkpoint", { runId: socket.data.runId }); + log.log("keeping run alive after duration checkpoint"); return; } @@ -851,12 +928,18 @@ class TaskCoordinator { // MARK: TASK WAIT socket.on("WAIT_FOR_TASK", async (message, callback) => { - logger.log("[WAIT_FOR_TASK]", message); + const log = logger.child({ + eventName: "WAIT_FOR_TASK", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling WAIT_FOR_TASK"); await chaosMonkey.call({ throwErrors: false }); if (checkpointInProgress()) { - logger.error("Checkpoint already in progress", { runId: socket.data.runId }); + log.error("Checkpoint already in progress"); callback({ willCheckpointAndRestore: false }); return; } @@ -876,10 +959,7 @@ class TaskCoordinator { const ready = await readyToCheckpoint("WAIT_FOR_TASK"); if (!ready.success) { - logger.error("Failed to become checkpointable", { - runId: socket.data.runId, - reason: ready.reason, - }); + log.error("Failed to become checkpointable", { reason: ready.reason }); return; } } @@ -892,21 +972,17 @@ class TaskCoordinator { }); if (!checkpoint) { - logger.error("Failed to checkpoint", { runId: socket.data.runId }); + log.error("Failed to checkpoint"); return; } - logger.log("WAIT_FOR_TASK checkpoint created", { - checkpoint, - socketData: socket.data, - }); + log.addFields({ checkpoint }); + + log.log("WAIT_FOR_TASK checkpoint created"); //setting this means we can only resume from a checkpoint socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; - logger.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage", { - checkpoint, - socketData: socket.data, - }); + log.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage"); const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { version: "v1", @@ -921,7 +997,7 @@ class TaskCoordinator { if (ack?.keepRunAlive) { socket.data.requiresCheckpointResumeWithMessage = undefined; - logger.log("keeping run alive after task checkpoint", { runId: socket.data.runId }); + log.log("keeping run alive after task checkpoint"); return; } @@ -932,12 +1008,18 @@ class TaskCoordinator { // MARK: BATCH WAIT socket.on("WAIT_FOR_BATCH", async (message, callback) => { - logger.log("[WAIT_FOR_BATCH]", message); + const log = logger.child({ + eventName: "WAIT_FOR_BATCH", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling WAIT_FOR_BATCH", message); await chaosMonkey.call({ throwErrors: false }); if (checkpointInProgress()) { - logger.error("Checkpoint already in progress", { runId: socket.data.runId }); + log.error("Checkpoint already in progress"); callback({ willCheckpointAndRestore: false }); return; } @@ -957,10 +1039,7 @@ class TaskCoordinator { const ready = await readyToCheckpoint("WAIT_FOR_BATCH"); if (!ready.success) { - logger.error("Failed to become checkpointable", { - runId: socket.data.runId, - reason: ready.reason, - }); + log.error("Failed to become checkpointable", { reason: ready.reason }); return; } } @@ -973,21 +1052,17 @@ class TaskCoordinator { }); if (!checkpoint) { - logger.error("Failed to checkpoint", { runId: socket.data.runId }); + log.error("Failed to checkpoint"); return; } - logger.log("WAIT_FOR_BATCH checkpoint created", { - checkpoint, - socketData: socket.data, - }); + log.addFields({ checkpoint }); + + log.log("WAIT_FOR_BATCH checkpoint created"); //setting this means we can only resume from a checkpoint socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`; - logger.log("WAIT_FOR_BATCH set checkpoint", { - checkpoint, - socketData: socket.data, - }); + log.log("WAIT_FOR_BATCH set checkpoint"); const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", { version: "v1", @@ -1003,7 +1078,7 @@ class TaskCoordinator { if (ack?.keepRunAlive) { socket.data.requiresCheckpointResumeWithMessage = undefined; - logger.log("keeping run alive after batch checkpoint", { runId: socket.data.runId }); + log.log("keeping run alive after batch checkpoint"); return; } @@ -1014,7 +1089,13 @@ class TaskCoordinator { // MARK: INDEX socket.on("INDEX_TASKS", async (message, callback) => { - logger.log("[INDEX_TASKS]", message); + const log = logger.child({ + eventName: "INDEX_TASKS", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling INDEX_TASKS"); const workerAck = await this.#platformSocket?.sendWithAck("CREATE_WORKER", { version: "v2", @@ -1030,7 +1111,7 @@ class TaskCoordinator { }); if (!workerAck) { - logger.debug("no worker ack while indexing", message); + log.debug("no worker ack while indexing"); } callback({ success: !!workerAck?.success }); @@ -1038,7 +1119,13 @@ class TaskCoordinator { // MARK: INDEX FAILED socket.on("INDEXING_FAILED", async (message) => { - logger.log("[INDEXING_FAILED]", message); + const log = logger.child({ + eventName: "INDEXING_FAILED", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling INDEXING_FAILED"); this.#platformSocket?.send("INDEXING_FAILED", { version: "v1", @@ -1049,7 +1136,13 @@ class TaskCoordinator { // MARK: CREATE ATTEMPT socket.on("CREATE_TASK_RUN_ATTEMPT", async (message, callback) => { - logger.log("[CREATE_TASK_RUN_ATTEMPT]", message); + const log = logger.child({ + eventName: "CREATE_TASK_RUN_ATTEMPT", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling CREATE_TASK_RUN_ATTEMPT"); await chaosMonkey.call({ throwErrors: false }); @@ -1059,7 +1152,7 @@ class TaskCoordinator { }); if (!createAttempt?.success) { - logger.debug("no ack while creating attempt", message); + log.debug("no ack while creating attempt"); callback({ success: false, reason: createAttempt?.reason }); return; } @@ -1074,13 +1167,25 @@ class TaskCoordinator { }); socket.on("UNRECOVERABLE_ERROR", async (message) => { - logger.log("[UNRECOVERABLE_ERROR]", message); + const log = logger.child({ + eventName: "UNRECOVERABLE_ERROR", + ...getSocketMetadata(), + error: message.error, + }); + + log.log("Handling UNRECOVERABLE_ERROR"); await crashRun(message.error); }); socket.on("SET_STATE", async (message) => { - logger.log("[SET_STATE]", message); + const log = logger.child({ + eventName: "SET_STATE", + ...getSocketMetadata(), + ...message, + }); + + log.log("Handling SET_STATE"); if (message.attemptFriendlyId) { updateAttemptFriendlyId(message.attemptFriendlyId);