Skip to content

Commit

Permalink
feat: generate sync agg indexes after update (databendlabs#13353)
Browse files Browse the repository at this point in the history
* generate sync agg indexes after update

* add sqllogic test
  • Loading branch information
ariesdevil authored Oct 20, 2023
1 parent ba277ad commit a057943
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 1 deletion.
119 changes: 119 additions & 0 deletions src/query/ee/tests/it/aggregating_index/index_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,131 @@ async fn test_refresh_agg_index_with_limit() -> Result<()> {

#[tokio::test(flavor = "multi_thread")]
async fn test_sync_agg_index() -> Result<()> {
test_sync_agg_index_after_update().await?;
test_sync_agg_index_after_insert().await?;
test_sync_agg_index_after_copy_into().await?;

Ok(())
}

async fn test_sync_agg_index_after_update() -> Result<()> {
let (_guard, ctx, root) = create_ee_query_context(None).await.unwrap();
ctx.get_settings()
.set_enable_refresh_aggregating_index_after_write(true)?;
let fixture = TestFixture::new_with_ctx(_guard, ctx).await;
let ctx = fixture.ctx();
// Create table
execute_sql(
ctx.clone(),
"CREATE TABLE t0 (a int, b int, c int) storage_format = 'parquet'",
)
.await?;

// Create agg index `index0`
let index_name = "index0";

let index_id0 = create_index(
ctx.clone(),
index_name,
"SELECT b, SUM(a) from t0 WHERE c > 1 GROUP BY b",
true,
)
.await?;

// Insert data
execute_sql(
ctx.clone(),
"INSERT INTO t0 VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)",
)
.await?;

let block_path = find_block_path(&root)?.unwrap();
let blocks = collect_file_names(&block_path)?;

// Get aggregating index files
let agg_index_path_0 = find_agg_index_path(&root, index_id0)?.unwrap();
let indexes_0 = collect_file_names(&agg_index_path_0)?;

assert_eq!(blocks, indexes_0);

// Check aggregating index_0 is correct.
{
let res = execute_sql(
ctx.clone(),
"SELECT b, SUM_STATE(a) from t0 WHERE c > 1 GROUP BY b",
)
.await?;
let data_blocks: Vec<DataBlock> = res.try_collect().await?;

let agg_res = execute_sql(
ctx.clone(),
&format!(
"SELECT * FROM 'fs://{}'",
agg_index_path_0.join(&indexes_0[0]).to_str().unwrap()
),
)
.await?;
let agg_data_blocks: Vec<DataBlock> = agg_res.try_collect().await?;

assert_two_blocks_sorted_eq_with_name(
"test_sync_agg_index_after_update",
&data_blocks,
&agg_data_blocks,
);
}

// Update
execute_sql(ctx.clone(), "UPDATE t0 SET c = 2 WHERE b = 2").await?;

let first_block = blocks[0].clone();
let first_agg_index = indexes_0[0].clone();

let blocks = collect_file_names(&block_path)?;

// check index0
let indexes_0 = collect_file_names(&agg_index_path_0)?;
assert_eq!(blocks, indexes_0);

// Check aggregating index_0 is correct after update.
{
let updated_block = blocks
.iter()
.find(|b| !b.eq_ignore_ascii_case(&first_block))
.unwrap();
let updated_agg_index = indexes_0
.iter()
.find(|i| !i.eq_ignore_ascii_case(&first_agg_index))
.unwrap();
let res = execute_sql(
ctx.clone(),
&format!(
"SELECT b, SUM_STATE(a) from 'fs://{}' WHERE c > 1 GROUP BY b",
block_path.join(updated_block).to_str().unwrap()
),
)
.await?;
let data_blocks: Vec<DataBlock> = res.try_collect().await?;

let agg_res = execute_sql(
ctx.clone(),
&format!(
"SELECT * FROM 'fs://{}'",
agg_index_path_0.join(updated_agg_index).to_str().unwrap()
),
)
.await?;
let agg_data_blocks: Vec<DataBlock> = agg_res.try_collect().await?;

assert_two_blocks_sorted_eq_with_name(
"test_sync_agg_index_after_update",
&data_blocks,
&agg_data_blocks,
);
}

Ok(())
}

async fn test_sync_agg_index_after_insert() -> Result<()> {
let (_guard, ctx, root) = create_ee_query_context(None).await.unwrap();
ctx.get_settings()
Expand Down
18 changes: 18 additions & 0 deletions src/query/service/src/interpreters/interpreter_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use log::debug;
use table_lock::TableLockHandlerWrapper;

use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::common::hook_refresh_agg_index;
use crate::interpreters::common::RefreshAggIndexDesc;
use crate::interpreters::interpreter_delete::replace_subquery;
use crate::interpreters::interpreter_delete::subquery_filter;
use crate::interpreters::Interpreter;
Expand Down Expand Up @@ -180,6 +182,22 @@ impl Interpreter for UpdateInterpreter {
)
.await?;

// generate sync aggregating indexes if `enable_refresh_aggregating_index_after_write` on.
{
let refresh_agg_index_desc = RefreshAggIndexDesc {
catalog: catalog_name.to_string(),
database: db_name.to_string(),
table: tbl_name.to_string(),
};

hook_refresh_agg_index(
self.ctx.clone(),
&mut build_res.main_pipeline,
refresh_agg_index_desc,
)
.await?;
}

if build_res.main_pipeline.is_empty() {
heartbeat.shutdown().await?;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ CREATE SYNC AGGREGATING INDEX testi AS select b, sum(a) from t where c > 1 group
statement ok
INSERT INTO t VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5), (1,3,3)

statement ok
UPDATE t SET C = 1 WHERE b = 1

# query: eval-agg-eval-scan, index: eval-agg-eval-scan

query II
SELECT b, SUM(a) from t WHERE c > 1 GROUP BY b ORDER BY b
----
1 1
2 3
3 1

Expand Down

0 comments on commit a057943

Please sign in to comment.