Skip to content

Commit

Permalink
Merge pull request #17 from lmnr-ai/eval-dashboard
Browse files Browse the repository at this point in the history
evaluation dashboards
  • Loading branch information
dinmukhamedm authored Oct 5, 2024
2 parents 7342d72 + d4cf313 commit d97bbde
Show file tree
Hide file tree
Showing 104 changed files with 3,668 additions and 3,175 deletions.
600 changes: 301 additions & 299 deletions app-server/Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions app-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ env_logger = "0.10.0"
actix-web = "4"
anyhow = "1"
futures-util = "0.3.28"
tonic = "0.8"
tonic = "0.12.3"

# workaround [AFIT](https://rust-lang.github.io/rfcs/3185-static-async-fn-in-trait.html)
# while rust people have hot [debates](https://github.com/rust-lang/rust/pull/115822#issuecomment-1718261458)
# around it and are strugling to stabilize it.
async-trait = "0.1"

dotenv = "0.15"
prost = "0.11"
prost = "0.13"
tokio = { version = "1.24", features = ["macros", "rt-multi-thread"] }
tokio-stream = { version = "0.1", features = ["net"] }
futures = "0.3"
Expand Down Expand Up @@ -66,4 +66,4 @@ serde_repr = "0.1.19"
num_cpus = "1.16.0"

[build-dependencies]
tonic-build = "0.8"
tonic-build = "0.12.3"
4 changes: 2 additions & 2 deletions app-server/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.build_client(true)
.build_server(false)
.out_dir("./src/semantic_search/")
.compile(&[proto_file], &["proto"])?;
.compile_protos(&[proto_file], &["proto"])?;

// NOTE: Currently need to manually enable this, fix errors with super::super::..., whenever changing proto.
tonic_build::configure()
Expand All @@ -15,7 +15,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.build_server(true)
.include_file("mod.rs")
.out_dir("./src/opentelemetry/")
.compile(
.compile_protos(
&[
"./proto/opentelemetry/common.proto",
"./proto/opentelemetry/resource.proto",
Expand Down
183 changes: 49 additions & 134 deletions app-server/src/api/v1/evaluations.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use actix_web::{post, web, HttpResponse};
use serde::Deserialize;
use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;
use uuid::Uuid;

use crate::{
ch::evaluation_scores::{insert_evaluation_scores, EvaluationScore},
db::{self, api_keys::ProjectApiKey, DB},
evaluations::stats::calculate_average_scores,
evaluations::utils::{get_columns_from_points, EvaluationDatapointResult},
names::NameGenerator,
routes::types::ResponseResult,
};
Expand All @@ -15,160 +15,75 @@ use crate::{
#[serde(rename_all = "camelCase")]
struct CreateEvaluationRequest {
name: Option<String>,
group_id: Option<String>,
points: Vec<EvaluationDatapointResult>,
}

#[post("evaluations")]
async fn create_evaluation(
req: web::Json<CreateEvaluationRequest>,
db: web::Data<DB>,
clickhouse: web::Data<clickhouse::Client>,
project_api_key: ProjectApiKey,
name_generator: web::Data<Arc<NameGenerator>>,
) -> ResponseResult {
let project_id = project_api_key.project_id;
let req = req.into_inner();
let clickhouse = clickhouse.into_inner().as_ref().clone();
let db = db.into_inner();

let name = if let Some(name) = req.name {
name
} else {
name_generator.next().await
};
let group_id = req.group_id.unwrap_or("default".to_string());
let points = req.points;

let evaluation = db::evaluations::create_evaluation(
&db.pool,
&name,
db::evaluations::EvaluationStatus::Started,
project_id,
)
.await?;
Ok(HttpResponse::Ok().json(evaluation))
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct UpdateEvaluationRequest {
status: db::evaluations::EvaluationStatus,
}

#[post("evaluations/{evaluation_id}")]
async fn update_evaluation(
path: web::Path<Uuid>,
req: web::Json<UpdateEvaluationRequest>,
db: web::Data<DB>,
project_api_key: ProjectApiKey,
) -> ResponseResult {
let project_id = project_api_key.project_id;
let evaluation_id = path.into_inner();
let req = req.into_inner();

let mut average_scores = None;
if req.status == db::evaluations::EvaluationStatus::Finished {
// Calculate average scores only once when the evaluation is finished to avoid recalculating them on each update and query
let datapoint_scores =
db::evaluations::get_evaluation_datapoint_scores(&db.pool, evaluation_id).await?;
let average_scores_json = serde_json::to_value(calculate_average_scores(datapoint_scores))
.map_err(|e| anyhow::anyhow!("Failed to serialize average scores: {}", e))?;
average_scores = Some(average_scores_json);
if points.is_empty() {
return Err(anyhow::anyhow!("Evaluation must have at least one datapoint result").into());
}

let evaluation = db::evaluations::update_evaluation_status(
&db.pool,
project_id,
evaluation_id,
req.status,
average_scores,
)
.await?;
Ok(HttpResponse::Ok().json(evaluation))
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct RequestEvaluationDatapoint {
data: Value,
target: Value,
executor_output: Option<Value>,
#[serde(default)]
trace_id: Uuid,
error: Option<Value>,
scores: HashMap<String, f64>,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct UploadEvaluationDatapointsRequest {
evaluation_id: Uuid,
points: Vec<RequestEvaluationDatapoint>,
}

#[post("evaluation-datapoints")]
async fn upload_evaluation_datapoints(
req: web::Json<UploadEvaluationDatapointsRequest>,
db: web::Data<DB>,
project_api_key: ProjectApiKey,
) -> ResponseResult {
let project_id = project_api_key.project_id;
let db = db.into_inner();
let evaluation =
db::evaluations::get_evaluation(db.clone(), project_id, req.evaluation_id).await?;

let evaluation_id = evaluation.id;
let statuses = req
.points
.iter()
.map(|point| {
if point.error.is_some() {
db::evaluations::EvaluationDatapointStatus::Error
} else {
db::evaluations::EvaluationDatapointStatus::Success
}
})
.collect::<Vec<_>>();

let data = req
.points
.iter()
.map(|point| point.data.clone())
.collect::<Vec<_>>();
db::evaluations::create_evaluation(&db.pool, &name, project_id, &group_id).await?;

let columns = get_columns_from_points(&points);
let ids = points.iter().map(|_| Uuid::new_v4()).collect::<Vec<_>>();

let ids_clone = ids.clone();
let db_task = tokio::spawn(async move {
db::evaluations::set_evaluation_results(
db.clone(),
evaluation.id,
&ids_clone,
&columns.scores,
&columns.datas,
&columns.targets,
&columns.executor_outputs,
&columns.trace_ids,
)
.await
});

// Flattened scores from all evaluators to be recorded to Clickhouse
// Its length can be longer than the amount of evaluation datapoints since each evaluator can return multiple scores
let ch_evaluation_scores = EvaluationScore::from_evaluation_datapoint_results(
&points,
ids,
project_id,
group_id,
evaluation.id,
);

let target = req
.points
.iter()
.map(|point| point.target.clone())
.collect::<Vec<_>>();
let ch_task = tokio::spawn(insert_evaluation_scores(
clickhouse.clone(),
ch_evaluation_scores,
));

let executor_output = req
.points
.iter()
.map(|point| point.executor_output.clone())
.collect::<Vec<_>>();
let (db_result, ch_result) = tokio::join!(db_task, ch_task);

let error = req
.points
.iter()
.map(|point| point.error.clone())
.collect::<Vec<_>>();
let scores = req
.points
.iter()
.map(|point| point.scores.clone())
.collect::<Vec<_>>();
let trace_ids = req
.points
.iter()
.map(|point| point.trace_id)
.collect::<Vec<_>>();
db_result.map_err(|e| anyhow::anyhow!("Database task failed: {}", e))??;
ch_result.map_err(|e| anyhow::anyhow!("Clickhouse task failed: {}", e))??;

let evaluation_datapoint = db::evaluations::set_evaluation_results(
&db.pool,
evaluation_id,
&statuses,
&scores,
&data,
&target,
&executor_output,
&trace_ids,
&error,
)
.await?;
Ok(HttpResponse::Ok().json(evaluation_datapoint))
Ok(HttpResponse::Ok().json(evaluation))
}
26 changes: 7 additions & 19 deletions app-server/src/api/v1/pipelines.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{collections::HashMap, sync::Arc};

use actix_web::{post, web, HttpResponse};
use actix_web::{get, post, web, HttpResponse};
use serde::Deserialize;
use uuid::Uuid;

use crate::{
api::utils::query_target_pipeline_version,
cache::Cache,
db::{api_keys::ProjectApiKey, DB},
db::{api_keys::ProjectApiKey, trace::CurrentTraceAndSpan, DB},
pipeline::{
nodes::{GraphOutput, GraphRunOutput, NodeInput, RunEndpointEventError, StreamChunk},
runner::{PipelineRunner, PipelineRunnerError},
Expand All @@ -19,14 +19,6 @@ use crate::{
},
};

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct CurrentTraceAndSpan {
trace_id: Uuid,
#[serde(default)]
parent_span_id: Option<Uuid>,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GraphRequest {
Expand Down Expand Up @@ -58,11 +50,9 @@ async fn run_pipeline_graph(
let inputs = req.inputs;
let mut env = req.env;
let metadata = req.metadata;
let parent_span_id = req
.current_trace_and_span
.as_ref()
.and_then(|t| t.parent_span_id);
let trace_id = req.current_trace_and_span.map(|t| t.trace_id);

let current_trace_and_span = req.current_trace_and_span;

env.insert("collection_name".to_string(), project_id.to_string());

let pipeline_version =
Expand Down Expand Up @@ -94,8 +84,7 @@ async fn run_pipeline_graph(
&run_result,
&project_id,
&pipeline_version_name,
parent_span_id,
trace_id,
current_trace_and_span,
None,
)
.await
Expand Down Expand Up @@ -158,8 +147,7 @@ async fn run_pipeline_graph(
&run_result,
&project_id,
&pipeline_version_name,
parent_span_id,
trace_id,
current_trace_and_span,
None,
)
.await?;
Expand Down
Loading

0 comments on commit d97bbde

Please sign in to comment.