diff --git a/.changeset/honest-scissors-live.md b/.changeset/honest-scissors-live.md new file mode 100644 index 0000000000..0905043985 --- /dev/null +++ b/.changeset/honest-scissors-live.md @@ -0,0 +1,7 @@ +--- +'hive': minor +--- + +Added a new environment variable `OPENTELEMETRY_TRACE_USAGE_REQUESTS` for `rate-limit` and `tokens` services. + +Self-hosters who wish to report telemetry information for `usage` service, can opt-in and set `OPENTELEMETRY_TRACE_USAGE_REQUESTS=1` to these services. This will skip sampling and will always trace requests originating from the `usage` service. diff --git a/.changeset/tender-maps-shout.md b/.changeset/tender-maps-shout.md new file mode 100644 index 0000000000..4a7f4b7806 --- /dev/null +++ b/.changeset/tender-maps-shout.md @@ -0,0 +1,9 @@ +--- +'hive': minor +--- + +Added OpenTelemetry traces to Usage service using a new `OPENTELEMETRY_COLLECTOR_ENDPOINT` env var. + +This option is disabled by default for self-hosting, you can opt-in by setting `OPENTELEMETRY_COLLECTOR_ENDPOINT`. + + diff --git a/deployment/index.ts b/deployment/index.ts index 6828b95307..bd7ae18656 100644 --- a/deployment/index.ts +++ b/deployment/index.ts @@ -190,6 +190,7 @@ const usage = deployUsage({ dbMigrations, rateLimit, sentry, + observability, }); const usageIngestor = deployUsageIngestor({ diff --git a/deployment/services/observability.ts b/deployment/services/observability.ts index b2896a88fc..3f0685dabd 100644 --- a/deployment/services/observability.ts +++ b/deployment/services/observability.ts @@ -3,6 +3,9 @@ import { serviceLocalHost } from '../utils/local-endpoint'; import { Observability as ObservabilityInstance } from '../utils/observability'; import { deployGrafana } from './grafana'; +// Change this to control OTEL tracing for usage service +const enableTracingForUsageService = true; + export function deployObservability(config: { envName: string; /** @@ -57,6 +60,7 @@ export function deployObservability(config: { observability: observabilityInstance, grafana: useLocal ? undefined : deployGrafana(config.envName, config.tableSuffix), enabled: true, + enabledForUsageService: enableTracingForUsageService, }; } diff --git a/deployment/services/rate-limit.ts b/deployment/services/rate-limit.ts index ba0d85ff84..ea97753990 100644 --- a/deployment/services/rate-limit.ts +++ b/deployment/services/rate-limit.ts @@ -47,6 +47,7 @@ export function deployRateLimit({ USAGE_ESTIMATOR_ENDPOINT: serviceLocalEndpoint(usageEstimator.service), EMAILS_ENDPOINT: serviceLocalEndpoint(emails.service), WEB_APP_URL: `https://${environment.appDns}/`, + OPENTELEMETRY_TRACE_USAGE_REQUESTS: observability.enabledForUsageService ? '1' : '', OPENTELEMETRY_COLLECTOR_ENDPOINT: observability.enabled && observability.tracingEndpoint ? observability.tracingEndpoint diff --git a/deployment/services/tokens.ts b/deployment/services/tokens.ts index e6fbf65eb9..685d9d6a01 100644 --- a/deployment/services/tokens.ts +++ b/deployment/services/tokens.ts @@ -45,6 +45,7 @@ export function deployTokens({ ...environment.envVars, SENTRY: sentry.enabled ? '1' : '0', HEARTBEAT_ENDPOINT: heartbeat ?? '', + OPENTELEMETRY_TRACE_USAGE_REQUESTS: observability.enabledForUsageService ? '1' : '', OPENTELEMETRY_COLLECTOR_ENDPOINT: observability.enabled && observability.tracingEndpoint ? observability.tracingEndpoint diff --git a/deployment/services/usage.ts b/deployment/services/usage.ts index d688333408..96e6a92e4f 100644 --- a/deployment/services/usage.ts +++ b/deployment/services/usage.ts @@ -5,6 +5,7 @@ import { DbMigrations } from './db-migrations'; import { Docker } from './docker'; import { Environment } from './environment'; import { Kafka } from './kafka'; +import { Observability } from './observability'; import { RateLimitService } from './rate-limit'; import { Sentry } from './sentry'; import { Tokens } from './tokens'; @@ -19,8 +20,10 @@ export function deployUsage({ rateLimit, image, docker, + observability, sentry, }: { + observability: Observability; image: string; environment: Environment; tokens: Tokens; @@ -58,6 +61,12 @@ export function deployUsage({ KAFKA_TOPIC: kafka.config.topic, TOKENS_ENDPOINT: serviceLocalEndpoint(tokens.service), RATE_LIMIT_ENDPOINT: serviceLocalEndpoint(rateLimit.service), + OPENTELEMETRY_COLLECTOR_ENDPOINT: + observability.enabled && + observability.enabledForUsageService && + observability.tracingEndpoint + ? observability.tracingEndpoint + : '', }, exposesMetrics: true, port: 4000, diff --git a/deployment/utils/observability.ts b/deployment/utils/observability.ts index 706042bbce..6c99af9bc1 100644 --- a/deployment/utils/observability.ts +++ b/deployment/utils/observability.ts @@ -111,8 +111,8 @@ export class Observability { replicaCount: 1, resources: { limits: { - cpu: '256m', - memory: '512Mi', + cpu: '512m', + memory: '1000Mi', }, }, podAnnotations: { @@ -196,12 +196,11 @@ export class Observability { 'attributes["component"] == "proxy" and attributes["http.method"] == "GET" and attributes["http.url"] == "/_health"', 'attributes["component"] == "proxy" and attributes["http.method"] == "GET" and IsMatch(attributes["http.url"], ".*/_health") == true', // Ignore Contour/Envoy traces for /usage requests - 'attributes["component"] == "proxy" and attributes["http.method"] == "POST" and attributes["http.url"] == "/usage"', - 'attributes["component"] == "proxy" and attributes["http.method"] == "POST" and IsMatch(attributes["upstream_cluster.name"], "default_usage-service-.*") == true', - 'attributes["component"] == "proxy" and attributes["http.method"] == "POST" and IsMatch(attributes["upstream_cluster.name"], "default_app-.*") == true', + 'attributes["component"] == "proxy" and attributes["http.method"] == "POST" and attributes["http.url"] == "/usage" and (attributes["http.status_code"] == "200" or attributes["http.status_code"] == "429")', // Ignore metrics scraping 'attributes["component"] == "proxy" and attributes["http.method"] == "GET" and attributes["http.url"] == "/metrics"', // Ignore webapp HTTP calls + 'attributes["component"] == "proxy" and attributes["http.method"] == "POST" and IsMatch(attributes["upstream_cluster.name"], "default_app-.*") == true', 'attributes["component"] == "proxy" and attributes["http.method"] == "GET" and IsMatch(attributes["upstream_cluster.name"], "default_app-.*") == true', ], }, diff --git a/packages/services/rate-limit/.env.template b/packages/services/rate-limit/.env.template index f8bae2314c..653b852d2e 100644 --- a/packages/services/rate-limit/.env.template +++ b/packages/services/rate-limit/.env.template @@ -8,4 +8,5 @@ USAGE_ESTIMATOR_ENDPOINT=http://localhost:4011 EMAILS_ENDPOINT=http://localhost:6260 WEB_APP_URL=http://localhost:3000 OPENTELEMETRY_COLLECTOR_ENDPOINT="" -LIMIT_CACHE_UPDATE_INTERVAL_MS=2000 \ No newline at end of file +LIMIT_CACHE_UPDATE_INTERVAL_MS=2000 +OPENTELEMETRY_TRACE_USAGE_REQUESTS=1 \ No newline at end of file diff --git a/packages/services/rate-limit/README.md b/packages/services/rate-limit/README.md index 7038fd720a..907654f5d8 100644 --- a/packages/services/rate-limit/README.md +++ b/packages/services/rate-limit/README.md @@ -5,23 +5,24 @@ you don't need this service. ## Configuration -| Name | Required | Description | Example Value | -| ----------------------------------- | -------------------------------------------------- | -------------------------------------------------------------------------------------------------------- | ---------------------------------------------------- | -| `PORT` | **Yes** | The HTTP port of the service. | `4012` | -| `LIMIT_CACHE_UPDATE_INTERVAL_MS` | No | The cache update interval limit in milliseconds. | `60_000` | -| `POSTGRES_HOST` | **Yes** | Host of the postgres database | `127.0.0.1` | -| `POSTGRES_PORT` | **Yes** | Port of the postgres database | `5432` | -| `POSTGRES_DB` | **Yes** | Name of the postgres database. | `registry` | -| `POSTGRES_USER` | **Yes** | User name for accessing the postgres database. | `postgres` | -| `POSTGRES_PASSWORD` | **Yes** | Password for accessing the postgres database. | `postgres` | -| `USAGE_ESTIMATOR_ENDPOINT` | **Yes** | The endpoint of the usage estimator service. | `http://127.0.0.1:4011` | -| `EMAILS_ENDPOINT` | No (if not provided no limit emails will be sent.) | The endpoint of the GraphQL Hive Email service. | `http://127.0.0.1:6260` | -| `ENVIRONMENT` | No | The environment of your Hive app. (**Note:** This will be used for Sentry reporting.) | `staging` | -| `SENTRY` | No | Whether Sentry error reporting should be enabled. | `1` (enabled) or `0` (disabled) | -| `SENTRY_DSN` | No | The DSN for reporting errors to Sentry. | `https://dooobars@o557896.ingest.sentry.io/12121212` | -| `PROMETHEUS_METRICS` | No | Whether Prometheus metrics should be enabled | `1` (enabled) or `0` (disabled) | -| `PROMETHEUS_METRICS_LABEL_INSTANCE` | No | The instance label added for the prometheus metrics. | `rate-limit` | -| `WEB_APP_URL` | No | The base url of the web app | `https://your-instance.com` | -| `REQUEST_LOGGING` | No | Log http requests | `1` (enabled) or `0` (disabled) | -| `LOG_LEVEL` | No | The verbosity of the service logs. One of `trace`, `debug`, `info`, `warn` ,`error`, `fatal` or `silent` | `info` (default) | -| `OPENTELEMETRY_COLLECTOR_ENDPOINT` | No | OpenTelemetry Collector endpoint. The expected traces transport is HTTP (port `4318`). | `http://localhost:4318/v1/traces` | +| Name | Required | Description | Example Value | +| ------------------------------------ | -------------------------------------------------- | -------------------------------------------------------------------------------------------------------- | ---------------------------------------------------- | +| `PORT` | **Yes** | The HTTP port of the service. | `4012` | +| `LIMIT_CACHE_UPDATE_INTERVAL_MS` | No | The cache update interval limit in milliseconds. | `60_000` | +| `POSTGRES_HOST` | **Yes** | Host of the postgres database | `127.0.0.1` | +| `POSTGRES_PORT` | **Yes** | Port of the postgres database | `5432` | +| `POSTGRES_DB` | **Yes** | Name of the postgres database. | `registry` | +| `POSTGRES_USER` | **Yes** | User name for accessing the postgres database. | `postgres` | +| `POSTGRES_PASSWORD` | **Yes** | Password for accessing the postgres database. | `postgres` | +| `USAGE_ESTIMATOR_ENDPOINT` | **Yes** | The endpoint of the usage estimator service. | `http://127.0.0.1:4011` | +| `EMAILS_ENDPOINT` | No (if not provided no limit emails will be sent.) | The endpoint of the GraphQL Hive Email service. | `http://127.0.0.1:6260` | +| `ENVIRONMENT` | No | The environment of your Hive app. (**Note:** This will be used for Sentry reporting.) | `staging` | +| `SENTRY` | No | Whether Sentry error reporting should be enabled. | `1` (enabled) or `0` (disabled) | +| `SENTRY_DSN` | No | The DSN for reporting errors to Sentry. | `https://dooobars@o557896.ingest.sentry.io/12121212` | +| `PROMETHEUS_METRICS` | No | Whether Prometheus metrics should be enabled | `1` (enabled) or `0` (disabled) | +| `PROMETHEUS_METRICS_LABEL_INSTANCE` | No | The instance label added for the prometheus metrics. | `rate-limit` | +| `WEB_APP_URL` | No | The base url of the web app | `https://your-instance.com` | +| `REQUEST_LOGGING` | No | Log http requests | `1` (enabled) or `0` (disabled) | +| `LOG_LEVEL` | No | The verbosity of the service logs. One of `trace`, `debug`, `info`, `warn` ,`error`, `fatal` or `silent` | `info` (default) | +| `OPENTELEMETRY_COLLECTOR_ENDPOINT` | No | OpenTelemetry Collector endpoint. The expected traces transport is HTTP (port `4318`). | `http://localhost:4318/v1/traces` | +| `OPENTELEMETRY_TRACE_USAGE_REQUESTS` | No | If enabled, requests send to this service from `usage` service will be monitored with OTEL. | `1` (enabled, or ``) | diff --git a/packages/services/rate-limit/src/environment.ts b/packages/services/rate-limit/src/environment.ts index 8bca21e6a6..4fe4b6bcf2 100644 --- a/packages/services/rate-limit/src/environment.ts +++ b/packages/services/rate-limit/src/environment.ts @@ -82,8 +82,13 @@ const configs = { prometheus: PrometheusModel.safeParse(process.env), // eslint-disable-next-line no-process-env log: LogModel.safeParse(process.env), - // eslint-disable-next-line no-process-env - tracing: OpenTelemetryConfigurationModel.safeParse(process.env), + tracing: zod + .object({ + ...OpenTelemetryConfigurationModel.shape, + OPENTELEMETRY_TRACE_USAGE_REQUESTS: emptyString(zod.literal('1').optional()), + }) + // eslint-disable-next-line no-process-env + .safeParse(process.env), }; const environmentErrors: Array = []; @@ -129,6 +134,7 @@ export const env = { tracing: { enabled: !!tracing.OPENTELEMETRY_COLLECTOR_ENDPOINT, collectorEndpoint: tracing.OPENTELEMETRY_COLLECTOR_ENDPOINT, + traceRequestsFromUsageService: tracing.OPENTELEMETRY_TRACE_USAGE_REQUESTS === '1', }, postgres: { host: postgres.POSTGRES_HOST, diff --git a/packages/services/rate-limit/src/index.ts b/packages/services/rate-limit/src/index.ts index cebaf59d06..57e48eb2a5 100644 --- a/packages/services/rate-limit/src/index.ts +++ b/packages/services/rate-limit/src/index.ts @@ -24,8 +24,11 @@ async function main() { tracing = configureTracing({ collectorEndpoint: env.tracing.collectorEndpoint, serviceName: 'rate-limit', - sampler(ctx, traceId, spanName, spanKind, attributes) { - if (attributes['requesting.service'] === 'usage') { + sampler: (ctx, traceId, spanName, spanKind, attributes) => { + if ( + attributes['requesting.service'] === 'usage' && + !env.tracing.traceRequestsFromUsageService + ) { return { decision: SamplingDecision.NOT_RECORD, }; diff --git a/packages/services/service-common/src/tracing.ts b/packages/services/service-common/src/tracing.ts index c351a6e4fb..d84104eaf2 100644 --- a/packages/services/service-common/src/tracing.ts +++ b/packages/services/service-common/src/tracing.ts @@ -42,7 +42,6 @@ import openTelemetryPlugin, { OpenTelemetryPluginOptions } from './fastify-traci export { trace, context, Span, SpanKind, SamplingDecision, SpanStatusCode }; type Instrumentations = NodeSDKConfiguration['instrumentations']; - export class TracingInstance { private instrumentations: Instrumentations = []; private sdk: NodeSDK | undefined; diff --git a/packages/services/tokens/.env.template b/packages/services/tokens/.env.template index e950d62275..0a900b60fc 100644 --- a/packages/services/tokens/.env.template +++ b/packages/services/tokens/.env.template @@ -8,4 +8,5 @@ REDIS_PORT="6379" REDIS_PASSWORD="" PORT=6001 OPENTELEMETRY_COLLECTOR_ENDPOINT="" -LOG_LEVEL="debug" \ No newline at end of file +LOG_LEVEL="debug" +OPENTELEMETRY_TRACE_USAGE_REQUESTS=1 \ No newline at end of file diff --git a/packages/services/tokens/README.md b/packages/services/tokens/README.md index b7840c8b21..69c2965604 100644 --- a/packages/services/tokens/README.md +++ b/packages/services/tokens/README.md @@ -5,26 +5,27 @@ APIs (usage service and GraphQL API). ## Configuration -| Name | Required | Description | Example Value | -| ----------------------------------- | -------- | -------------------------------------------------------------------------------------------------------- | ---------------------------------------------------- | -| `PORT` | **Yes** | The port this service is running on. | `6001` | -| `POSTGRES_HOST` | **Yes** | Host of the postgres database | `127.0.0.1` | -| `POSTGRES_PORT` | **Yes** | Port of the postgres database | `5432` | -| `POSTGRES_DB` | **Yes** | Name of the postgres database. | `registry` | -| `POSTGRES_USER` | **Yes** | User name for accessing the postgres database. | `postgres` | -| `POSTGRES_PASSWORD` | **Yes** | Password for accessing the postgres database. | `postgres` | -| `POSTGRES_SSL` | No | Whether the postgres connection should be established via SSL. | `1` (enabled) or `0` (disabled) | -| `REDIS_HOST` | **Yes** | The host of your redis instance. | `"127.0.0.1"` | -| `REDIS_PORT` | **Yes** | The port of your redis instance. | `6379` | -| `REDIS_PASSWORD` | **Yes** | The password of your redis instance. | `"apollorocks"` | -| `REDIS_TLS_ENABLED` | **No** | Enable TLS for redis connection (rediss://). | `"0"` | -| `RATE_LIMIT_ENDPOINT` | **Yes** | The endpoint of the rate limiting service. | `http://127.0.0.1:4012` | -| `ENVIRONMENT` | No | The environment of your Hive app. (**Note:** This will be used for Sentry reporting.) | `staging` | -| `SENTRY` | No | Whether Sentry error reporting should be enabled. | `1` (enabled) or `0` (disabled) | -| `SENTRY_DSN` | No | The DSN for reporting errors to Sentry. | `https://dooobars@o557896.ingest.sentry.io/12121212` | -| `PROMETHEUS_METRICS` | No | Whether Prometheus metrics should be enabled | `1` (enabled) or `0` (disabled) | -| `PROMETHEUS_METRICS_LABEL_INSTANCE` | No | The instance label added for the prometheus metrics. | `tokens` | -| `PROMETHEUS_METRICS_PORT` | No | Port on which prometheus metrics are exposed | Defaults to `10254` | -| `REQUEST_LOGGING` | No | Log http requests | `1` (enabled) or `0` (disabled) | -| `LOG_LEVEL` | No | The verbosity of the service logs. One of `trace`, `debug`, `info`, `warn` ,`error`, `fatal` or `silent` | `info` (default) | -| `OPENTELEMETRY_COLLECTOR_ENDPOINT` | No | OpenTelemetry Collector endpoint. The expected traces transport is HTTP (port `4318`). | `http://localhost:4318/v1/traces` | +| Name | Required | Description | Example Value | +| ------------------------------------ | -------- | -------------------------------------------------------------------------------------------------------- | ---------------------------------------------------- | +| `PORT` | **Yes** | The port this service is running on. | `6001` | +| `POSTGRES_HOST` | **Yes** | Host of the postgres database | `127.0.0.1` | +| `POSTGRES_PORT` | **Yes** | Port of the postgres database | `5432` | +| `POSTGRES_DB` | **Yes** | Name of the postgres database. | `registry` | +| `POSTGRES_USER` | **Yes** | User name for accessing the postgres database. | `postgres` | +| `POSTGRES_PASSWORD` | **Yes** | Password for accessing the postgres database. | `postgres` | +| `POSTGRES_SSL` | No | Whether the postgres connection should be established via SSL. | `1` (enabled) or `0` (disabled) | +| `REDIS_HOST` | **Yes** | The host of your redis instance. | `"127.0.0.1"` | +| `REDIS_PORT` | **Yes** | The port of your redis instance. | `6379` | +| `REDIS_PASSWORD` | **Yes** | The password of your redis instance. | `"apollorocks"` | +| `REDIS_TLS_ENABLED` | **No** | Enable TLS for redis connection (rediss://). | `"0"` | +| `RATE_LIMIT_ENDPOINT` | **Yes** | The endpoint of the rate limiting service. | `http://127.0.0.1:4012` | +| `ENVIRONMENT` | No | The environment of your Hive app. (**Note:** This will be used for Sentry reporting.) | `staging` | +| `SENTRY` | No | Whether Sentry error reporting should be enabled. | `1` (enabled) or `0` (disabled) | +| `SENTRY_DSN` | No | The DSN for reporting errors to Sentry. | `https://dooobars@o557896.ingest.sentry.io/12121212` | +| `PROMETHEUS_METRICS` | No | Whether Prometheus metrics should be enabled | `1` (enabled) or `0` (disabled) | +| `PROMETHEUS_METRICS_LABEL_INSTANCE` | No | The instance label added for the prometheus metrics. | `tokens` | +| `PROMETHEUS_METRICS_PORT` | No | Port on which prometheus metrics are exposed | Defaults to `10254` | +| `REQUEST_LOGGING` | No | Log http requests | `1` (enabled) or `0` (disabled) | +| `LOG_LEVEL` | No | The verbosity of the service logs. One of `trace`, `debug`, `info`, `warn` ,`error`, `fatal` or `silent` | `info` (default) | +| `OPENTELEMETRY_COLLECTOR_ENDPOINT` | No | OpenTelemetry Collector endpoint. The expected traces transport is HTTP (port `4318`). | `http://localhost:4318/v1/traces` | +| `OPENTELEMETRY_TRACE_USAGE_REQUESTS` | No | If enabled, requests send to this service from `usage` service will be monitored with OTEL. | `1` (enabled, or ``) | diff --git a/packages/services/tokens/src/environment.ts b/packages/services/tokens/src/environment.ts index b26e220500..d907904d31 100644 --- a/packages/services/tokens/src/environment.ts +++ b/packages/services/tokens/src/environment.ts @@ -89,8 +89,13 @@ const configs = { prometheus: PrometheusModel.safeParse(process.env), // eslint-disable-next-line no-process-env log: LogModel.safeParse(process.env), - // eslint-disable-next-line no-process-env - tracing: OpenTelemetryConfigurationModel.safeParse(process.env), + tracing: zod + .object({ + ...OpenTelemetryConfigurationModel.shape, + OPENTELEMETRY_TRACE_USAGE_REQUESTS: emptyString(zod.literal('1').optional()), + }) + // eslint-disable-next-line no-process-env + .safeParse(process.env), }; const environmentErrors: Array = []; @@ -131,6 +136,7 @@ export const env = { tracing: { enabled: !!tracing.OPENTELEMETRY_COLLECTOR_ENDPOINT, collectorEndpoint: tracing.OPENTELEMETRY_COLLECTOR_ENDPOINT, + traceRequestsFromUsageService: tracing.OPENTELEMETRY_TRACE_USAGE_REQUESTS === '1', }, postgres: { host: postgres.POSTGRES_HOST, diff --git a/packages/services/usage/.env.template b/packages/services/usage/.env.template index 012a60a604..1a74f2f6a1 100644 --- a/packages/services/usage/.env.template +++ b/packages/services/usage/.env.template @@ -7,3 +7,4 @@ KAFKA_BUFFER_DYNAMIC=1 KAFKA_TOPIC="usage_reports_v2" PORT=4001 RATE_LIMIT_ENDPOINT="http://localhost:4012" +OPENTELEMETRY_COLLECTOR_ENDPOINT="" diff --git a/packages/services/usage/README.md b/packages/services/usage/README.md index 00b244c16f..9c2bb6c95e 100644 --- a/packages/services/usage/README.md +++ b/packages/services/usage/README.md @@ -33,3 +33,4 @@ The data is written to a Kafka broker, form Kafka the data is feed into clickhou | `PROMETHEUS_METRICS_PORT` | No | Port on which prometheus metrics are exposed | Defaults to `10254` | | `REQUEST_LOGGING` | No | Log http requests | `1` (enabled) or `0` (disabled) | | `LOG_LEVEL` | No | The verbosity of the service logs. One of `trace`, `debug`, `info`, `warn` ,`error`, `fatal` or `silent` | `info` (default) | +| `OPENTELEMETRY_COLLECTOR_ENDPOINT` | No | OpenTelemetry Collector endpoint. The expected traces transport is HTTP (port `4318`). | `http://localhost:4318/v1/traces` | diff --git a/packages/services/usage/src/environment.ts b/packages/services/usage/src/environment.ts index af79fdb8e4..c9f1b598a0 100644 --- a/packages/services/usage/src/environment.ts +++ b/packages/services/usage/src/environment.ts @@ -1,5 +1,6 @@ import * as fs from 'fs'; import zod from 'zod'; +import { OpenTelemetryConfigurationModel } from '@hive/service-common'; const isNumberString = (input: unknown) => zod.string().regex(/^\d+$/).safeParse(input).success; @@ -100,6 +101,8 @@ const configs = { prometheus: PrometheusModel.safeParse(process.env), // eslint-disable-next-line no-process-env log: LogModel.safeParse(process.env), + // eslint-disable-next-line no-process-env + tracing: OpenTelemetryConfigurationModel.safeParse(process.env), }; const environmentErrors: Array = []; @@ -128,6 +131,7 @@ const sentry = extractConfig(configs.sentry); const kafka = extractConfig(configs.kafka); const prometheus = extractConfig(configs.prometheus); const log = extractConfig(configs.log); +const tracing = extractConfig(configs.tracing); export const env = { environment: base.ENVIRONMENT, @@ -135,6 +139,10 @@ export const env = { http: { port: base.PORT ?? 5000, }, + tracing: { + enabled: !!tracing.OPENTELEMETRY_COLLECTOR_ENDPOINT, + collectorEndpoint: tracing.OPENTELEMETRY_COLLECTOR_ENDPOINT, + }, hive: { tokens: { endpoint: base.TOKENS_ENDPOINT, diff --git a/packages/services/usage/src/index.ts b/packages/services/usage/src/index.ts index 70f6aad547..9675662af0 100644 --- a/packages/services/usage/src/index.ts +++ b/packages/services/usage/src/index.ts @@ -1,11 +1,15 @@ #!/usr/bin/env node import { hostname } from 'os'; import { + configureTracing, createServer, maskToken, registerShutdown, reportReadiness, + SpanStatusCode, startMetrics, + traceInline, + TracingInstance, } from '@hive/service-common'; import * as Sentry from '@sentry/node'; import { env } from './environment'; @@ -35,6 +39,19 @@ declare module 'fastify' { } async function main() { + let tracing: TracingInstance | undefined; + + if (env.tracing.enabled && env.tracing.collectorEndpoint) { + tracing = configureTracing({ + collectorEndpoint: env.tracing.collectorEndpoint, + serviceName: 'usage', + }); + + tracing.instrumentNodeFetch(); + tracing.build(); + tracing.start(); + } + if (env.sentry) { Sentry.init({ serverName: hostname(), @@ -55,6 +72,10 @@ async function main() { }, }); + if (tracing) { + await server.register(...tracing.instrumentFastify()); + } + try { const { collect, readiness, start, stop } = createUsage({ logger: server.log, @@ -71,6 +92,10 @@ async function main() { const shutdown = registerShutdown({ logger: server.log, async onShutdown() { + server.log.info('Stopping tracing handler...'); + await tracing?.shutdown(); + + server.log.info('Stopping service handler...'); await Promise.all([stop(), server.close()]); }, }); @@ -111,204 +136,239 @@ async function main() { httpRequestDuration.observe(delta[0] + delta[1] / 1e9); done(); }, - handler: measureHandler(async function usageHandler(req, res) { - httpRequests.inc(); - let token: string | undefined; - const legacyToken = req.headers['x-api-token'] as string; - const apiVersion = Array.isArray(req.headers['x-usage-api-version']) - ? req.headers['x-usage-api-version'][0] - : req.headers['x-usage-api-version']; - - if (apiVersion) { - if (apiVersion === '1') { - usedAPIVersion.labels({ version: '1' }).inc(); - } else if (apiVersion === '2') { - usedAPIVersion.labels({ version: '2' }).inc(); + handler: traceInline( + 'Usage Handler', + {}, + measureHandler(async function usageHandler(req, res) { + const { activeSpan } = req.openTelemetry ? req.openTelemetry() : { activeSpan: null }; + httpRequests.inc(); + let token: string | undefined; + const legacyToken = req.headers['x-api-token'] as string; + const apiVersion = Array.isArray(req.headers['x-usage-api-version']) + ? req.headers['x-usage-api-version'][0] + : req.headers['x-usage-api-version']; + + if (apiVersion) { + if (apiVersion === '1') { + usedAPIVersion.labels({ version: '1' }).inc(); + activeSpan?.setAttribute('hive.usage.api_version', '1'); + } else if (apiVersion === '2') { + activeSpan?.setAttribute('hive.usage.api_version', '2'); + usedAPIVersion.labels({ version: '2' }).inc(); + } else { + activeSpan?.setAttribute('hive.usage.api_version', apiVersion); + activeSpan?.setStatus({ + code: SpanStatusCode.ERROR, + message: "Invalid 'x-api-version' header value.", + }); + + usedAPIVersion.labels({ version: 'invalid' }).inc(); + } } else { - usedAPIVersion.labels({ version: 'invalid' }).inc(); + usedAPIVersion.labels({ version: 'none' }).inc(); + activeSpan?.setAttribute('hive.usage.api_version', 'none'); } - } else { - usedAPIVersion.labels({ version: 'none' }).inc(); - } - - if (legacyToken) { - // TODO: add metrics to track legacy x-api-token header - token = legacyToken; - } else { - const authValue = req.headers.authorization; - - if (authValue) { - token = authValue.replace(/^Bearer\s+/, ''); + + if (legacyToken) { + // TODO: add metrics to track legacy x-api-token header + token = legacyToken; + } else { + const authValue = req.headers.authorization; + + if (authValue) { + token = authValue.replace(/^Bearer\s+/, ''); + } + } + + if (!token) { + void res.status(401).send('Missing token'); + httpRequestsWithoutToken.inc(); + activeSpan?.recordException('Missing token in request'); + return; } - } - if (!token) { - void res.status(401).send('Missing token'); - httpRequestsWithoutToken.inc(); - return; - } + if (token.length !== 32) { + void res.status(401).send('Invalid token'); + httpRequestsWithoutToken.inc(); + activeSpan?.recordException('Invalid token'); + return; + } - if (token.length !== 32) { - void res.status(401).send('Invalid token'); - httpRequestsWithoutToken.inc(); - return; - } + const stopTokensDurationTimer = tokensDuration.startTimer(); + const tokenInfo = await tokens.fetch(token); + const maskedToken = maskToken(token); + activeSpan?.setAttribute('hive.usage.masked_token', maskedToken); - const stopTokensDurationTimer = tokensDuration.startTimer(); - const tokenInfo = await tokens.fetch(token); - const maskedToken = maskToken(token); + if (tokens.isNotFound(tokenInfo)) { + stopTokensDurationTimer({ + status: 'not_found', + }); + httpRequestsWithNonExistingToken.inc(); + req.log.info('Token not found (token=%s)', maskedToken); + void res.status(401).send('Missing token'); + activeSpan?.recordException('Token not found'); + return; + } - if (tokens.isNotFound(tokenInfo)) { - stopTokensDurationTimer({ - status: 'not_found', - }); - httpRequestsWithNonExistingToken.inc(); - req.log.info('Token not found (token=%s)', maskedToken); - void res.status(401).send('Missing token'); - return; - } - - // We treat collected operations as part of registry - if (tokens.isNoAccess(tokenInfo)) { - stopTokensDurationTimer({ - status: 'no_access', - }); - httpRequestsWithNoAccess.inc(); - req.log.info('No access (token=%s)', maskedToken); - void res.status(403).send('No access'); - return; - } - - const authenticatedRequestLogger = req.log.child({ - token: maskedToken, - target: tokenInfo.target, - organization: tokenInfo.organization, - }); - - stopTokensDurationTimer({ - status: 'success', - }); - - const isRateLimited = await rateLimit - .isRateLimited({ - targetId: tokenInfo.target, - token, - }) - .catch(error => { - authenticatedRequestLogger.error('Failed to check rate limit'); - authenticatedRequestLogger.error(error); - Sentry.captureException(error, { - level: 'error', + // We treat collected operations as part of registry + if (tokens.isNoAccess(tokenInfo)) { + stopTokensDurationTimer({ + status: 'no_access', }); + httpRequestsWithNoAccess.inc(); + req.log.info('No access (token=%s)', maskedToken); + void res.status(403).send('No access'); + activeSpan?.recordException('No access'); + return; + } - // If we can't check rate limit, we should not drop the report - return false; + const authenticatedRequestLogger = req.log.child({ + token: maskedToken, + target: tokenInfo.target, + organization: tokenInfo.organization, }); - if (isRateLimited) { - droppedReports - .labels({ targetId: tokenInfo.target, orgId: tokenInfo.organization }) - .inc(); - authenticatedRequestLogger.info( - 'Rate limited', - maskedToken, - tokenInfo.target, - tokenInfo.organization, - ); - void res.status(429).send(); - - return; - } - - const retentionInfo = await rateLimit - .getRetentionForTargetId(tokenInfo.target) - .catch(error => { - authenticatedRequestLogger.error(error); - Sentry.captureException(error, { - level: 'error', - }); - return null; + stopTokensDurationTimer({ + status: 'success', }); - if (typeof retentionInfo !== 'number') { - authenticatedRequestLogger.error('Failed to get retention info'); - } + activeSpan?.setAttribute('hive.input.target', tokenInfo.target); + activeSpan?.setAttribute('hive.input.organization', tokenInfo.organization); + activeSpan?.setAttribute('hive.input.project', tokenInfo.project); + activeSpan?.setAttribute('hive.token.scopes', tokenInfo.scopes.join(',')); + + const isRateLimited = await rateLimit + .isRateLimited({ + targetId: tokenInfo.target, + token, + }) + .catch(error => { + authenticatedRequestLogger.error('Failed to check rate limit'); + authenticatedRequestLogger.error(error); + Sentry.captureException(error, { + level: 'error', + }); + activeSpan?.recordException(error); - const stopTimer = collectDuration.startTimer(); - try { - if (readiness() === false) { - authenticatedRequestLogger.warn('Not ready to collect report'); - stopTimer({ - status: 'not_ready', + // If we can't check rate limit, we should not drop the report + return false; }); - // 503 - Service Unavailable - // The server is currently unable to handle the request due being not ready. - // This tells the gateway to retry the request and not to drop it. - void res.status(503).send(); + + if (isRateLimited) { + activeSpan?.addEvent('rate-limited'); + droppedReports + .labels({ targetId: tokenInfo.target, orgId: tokenInfo.organization }) + .inc(); + authenticatedRequestLogger.info( + 'Rate limited', + maskedToken, + tokenInfo.target, + tokenInfo.organization, + ); + void res.status(429).send(); + return; } - if (apiVersion === undefined || apiVersion === '1') { - const result = measureParsing( - () => usageProcessorV1(server.log, req.body as any, tokenInfo, retentionInfo), - 'v1', - ); - collect(result.report); - stopTimer({ - status: 'success', - }); - void res.status(200).send({ - id: result.report.id, - operations: result.operations, + const retentionInfo = await rateLimit + .getRetentionForTargetId(tokenInfo.target) + .catch(error => { + authenticatedRequestLogger.error(error); + Sentry.captureException(error, { + level: 'error', + }); + activeSpan?.addEvent(error); + return null; }); - } else if (apiVersion === '2') { - const result = measureParsing( - () => usageProcessorV2(server.log, req.body, tokenInfo, retentionInfo), - 'v2', - ); - if (result.success === false) { + if (typeof retentionInfo !== 'number') { + authenticatedRequestLogger.error('Failed to get retention info'); + } + + const stopTimer = collectDuration.startTimer(); + try { + if (readiness() === false) { + authenticatedRequestLogger.warn('Not ready to collect report'); stopTimer({ - status: 'error', + status: 'not_ready', + }); + activeSpan?.recordException('Not ready to collect report, status is not ready'); + // 503 - Service Unavailable + // The server is currently unable to handle the request due being not ready. + // This tells the gateway to retry the request and not to drop it. + void res.status(503).send(); + return; + } + + if (apiVersion === undefined || apiVersion === '1') { + const result = measureParsing( + () => usageProcessorV1(server.log, req.body as any, tokenInfo, retentionInfo), + 'v1', + ); + collect(result.report); + stopTimer({ + status: 'success', }); - authenticatedRequestLogger.info( - 'Report validation failed (errors=%j)', - result.errors.map(error => error.path + ': ' + error.message), + void res.status(200).send({ + id: result.report.id, + operations: result.operations, + }); + } else if (apiVersion === '2') { + const result = measureParsing( + () => usageProcessorV2(server.log, req.body, tokenInfo, retentionInfo), + 'v2', ); - void res.status(400).send({ - errors: result.errors, + if (result.success === false) { + stopTimer({ + status: 'error', + }); + authenticatedRequestLogger.info( + 'Report validation failed (errors=%j)', + result.errors.map(error => error.path + ': ' + error.message), + ); + + activeSpan?.addEvent('Failed to parse report object'); + result.errors.forEach(error => + activeSpan?.recordException(error.path + ': ' + error.message), + ); + + void res.status(400).send({ + errors: result.errors, + }); + return; + } + + collect(result.report); + stopTimer({ + status: 'success', }); - return; + void res.status(200).send({ + id: result.report.id, + operations: result.operations, + }); + } else { + authenticatedRequestLogger.debug("Invalid 'x-api-version' header value."); + stopTimer({ + status: 'error', + }); + activeSpan?.recordException("Invalid 'x-api-version' header value."); + void res.status(401).send("Invalid 'x-api-version' header value."); } - - collect(result.report); - stopTimer({ - status: 'success', - }); - void res.status(200).send({ - id: result.report.id, - operations: result.operations, - }); - } else { - authenticatedRequestLogger.debug("Invalid 'x-api-version' header value."); + } catch (error) { stopTimer({ status: 'error', }); - void res.status(401).send("Invalid 'x-api-version' header value."); + authenticatedRequestLogger.error('Failed to collect report'); + authenticatedRequestLogger.error(error, 'Failed to collect'); + Sentry.captureException(error, { + level: 'error', + }); + activeSpan?.recordException(error as Error); + void res.status(500).send(); } - } catch (error) { - stopTimer({ - status: 'error', - }); - authenticatedRequestLogger.error('Failed to collect report'); - authenticatedRequestLogger.error(error, 'Failed to collect'); - Sentry.captureException(error, { - level: 'error', - }); - void res.status(500).send(); - } - }), + }), + ), }); server.route({ @@ -332,6 +392,7 @@ async function main() { if (env.prometheus) { await startMetrics(env.prometheus.labels.instance, env.prometheus.port); } + await server.listen({ port: env.http.port, host: '::', diff --git a/packages/services/usage/src/usage-processor-1.ts b/packages/services/usage/src/usage-processor-1.ts index c0703882d6..e8ea68f363 100644 --- a/packages/services/usage/src/usage-processor-1.ts +++ b/packages/services/usage/src/usage-processor-1.ts @@ -3,7 +3,7 @@ import type { JSONSchemaType } from 'ajv'; import Ajv from 'ajv'; import { parse } from 'graphql'; import { LRUCache } from 'lru-cache'; -import type { ServiceLogger as Logger } from '@hive/service-common'; +import { traceInlineSync, type ServiceLogger as Logger } from '@hive/service-common'; import { RawReport } from '@hive/usage-common'; import { invalidRawOperations, @@ -19,116 +19,131 @@ const DAY_IN_MS = 86_400_000; /** * Process Usage for API Version 1 */ -export function usageProcessorV1( - logger: Logger, - incomingReport: IncomingReport | IncomingLegacyReport, - token: TokensResponse, - targetRetentionInDays: number | null, -): { - report: RawReport; - operations: { - rejected: number; - accepted: number; - }; -} { - const now = Date.now(); - - const incoming = ensureReportFormat(incomingReport); - ensureIncomingMessageValidity(incoming); - - const size = incoming.operations.length; - totalReports.inc(); - totalOperations.inc(size); - rawOperationsSize.observe(size); - - const report: RawReport = { - id: randomUUID(), - target: token.target, - organization: token.organization, - size: 0, - map: {}, - operations: [], - }; - - const oldNewKeyMapping = new Map(); - - for (const rawKey in incoming.map) { - const record = incoming.map[rawKey]; - const validationResult = validateOperationMapRecord(record); - - if (validationResult.valid) { - // The key is used for lru cache (usage-ingestor) so we need to make sure, the record is unique per target, operation body, name and the list of fields - const key = createHash('md5') - .update(report.target) - .update(record.operation) - .update(record.operationName ?? '') - .update(JSON.stringify(record.fields.sort())) - .digest('hex'); - - oldNewKeyMapping.set(rawKey, key); +export const usageProcessorV1 = traceInlineSync( + 'usageProcessorV1', + { + initAttributes: (_logger, _incomingReport, token) => ({ + 'hive.input.target': token.target, + 'hive.input.project': token.project, + 'hive.input.organization': token.organization, + }), + resultAttributes: result => ({ + 'hive.result.reportId': result.report.id, + 'hive.result.operations.accepted': result.operations.accepted, + 'hive.result.operations.rejected': result.operations.rejected, + }), + }, + ( + logger: Logger, + incomingReport: IncomingReport | IncomingLegacyReport, + token: TokensResponse, + targetRetentionInDays: number | null, + ): { + report: RawReport; + operations: { + rejected: number; + accepted: number; + }; + } => { + const now = Date.now(); + + const incoming = ensureReportFormat(incomingReport); + ensureIncomingMessageValidity(incoming); + + const size = incoming.operations.length; + totalReports.inc(); + totalOperations.inc(size); + rawOperationsSize.observe(size); + + const report: RawReport = { + id: randomUUID(), + target: token.target, + organization: token.organization, + size: 0, + map: {}, + operations: [], + }; - report.map[key] = { - key, - operation: record.operation, - operationName: record.operationName, - fields: record.fields, - }; + const oldNewKeyMapping = new Map(); + + for (const rawKey in incoming.map) { + const record = incoming.map[rawKey]; + const validationResult = validateOperationMapRecord(record); + + if (validationResult.valid) { + // The key is used for lru cache (usage-ingestor) so we need to make sure, the record is unique per target, operation body, name and the list of fields + const key = createHash('md5') + .update(report.target) + .update(record.operation) + .update(record.operationName ?? '') + .update(JSON.stringify(record.fields.sort())) + .digest('hex'); + + oldNewKeyMapping.set(rawKey, key); + + report.map[key] = { + key, + operation: record.operation, + operationName: record.operationName, + fields: record.fields, + }; + } } - } - for (const operation of incoming.operations) { - // The validateOperation function drops the operation if the operationMapKey does not exist, we can safely pass the old key in case the new key is missing. - operation.operationMapKey = - oldNewKeyMapping.get(operation.operationMapKey) ?? operation.operationMapKey; - const validationResult = validateOperation(operation, report.map); - - if (validationResult.valid) { - // Increase size - report.size += 1; - - // Add operation - const ts = operation.timestamp ?? now; - report.operations.push({ - operationMapKey: operation.operationMapKey, - timestamp: ts, - expiresAt: targetRetentionInDays ? ts + targetRetentionInDays * DAY_IN_MS : undefined, - execution: { - ok: operation.execution.ok, - duration: operation.execution.duration, - errorsTotal: operation.execution.errorsTotal, - }, - metadata: { - client: { - name: operation.metadata?.client?.name, - version: operation.metadata?.client?.version, + for (const operation of incoming.operations) { + // The validateOperation function drops the operation if the operationMapKey does not exist, we can safely pass the old key in case the new key is missing. + operation.operationMapKey = + oldNewKeyMapping.get(operation.operationMapKey) ?? operation.operationMapKey; + const validationResult = validateOperation(operation, report.map); + + if (validationResult.valid) { + // Increase size + report.size += 1; + + // Add operation + const ts = operation.timestamp ?? now; + report.operations.push({ + operationMapKey: operation.operationMapKey, + timestamp: ts, + expiresAt: targetRetentionInDays ? ts + targetRetentionInDays * DAY_IN_MS : undefined, + execution: { + ok: operation.execution.ok, + duration: operation.execution.duration, + errorsTotal: operation.execution.errorsTotal, }, - }, - }); - } else { - logger.warn( - `Detected invalid operation (target=%s): %o`, - token.target, - validationResult.errors, - ); - invalidRawOperations - .labels({ - reason: - 'reason' in validationResult && validationResult.reason - ? validationResult.reason - : 'unknown', - }) - .inc(1); + metadata: { + client: { + name: operation.metadata?.client?.name, + version: operation.metadata?.client?.version, + }, + }, + }); + } else { + logger.warn( + `Detected invalid operation (target=%s): %o`, + token.target, + validationResult.errors, + ); + invalidRawOperations + .labels({ + reason: + 'reason' in validationResult && validationResult.reason + ? validationResult.reason + : 'unknown', + }) + .inc(1); + } } - } - return { - report: report, - operations: { - accepted: size - report.size, - rejected: report.size, - }, - }; -} + return { + report: report, + operations: { + accepted: size - report.size, + rejected: report.size, + }, + }; + }, +); function ensureIncomingMessageValidity(incoming: Partial) { if (!incoming || !incoming.operations || !Array.isArray(incoming.operations)) { diff --git a/packages/services/usage/src/usage-processor-2.ts b/packages/services/usage/src/usage-processor-2.ts index 300cb0b4ad..abbb104962 100644 --- a/packages/services/usage/src/usage-processor-2.ts +++ b/packages/services/usage/src/usage-processor-2.ts @@ -1,5 +1,5 @@ import { createHash, randomUUID } from 'node:crypto'; -import { ServiceLogger as Logger } from '@hive/service-common'; +import { ServiceLogger as Logger, traceInlineSync } from '@hive/service-common'; import { type ClientMetadata, type RawOperation, @@ -12,204 +12,221 @@ import { invalidRawOperations, rawOperationsSize, totalOperations, totalReports import { TokensResponse } from './tokens'; import { isValidOperationBody } from './usage-processor-1'; -export function usageProcessorV2( - logger: Logger, - incomingReport: unknown, - token: TokensResponse, - targetRetentionInDays: number | null, -): - | { success: false; errors: Array } - | { - success: true; - report: RawReport; - operations: { - rejected: number; - accepted: number; +export const usageProcessorV2 = traceInlineSync( + 'usageProcessorV2', + { + initAttributes: (_logger, _incomingReport, token) => ({ + 'hive.input.target': token.target, + 'hive.input.project': token.project, + 'hive.input.organization': token.organization, + }), + resultAttributes: result => ({ + 'hive.result.success': result.success, + 'hive.result.reportId': result.success ? result.report.id : undefined, + 'hive.result.operations.accepted': result.success ? result.operations.accepted : undefined, + 'hive.result.operations.rejected': result.success ? result.operations.rejected : undefined, + 'hive.result.error.count': result.success ? undefined : result.errors.length, + }), + }, + ( + logger: Logger, + incomingReport: unknown, + token: TokensResponse, + targetRetentionInDays: number | null, + ): + | { success: false; errors: Array } + | { + success: true; + report: RawReport; + operations: { + rejected: number; + accepted: number; + }; + } => { + const reportResult = decodeReport(incomingReport); + + if (reportResult.success === false) { + return { + success: false, + errors: reportResult.errors, }; - } { - const reportResult = decodeReport(incomingReport); - - if (reportResult.success === false) { - return { - success: false, - errors: reportResult.errors, - }; - } - - const incoming = reportResult.report; - - const incomingOperations = incoming.operations ?? []; - const incomingSubscriptionOperations = incoming.subscriptionOperations ?? []; - - const size = incomingOperations.length + incomingSubscriptionOperations.length; - totalReports.inc(); - totalOperations.inc(size); - rawOperationsSize.observe(size); - - const rawOperations: RawOperation[] = []; - const rawSubscriptionOperations: RawSubscriptionOperation[] = []; - - const lastAppDeploymentUsage = new Map<`${string}/${string}`, number>(); - - function upsertClientUsageTimestamp( - clientName: string, - clientVersion: string, - timestamp: number, - ) { - const key = `${clientName}/${clientVersion}` as const; - let latestTimestamp = lastAppDeploymentUsage.get(key); - if (!latestTimestamp || timestamp > latestTimestamp) { - lastAppDeploymentUsage.set(key, timestamp); } - } - - const report: RawReport = { - id: randomUUID(), - target: token.target, - organization: token.organization, - size: 0, - map: {}, - operations: rawOperations, - subscriptionOperations: rawSubscriptionOperations, - }; - const newKeyMappings = new Map(); + const incoming = reportResult.report; - function getOperationMapRecordKey(operationMapKey: string): string | null { - const operationMapRecord = incoming.map[operationMapKey] as OperationMapRecord | undefined; + const incomingOperations = incoming.operations ?? []; + const incomingSubscriptionOperations = incoming.subscriptionOperations ?? []; - if (!operationMapRecord) { - logger.warn( - `Detected invalid operation. Operation map key could not be found. (target=%s): %s`, - token.target, - operationMapKey, - ); - invalidRawOperations - .labels({ - reason: 'operation_map_key_not_found', - }) - .inc(1); - return null; - } - - let newOperationMapKey = newKeyMappings.get(operationMapRecord); + const size = incomingOperations.length + incomingSubscriptionOperations.length; + totalReports.inc(); + totalOperations.inc(size); + rawOperationsSize.observe(size); - if (!isValidOperationBody(operationMapRecord.operation)) { - logger.warn(`Detected invalid operation (target=%s): %s`, token.target, operationMapKey); - invalidRawOperations - .labels({ - reason: 'invalid_operation_body', - }) - .inc(1); - return null; - } + const rawOperations: RawOperation[] = []; + const rawSubscriptionOperations: RawSubscriptionOperation[] = []; - if (newOperationMapKey === undefined) { - const sortedFields = operationMapRecord.fields.sort(); - newOperationMapKey = createHash('md5') - .update(token.target) - .update(operationMapRecord.operation) - .update(operationMapRecord.operationName ?? '') - .update(JSON.stringify(sortedFields)) - .digest('hex'); - - report.map[newOperationMapKey] = { - key: newOperationMapKey, - operation: operationMapRecord.operation, - operationName: operationMapRecord.operationName, - fields: sortedFields, - }; + const lastAppDeploymentUsage = new Map<`${string}/${string}`, number>(); - newKeyMappings.set(operationMapRecord, newOperationMapKey); + function upsertClientUsageTimestamp( + clientName: string, + clientVersion: string, + timestamp: number, + ) { + const key = `${clientName}/${clientVersion}` as const; + let latestTimestamp = lastAppDeploymentUsage.get(key); + if (!latestTimestamp || timestamp > latestTimestamp) { + lastAppDeploymentUsage.set(key, timestamp); + } } - return newOperationMapKey; - } - - for (const operation of incomingOperations) { - const operationMapKey = getOperationMapRecordKey(operation.operationMapKey); + const report: RawReport = { + id: randomUUID(), + target: token.target, + organization: token.organization, + size: 0, + map: {}, + operations: rawOperations, + subscriptionOperations: rawSubscriptionOperations, + }; - // if the record does not exist -> skip the operation - if (operationMapKey === null) { - continue; + const newKeyMappings = new Map(); + + function getOperationMapRecordKey(operationMapKey: string): string | null { + const operationMapRecord = incoming.map[operationMapKey] as OperationMapRecord | undefined; + + if (!operationMapRecord) { + logger.warn( + `Detected invalid operation. Operation map key could not be found. (target=%s): %s`, + token.target, + operationMapKey, + ); + invalidRawOperations + .labels({ + reason: 'operation_map_key_not_found', + }) + .inc(1); + return null; + } + + let newOperationMapKey = newKeyMappings.get(operationMapRecord); + + if (!isValidOperationBody(operationMapRecord.operation)) { + logger.warn(`Detected invalid operation (target=%s): %s`, token.target, operationMapKey); + invalidRawOperations + .labels({ + reason: 'invalid_operation_body', + }) + .inc(1); + return null; + } + + if (newOperationMapKey === undefined) { + const sortedFields = operationMapRecord.fields.sort(); + newOperationMapKey = createHash('md5') + .update(token.target) + .update(operationMapRecord.operation) + .update(operationMapRecord.operationName ?? '') + .update(JSON.stringify(sortedFields)) + .digest('hex'); + + report.map[newOperationMapKey] = { + key: newOperationMapKey, + operation: operationMapRecord.operation, + operationName: operationMapRecord.operationName, + fields: sortedFields, + }; + + newKeyMappings.set(operationMapRecord, newOperationMapKey); + } + + return newOperationMapKey; } - let client: ClientMetadata | undefined; - if (operation.persistedDocumentHash) { - const [name, version] = operation.persistedDocumentHash.split('~'); - client = { - name, - version, - }; - upsertClientUsageTimestamp(name, version, operation.timestamp); - } else { - client = operation.metadata?.client ?? undefined; + for (const operation of incomingOperations) { + const operationMapKey = getOperationMapRecordKey(operation.operationMapKey); + + // if the record does not exist -> skip the operation + if (operationMapKey === null) { + continue; + } + + let client: ClientMetadata | undefined; + if (operation.persistedDocumentHash) { + const [name, version] = operation.persistedDocumentHash.split('~'); + client = { + name, + version, + }; + upsertClientUsageTimestamp(name, version, operation.timestamp); + } else { + client = operation.metadata?.client ?? undefined; + } + + report.size += 1; + rawOperations.push({ + operationMapKey, + timestamp: operation.timestamp, + expiresAt: targetRetentionInDays + ? operation.timestamp + targetRetentionInDays * DAY_IN_MS + : undefined, + execution: { + ok: operation.execution.ok, + duration: operation.execution.duration, + errorsTotal: operation.execution.errorsTotal, + }, + metadata: { + client, + }, + }); } - report.size += 1; - rawOperations.push({ - operationMapKey, - timestamp: operation.timestamp, - expiresAt: targetRetentionInDays - ? operation.timestamp + targetRetentionInDays * DAY_IN_MS - : undefined, - execution: { - ok: operation.execution.ok, - duration: operation.execution.duration, - errorsTotal: operation.execution.errorsTotal, - }, - metadata: { - client, - }, - }); - } - - for (const operation of incomingSubscriptionOperations) { - const operationMapKey = getOperationMapRecordKey(operation.operationMapKey); - - // if the record does not exist -> skip the operation - if (operationMapKey === null) { - continue; + for (const operation of incomingSubscriptionOperations) { + const operationMapKey = getOperationMapRecordKey(operation.operationMapKey); + + // if the record does not exist -> skip the operation + if (operationMapKey === null) { + continue; + } + + let client: ClientMetadata | undefined; + if (operation.persistedDocumentHash) { + const [name, version] = operation.persistedDocumentHash.split('/'); + client = { + name, + version, + }; + upsertClientUsageTimestamp(name, version, operation.timestamp); + } else { + client = operation.metadata?.client ?? undefined; + } + + report.size += 1; + rawSubscriptionOperations.push({ + operationMapKey, + timestamp: operation.timestamp, + expiresAt: targetRetentionInDays + ? operation.timestamp + targetRetentionInDays * DAY_IN_MS + : undefined, + metadata: { + client, + }, + }); } - let client: ClientMetadata | undefined; - if (operation.persistedDocumentHash) { - const [name, version] = operation.persistedDocumentHash.split('/'); - client = { - name, - version, - }; - upsertClientUsageTimestamp(name, version, operation.timestamp); - } else { - client = operation.metadata?.client ?? undefined; + if (lastAppDeploymentUsage.size) { + report.appDeploymentUsageTimestamps = Object.fromEntries(lastAppDeploymentUsage); } - report.size += 1; - rawSubscriptionOperations.push({ - operationMapKey, - timestamp: operation.timestamp, - expiresAt: targetRetentionInDays - ? operation.timestamp + targetRetentionInDays * DAY_IN_MS - : undefined, - metadata: { - client, + return { + success: true, + report, + operations: { + rejected: size - report.size, + accepted: report.size, }, - }); - } - - if (lastAppDeploymentUsage.size) { - report.appDeploymentUsageTimestamps = Object.fromEntries(lastAppDeploymentUsage); - } - - return { - success: true, - report, - operations: { - rejected: size - report.size, - accepted: report.size, - }, - }; -} + }; + }, +); // The idea behind this function is to make sure we use Optional on top of the Union. // If the order is different, the field will be required. diff --git a/packages/services/usage/src/usage.ts b/packages/services/usage/src/usage.ts index eb9214f266..81feee4451 100644 --- a/packages/services/usage/src/usage.ts +++ b/packages/services/usage/src/usage.ts @@ -1,7 +1,8 @@ import { CompressionTypes, Kafka, logLevel, Partitioners, RetryOptions } from 'kafkajs'; -import type { ServiceLogger } from '@hive/service-common'; +import { traceInlineSync, type ServiceLogger } from '@hive/service-common'; import type { RawOperationMap, RawReport } from '@hive/usage-common'; import { compress } from '@hive/usage-common'; +import * as Sentry from '@sentry/node'; import { calculateChunkSize, createKVBuffer, isBufferTooBigError } from './buffer'; import type { KafkaEnvironment } from './environment'; import { @@ -197,6 +198,12 @@ export function createUsage(config: { if (meta[0].errorCode) { rawOperationFailures.inc(numOfOperations); logger.error(`Failed to flush (id=%s, errorCode=%s)`, batchId, meta[0].errorCode); + Sentry.setTags({ + batchId, + errorCode: meta[0].errorCode, + numOfOperations, + }); + Sentry.captureException(new Error(`Failed to flush usage reports to Kafka`)); } else { rawOperationWrites.inc(numOfOperations); logger.info(`Flushed (id=%s, operations=%s)`, batchId, numOfOperations); @@ -211,6 +218,13 @@ export function createUsage(config: { } else { status = Status.Unhealthy; logger.error(`Failed to flush (id=%s, error=%s)`, batchId, error.message); + Sentry.setTags({ + batchId, + message: error.message, + numOfOperations, + }); + Sentry.captureException(error); + scheduleReconnect(); } @@ -291,13 +305,23 @@ export function createUsage(config: { } return { - collect(report: RawReport) { - if (status !== Status.Ready) { - throw new Error('Usage is not ready yet'); - } + collect: traceInlineSync( + 'collect', + { + initAttributes: report => ({ + 'hive.service.ready': status == Status.Ready, + 'hive.input.report.id': report.id, + 'hive.input.report.size': Object.keys(report.map).length, + }), + }, + (report: RawReport) => { + if (status !== Status.Ready) { + throw new Error('Usage is not ready yet'); + } - buffer.add(report); - }, + buffer.add(report); + }, + ), readiness() { return status === Status.Ready; }, diff --git a/packages/web/docs/src/pages/docs/self-hosting/telemetry.mdx b/packages/web/docs/src/pages/docs/self-hosting/telemetry.mdx index bbabb5c5f3..46e4509f6a 100644 --- a/packages/web/docs/src/pages/docs/self-hosting/telemetry.mdx +++ b/packages/web/docs/src/pages/docs/self-hosting/telemetry.mdx @@ -5,6 +5,10 @@ import { Steps } from '@theguild/components' When self-hosting Hive, you can enable telemetry to help us understand how Hive is being used and how we can improve it. Telemetry is disabled by default. +> By default, all services emits OpenTelemetry traces, except for the `usage` service. To enable +> this service, you need to set the `OPENTELEMETRY_COLLECTOR_ENDPOINT=...` environment variable +> explicitly for this service. + ## OpenTelemetry Traces @@ -58,6 +62,11 @@ docker run \ --config otel-local-config.yaml ``` +Alternatively, you can use +[these following instructions for running a Grafana with Tempo service locally using Docker](https://grafana.com/docs/tempo/latest/getting-started/docker-example/). +In that case, use `OPENTELEMETRY_COLLECTOR_ENDPOINT="http://localhost:4318/v1/traces"` as the OTLP +endpoint. + ### Configure Hive to send traces to the OTLP Collector To instruct Hive to send traces to the OTLP Collector, you need to set the following environment diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6eeb971d0a..4f05b542de 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -16500,8 +16500,8 @@ snapshots: dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.596.0 - '@aws-sdk/client-sts': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0) + '@aws-sdk/client-sso-oidc': 3.596.0(@aws-sdk/client-sts@3.596.0) + '@aws-sdk/client-sts': 3.596.0 '@aws-sdk/core': 3.592.0 '@aws-sdk/credential-provider-node': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0)(@aws-sdk/client-sts@3.596.0) '@aws-sdk/middleware-host-header': 3.577.0 @@ -16608,11 +16608,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sso-oidc@3.596.0': + '@aws-sdk/client-sso-oidc@3.596.0(@aws-sdk/client-sts@3.596.0)': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sts': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0) + '@aws-sdk/client-sts': 3.596.0 '@aws-sdk/core': 3.592.0 '@aws-sdk/credential-provider-node': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0)(@aws-sdk/client-sts@3.596.0) '@aws-sdk/middleware-host-header': 3.577.0 @@ -16651,6 +16651,7 @@ snapshots: '@smithy/util-utf8': 3.0.0 tslib: 2.8.1 transitivePeerDependencies: + - '@aws-sdk/client-sts' - aws-crt '@aws-sdk/client-sso-oidc@3.723.0(@aws-sdk/client-sts@3.723.0)': @@ -16784,11 +16785,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sts@3.596.0(@aws-sdk/client-sso-oidc@3.596.0)': + '@aws-sdk/client-sts@3.596.0': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.596.0 + '@aws-sdk/client-sso-oidc': 3.596.0(@aws-sdk/client-sts@3.596.0) '@aws-sdk/core': 3.592.0 '@aws-sdk/credential-provider-node': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0)(@aws-sdk/client-sts@3.596.0) '@aws-sdk/middleware-host-header': 3.577.0 @@ -16827,7 +16828,6 @@ snapshots: '@smithy/util-utf8': 3.0.0 tslib: 2.8.1 transitivePeerDependencies: - - '@aws-sdk/client-sso-oidc' - aws-crt '@aws-sdk/client-sts@3.723.0': @@ -16941,7 +16941,7 @@ snapshots: '@aws-sdk/credential-provider-ini@3.596.0(@aws-sdk/client-sso-oidc@3.596.0)(@aws-sdk/client-sts@3.596.0)': dependencies: - '@aws-sdk/client-sts': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0) + '@aws-sdk/client-sts': 3.596.0 '@aws-sdk/credential-provider-env': 3.587.0 '@aws-sdk/credential-provider-http': 3.596.0 '@aws-sdk/credential-provider-process': 3.587.0 @@ -17060,7 +17060,7 @@ snapshots: '@aws-sdk/credential-provider-web-identity@3.587.0(@aws-sdk/client-sts@3.596.0)': dependencies: - '@aws-sdk/client-sts': 3.596.0(@aws-sdk/client-sso-oidc@3.596.0) + '@aws-sdk/client-sts': 3.596.0 '@aws-sdk/types': 3.577.0 '@smithy/property-provider': 3.1.11 '@smithy/types': 3.7.2 @@ -17235,7 +17235,7 @@ snapshots: '@aws-sdk/token-providers@3.587.0(@aws-sdk/client-sso-oidc@3.596.0)': dependencies: - '@aws-sdk/client-sso-oidc': 3.596.0 + '@aws-sdk/client-sso-oidc': 3.596.0(@aws-sdk/client-sts@3.596.0) '@aws-sdk/types': 3.577.0 '@smithy/property-provider': 3.1.11 '@smithy/shared-ini-file-loader': 3.1.12