Skip to content

Commit

Permalink
Merge pull request #349 from lmnr-ai/dev
Browse files Browse the repository at this point in the history
Change ports, add clickhouse to lite version
  • Loading branch information
dinmukhamedm authored Jan 23, 2025
2 parents 95118b1 + 7e0c51b commit 2205bea
Show file tree
Hide file tree
Showing 30 changed files with 156 additions and 169 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ RABBITMQ_DEFAULT_USER=admin
RABBITMQ_DEFAULT_PASS=adminpasswd

CLICKHOUSE_USER=default
POSTGRES_PORT=5433
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
**/.env.local
**/.env
.DS_Store
**/..DS_Store
**/..DS_Store
.idea
6 changes: 3 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ We do this to avoid legal issues and disputes, and to stay compliant with releva

Don't get overwhelmed by the number of docker-compose files. Here's a quick overview:

- `docker-compose.yml` is the simplest one that spins up frontend, app-server, and postgres. Good for quickstarts.
- `docker-compose.yml` is the simplest one that spins up frontend, app-server, clickhouse, and postgres. Good for quickstarts.
- `docker-compose-full.yml` is the one you want to use for running the full stack locally. This is the best
for self-hosting.
- `docker-compose-local-dev-full.yml` full file for local development. To be used when you make changes
to the backend. It will only run the dependency services (postgres, qdrant, clickhouse, rabbitmq).
You will need to run `cargo r`, `pnpm run dev`, and `python server.py` manually.
- `docker-compose-local-dev.yml` is the one you want to use for local development. It will only
run postgres and app-server. Good for frontend changes.
run postgres, clickhouse, and app-server. Good for frontend changes.
- `docker-compose-local-build.yml` will build the services from the source and run them in production mode. This is good for self-hosting with your own changes,
or for testing the changes after developing on your own and before opening a PR.

| Service | docker-compose.yml | docker-compose-full.yml | docker-compose-local-dev-full.yml | docker-compose-local-dev.yml | docker-compose-local-build.yml |
|---------|-------------------|------------------------|------------------------------|----------------------------|------------------------------|
| postgres ||||||
| qdrant ||||||
| clickhouse | ||| ||
| clickhouse | ||| ||
| rabbitmq ||||||
| app-server | ℹ️ || 💻 | ℹ️ | 🔧 |
| frontend | ℹ️ || 💻 | 💻 | 🔧 |
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ cd lmnr
docker compose up -d
```

This will spin up a lightweight version of the stack with Postgres, app-server, and frontend. This is good for a quickstart
or for lightweight usage. You can access the UI at http://localhost:3000 in your browser.
This will spin up a lightweight version of the stack with Postgres, clickhouse, app-server, and frontend. This is good for a quickstart
or for lightweight usage. You can access the UI at http://localhost:5667 in your browser.

For production environment, we recommend using our [managed platform](https://www.lmnr.ai/projects) or `docker compose -f docker-compose-full.yml up -d`.

Expand Down
2 changes: 1 addition & 1 deletion app-server/.env.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
SEMANTIC_SEARCH_URL=http://localhost:8080
# postgres://user:password@host:port/dbname
DATABASE_URL="postgres://postgres:postgres_passwordabc@localhost:5432/postgres"
DATABASE_URL="postgres://postgres:postgres_passwordabc@localhost:5433/postgres"

PORT=8000
GRPC_PORT=8001
Expand Down
3 changes: 3 additions & 0 deletions app-server/src/api/v1/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ pub async fn process_traces(
project_api_key: ProjectApiKey,
rabbitmq_connection: web::Data<Option<Arc<Connection>>>,
db: web::Data<DB>,
clickhouse: web::Data<clickhouse::Client>,
cache: web::Data<crate::cache::Cache>,
) -> ResponseResult {
let db = db.into_inner();
let cache = cache.into_inner();
let clickhouse = clickhouse.into_inner().as_ref().clone();
let request = ExportTraceServiceRequest::decode(body).map_err(|e| {
anyhow::anyhow!("Failed to decode ExportTraceServiceRequest from bytes. {e}")
})?;
Expand All @@ -57,6 +59,7 @@ pub async fn process_traces(
project_api_key.project_id,
rabbitmq_connection,
db,
clickhouse,
cache,
)
.await?;
Expand Down
13 changes: 1 addition & 12 deletions app-server/src/ch/evaluation_scores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,8 @@ use clickhouse::Row;
use serde::{Deserialize, Serialize, Serializer};
use uuid::Uuid;

use crate::{
evaluations::utils::EvaluationDatapointResult,
features::{is_feature_enabled, Feature},
};

use super::utils::chrono_to_nanoseconds;
use crate::evaluations::utils::EvaluationDatapointResult;

fn serialize_timestamp<S>(timestamp: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
where
Expand Down Expand Up @@ -78,10 +74,6 @@ pub async fn insert_evaluation_scores(
return Ok(());
}

if !is_feature_enabled(Feature::FullBuild) {
return Ok(());
}

let ch_insert = clickhouse.insert("evaluation_scores");
match ch_insert {
Ok(mut ch_insert) => {
Expand Down Expand Up @@ -284,9 +276,6 @@ pub async fn delete_evaluation_score(
result_id: Uuid,
label_id: Uuid,
) -> Result<()> {
if !is_feature_enabled(Feature::FullBuild) {
return Ok(());
}
// Note, this does not immediately physically delete the data.
// https://clickhouse.com/docs/en/sql-reference/statements/delete
clickhouse
Expand Down
5 changes: 1 addition & 4 deletions app-server/src/ch/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use clickhouse::Row;
use serde::Serialize;
use uuid::Uuid;

use crate::{db::events::Event, features::is_feature_enabled, Feature};
use crate::db::events::Event;

use super::utils::chrono_to_nanoseconds;

Expand All @@ -30,9 +30,6 @@ impl CHEvent {
}

pub async fn insert_events(clickhouse: clickhouse::Client, events: Vec<CHEvent>) -> Result<()> {
if !is_feature_enabled(Feature::FullBuild) {
return Ok(());
}
if events.is_empty() {
return Ok(());
}
Expand Down
12 changes: 1 addition & 11 deletions app-server/src/ch/labels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use clickhouse::Row;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::{
db::labels::LabelSource,
features::{is_feature_enabled, Feature},
};
use crate::db::labels::LabelSource;

use super::utils::chrono_to_nanoseconds;

Expand Down Expand Up @@ -76,10 +73,6 @@ pub async fn insert_label(
value: f64,
span_id: Uuid,
) -> Result<()> {
if !is_feature_enabled(Feature::FullBuild) {
return Ok(());
}

let label = CHLabel::new(
project_id,
class_id,
Expand Down Expand Up @@ -120,9 +113,6 @@ pub async fn delete_label(
span_id: Uuid,
id: Uuid,
) -> Result<()> {
if !is_feature_enabled(Feature::FullBuild) {
return Ok(());
}
// Note, this does not immediately physically delete the data.
// https://clickhouse.com/docs/en/sql-reference/statements/delete
client
Expand Down
4 changes: 0 additions & 4 deletions app-server/src/ch/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use uuid::Uuid;

use crate::{
db::spans::{Span, SpanType},
features::{is_feature_enabled, Feature},
traces::spans::SpanUsage,
};

Expand Down Expand Up @@ -97,9 +96,6 @@ impl CHSpan {
}

pub async fn insert_span(clickhouse: clickhouse::Client, span: &CHSpan) -> Result<()> {
if !is_feature_enabled(Feature::FullBuild) {
return Ok(());
}
let ch_insert = clickhouse.insert("spans");
match ch_insert {
Ok(mut ch_insert) => {
Expand Down
12 changes: 1 addition & 11 deletions app-server/src/ch/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use clickhouse::Row;
use serde::Deserialize;
use uuid::Uuid;

use crate::{
db::utils::validate_sql_string,
features::{is_feature_enabled, Feature},
};
use crate::db::utils::validate_sql_string;

use super::modifiers::GroupByInterval;

Expand Down Expand Up @@ -101,13 +98,6 @@ async fn get_time_bounds(
return Err(anyhow::anyhow!("Invalid column name: {}", column_name));
}

if !is_feature_enabled(Feature::FullBuild) {
return Ok(TimeBounds {
min_time: chrono_to_nanoseconds(Utc::now() - chrono::Duration::days(1)),
max_time: chrono_to_nanoseconds(Utc::now()),
});
}

let query_string = format!(
"SELECT
MIN({column_name}) AS min_time,
Expand Down
36 changes: 16 additions & 20 deletions app-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,28 +168,22 @@ fn main() -> anyhow::Result<()> {

let interrupt_senders = Arc::new(DashMap::<Uuid, mpsc::Sender<GraphInterruptMessage>>::new());

let clickhouse = if is_feature_enabled(Feature::FullBuild) {
let clickhouse_url = env::var("CLICKHOUSE_URL").expect("CLICKHOUSE_URL must be set");
let clickhouse_user = env::var("CLICKHOUSE_USER").expect("CLICKHOUSE_USER must be set");
let clickhouse_password = env::var("CLICKHOUSE_PASSWORD");
// https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts -> Create client which will wait for async inserts
// For now, we're not waiting for inserts to finish, but later need to add queue and batch on client-side
let mut client = clickhouse::Client::default()
.with_url(clickhouse_url)
.with_user(clickhouse_user)
.with_database("default")
.with_option("async_insert", "1")
.with_option("wait_for_async_insert", "0");
if let Ok(clickhouse_password) = clickhouse_password {
client = client.with_password(clickhouse_password);
} else {
let clickhouse_url = env::var("CLICKHOUSE_URL").expect("CLICKHOUSE_URL must be set");
let clickhouse_user = env::var("CLICKHOUSE_USER").expect("CLICKHOUSE_USER must be set");
let clickhouse_password = env::var("CLICKHOUSE_PASSWORD");
let client = clickhouse::Client::default()
.with_url(clickhouse_url)
.with_user(clickhouse_user)
.with_database("default")
.with_option("async_insert", "1")
.with_option("wait_for_async_insert", "0");

let clickhouse = match clickhouse_password {
Ok(password) => client.with_password(password),
_ => {
log::warn!("CLICKHOUSE_PASSWORD not set, using without password");
client
}
client
} else {
// This client does not connect to ClickHouse, and the feature flag must be checked before using it
// TODO: wrap this in a dyn trait object
clickhouse::Client::default()
};

let mut rabbitmq_connection = None;
Expand Down Expand Up @@ -253,6 +247,7 @@ fn main() -> anyhow::Result<()> {
let runtime_handle_for_http = runtime_handle.clone();
let db_for_http = db.clone();
let cache_for_http = cache.clone();
let clickhouse_for_grpc = clickhouse.clone();
let http_server_handle = thread::Builder::new()
.name("http".to_string())
.spawn(move || {
Expand Down Expand Up @@ -555,6 +550,7 @@ fn main() -> anyhow::Result<()> {
db.clone(),
cache.clone(),
rabbitmq_connection_grpc.clone(),
clickhouse_for_grpc,
);

Server::builder()
Expand Down
6 changes: 0 additions & 6 deletions app-server/src/routes/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use std::sync::Arc;

use super::{GetMetricsQueryParams, ResponseResult};
use crate::ch::utils::get_bounds;
use crate::ch::MetricTimeValue;
use crate::features::{is_feature_enabled, Feature};
use crate::semantic_search::semantic_search_grpc::DateRanges;
use crate::semantic_search::SemanticSearch;
use crate::{
Expand Down Expand Up @@ -122,10 +120,6 @@ pub async fn get_traces_metrics(
past_hours: "all".to_string(),
}));

if !is_feature_enabled(Feature::FullBuild) {
return Ok(HttpResponse::Ok().json(Vec::<MetricTimeValue<f64>>::new()));
}

match defaulted_range {
DateRange::Relative(interval) => {
if interval.past_hours == "all" {
Expand Down
3 changes: 0 additions & 3 deletions app-server/src/traces/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ async fn inner_process_queue_spans<T: Storage + ?Sized>(
clickhouse: clickhouse::Client,
storage: Arc<T>,
) {
if !is_feature_enabled(Feature::FullBuild) {
return;
}
// Safe to unwrap because we checked is_feature_enabled above
let channel = rabbitmq_connection.unwrap().create_channel().await.unwrap();

Expand Down
4 changes: 4 additions & 0 deletions app-server/src/traces/grpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@ pub struct ProcessTracesService {
db: Arc<DB>,
cache: Arc<Cache>,
rabbitmq_connection: Option<Arc<Connection>>,
clickhouse: clickhouse::Client,
}

impl ProcessTracesService {
pub fn new(
db: Arc<DB>,
cache: Arc<Cache>,
rabbitmq_connection: Option<Arc<Connection>>,
clickhouse: clickhouse::Client,
) -> Self {
Self {
db,
cache,
rabbitmq_connection,
clickhouse,
}
}
}
Expand Down Expand Up @@ -71,6 +74,7 @@ impl TraceService for ProcessTracesService {
project_id,
self.rabbitmq_connection.clone(),
self.db.clone(),
self.clickhouse.clone(),
self.cache.clone(),
)
.await
Expand Down
16 changes: 16 additions & 0 deletions app-server/src/traces/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use uuid::Uuid;
use crate::{
api::v1::traces::RabbitMqSpanMessage,
cache::Cache,
ch::{self, spans::CHSpan},
db::{events::Event, spans::Span, DB},
features::{is_feature_enabled, Feature},
opentelemetry::opentelemetry::proto::collector::trace::v1::{
Expand All @@ -25,6 +26,7 @@ pub async fn push_spans_to_queue(
project_id: Uuid,
rabbitmq_connection: Option<Arc<Connection>>,
db: Arc<DB>,
clickhouse: clickhouse::Client,
cache: Arc<Cache>,
) -> Result<ExportTraceServiceResponse> {
if !is_feature_enabled(Feature::FullBuild) {
Expand Down Expand Up @@ -54,6 +56,20 @@ pub async fn push_spans_to_queue(
e
);
}

let ch_span = CHSpan::from_db_span(&span, span_usage, project_id);

let insert_span_res =
ch::spans::insert_span(clickhouse.clone(), &ch_span).await;

if let Err(e) = insert_span_res {
log::error!(
"Failed to insert span into Clickhouse. span_id [{}], project_id [{}]: {:?}",
span.span_id,
project_id,
e
);
}
}
}
}
Expand Down
Loading

0 comments on commit 2205bea

Please sign in to comment.