Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better logging and error handling for pollers #124

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cdk/lib/__snapshots__/newswires.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -2531,7 +2531,7 @@ exports[`The Newswires stack matches the snapshot 1`] = `
"FunctionName": "editorial-feeds-TEST-apPoller_poller_lambda",
"Handler": "index.handlers.apPoller",
"LoggingConfig": {
"LogFormat": "JSON",
"LogFormat": "Text",
},
"MemorySize": 128,
"RecursiveLoop": "Allow",
Expand Down Expand Up @@ -3615,7 +3615,7 @@ dpkg -i /newswires/newswires.deb",
"FunctionName": "editorial-feeds-TEST-reuters_poller_lambda",
"Handler": "index.handlers.reuters",
"LoggingConfig": {
"LogFormat": "JSON",
"LogFormat": "Text",
},
"MemorySize": 128,
"RecursiveLoop": "Allow",
Expand Down
3 changes: 2 additions & 1 deletion cdk/lib/constructs/pollerLambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
Metric,
TreatMissingData,
} from 'aws-cdk-lib/aws-cloudwatch';
import { RecursiveLoop } from 'aws-cdk-lib/aws-lambda';
import { LoggingFormat, RecursiveLoop } from 'aws-cdk-lib/aws-lambda';
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
import { Secret } from 'aws-cdk-lib/aws-secretsmanager';
import type { PollerConfig, PollerId } from '../../../shared/pollers';
Expand Down Expand Up @@ -100,6 +100,7 @@ export class PollerLambda {
timeout,
handler: `index.handlers.${pollerId}`, // see programmatically generated exports in poller-lambdas/src/index.ts
fileName: `poller-lambdas.zip`, // shared zip for all the poller-lambdas
loggingFormat: LoggingFormat.TEXT,
});

secret.grantRead(lambda);
Expand Down
31 changes: 24 additions & 7 deletions poller-lambdas/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import {
} from '@aws-sdk/client-secrets-manager';
import type { SendMessageCommandInput } from '@aws-sdk/client-sqs';
import { SendMessageCommand } from '@aws-sdk/client-sqs';
import {
POLLER_FAILURE_EVENT_TYPE,
POLLER_INVOCATION_EVENT_TYPE,
} from '../../shared/constants';
import { createLogger } from '../../shared/lambda-logging';
import type { PollerId } from '../../shared/pollers';
import { POLLER_LAMBDA_ENV_VAR_KEYS } from '../../shared/pollers';
import { queueNextInvocation, secretsManager, sqs } from './aws';
Expand All @@ -16,6 +21,13 @@ import { isFixedFrequencyPollOutput } from './types';
const pollerWrapper =
(pollerFunction: PollFunction) =>
async ({ Records }: HandlerInputSqsPayload) => {
const logger = createLogger({
sqsMessageId: Records.map((record) => record.messageId).join(', '),
});
logger.log({
message: `Poller lambda invoked with SQS message id: ${Records.map((record) => record.messageId).join(', ')}`,
eventType: POLLER_INVOCATION_EVENT_TYPE,
});
const startTimeEpochMillis = Date.now();
const secretName = getEnvironmentVariableOrCrash(
POLLER_LAMBDA_ENV_VAR_KEYS.SECRET_NAME,
Expand All @@ -37,7 +49,7 @@ const pollerWrapper =
}
for (const record of Records) {
const valueFromPreviousPoll = record.body;
await pollerFunction(secret, valueFromPreviousPoll)
await pollerFunction({ secret, input: valueFromPreviousPoll, logger })
.then(async (output) => {
const endTimeEpochMillis = Date.now();

Expand All @@ -64,11 +76,11 @@ const pollerWrapper =
},
};
await sqs.send(new SendMessageCommand(message)).catch((error) => {
console.error(
`sending to queue failed for ${externalId}`,
message,
error,
);
logger.error({
message: `Sending to queue failed for ${externalId}`,
error: error instanceof Error ? error.message : error,
queueMessage: JSON.stringify(message),
});
throw error; // we still expect this to be terminal for the poller lambda
});
}
Expand Down Expand Up @@ -107,7 +119,12 @@ const pollerWrapper =
}
})
.catch((error) => {
console.error('FAILED', error);
logger.error({
message: `Poller lambda failed with message: ${error instanceof Error ? error.message : error}`,
sqsMessageId: Records.map((record) => record.messageId).join(', '),
eventType: POLLER_FAILURE_EVENT_TYPE,
pollerName: pollerFunction.name,
});
// consider still queuing next (perhaps with default delay or 1min) to avoid the lambda from stopping entirely
throw error;
});
Expand Down
7 changes: 3 additions & 4 deletions poller-lambdas/src/pollers/ap/apPoller.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import type {
IngestorPayload,
LongPollFunction,
PollerInput,
SecretValue,
PollFunctionInput,
} from '../../types';
import type {
Contentitem,
Expand All @@ -19,7 +18,7 @@ type FeedItemWithContent = {
};

// https://api.ap.org/media/v/content/feed?page_size=10&in_my_plan=true&include=*
export const apPoller = (async (secret: SecretValue, input: PollerInput) => {
export const apPoller = (async ({ secret, input }: PollFunctionInput) => {
const baseUrl = 'https://api.ap.org/media/v';
const defaultFeedUrl = `${baseUrl}/content/feed?page_size=10&in_my_plan=true&include=*`;
const apiKey = secret;
Expand Down Expand Up @@ -204,7 +203,7 @@ function itemWithContentToDesiredOutput({
ednote,
imageIds: associations
? Object.keys(associations)
.filter(key => associations[key]?.type === 'picture')
.filter((key) => associations[key]?.type === 'picture')
.map((key) => associations[key]?.altids?.itemid)
.filter((item): item is string => item !== undefined)
: [],
Expand Down
44 changes: 31 additions & 13 deletions poller-lambdas/src/pollers/reuters/reutersPoller.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { parse } from 'node-html-parser';
import { z } from 'zod';
import { POLLER_FAILURE_EVENT_TYPE } from '../../../../shared/constants';
import { REUTERS_POLLING_FREQUENCY_IN_SECONDS } from '../../../../shared/pollers';
import type { IngestorInputBody } from '../../../../shared/types';
import type {
FixedFrequencyPollFunction,
PollerInput,
SecretValue,
PollFunctionInput,
} from '../../types';
import { auth } from './auth';

Expand Down Expand Up @@ -139,6 +139,8 @@ const itemResponseSchema = z.object({
}),
});

type ItemResponse = z.infer<typeof itemResponseSchema>;

function itemResponseToIngestionLambdaInput(
item: z.infer<typeof ReutersItemSchema>,
): IngestorInputBody {
Expand Down Expand Up @@ -177,6 +179,7 @@ function itemResponseToIngestionLambdaInput(
body_text: bodyHtml,
copyrightNotice: item.copyrightNotice,
language: item.language,
imageIds: [],
};
}

Expand All @@ -186,10 +189,11 @@ const SecretValueSchema = z.object({
ACCESS_TOKEN: z.string().optional(),
});

export const reutersPoller = (async (
secret: SecretValue,
input: PollerInput,
) => {
export const reutersPoller = (async ({
secret,
input,
logger,
}: PollFunctionInput) => {
const parsedSecret = SecretValueSchema.safeParse(JSON.parse(secret));
if (!parsedSecret.success) {
throw new Error('Failed to parse secret value for Reuters poller');
Expand Down Expand Up @@ -279,16 +283,30 @@ export const reutersPoller = (async (
.filter((guid): guid is string => guid !== undefined);

const itemResponses = await Promise.all(
itemsToFetch.map(async (itemId) =>
itemResponseSchema.parse(await fetchWithReauth(itemQuery(itemId))),
),
itemsToFetch.map(async (itemId) => {
const parsedItemResult = itemResponseSchema.safeParse(
await fetchWithReauth(itemQuery(itemId)),
);
if (!parsedItemResult.success) {
logger.log({
externalId: itemId,
supplier: 'Reuters',
eventType: POLLER_FAILURE_EVENT_TYPE,
errors: parsedItemResult.error.errors,
message: `Failed to parse item response for ${itemId}`,
});
}
return parsedItemResult.data;
}),
);

return {
payloadForIngestionLambda: itemResponses.map((response) => ({
externalId: response.data.item.versionedGuid,
body: itemResponseToIngestionLambdaInput(response.data.item),
})),
payloadForIngestionLambda: itemResponses
.filter((_): _ is ItemResponse => typeof _ !== 'undefined')
.map((response) => ({
externalId: response.data.item.versionedGuid,
body: itemResponseToIngestionLambdaInput(response.data.item),
})),
valueForNextPoll: input,
idealFrequencyInSeconds: REUTERS_POLLING_FREQUENCY_IN_SECONDS,
newSecretValue:
Expand Down
25 changes: 16 additions & 9 deletions poller-lambdas/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { Logger } from '../../shared/lambda-logging';
import type { IngestorInputBody } from '../../shared/types';

export type SecretValue = string;
Expand All @@ -22,15 +23,19 @@ export const isFixedFrequencyPollOutput = (
output: LongPollOutput | FixedFrequencyPollOutput,
): output is FixedFrequencyPollOutput => 'idealFrequencyInSeconds' in output;

export type LongPollFunction = (
secret: SecretValue,
input: PollerInput,
) => Promise<LongPollOutput>;
export type PollFunctionInput = {
secret: SecretValue;
input: PollerInput;
logger: Logger;
};

export type LongPollFunction = ({
input,
}: PollFunctionInput) => Promise<LongPollOutput>;

export type FixedFrequencyPollFunction = (
secret: SecretValue,
input: PollerInput,
) => Promise<FixedFrequencyPollOutput>;
export type FixedFrequencyPollFunction = ({
input,
}: PollFunctionInput) => Promise<FixedFrequencyPollOutput>;

export type PollFunction = LongPollFunction | FixedFrequencyPollFunction;

Expand All @@ -39,4 +44,6 @@ export type IngestorPayload = {
body?: IngestorInputBody;
};

export type HandlerInputSqsPayload = { Records: Array<{ body: string }> };
export type HandlerInputSqsPayload = {
Records: Array<{ body: string; messageId: string }>;
};
2 changes: 2 additions & 0 deletions shared/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
export const STACK = 'editorial-feeds';

export const SUCCESSFUL_INGESTION_EVENT_TYPE = 'SUCCESSFUL_INGESTION';
export const POLLER_FAILURE_EVENT_TYPE = 'POLLER_FAILURE';
export const POLLER_INVOCATION_EVENT_TYPE = 'POLLER_INVOCATION';
40 changes: 40 additions & 0 deletions shared/lambda-logging.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
interface LogLine {
/**
* The message to log.
*/
message: string;
/**
* Any additional markers to log.
*/
[key: string]: unknown;
}

export interface Logger {
log: (line: LogLine) => void;
debug: (line: LogLine) => void;
warn: (line: LogLine) => void;
error: (line: LogLine) => void;
}

/**
* Produces a log message with markers compatible with https://github.com/guardian/cloudwatch-logs-management.
* Note: if using within AWS Lambda, the Lambda must also log in text format not JSON.
*
* @see https://github.com/guardian/cloudwatch-logs-management/issues/326
*/
export function createLogger(defaultFields: Omit<LogLine, 'message'>): Logger {
return {
log: (line: LogLine) => {
console.log(JSON.stringify({ ...defaultFields, ...line }));
},
debug: (line: LogLine) => {
console.debug(JSON.stringify({ ...defaultFields, ...line }));
},
warn: (line: LogLine) => {
console.warn(JSON.stringify({ ...defaultFields, ...line }));
},
error: (line: LogLine) => {
console.error(JSON.stringify({ ...defaultFields, ...line }));
},
};
}
Loading