Skip to content

Commit

Permalink
In-memory fallback queue and no manual reconnecting of Kafka in Usage…
Browse files Browse the repository at this point in the history
… service (#6531)
  • Loading branch information
kamilkisiela authored Feb 18, 2025
1 parent 457fc3d commit 2507a9b
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 127 deletions.
8 changes: 7 additions & 1 deletion deployment/services/usage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@ export function deployUsage({
image,
imagePullSecret: docker.secret,
replicas,
readinessProbe: '/_readiness',
readinessProbe: {
initialDelaySeconds: 10,
periodSeconds: 5,
failureThreshold: 2,
timeoutSeconds: 5,
endpoint: '/_readiness',
},
livenessProbe: '/_health',
startupProbe: '/_health',
availabilityOnEveryNode: true,
Expand Down
1 change: 1 addition & 0 deletions packages/services/usage/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"graphql": "16.9.0",
"kafkajs": "2.2.4",
"lru-cache": "11.0.2",
"p-limit": "6.2.0",
"pino-pretty": "11.3.0",
"zod": "3.24.1"
}
Expand Down
50 changes: 33 additions & 17 deletions packages/services/usage/src/buffer.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import { randomUUID } from 'node:crypto';
import type { ServiceLogger } from '@hive/service-common';

export class BufferTooBigError extends Error {
class BufferTooBigError extends Error {
constructor(public bytes: number) {
super(`Buffer too big: ${bytes}`);
}
}

export function isBufferTooBigError(error: unknown): error is BufferTooBigError {
return error instanceof BufferTooBigError;
}

/**
* @param totalLength Number of all items in a list
* @param numOfChunks How many chunks to split the list into
Expand Down Expand Up @@ -142,6 +138,7 @@ export function createKVBuffer<T>(config: {
const { logger } = config;
let buffer: T[] = [];
let timeoutId: ReturnType<typeof setTimeout> | null = null;

const estimator = createEstimator({
logger,
defaultBytesPerUnit: config.limitInBytes / config.size,
Expand Down Expand Up @@ -172,11 +169,10 @@ export function createKVBuffer<T>(config: {
reports: readonly T[],
size: number,
batchId: string,
isRetry = false,
isChunkedBuffer = false,
) {
logger.info(`Flushing (reports=%s, bufferSize=%s, id=%s)`, reports.length, size, batchId);
const estimatedSizeInBytes = estimator.estimate(size);
buffer = [];
await config
.sender(reports, estimatedSizeInBytes, batchId, function validateSize(bytes) {
if (!config.useEstimator) {
Expand All @@ -203,7 +199,11 @@ export function createKVBuffer<T>(config: {
}
})
.catch(error => {
if (!isRetry && isBufferTooBigError(error)) {
if (isChunkedBuffer) {
return Promise.reject(error);
}

if (error instanceof BufferTooBigError) {
config.onRetry(reports);
logger.info(`Retrying (reports=%s, bufferSize=%s, id=%s)`, reports.length, size, batchId);

Expand Down Expand Up @@ -241,17 +241,16 @@ export function createKVBuffer<T>(config: {
});
}

async function send(shouldSchedule = true): Promise<void> {
if (timeoutId !== null) {
clearTimeout(timeoutId);
}
async function send(options: { scheduleNextSend: boolean }): Promise<void> {
const { scheduleNextSend } = options;

if (buffer.length !== 0) {
const reports = buffer.slice();
const size = calculateBufferSize(reports);
const batchId = randomUUID();

try {
buffer = [];
await flushBuffer(reports, size, batchId);
} catch (error) {
logger.error(error);
Expand All @@ -262,13 +261,24 @@ export function createKVBuffer<T>(config: {
}
}

if (shouldSchedule) {
if (scheduleNextSend) {
schedule();
}
}

function schedule() {
timeoutId = setTimeout(() => send(true), config.interval);
if (timeoutId !== null) {
clearTimeout(timeoutId);
timeoutId = null;
}

timeoutId = setTimeout(
() =>
void send({
scheduleNextSend: true,
}),
config.interval,
);
}

function add(report: T) {
Expand All @@ -278,7 +288,9 @@ export function createKVBuffer<T>(config: {
const estimatedBufferSize = currentBufferSize + estimatedReportSize;

if (currentBufferSize >= config.limitInBytes || estimatedBufferSize >= config.limitInBytes) {
void send(true);
void send({
scheduleNextSend: true,
});
}

if (estimatedReportSize > config.limitInBytes) {
Expand All @@ -293,7 +305,9 @@ export function createKVBuffer<T>(config: {
} else {
buffer.push(report);
if (sumOfOperationsSizeInBuffer() >= config.size) {
void send(true);
void send({
scheduleNextSend: true,
});
}
}
}
Expand All @@ -309,7 +323,9 @@ export function createKVBuffer<T>(config: {
if (timeoutId) {
clearTimeout(timeoutId);
}
await send(false);
await send({
scheduleNextSend: false,
});
},
};
}
89 changes: 89 additions & 0 deletions packages/services/usage/src/fallback-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import pLimit from 'p-limit';
import type { ServiceLogger } from '@hive/service-common';

// Average message size is ~800kb
// 1000 messages = 800mb
const MAX_QUEUE_SIZE = 1000;

export function createFallbackQueue(config: {
send: (msgValue: Buffer<ArrayBufferLike>, numOfOperations: number) => Promise<void>;
logger: ServiceLogger;
}) {
const queue: [Buffer<ArrayBufferLike>, number][] = [];

async function flushSingle() {
const msg = queue.shift();
if (!msg) {
return;
}

try {
const [msgValue, numOfOperations] = msg;
await config.send(msgValue, numOfOperations);
} catch (error) {
if (error instanceof Error && 'type' in error && error.type === 'MESSAGE_TOO_LARGE') {
config.logger.error('Message too large, dropping message');
return;
}

config.logger.error(
{
error: error instanceof Error ? error.message : String(error),
},
'Failed to flush message, adding back to fallback queue',
);
queue.push(msg);
}
}

let timeoutId: ReturnType<typeof setTimeout> | null = null;

function schedule() {
if (timeoutId !== null) {
clearTimeout(timeoutId);
}

timeoutId = setTimeout(async () => {
await flushSingle();
schedule();
}, 200);
}

return {
start() {
schedule();
},
stop() {
if (timeoutId !== null) {
clearTimeout(timeoutId);
}

const limit = pLimit(10);
return Promise.allSettled(
queue.map(msgValue =>
limit(() =>
config.send(msgValue[0], msgValue[1]).catch(error => {
config.logger.error(
{
error: error instanceof Error ? error.message : String(error),
},
'Failed to flush message before stopping',
);
}),
),
),
);
},
add(msgValue: Buffer<ArrayBufferLike>, numOfOperations: number) {
if (queue.length >= MAX_QUEUE_SIZE) {
config.logger.error('Queue is full, dropping oldest message');
queue.shift();
}

queue.push([msgValue, numOfOperations]);
},
size() {
return queue.length;
},
};
}
51 changes: 28 additions & 23 deletions packages/services/usage/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ async function main() {
if (!token) {
httpRequestsWithoutToken.inc();
activeSpan?.recordException('Missing token in request');
void res.status(401).send('Missing token');
await res.status(401).send('Missing token');
return;
}

if (token.length !== 32) {
activeSpan?.recordException('Invalid token');
httpRequestsWithoutToken.inc();
void res.status(401).send('Invalid token');
await res.status(401).send('Invalid token');
return;
}

Expand All @@ -205,7 +205,7 @@ async function main() {
httpRequestsWithNonExistingToken.inc();
req.log.info('Token not found (token=%s)', maskedToken);
activeSpan?.recordException('Token not found');
void res.status(401).send('Missing token');
await res.status(401).send('Missing token');
return;
}

Expand All @@ -217,7 +217,7 @@ async function main() {
httpRequestsWithNoAccess.inc();
req.log.info('No access (token=%s)', maskedToken);
activeSpan?.recordException('No access');
void res.status(403).send('No access');
await res.status(403).send('No access');
return;
}

Expand Down Expand Up @@ -259,13 +259,13 @@ async function main() {
droppedReports
.labels({ targetId: tokenInfo.target, orgId: tokenInfo.organization })
.inc();
authenticatedRequestLogger.info(
authenticatedRequestLogger.debug(
'Rate limited',
maskedToken,
tokenInfo.target,
tokenInfo.organization,
);
void res.status(429).send();
await res.status(429).send();

return;
}
Expand Down Expand Up @@ -297,7 +297,7 @@ async function main() {
// 503 - Service Unavailable
// The server is currently unable to handle the request due being not ready.
// This tells the gateway to retry the request and not to drop it.
void res.status(503).send();
await res.status(503).send();
return;
}

Expand All @@ -311,11 +311,14 @@ async function main() {
stopTimer({
status: 'success',
});
void res.status(200).send({
await res.status(200).send({
id: result.report.id,
operations: result.operations,
});
} else if (apiVersion === '2') {
return;
}

if (apiVersion === '2') {
activeSpan?.addEvent('using v2');
const result = measureParsing(
() => usageProcessorV2(server.log, req.body, tokenInfo, retentionInfo),
Expand All @@ -336,7 +339,7 @@ async function main() {
activeSpan?.recordException(error.path + ': ' + error.message),
);

void res.status(400).send({
await res.status(400).send({
errors: result.errors,
});

Expand All @@ -347,18 +350,20 @@ async function main() {
stopTimer({
status: 'success',
});
void res.status(200).send({
await res.status(200).send({
id: result.report.id,
operations: result.operations,
});
} else {
authenticatedRequestLogger.debug("Invalid 'x-api-version' header value.");
stopTimer({
status: 'error',
});
activeSpan?.recordException("Invalid 'x-api-version' header value.");
void res.status(401).send("Invalid 'x-api-version' header value.");
return;
}

authenticatedRequestLogger.debug("Invalid 'x-api-version' header value.");
stopTimer({
status: 'error',
});
activeSpan?.recordException("Invalid 'x-api-version' header value.");
await res.status(401).send("Invalid 'x-api-version' header value.");
return;
} catch (error) {
stopTimer({
status: 'error',
Expand All @@ -369,26 +374,26 @@ async function main() {
level: 'error',
});
activeSpan?.recordException(error as Error);
void res.status(500).send();
await res.status(500).send();
}
}),
});

server.route({
method: ['GET', 'HEAD'],
url: '/_health',
handler(_, res) {
void res.status(200).send();
async handler(_, res) {
await res.status(200).send();
},
});

server.route({
method: ['GET', 'HEAD'],
url: '/_readiness',
handler(_, res) {
async handler(_, res) {
const isReady = readiness();
reportReadiness(isReady);
void res.status(isReady ? 200 : 400).send();
await res.status(isReady ? 200 : 400).send();
},
});

Expand Down
Loading

0 comments on commit 2507a9b

Please sign in to comment.