Skip to content

Commit

Permalink
Browser and evals (#358)
Browse files Browse the repository at this point in the history
* eval pooler

* changed limits

* rename vars, add camelCase

* lint frontend, fixes build

* add evaluation index

* expose env var + wip: realtime

* updated rrweb-player

* improved browser ui

* correct time

* fixed timeline

* v0 realtime evals, also makes supabase client singleton

---------

Co-authored-by: Din <[email protected]>
  • Loading branch information
skull8888888 and dinmukhamedm authored Feb 2, 2025
1 parent 59bb656 commit 8a1eca2
Show file tree
Hide file tree
Showing 30 changed files with 3,585 additions and 465 deletions.
3 changes: 2 additions & 1 deletion app-server/src/api/v1/browser_sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ async fn create_session_event(
INSERT INTO browser_session_events (
event_id, session_id, trace_id, timestamp,
event_type, data, project_id
) VALUES ",
)
VALUES ",
);

let mut values = Vec::new();
Expand Down
81 changes: 81 additions & 0 deletions app-server/src/api/v1/evals.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::sync::Arc;

use crate::{
db::{self, project_api_keys::ProjectApiKey, DB},
evaluations::{save_evaluation_scores, utils::EvaluationDatapointResult},
names::NameGenerator,
routes::types::ResponseResult,
};
use actix_web::{
post,
web::{self, Json},
HttpResponse,
};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct InitEvalRequest {
pub name: Option<String>,
pub group_name: Option<String>,
}

#[post("/evals")]
pub async fn init_eval(
req: Json<InitEvalRequest>,
db: web::Data<DB>,
name_generator: web::Data<Arc<NameGenerator>>,
project_api_key: ProjectApiKey,
) -> ResponseResult {
let req = req.into_inner();
let group_name = req.group_name.unwrap_or_else(|| "default".to_string());
let project_id = project_api_key.project_id;

let name = if let Some(name) = req.name {
name
} else {
name_generator.next().await
};

let evaluation =
db::evaluations::create_evaluation(&db.pool, &name, project_id, &group_name).await?;

Ok(HttpResponse::Ok().json(evaluation))
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SaveEvalDatapointsRequest {
pub group_name: Option<String>,
pub points: Vec<EvaluationDatapointResult>,
}

#[post("/evals/{eval_id}/datapoints")]
pub async fn save_eval_datapoints(
eval_id: web::Path<Uuid>,
req: Json<SaveEvalDatapointsRequest>,
db: web::Data<DB>,
clickhouse: web::Data<clickhouse::Client>,
project_api_key: ProjectApiKey,
) -> ResponseResult {
let eval_id = eval_id.into_inner();
let req = req.into_inner();
let project_id = project_api_key.project_id;
let points = req.points;
let db = db.into_inner();
let group_name = req.group_name.unwrap_or("default".to_string());
let clickhouse = clickhouse.into_inner().as_ref().clone();

save_evaluation_scores(
db.clone(),
clickhouse,
points,
eval_id,
project_id,
&group_name,
)
.await?;

Ok(HttpResponse::Ok().json(eval_id))
}
1 change: 1 addition & 0 deletions app-server/src/api/v1/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod browser_sessions;
pub mod datasets;
pub mod evals;
pub mod evaluations;
pub mod machine_manager;
pub mod metrics;
Expand Down
9 changes: 5 additions & 4 deletions app-server/src/db/evaluations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub async fn set_evaluation_results(
targets: &Vec<Value>,
executor_outputs: &Vec<Option<Value>>,
trace_ids: &Vec<Uuid>,
indices: &Vec<i32>,
) -> Result<()> {
let results = sqlx::query_as::<_, EvaluationDatapointPreview>(
r"INSERT INTO evaluation_results (
Expand All @@ -88,7 +89,7 @@ pub async fn set_evaluation_results(
target,
executor_output,
trace_id,
index_in_batch
index
)
SELECT
id,
Expand All @@ -97,10 +98,10 @@ pub async fn set_evaluation_results(
target,
executor_output,
trace_id,
index_in_batch
index
FROM
UNNEST ($1::uuid[], $2::jsonb[], $3::jsonb[], $4::jsonb[], $5::uuid[], $6::int8[])
AS tmp_table(id, data, target, executor_output, trace_id, index_in_batch)
AS tmp_table(id, data, target, executor_output, trace_id, index)
RETURNING id, created_at, evaluation_id, trace_id
",
)
Expand All @@ -109,7 +110,7 @@ pub async fn set_evaluation_results(
.bind(targets)
.bind(executor_outputs)
.bind(trace_ids)
.bind(&Vec::from_iter(0..ids.len() as i64))
.bind(indices)
.bind(evaluation_id)
.fetch_all(pool)
.await?;
Expand Down
5 changes: 3 additions & 2 deletions app-server/src/evaluations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub async fn save_evaluation_scores(
points: Vec<EvaluationDatapointResult>,
evaluation_id: Uuid,
project_id: Uuid,
group_id: &String,
group_name: &String,
) -> Result<()> {
let columns = get_columns_from_points(&points);
let ids = points.iter().map(|_| Uuid::new_v4()).collect::<Vec<_>>();
Expand All @@ -41,6 +41,7 @@ pub async fn save_evaluation_scores(
&columns.targets,
&columns.executor_outputs,
&columns.trace_ids,
&columns.indices,
)
.await
});
Expand All @@ -52,7 +53,7 @@ pub async fn save_evaluation_scores(
&points,
&ids,
project_id,
group_id.clone(),
group_name.clone(),
evaluation_id,
Utc::now(),
);
Expand Down
6 changes: 6 additions & 0 deletions app-server/src/evaluations/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct EvaluationDatapointResult {
pub human_evaluators: Vec<HumanEvaluator>,
#[serde(default)]
pub executor_span_id: Uuid,
#[serde(default)]
pub index: i32,
}

pub struct DatapointColumns {
Expand All @@ -35,6 +37,7 @@ pub struct DatapointColumns {
pub executor_outputs: Vec<Option<Value>>,
pub trace_ids: Vec<Uuid>,
pub scores: Vec<HashMap<String, f64>>,
pub indices: Vec<i32>,
}

pub fn get_columns_from_points(points: &Vec<EvaluationDatapointResult>) -> DatapointColumns {
Expand Down Expand Up @@ -63,12 +66,15 @@ pub fn get_columns_from_points(points: &Vec<EvaluationDatapointResult>) -> Datap
.map(|point| point.trace_id)
.collect::<Vec<_>>();

let indices = points.iter().map(|point| point.index).collect::<Vec<_>>();

DatapointColumns {
datas,
targets,
executor_outputs,
trace_ids,
scores,
indices,
}
}

Expand Down
21 changes: 14 additions & 7 deletions app-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ mod storage;
mod traces;

const DEFAULT_CACHE_SIZE: u64 = 100; // entries
const HTTP_PAYLOAD_LIMIT: usize = 100 * 1024 * 1024; // 100MB
const GRPC_PAYLOAD_DECODING_LIMIT: usize = 100 * 1024 * 1024; // 100MB
const HTTP_PAYLOAD_LIMIT: usize = 5 * 1024 * 1024; // 5MB
const GRPC_PAYLOAD_DECODING_LIMIT: usize = 10 * 1024 * 1024; // 10MB

fn tonic_error_to_io_error(err: tonic::transport::Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, err)
Expand Down Expand Up @@ -361,9 +361,14 @@ fn main() -> anyhow::Result<()> {
cache_for_http.clone(),
));

// start 8 threads per core to process spans from RabbitMQ
let num_workers_per_thread = env::var("NUM_WORKERS_PER_THREAD")
.unwrap_or(String::from("8"))
.parse::<u8>()
.unwrap_or(8);

// start num_workers_per_thread threads per core to process spans from RabbitMQ
if is_feature_enabled(Feature::FullBuild) {
for _ in 0..8 {
for _ in 0..num_workers_per_thread {
tokio::spawn(process_queue_spans(
pipeline_runner.clone(),
db_for_http.clone(),
Expand All @@ -379,6 +384,8 @@ fn main() -> anyhow::Result<()> {
App::new()
.wrap(Logger::default())
.wrap(NormalizePath::trim())
.app_data(JsonConfig::default().limit(HTTP_PAYLOAD_LIMIT))
.app_data(PayloadConfig::new(HTTP_PAYLOAD_LIMIT))
.app_data(web::Data::from(cache_for_http.clone()))
.app_data(web::Data::from(db_for_http.clone()))
.app_data(web::Data::new(pipeline_runner.clone()))
Expand Down Expand Up @@ -421,8 +428,6 @@ fn main() -> anyhow::Result<()> {
.service(
web::scope("/v1")
.wrap(project_auth.clone())
.app_data(PayloadConfig::new(HTTP_PAYLOAD_LIMIT))
.app_data(JsonConfig::default().limit(HTTP_PAYLOAD_LIMIT))
.service(api::v1::pipelines::run_pipeline_graph)
.service(api::v1::pipelines::ping_healthcheck)
.service(api::v1::traces::process_traces)
Expand All @@ -434,7 +439,9 @@ fn main() -> anyhow::Result<()> {
.service(api::v1::machine_manager::start_machine)
.service(api::v1::machine_manager::terminate_machine)
.service(api::v1::machine_manager::execute_computer_action)
.service(api::v1::browser_sessions::create_session_event),
.service(api::v1::browser_sessions::create_session_event)
.service(api::v1::evals::init_eval)
.service(api::v1::evals::save_eval_datapoints),
)
// Scopes with generic auth
.service(
Expand Down
12 changes: 8 additions & 4 deletions app-server/src/traces/spans.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, env, sync::Arc};

use anyhow::Result;
use chrono::{TimeZone, Utc};
Expand Down Expand Up @@ -46,7 +46,7 @@ const HAS_BROWSER_SESSION_ATTRIBUTE_NAME: &str = "lmnr.internal.has_browser_sess
//
// We use 7/2 as an estimate of the number of characters per token.
// And 128K is a common input size for LLM calls.
const PAYLOAD_SIZE_THRESHOLD: usize = (7 / 2) * 128_000; // approx 448KB
const DEFAULT_PAYLOAD_SIZE_THRESHOLD: usize = (7 / 2) * 128_000; // approx 448KB

#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -585,6 +585,10 @@ impl Span {
project_id: &Uuid,
storage: Arc<S>,
) -> Result<()> {
let payload_size_threshold = env::var("MAX_DB_SPAN_PAYLOAD_BYTES")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(DEFAULT_PAYLOAD_SIZE_THRESHOLD);
if let Some(input) = self.input.clone() {
let span_input = serde_json::from_value::<Vec<ChatMessage>>(input);
if let Ok(span_input) = span_input {
Expand All @@ -605,7 +609,7 @@ impl Span {
// but we don't need to be exact here.
} else {
let input_str = serde_json::to_string(&self.input).unwrap_or_default();
if input_str.len() > PAYLOAD_SIZE_THRESHOLD {
if input_str.len() > payload_size_threshold {
let key = crate::storage::create_key(project_id, &None);
let mut data = Vec::new();
serde_json::to_writer(&mut data, &self.input)?;
Expand All @@ -619,7 +623,7 @@ impl Span {
}
if let Some(output) = self.output.clone() {
let output_str = serde_json::to_string(&output).unwrap_or_default();
if output_str.len() > PAYLOAD_SIZE_THRESHOLD {
if output_str.len() > payload_size_threshold {
let key = crate::storage::create_key(project_id, &None);
let mut data = Vec::new();
serde_json::to_writer(&mut data, &output)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ export async function GET(request: NextRequest, props: { params: Promise<{ proje
const { projectId } = params;

const traceId = request.nextUrl.searchParams.get('traceId');
console.log('traceId', traceId, 'projectId', projectId);

const res = await clickhouseClient.query({
query: 'SELECT * FROM browser_session_events WHERE trace_id = {id: UUID} AND project_id = {projectId: UUID} ORDER BY timestamp ASC',
query: `
SELECT *
FROM browser_session_events
WHERE trace_id = {id: UUID}
AND project_id = {projectId: UUID}
ORDER BY timestamp ASC`,
format: 'JSONEachRow',
query_params: {
id: traceId,
projectId: projectId
projectId: projectId,
}
});
const events = await res.json();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export async function GET(
target: sql<string>`SUBSTRING(${evaluationResults.target}::text, 0, 100)`.as('target'),
executorOutput: evaluationResults.executorOutput,
scores: subQueryScoreCte.cteScores,
index: evaluationResults.index
})
.from(evaluationResults)
.leftJoin(
Expand All @@ -52,8 +53,8 @@ export async function GET(
)
.where(eq(evaluationResults.evaluationId, evaluationId))
.orderBy(
asc(evaluationResults.createdAt),
asc(evaluationResults.indexInBatch)
asc(evaluationResults.index),
asc(evaluationResults.createdAt)
);

const [evaluation, results] = await Promise.all([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ export async function POST(
const parsed = updatePlaygroundSchema.safeParse(body);

if (!parsed.success) {
console.log(parsed.error.errors);
return new Response(JSON.stringify({ error: parsed.error.errors }), {
status: 400
});
Expand Down
Loading

0 comments on commit 8a1eca2

Please sign in to comment.