Skip to content

Commit

Permalink
feat: make probabilistic optimizations optional and tunable in the YA…
Browse files Browse the repository at this point in the history
…ML config (#1912)

Probabilistic optimization sacrifices accuracy in order to reduce memory consumption. In certain parts of the pipeline, a Bloom Filter is used ([set_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/product/set/set_processor.rs#L20)), while in other parts, hash tables that store only the hash of the keys instead of the full keys are used ([aggregation_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/aggregation/processor.rs#L59) and [join_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/product/join/operator.rs#L57-L58)).

This commit makes these optimizations disabled by default and offers user-configurable flags to enable each of these optimizations separately.

This is an example of how to turn on probabilistic optimizations for each processor in the Dozer configuration.

```
flags:
  enable_probabilistic_optimizations:
    in_sets: true # enable probabilistic optimizations in set operations (UNION, EXCEPT, INTERSECT); Default: false
    in_joins: true # enable probabilistic optimizations in JOIN operations; Default: false
    in_aggregations: true # enable probabilistic optimizations in aggregations (SUM, COUNT, MIN, etc.); Default: false
```
  • Loading branch information
abcpro1 authored Aug 25, 2023
1 parent 058f1ae commit f5b6c7f
Show file tree
Hide file tree
Showing 30 changed files with 536 additions and 190 deletions.
2 changes: 1 addition & 1 deletion dozer-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub use dozer_ingestion::{
pub use dozer_sql::pipeline::builder::QueryContext;
pub use ui_helper::config_to_ui_dag;
pub fn wrapped_statement_to_pipeline(sql: &str) -> Result<QueryContext, PipelineError> {
let mut pipeline = AppPipeline::new();
let mut pipeline = AppPipeline::new_with_default_flags();
statement_to_pipeline(sql, &mut pipeline, None)
}

Expand Down
10 changes: 8 additions & 2 deletions dozer-cli/src/live/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use dozer_types::{
models::{
api_config::{ApiConfig, AppGrpcOptions},
api_endpoint::ApiEndpoint,
flags::Flags,
},
};
use tokio::{runtime::Runtime, sync::RwLock};
Expand Down Expand Up @@ -292,6 +293,7 @@ async fn create_dag(
dozer.config.sql.as_deref(),
endpoint_and_logs,
MultiProgress::new(),
Flags::default(),
);
let (_shutdown_sender, shutdown_receiver) = shutdown::new(&dozer.runtime);
builder.build(&dozer.runtime, shutdown_receiver).await
Expand Down Expand Up @@ -324,8 +326,12 @@ fn get_dozer_run_instance(
) -> Result<SimpleOrchestrator, LiveError> {
match req.request {
Some(dozer_types::grpc_types::live::run_request::Request::Sql(req)) => {
let context = statement_to_pipeline(&req.sql, &mut AppPipeline::new(), None)
.map_err(LiveError::PipelineError)?;
let context = statement_to_pipeline(
&req.sql,
&mut AppPipeline::new(dozer.config.flags.clone().unwrap_or_default().into()),
None,
)
.map_err(LiveError::PipelineError)?;

//overwrite sql
dozer.config.sql = Some(req.sql);
Expand Down
8 changes: 6 additions & 2 deletions dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use dozer_types::indicatif::MultiProgress;
use dozer_types::log::debug;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::connection::Connection;
use dozer_types::models::flags::Flags;
use dozer_types::models::source::Source;
use dozer_types::parking_lot::Mutex;
use std::hash::Hash;
Expand Down Expand Up @@ -57,6 +58,7 @@ pub struct PipelineBuilder<'a> {
/// `ApiEndpoint` and its log.
endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>,
progress: MultiProgress,
flags: Flags,
}

impl<'a> PipelineBuilder<'a> {
Expand All @@ -66,13 +68,15 @@ impl<'a> PipelineBuilder<'a> {
sql: Option<&'a str>,
endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>,
progress: MultiProgress,
flags: Flags,
) -> Self {
Self {
connections,
sources,
sql,
endpoint_and_logs,
progress,
flags,
}
}

Expand Down Expand Up @@ -148,7 +152,7 @@ impl<'a> PipelineBuilder<'a> {
let mut original_sources = vec![];

let mut query_ctx = None;
let mut pipeline = AppPipeline::new();
let mut pipeline = AppPipeline::new((&self.flags).into());

let mut transformed_sources = vec![];

Expand Down Expand Up @@ -205,7 +209,7 @@ impl<'a> PipelineBuilder<'a> {

let mut pipelines: Vec<AppPipeline<SchemaSQLContext>> = vec![];

let mut pipeline = AppPipeline::new();
let mut pipeline = AppPipeline::new(self.flags.into());

let mut available_output_tables: HashMap<String, OutputTableInfo> = HashMap::new();

Expand Down
2 changes: 2 additions & 0 deletions dozer-cli/src/pipeline/tests/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use dozer_types::models::config::Config;

use dozer_types::indicatif::MultiProgress;
use dozer_types::models::connection::{Connection, ConnectionConfig};
use dozer_types::models::flags::Flags;
use dozer_types::models::source::Source;

fn get_default_config() -> Config {
Expand Down Expand Up @@ -66,6 +67,7 @@ fn load_multi_sources() {
.map(|endpoint| (endpoint, None))
.collect(),
MultiProgress::new(),
Flags::default(),
);

let runtime = tokio::runtime::Builder::new_current_thread()
Expand Down
3 changes: 3 additions & 0 deletions dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use dozer_cache::dozer_log::replication::Log;
use dozer_core::checkpoint::{CheckpointFactory, CheckpointFactoryOptions};
use dozer_core::processor_record::ProcessorRecordStore;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::flags::Flags;
use dozer_types::parking_lot::Mutex;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -84,6 +85,7 @@ impl<'a> Executor<'a> {
runtime: &Arc<Runtime>,
executor_options: ExecutorOptions,
shutdown: ShutdownReceiver,
flags: Flags,
) -> Result<DagExecutor, OrchestrationError> {
let builder = PipelineBuilder::new(
self.connections,
Expand All @@ -94,6 +96,7 @@ impl<'a> Executor<'a> {
.map(|(endpoint, log)| (endpoint.clone(), Some(log.log.clone())))
.collect(),
self.multi_pb.clone(),
flags,
);

let dag = builder.build(runtime, shutdown).await?;
Expand Down
4 changes: 3 additions & 1 deletion dozer-cli/src/simple/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ impl SimpleOrchestrator {
&self.runtime,
get_executor_options(&self.config),
shutdown.clone(),
self.config.flags.clone().unwrap_or_default(),
))?;

let app_grpc_config = get_app_grpc_config(&self.config);
Expand Down Expand Up @@ -289,6 +290,7 @@ impl SimpleOrchestrator {
self.config.sql.as_deref(),
endpoint_and_logs,
self.multi_pb.clone(),
self.config.flags.clone().unwrap_or_default(),
);
let dag = self
.runtime
Expand Down Expand Up @@ -374,7 +376,7 @@ impl SimpleOrchestrator {
}

pub fn validate_sql(sql: String) -> Result<(), PipelineError> {
statement_to_pipeline(&sql, &mut AppPipeline::new(), None).map_or_else(
statement_to_pipeline(&sql, &mut AppPipeline::new_with_default_flags(), None).map_or_else(
|e| {
error!(
"[sql][{}] Transforms validation error: {}",
Expand Down
12 changes: 9 additions & 3 deletions dozer-cli/src/ui_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use dozer_core::{
use dozer_sql::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
use dozer_types::{
grpc_types::cloud::{QueryEdge, QueryGraph, QueryNode, QueryNodeType},
models::{config::Config, connection::Connection, source::Source},
models::{config::Config, connection::Connection, flags::Flags, source::Source},
};

use crate::{errors::OrchestrationError, pipeline::source_builder::SourceBuilder};
Expand Down Expand Up @@ -53,8 +53,9 @@ fn prepare_pipeline_dag(
sql: String,
connection_sources: HashMap<Connection, Vec<Source>>,
connection_source_ports: HashMap<(&str, &str), u16>,
flags: Flags,
) -> Result<Dag<SchemaSQLContext>, OrchestrationError> {
let mut pipeline = AppPipeline::new();
let mut pipeline = AppPipeline::new(flags.into());
let mut asm: AppSourceManager<dozer_sql::pipeline::builder::SchemaSQLContext> =
AppSourceManager::new();
connection_sources.iter().for_each(|cs| {
Expand Down Expand Up @@ -169,6 +170,11 @@ pub fn config_to_ui_dag(config: Config) -> Result<QueryGraph, OrchestrationError
}
let source_builder = SourceBuilder::new(connection_sources.clone(), None);
let connection_source_ports = source_builder.get_ports();
let sql_dag = prepare_pipeline_dag(sql, connection_sources, connection_source_ports)?;
let sql_dag = prepare_pipeline_dag(
sql,
connection_sources,
connection_source_ports,
config.flags.unwrap_or_default(),
)?;
Ok(transform_to_ui_graph(&sql_dag))
}
47 changes: 40 additions & 7 deletions dozer-core/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use dozer_types::models::flags::{EnableProbabilisticOptimizations, Flags};
use dozer_types::node::NodeHandle;

use crate::appsource::{self, AppSourceManager};
Expand Down Expand Up @@ -28,12 +29,7 @@ pub struct AppPipeline<T> {
processors: Vec<(NodeHandle, Box<dyn ProcessorFactory<T>>)>,
sinks: Vec<(NodeHandle, Box<dyn SinkFactory<T>>)>,
entry_points: Vec<(NodeHandle, PipelineEntryPoint)>,
}

impl<T> Default for AppPipeline<T> {
fn default() -> Self {
Self::new()
}
flags: PipelineFlags,
}

impl<T> AppPipeline<T> {
Expand Down Expand Up @@ -79,21 +75,58 @@ impl<T> AppPipeline<T> {
self.edges.push(edge);
}

pub fn new() -> Self {
pub fn new(flags: PipelineFlags) -> Self {
Self {
processors: Vec::new(),
sinks: Vec::new(),
edges: Vec::new(),
entry_points: Vec::new(),
flags,
}
}

pub fn new_with_default_flags() -> Self {
Self::new(Default::default())
}

pub fn get_entry_points_sources_names(&self) -> Vec<String> {
self.entry_points
.iter()
.map(|(_, p)| p.source_name().to_string())
.collect()
}

pub fn flags(&self) -> &PipelineFlags {
&self.flags
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PipelineFlags {
pub enable_probabilistic_optimizations: EnableProbabilisticOptimizations,
}

impl From<&Flags> for PipelineFlags {
fn from(flags: &Flags) -> Self {
Self {
enable_probabilistic_optimizations: flags
.enable_probabilistic_optimizations
.clone()
.unwrap_or_default(),
}
}
}

impl From<Flags> for PipelineFlags {
fn from(flags: Flags) -> Self {
Self::from(&flags)
}
}

impl Default for PipelineFlags {
fn default() -> Self {
Flags::default().into()
}
}

pub struct App<T> {
Expand Down
4 changes: 2 additions & 2 deletions dozer-core/src/tests/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async fn test_app_dag() {

let mut app = App::new(asm);

let mut p1 = AppPipeline::new();
let mut p1 = AppPipeline::new_with_default_flags();
p1.add_processor(
Box::new(NoopJoinProcessorFactory {}),
"join",
Expand All @@ -197,7 +197,7 @@ async fn test_app_dag() {

app.add_pipeline(p1);

let mut p2 = AppPipeline::new();
let mut p2 = AppPipeline::new_with_default_flags();
p2.add_processor(
Box::new(NoopJoinProcessorFactory {}),
"join",
Expand Down
10 changes: 9 additions & 1 deletion dozer-sql/src/pipeline/aggregation/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@ pub struct AggregationProcessorFactory {
id: String,
projection: Select,
_stateful: bool,
enable_probabilistic_optimizations: bool,
}

impl AggregationProcessorFactory {
pub fn new(id: String, projection: Select, stateful: bool) -> Self {
pub fn new(
id: String,
projection: Select,
stateful: bool,
enable_probabilistic_optimizations: bool,
) -> Self {
Self {
id,
projection,
_stateful: stateful,
enable_probabilistic_optimizations,
}
}

Expand Down Expand Up @@ -90,6 +97,7 @@ impl ProcessorFactory<SchemaSQLContext> for AggregationProcessorFactory {
planner.having,
input_schema.clone(),
planner.post_aggregation_schema,
self.enable_probabilistic_optimizations,
)?)
};
Ok(processor)
Expand Down
Loading

0 comments on commit f5b6c7f

Please sign in to comment.