Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix/add datadog tracing exporter #44

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 44 additions & 42 deletions scheduler/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ opentelemetry-otlp = { version = "=0.16.0", features = [
"http-proto",
"tls",
] }
tracing-bunyan-formatter = "0.3"
opentelemetry-datadog = { version = "0.11.0", features = ["reqwest-client"] }

openssl-probe = "0.1.2"
chrono = "0.4.19"
Expand All @@ -53,7 +53,6 @@ chrono-tz = "0.8.1"
[dev-dependencies]
nettu_scheduler_sdk = { path = "./clients/rust" }


[workspace.lints.rust]
unsafe_code = "forbid"

Expand Down
51 changes: 51 additions & 0 deletions scheduler/crates/api/src/http_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,57 @@ impl RootSpanBuilder for NitteiTracingRootSpanBuilder {
}

fn on_request_end<B: MessageBody>(span: Span, outcome: &Result<ServiceResponse<B>, Error>) {
// Log the outcome of the request
log_request(outcome);

DefaultRootSpanBuilder::on_request_end(span, outcome);
}
}

/// Log the outcome of the request
fn log_request(outcome: &Result<ServiceResponse<impl MessageBody>, Error>) {
// Log the outcome of the request
if let Ok(response) = outcome {
let status_code = response.status().as_u16();
let method = response.request().method().to_string();
let path = response.request().path().to_string();

// Ignore healthcheck endpoint
if path == "/api/v1/healthcheck" {
return;
}

// Log with custom fields in JSON format
let message = format!("{} {} => {}", method, path, status_code);

if status_code >= 500 {
tracing::error!(
method = method,
path = path,
status_code = status_code,
message,
);
} else if status_code >= 400 {
tracing::warn!(
method = method,
path = path,
status_code = status_code,
message,
);
} else {
tracing::info!(
method = method,
path = path,
status_code = status_code,
message,
);
};
} else if let Err(err) = outcome {
// Fallback in case we can't retrieve the request from the span
tracing::error!(
status_code = 500,
error = %err,
"HTTP request resulted in an error, but request details are missing"
);
}
}
8 changes: 7 additions & 1 deletion scheduler/crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ mod user;
use std::net::TcpListener;

use actix_cors::Cors;
use actix_web::{dev::Server, middleware, web, web::Data, App, HttpServer};
use actix_web::{
dev::Server,
middleware::{self},
web::{self, Data},
App,
HttpServer,
};
use http_logger::NitteiTracingRootSpanBuilder;
use job_schedulers::{start_reminder_generation_job_scheduler, start_send_reminders_job};
use nettu_scheduler_domain::{
Expand Down
9 changes: 8 additions & 1 deletion scheduler/crates/infra/src/repos/account/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use sqlx::{
FromRow,
PgPool,
};
use tracing::error;
use tracing::{error, instrument};

use super::IAccountRepo;

#[derive(Debug)]
pub struct PostgresAccountRepo {
pool: PgPool,
}
Expand Down Expand Up @@ -40,6 +41,7 @@ impl From<AccountRaw> for Account {

#[async_trait::async_trait]
impl IAccountRepo for PostgresAccountRepo {
#[instrument]
async fn insert(&self, account: &Account) -> anyhow::Result<()> {
sqlx::query!(
r#"
Expand All @@ -63,6 +65,7 @@ impl IAccountRepo for PostgresAccountRepo {
Ok(())
}

#[instrument]
async fn save(&self, account: &Account) -> anyhow::Result<()> {
sqlx::query!(
r#"
Expand All @@ -89,6 +92,7 @@ impl IAccountRepo for PostgresAccountRepo {
Ok(())
}

#[instrument]
async fn find(&self, account_id: &ID) -> Option<Account> {
let res: Option<AccountRaw> = sqlx::query_as!(
AccountRaw,
Expand All @@ -111,6 +115,7 @@ impl IAccountRepo for PostgresAccountRepo {
res.map(|account| account.into())
}

#[instrument]
async fn find_many(&self, accounts_ids: &[ID]) -> anyhow::Result<Vec<Account>> {
let ids = accounts_ids
.iter()
Expand All @@ -137,6 +142,7 @@ impl IAccountRepo for PostgresAccountRepo {
Ok(accounts_raw.into_iter().map(|acc| acc.into()).collect())
}

#[instrument]
async fn delete(&self, account_id: &ID) -> Option<Account> {
let res: Option<AccountRaw> = sqlx::query_as!(
AccountRaw,
Expand All @@ -160,6 +166,7 @@ impl IAccountRepo for PostgresAccountRepo {
res.map(|acc| acc.into())
}

#[instrument]
async fn find_by_apikey(&self, api_key: &str) -> Option<Account> {
let res: Option<AccountRaw> = sqlx::query_as!(
AccountRaw,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use nettu_scheduler_domain::{AccountIntegration, IntegrationProvider, ID};
use sqlx::{types::Uuid, FromRow, PgPool};
use tracing::error;
use tracing::{error, instrument};

use super::IAccountIntegrationRepo;

#[derive(Debug)]
pub struct PostgresAccountIntegrationRepo {
pool: PgPool,
}
Expand Down Expand Up @@ -37,6 +38,7 @@ impl From<AccountIntegrationRaw> for AccountIntegration {

#[async_trait::async_trait]
impl IAccountIntegrationRepo for PostgresAccountIntegrationRepo {
#[instrument]
async fn insert(&self, integration: &AccountIntegration) -> anyhow::Result<()> {
let provider: String = integration.provider.clone().into();
sqlx::query!(
Expand All @@ -62,6 +64,7 @@ impl IAccountIntegrationRepo for PostgresAccountIntegrationRepo {
Ok(())
}

#[instrument]
async fn find(&self, account_id: &ID) -> anyhow::Result<Vec<AccountIntegration>> {
let integrations: Vec<AccountIntegrationRaw> = sqlx::query_as!(
AccountIntegrationRaw,
Expand All @@ -83,6 +86,7 @@ impl IAccountIntegrationRepo for PostgresAccountIntegrationRepo {
Ok(integrations.into_iter().map(|i| i.into()).collect())
}

#[instrument]
async fn delete(&self, account_id: &ID, provider: IntegrationProvider) -> anyhow::Result<()> {
let provider: String = provider.into();
match sqlx::query!(
Expand Down
Loading