From 3981abff68d76a52ca92e62883a5fffd2c68b3b5 Mon Sep 17 00:00:00 2001 From: Nick Gerace Date: Tue, 25 Feb 2025 18:44:25 -0500 Subject: [PATCH] Add NATS event callback metrics This commit adds NATS event callback metrics and other logging during events callback. This allows us to detect issues upon reconnection, sending messages after "publishing" them (i.e. sent in an internal channel to be sent to the server later), etc. As part of these changes, we also fix some "si.error.message" locations. Co-authored-by: Fletcher Nichol Signed-off-by: Nick Gerace --- Cargo.lock | 1 + lib/naxum/src/middleware/ack/on_failure.rs | 2 +- lib/naxum/src/middleware/ack/on_success.rs | 2 +- lib/naxum/src/serve.rs | 2 +- lib/si-data-nats/BUCK | 1 + lib/si-data-nats/Cargo.toml | 1 + lib/si-data-nats/src/lib.rs | 24 ++++++++++++++++++++++ lib/veritech-server/src/handlers.rs | 2 +- lib/veritech-server/src/server.rs | 2 +- 9 files changed, 32 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3697a56c0f..fe7ad0a865 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6707,6 +6707,7 @@ dependencies = [ "serde", "serde_json", "telemetry", + "telemetry-utils", "thiserror 2.0.11", "tokio", "tracing-subscriber", diff --git a/lib/naxum/src/middleware/ack/on_failure.rs b/lib/naxum/src/middleware/ack/on_failure.rs index f094ce5240..8cc3e49624 100644 --- a/lib/naxum/src/middleware/ack/on_failure.rs +++ b/lib/naxum/src/middleware/ack/on_failure.rs @@ -25,7 +25,7 @@ impl OnFailure for DefaultOnFailure { trace!("nacking message"); if let Err(err) = acker.ack_with(jetstream::AckKind::Nak(None)).await { warn!( - error = ?err, + si.error.message = ?err, subject = head.subject.as_str(), "failed to nack the message", ); diff --git a/lib/naxum/src/middleware/ack/on_success.rs b/lib/naxum/src/middleware/ack/on_success.rs index 30dd6ca8c4..e740826af2 100644 --- a/lib/naxum/src/middleware/ack/on_success.rs +++ b/lib/naxum/src/middleware/ack/on_success.rs @@ -25,7 +25,7 @@ impl OnSuccess for DefaultOnSuccess { trace!("double acking message"); if let Err(err) = acker.double_ack().await { warn!( - error = ?err, + si.error.message = ?err, subject = head.subject.as_str(), "failed to double ack the message", ); diff --git a/lib/naxum/src/serve.rs b/lib/naxum/src/serve.rs index f0ff64836c..c7e50136b4 100644 --- a/lib/naxum/src/serve.rs +++ b/lib/naxum/src/serve.rs @@ -271,7 +271,7 @@ where Err(err) => { if failed_count > MAX_FAILED_MESSAGES { warn!( - error = ?err, + si.error.message = ?err, "failed to read message in after {} consecutive failures; closing stream", failed_count, ); diff --git a/lib/si-data-nats/BUCK b/lib/si-data-nats/BUCK index b605e6b2d3..63542c2624 100644 --- a/lib/si-data-nats/BUCK +++ b/lib/si-data-nats/BUCK @@ -4,6 +4,7 @@ rust_library( name = "si-data-nats", deps = [ "//lib/telemetry-rs:telemetry", + "//lib/telemetry-utils-rs:telemetry-utils", "//third-party/rust:async-nats", "//third-party/rust:bytes", "//third-party/rust:futures", diff --git a/lib/si-data-nats/Cargo.toml b/lib/si-data-nats/Cargo.toml index 9ccca980aa..4adf7e7896 100644 --- a/lib/si-data-nats/Cargo.toml +++ b/lib/si-data-nats/Cargo.toml @@ -10,6 +10,7 @@ publish.workspace = true [dependencies] telemetry = { path = "../../lib/telemetry-rs" } +telemetry-utils = { path = "../../lib/telemetry-utils-rs" } async-nats = { workspace = true } bytes = { workspace = true } diff --git a/lib/si-data-nats/src/lib.rs b/lib/si-data-nats/src/lib.rs index c23e459a85..926099299e 100644 --- a/lib/si-data-nats/src/lib.rs +++ b/lib/si-data-nats/src/lib.rs @@ -13,6 +13,7 @@ use async_nats::{subject::ToSubject, ToServerAddrs}; use bytes::Bytes; use serde::{Deserialize, Serialize}; use telemetry::prelude::*; +use telemetry_utils::metric; use thiserror::Error; use tokio::sync::Mutex; @@ -121,6 +122,29 @@ impl Client { if let Some(connection_name) = &config.connection_name { options = options.name(connection_name); } + + options = options.event_callback(|event| async move { + match event { + Event::Connected => metric!(counter.nats.event_callback.connected = 1), + Event::Disconnected => metric!(counter.nats.event_callback.disconnected = 1), + Event::Closed => metric!(counter.nats.event_callback.closed = 1), + Event::LameDuckMode => metric!(counter.nats.event_callback.lame_duck_mode = 1), + Event::Draining => metric!(counter.nats.event_callback.draining = 1), + Event::SlowConsumer(sid) => { + metric!(counter.nats.event_callback.slow_consumer = 1); + warn!(%sid, "nats event callback: slow consumers for subscription"); + } + Event::ServerError(server_error) => { + metric!(counter.nats.event_callback.server_error = 1); + error!(si.error.message = ?server_error, "nats event callback: server error") + } + Event::ClientError(client_error) => { + metric!(counter.nats.event_callback.client_error = 1); + error!(si.error.message = ?client_error, "nats event callback: client error") + } + } + }); + Self::connect_with_options(&config.url, config.subject_prefix.clone(), options).await } diff --git a/lib/veritech-server/src/handlers.rs b/lib/veritech-server/src/handlers.rs index f6a3705d11..2fee22cf25 100644 --- a/lib/veritech-server/src/handlers.rs +++ b/lib/veritech-server/src/handlers.rs @@ -309,7 +309,7 @@ where }; request.dec_run_metric(); if let Err(err) = publisher.publish_result(&func_result_error).await { - error!(error = ?err, "failed to publish errored result"); + error!(si.error.message = ?err, "failed to publish errored result"); } } } diff --git a/lib/veritech-server/src/server.rs b/lib/veritech-server/src/server.rs index d4cfa3fa0e..06a0bc8a69 100644 --- a/lib/veritech-server/src/server.rs +++ b/lib/veritech-server/src/server.rs @@ -165,7 +165,7 @@ impl Server { #[inline] pub async fn run(self) { if let Err(err) = self.try_run().await { - error!(error = ?err, "error while running veritech main loop"); + error!(si.error.message = ?err, "error while running veritech main loop"); } }