diff --git a/src/query/ee/tests/it/aggregating_index/index_refresh.rs b/src/query/ee/tests/it/aggregating_index/index_refresh.rs index a8d5a8ca325a..59faef978667 100644 --- a/src/query/ee/tests/it/aggregating_index/index_refresh.rs +++ b/src/query/ee/tests/it/aggregating_index/index_refresh.rs @@ -307,12 +307,131 @@ async fn test_refresh_agg_index_with_limit() -> Result<()> { #[tokio::test(flavor = "multi_thread")] async fn test_sync_agg_index() -> Result<()> { + test_sync_agg_index_after_update().await?; test_sync_agg_index_after_insert().await?; test_sync_agg_index_after_copy_into().await?; Ok(()) } +async fn test_sync_agg_index_after_update() -> Result<()> { + let (_guard, ctx, root) = create_ee_query_context(None).await.unwrap(); + ctx.get_settings() + .set_enable_refresh_aggregating_index_after_write(true)?; + let fixture = TestFixture::new_with_ctx(_guard, ctx).await; + let ctx = fixture.ctx(); + // Create table + execute_sql( + ctx.clone(), + "CREATE TABLE t0 (a int, b int, c int) storage_format = 'parquet'", + ) + .await?; + + // Create agg index `index0` + let index_name = "index0"; + + let index_id0 = create_index( + ctx.clone(), + index_name, + "SELECT b, SUM(a) from t0 WHERE c > 1 GROUP BY b", + true, + ) + .await?; + + // Insert data + execute_sql( + ctx.clone(), + "INSERT INTO t0 VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5)", + ) + .await?; + + let block_path = find_block_path(&root)?.unwrap(); + let blocks = collect_file_names(&block_path)?; + + // Get aggregating index files + let agg_index_path_0 = find_agg_index_path(&root, index_id0)?.unwrap(); + let indexes_0 = collect_file_names(&agg_index_path_0)?; + + assert_eq!(blocks, indexes_0); + + // Check aggregating index_0 is correct. + { + let res = execute_sql( + ctx.clone(), + "SELECT b, SUM_STATE(a) from t0 WHERE c > 1 GROUP BY b", + ) + .await?; + let data_blocks: Vec = res.try_collect().await?; + + let agg_res = execute_sql( + ctx.clone(), + &format!( + "SELECT * FROM 'fs://{}'", + agg_index_path_0.join(&indexes_0[0]).to_str().unwrap() + ), + ) + .await?; + let agg_data_blocks: Vec = agg_res.try_collect().await?; + + assert_two_blocks_sorted_eq_with_name( + "test_sync_agg_index_after_update", + &data_blocks, + &agg_data_blocks, + ); + } + + // Update + execute_sql(ctx.clone(), "UPDATE t0 SET c = 2 WHERE b = 2").await?; + + let first_block = blocks[0].clone(); + let first_agg_index = indexes_0[0].clone(); + + let blocks = collect_file_names(&block_path)?; + + // check index0 + let indexes_0 = collect_file_names(&agg_index_path_0)?; + assert_eq!(blocks, indexes_0); + + // Check aggregating index_0 is correct after update. + { + let updated_block = blocks + .iter() + .find(|b| !b.eq_ignore_ascii_case(&first_block)) + .unwrap(); + let updated_agg_index = indexes_0 + .iter() + .find(|i| !i.eq_ignore_ascii_case(&first_agg_index)) + .unwrap(); + let res = execute_sql( + ctx.clone(), + &format!( + "SELECT b, SUM_STATE(a) from 'fs://{}' WHERE c > 1 GROUP BY b", + block_path.join(updated_block).to_str().unwrap() + ), + ) + .await?; + let data_blocks: Vec = res.try_collect().await?; + + let agg_res = execute_sql( + ctx.clone(), + &format!( + "SELECT * FROM 'fs://{}'", + agg_index_path_0.join(updated_agg_index).to_str().unwrap() + ), + ) + .await?; + let agg_data_blocks: Vec = agg_res.try_collect().await?; + + assert_two_blocks_sorted_eq_with_name( + "test_sync_agg_index_after_update", + &data_blocks, + &agg_data_blocks, + ); + } + + Ok(()) +} + async fn test_sync_agg_index_after_insert() -> Result<()> { let (_guard, ctx, root) = create_ee_query_context(None).await.unwrap(); ctx.get_settings() diff --git a/src/query/service/src/interpreters/interpreter_update.rs b/src/query/service/src/interpreters/interpreter_update.rs index 6644180f553b..062579340f2e 100644 --- a/src/query/service/src/interpreters/interpreter_update.rs +++ b/src/query/service/src/interpreters/interpreter_update.rs @@ -31,6 +31,8 @@ use log::debug; use table_lock::TableLockHandlerWrapper; use crate::interpreters::common::check_deduplicate_label; +use crate::interpreters::common::hook_refresh_agg_index; +use crate::interpreters::common::RefreshAggIndexDesc; use crate::interpreters::interpreter_delete::replace_subquery; use crate::interpreters::interpreter_delete::subquery_filter; use crate::interpreters::Interpreter; @@ -180,6 +182,22 @@ impl Interpreter for UpdateInterpreter { ) .await?; + // generate sync aggregating indexes if `enable_refresh_aggregating_index_after_write` on. + { + let refresh_agg_index_desc = RefreshAggIndexDesc { + catalog: catalog_name.to_string(), + database: db_name.to_string(), + table: tbl_name.to_string(), + }; + + hook_refresh_agg_index( + self.ctx.clone(), + &mut build_res.main_pipeline, + refresh_agg_index_desc, + ) + .await?; + } + if build_res.main_pipeline.is_empty() { heartbeat.shutdown().await?; } else { diff --git a/tests/sqllogictests/suites/ee/02_ee_aggregating_index/02_0002_sync_agg_index_base b/tests/sqllogictests/suites/ee/02_ee_aggregating_index/02_0002_sync_agg_index_base index 87fcb91706e7..2e4500d2db4b 100644 --- a/tests/sqllogictests/suites/ee/02_ee_aggregating_index/02_0002_sync_agg_index_base +++ b/tests/sqllogictests/suites/ee/02_ee_aggregating_index/02_0002_sync_agg_index_base @@ -23,12 +23,14 @@ CREATE SYNC AGGREGATING INDEX testi AS select b, sum(a) from t where c > 1 group statement ok INSERT INTO t VALUES (1,1,4), (1,2,1), (1,2,4), (2,2,5), (1,3,3) +statement ok +UPDATE t SET C = 1 WHERE b = 1 + # query: eval-agg-eval-scan, index: eval-agg-eval-scan query II SELECT b, SUM(a) from t WHERE c > 1 GROUP BY b ORDER BY b ---- -1 1 2 3 3 1