Skip to content

Commit

Permalink
feat(cost-model): introduce attributes & stats methods in ORM (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
xx01cyx authored Nov 16, 2024
1 parent fe0041a commit 986cf00
Show file tree
Hide file tree
Showing 11 changed files with 445 additions and 128 deletions.
30 changes: 30 additions & 0 deletions optd-cost-model/src/common/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,33 @@ impl Display for EpochId {
write!(f, "Epoch#{}", self.0)
}
}

impl From<GroupId> for i32 {
fn from(id: GroupId) -> i32 {
id.0 as i32
}
}

impl From<ExprId> for i32 {
fn from(id: ExprId) -> i32 {
id.0 as i32
}
}

impl From<TableId> for i32 {
fn from(id: TableId) -> i32 {
id.0 as i32
}
}

impl From<AttrId> for i32 {
fn from(id: AttrId) -> i32 {
id.0 as i32
}
}

impl From<EpochId> for i32 {
fn from(id: EpochId) -> i32 {
id.0 as i32
}
}
22 changes: 20 additions & 2 deletions optd-cost-model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use common::{
nodes::{ArcPredicateNode, PhysicalNodeType},
types::{AttrId, EpochId, ExprId, GroupId, TableId},
};
use optd_persistent::cost_model::interface::{Stat, StatType};
use optd_persistent::{
cost_model::interface::{Stat, StatType},
BackendError,
};

pub mod common;
pub mod cost;
Expand Down Expand Up @@ -32,10 +35,25 @@ pub struct EstimatedStatistic(pub u64);

pub type CostModelResult<T> = Result<T, CostModelError>;

#[derive(Debug)]
pub enum SemanticError {
// TODO: Add more error types
UnknownStatisticType,
VersionedStatisticNotFound,
AttributeNotFound(TableId, i32), // (table_id, attribute_base_index)
}

#[derive(Debug)]
pub enum CostModelError {
// TODO: Add more error types
ORMError,
ORMError(BackendError),
SemanticError(SemanticError),
}

impl From<BackendError> for CostModelError {
fn from(err: BackendError) -> Self {
CostModelError::ORMError(err)
}
}

pub trait CostModel: 'static + Send + Sync {
Expand Down
4 changes: 2 additions & 2 deletions optd-cost-model/src/stats/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};

/// The Counter structure to track exact frequencies of fixed elements.
#[serde_with::serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[derive(Default, Serialize, Deserialize, Debug)]
pub struct Counter<T: PartialEq + Eq + Hash + Clone + Serialize + DeserializeOwned> {
#[serde_as(as = "HashMap<serde_with::json::JsonString, _>")]
counts: HashMap<T, i32>, // The exact counts of an element T.
Expand All @@ -33,7 +33,7 @@ where
}

// Inserts an element in the Counter if it is being tracked.
pub fn insert_element(&mut self, elem: T, occ: i32) {
fn insert_element(&mut self, elem: T, occ: i32) {
if let Some(frequency) = self.counts.get_mut(&elem) {
*frequency += occ;
}
Expand Down
60 changes: 57 additions & 3 deletions optd-cost-model/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl MostCommonValues {
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
pub enum Distribution {
TDigest(tdigest::TDigest<Value>),
// Add more types here...
Expand Down Expand Up @@ -116,8 +117,61 @@ impl AttributeCombValueStats {
}
}

impl From<serde_json::Value> for AttributeCombValueStats {
fn from(value: serde_json::Value) -> Self {
serde_json::from_value(value).unwrap()
#[cfg(test)]
mod tests {
use super::{Counter, MostCommonValues};
use crate::{common::values::Value, stats::AttributeCombValue};
use serde_json::json;

#[test]
fn test_most_common_values() {
let elem1 = vec![Some(Value::Int32(1))];
let elem2 = vec![Some(Value::Int32(2))];
let mut counter = Counter::new(&[elem1.clone(), elem2.clone()]);

let elems = vec![elem2.clone(), elem1.clone(), elem2.clone(), elem2.clone()];
counter.aggregate(&elems);

let mcvs = MostCommonValues::Counter(counter);
assert_eq!(mcvs.freq(&elem1), Some(0.25));
assert_eq!(mcvs.freq(&elem2), Some(0.75));
assert_eq!(mcvs.total_freq(), 1.0);

let elem1_cloned = elem1.clone();
let pred1 = Box::new(move |x: &AttributeCombValue| x == &elem1_cloned);
let pred2 = Box::new(move |x: &AttributeCombValue| x != &elem1);
assert_eq!(mcvs.freq_over_pred(pred1), 0.25);
assert_eq!(mcvs.freq_over_pred(pred2), 0.75);

assert_eq!(mcvs.cnt(), 2);
}

#[test]
fn test_most_common_values_serde() {
let elem1 = vec![Some(Value::Int32(1))];
let elem2 = vec![Some(Value::Int32(2))];
let mut counter = Counter::new(&[elem1.clone(), elem2.clone()]);

let elems = vec![elem2.clone(), elem1.clone(), elem2.clone(), elem2.clone()];
counter.aggregate(&elems);

let mcvs = MostCommonValues::Counter(counter);
let serialized = serde_json::to_value(&mcvs).unwrap();
println!("serialized: {:?}", serialized);

let deserialized: MostCommonValues = serde_json::from_value(serialized).unwrap();
assert_eq!(mcvs.freq(&elem1), Some(0.25));
assert_eq!(mcvs.freq(&elem2), Some(0.75));
assert_eq!(mcvs.total_freq(), 1.0);

let elem1_cloned = elem1.clone();
let pred1 = Box::new(move |x: &AttributeCombValue| x == &elem1_cloned);
let pred2 = Box::new(move |x: &AttributeCombValue| x != &elem1);
assert_eq!(mcvs.freq_over_pred(pred1), 0.25);
assert_eq!(mcvs.freq_over_pred(pred2), 0.75);

assert_eq!(mcvs.cnt(), 2);
}

// TODO: Add tests for Distribution
}
119 changes: 117 additions & 2 deletions optd-cost-model/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
#![allow(unused_variables)]
use std::sync::Arc;

use optd_persistent::CostModelStorageLayer;
use optd_persistent::{
cost_model::interface::{Attr, StatType},
CostModelStorageLayer,
};

use crate::{
common::types::TableId,
stats::{counter::Counter, AttributeCombValueStats, Distribution, MostCommonValues},
CostModelResult,
};

/// TODO: documentation
pub struct CostModelStorageManager<S: CostModelStorageLayer> {
Expand All @@ -9,8 +19,113 @@ pub struct CostModelStorageManager<S: CostModelStorageLayer> {
}

impl<S: CostModelStorageLayer> CostModelStorageManager<S> {
/// TODO: documentation
pub fn new(backend_manager: Arc<S>) -> Self {
Self { backend_manager }
}

/// Gets the attribute information for a given table and attribute base index.
///
/// TODO: if we have memory cache,
/// we should add the reference. (&Attr)
pub async fn get_attribute_info(
&self,
table_id: TableId,
attr_base_index: i32,
) -> CostModelResult<Option<Attr>> {
Ok(self
.backend_manager
.get_attribute(table_id.into(), attr_base_index)
.await?)
}

/// Gets the latest statistics for a given table.
///
/// TODO: Currently, in `AttributeCombValueStats`, only `Distribution` is optional.
/// This poses a question about the behavior of the system if there is no corresponding
/// `MostCommonValues`, `ndistinct`, or other statistics. We should have a clear
/// specification about the behavior of the system in the presence of missing statistics.
///
/// TODO: if we have memory cache,
/// we should add the reference. (&AttributeCombValueStats)
///
/// TODO: Shall we pass in an epoch here to make sure that the statistics are from the same
/// epoch?
pub async fn get_attributes_comb_statistics(
&self,
table_id: TableId,
attr_base_indices: &[i32],
) -> CostModelResult<Option<AttributeCombValueStats>> {
let dist: Option<Distribution> = self
.backend_manager
.get_stats_for_attr_indices_based(
table_id.into(),
attr_base_indices.to_vec(),
StatType::Distribution,
None,
)
.await?
.map(|json| serde_json::from_value(json).unwrap());

let mcvs = self
.backend_manager
.get_stats_for_attr_indices_based(
table_id.into(),
attr_base_indices.to_vec(),
StatType::MostCommonValues,
None,
)
.await?
.map(|json| serde_json::from_value(json).unwrap())
.unwrap_or_else(|| MostCommonValues::Counter(Counter::default()));

let ndistinct = self
.backend_manager
.get_stats_for_attr_indices_based(
table_id.into(),
attr_base_indices.to_vec(),
StatType::Cardinality,
None,
)
.await?
.map(|json| serde_json::from_value(json).unwrap())
.unwrap_or(0);

let table_row_count = self
.backend_manager
.get_stats_for_attr_indices_based(
table_id.into(),
attr_base_indices.to_vec(),
StatType::TableRowCount,
None,
)
.await?
.map(|json| serde_json::from_value(json).unwrap())
.unwrap_or(0);
let non_null_count = self
.backend_manager
.get_stats_for_attr_indices_based(
table_id.into(),
attr_base_indices.to_vec(),
StatType::NonNullCount,
None,
)
.await?
.map(|json| serde_json::from_value(json).unwrap())
.unwrap_or(0);

// FIXME: Only minimal checks for invalid values is conducted here. We should have
// much clear specification about the behavior of the system in the presence of
// invalid statistics.
let null_frac = if table_row_count == 0 {
0.0
} else {
1.0 - (non_null_count as f64 / table_row_count as f64)
};

Ok(Some(AttributeCombValueStats::new(
mcvs, ndistinct, null_frac, dist,
)))
}
}

// TODO: add some tests, especially cover the error cases.
4 changes: 2 additions & 2 deletions optd-persistent/src/bin/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn init_all_tables() -> Result<(), sea_orm::error::DbErr> {
name: Set("user_id".to_owned()),
compression_method: Set("N".to_owned()),
variant_tag: Set(AttrType::Integer as i32),
base_attribute_number: Set(1),
base_attribute_number: Set(0),
is_not_null: Set(true),
};
let attribute2 = attribute::ActiveModel {
Expand All @@ -72,7 +72,7 @@ async fn init_all_tables() -> Result<(), sea_orm::error::DbErr> {
name: Set("username".to_owned()),
compression_method: Set("N".to_owned()),
variant_tag: Set(AttrType::Varchar as i32),
base_attribute_number: Set(2),
base_attribute_number: Set(1),
is_not_null: Set(true),
};
attribute::Entity::insert(attribute1)
Expand Down
4 changes: 2 additions & 2 deletions optd-persistent/src/cost_model/catalog/mock_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,15 @@ impl MockCatalog {
let statistics: Vec<MockStatistic> = vec![
MockStatistic {
id: 1,
stat_type: StatType::NotNullCount as i32,
stat_type: StatType::NonNullCount as i32,
stat_value: json!(100),
attr_ids: vec![1],
table_id: None,
name: "CountAttr1".to_string(),
},
MockStatistic {
id: 2,
stat_type: StatType::NotNullCount as i32,
stat_type: StatType::NonNullCount as i32,
stat_value: json!(200),
attr_ids: vec![2],
table_id: None,
Expand Down
Loading

0 comments on commit 986cf00

Please sign in to comment.