From 37d96fdc3af6ed24bf88ea877c3c8bf3d9ff1946 Mon Sep 17 00:00:00 2001
From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com>
Date: Sat, 8 Feb 2025 18:13:52 +0800
Subject: [PATCH 1/5] feat(source): support iceberg source with snowflake
catalog (#20421)
---
java/connector-node/risingwave-sink-iceberg/pom.xml | 6 ++++++
src/connector/src/connector_common/iceberg/mod.rs | 2 ++
src/connector/src/sink/iceberg/mod.rs | 3 +++
src/jni_core/src/jvm_runtime.rs | 1 +
4 files changed, 12 insertions(+)
diff --git a/java/connector-node/risingwave-sink-iceberg/pom.xml b/java/connector-node/risingwave-sink-iceberg/pom.xml
index b4d2e87e404b1..7732ad3d62e9d 100644
--- a/java/connector-node/risingwave-sink-iceberg/pom.xml
+++ b/java/connector-node/risingwave-sink-iceberg/pom.xml
@@ -147,5 +147,11 @@
assertj-core
test
+
+
+ org.apache.iceberg
+ iceberg-snowflake
+ 1.7.1
+
diff --git a/src/connector/src/connector_common/iceberg/mod.rs b/src/connector/src/connector_common/iceberg/mod.rs
index 936177668d765..e47732d1ef94f 100644
--- a/src/connector/src/connector_common/iceberg/mod.rs
+++ b/src/connector/src/connector_common/iceberg/mod.rs
@@ -436,6 +436,7 @@ impl IcebergCommon {
}
catalog_type
if catalog_type == "hive"
+ || catalog_type == "snowflake"
|| catalog_type == "jdbc"
|| catalog_type == "rest"
|| catalog_type == "glue" =>
@@ -446,6 +447,7 @@ impl IcebergCommon {
let catalog_impl = match catalog_type {
"hive" => "org.apache.iceberg.hive.HiveCatalog",
"jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
+ "snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
"rest" => "org.apache.iceberg.rest.RESTCatalog",
"glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
_ => unreachable!(),
diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs
index a18287062bd47..4d22887ec9db9 100644
--- a/src/connector/src/sink/iceberg/mod.rs
+++ b/src/connector/src/sink/iceberg/mod.rs
@@ -370,6 +370,9 @@ impl Sink for IcebergSink {
const SINK_NAME: &'static str = ICEBERG_SINK;
async fn validate(&self) -> Result<()> {
+ if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
+ bail!("Snowflake catalog only supports iceberg sources");
+ }
if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
risingwave_common::license::Feature::IcebergSinkWithGlue
.check_available()
diff --git a/src/jni_core/src/jvm_runtime.rs b/src/jni_core/src/jvm_runtime.rs
index 3345ba0c6c0c1..fa896508a3e9a 100644
--- a/src/jni_core/src/jvm_runtime.rs
+++ b/src/jni_core/src/jvm_runtime.rs
@@ -123,6 +123,7 @@ impl JavaVmWrapper {
.version(JNIVersion::V8)
.option("-Dis_embedded_connector=true")
.option(format!("-Djava.class.path={}", class_vec.join(":")))
+ .option("--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED")
.option("-Xms16m")
.option(format!("-Xmx{}", jvm_heap_size));
From 1578865ce611c7e1a5276d9c9c566de8f685304e Mon Sep 17 00:00:00 2001
From: Li0k
Date: Sat, 8 Feb 2025 20:54:33 +0800
Subject: [PATCH 2/5] fix(storage): reset partition_vnode_count after merge
(#20426)
---
.../compaction/compaction_group_schedule.rs | 14 ++++++++++++++
1 file changed, 14 insertions(+)
diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs
index 2dba2909ecd87..01c43ca1660dc 100644
--- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs
+++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs
@@ -293,6 +293,20 @@ impl HummockManager {
);
}
+ // clear `partition_vnode_count` for the hybrid group
+ {
+ if let Err(err) = compaction_groups_txn.update_compaction_config(
+ &[left_group_id],
+ &[MutableConfig::SplitWeightByVnode(0)], // default
+ ) {
+ tracing::error!(
+ error = %err.as_report(),
+ "failed to update compaction config for group-{}",
+ left_group_id
+ );
+ }
+ }
+
new_version_delta.pre_apply();
// remove right_group_id
From e928b30965c87826f811798e2eca45d7981ab6dc Mon Sep 17 00:00:00 2001
From: Bohan Zhang
Date: Sun, 9 Feb 2025 01:43:19 +0800
Subject: [PATCH 3/5] feat: allow pass-by source enumerator check (#20352)
Co-authored-by: tabversion
---
ci/scripts/e2e-source-test.sh | 6 ++
e2e_test/debug_mode_only/debug_splits.slt | 73 ++++++++++++++++++
src/meta/src/stream/source_manager/worker.rs | 78 ++++++++++++++++++--
3 files changed, 151 insertions(+), 6 deletions(-)
create mode 100644 e2e_test/debug_mode_only/debug_splits.slt
diff --git a/ci/scripts/e2e-source-test.sh b/ci/scripts/e2e-source-test.sh
index f516558c5029e..16ed8b426297c 100755
--- a/ci/scripts/e2e-source-test.sh
+++ b/ci/scripts/e2e-source-test.sh
@@ -38,6 +38,12 @@ apt-get -y install jq
echo "--- e2e, inline test"
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_meta=info" \
risedev ci-start ci-inline-source-test
+
+# check if run debug only test
+if [ "$profile" == "ci-dev" ]; then
+ echo "--- Run debug mode only tests"
+ risedev slt './e2e_test/debug_mode_only/debug_splits.slt'
+fi
risedev slt './e2e_test/source_inline/**/*.slt' -j16
risedev slt './e2e_test/source_inline/**/*.slt.serial'
echo "--- Kill cluster"
diff --git a/e2e_test/debug_mode_only/debug_splits.slt b/e2e_test/debug_mode_only/debug_splits.slt
new file mode 100644
index 0000000000000..90c3c16f34f04
--- /dev/null
+++ b/e2e_test/debug_mode_only/debug_splits.slt
@@ -0,0 +1,73 @@
+control substitution on
+
+system ok
+rpk topic delete 'test_debug_splits' || true;
+
+system ok
+rpk topic create test_debug_splits -p 2
+
+system ok
+cat <>,
}
@@ -38,6 +46,8 @@ pub struct ConnectorSourceWorker {
connector_properties: ConnectorProperties,
fail_cnt: u32,
source_is_up: LabelGuardedIntGauge<2>,
+
+ debug_splits: Option>,
}
fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult {
@@ -48,8 +58,24 @@ fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult ConnectorResult {
- let options_with_secret =
- WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
+ let options_with_secret = WithOptionsSecResolved::new(
+ {
+ let mut with_properties = source.with_properties.clone();
+ let _removed = with_properties.remove(DEBUG_SPLITS_KEY);
+
+ #[cfg(not(debug_assertions))]
+ {
+ if _removed.is_some() {
+ return Err(ConnectorError::from(anyhow::anyhow!(
+ "`debug_splits` is not allowed in release mode"
+ )));
+ }
+ }
+
+ with_properties
+ },
+ source.secret_refs.clone(),
+ );
let mut properties = ConnectorProperties::extract(options_with_secret, true)?;
properties.init_from_pb_source(source);
Ok(properties)
@@ -220,6 +246,38 @@ impl ConnectorSourceWorker {
connector_properties,
fail_cnt: 0,
source_is_up,
+ debug_splits: {
+ let debug_splits = source.with_properties.get(DEBUG_SPLITS_KEY);
+ #[cfg(not(debug_assertions))]
+ {
+ if debug_splits.is_some() {
+ return Err(ConnectorError::from(anyhow::anyhow!(
+ "`debug_splits` is not allowed in release mode"
+ ))
+ .into());
+ }
+ None
+ }
+
+ #[cfg(debug_assertions)]
+ {
+ use risingwave_common::types::JsonbVal;
+ if let Some(debug_splits) = debug_splits {
+ let mut splits = Vec::new();
+ let debug_splits_value =
+ jsonbb::serde_json::from_str::(debug_splits)
+ .context("failed to parse split impl")?;
+ for split_impl_value in debug_splits_value.as_array().unwrap() {
+ splits.push(SplitImpl::restore_from_json(JsonbVal::from(
+ split_impl_value.clone(),
+ ))?);
+ }
+ Some(splits)
+ } else {
+ None
+ }
+ }
+ },
})
}
@@ -272,10 +330,18 @@ impl ConnectorSourceWorker {
let source_is_up = |res: i64| {
self.source_is_up.set(res);
};
- let splits = self.enumerator.list_splits().await.inspect_err(|_| {
- source_is_up(0);
- self.fail_cnt += 1;
- })?;
+
+ let splits = {
+ if let Some(debug_splits) = &self.debug_splits {
+ debug_splits.clone()
+ } else {
+ self.enumerator.list_splits().await.inspect_err(|_| {
+ source_is_up(0);
+ self.fail_cnt += 1;
+ })?
+ }
+ };
+
source_is_up(1);
self.fail_cnt = 0;
let mut current_splits = self.current_splits.lock().await;
From 0e5aa83266974d9b45b56120fa05453cca3f5abf Mon Sep 17 00:00:00 2001
From: Dylan
Date: Sun, 9 Feb 2025 18:54:59 +0800
Subject: [PATCH 4/5] feat(iceberg): make iceberg engine a paid feature
(#20324)
---
src/frontend/src/handler/create_table.rs | 3 +++
src/license/src/feature.rs | 1 +
2 files changed, 4 insertions(+)
diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs
index 65a6fa553b7a2..d6eed9f9e4053 100644
--- a/src/frontend/src/handler/create_table.rs
+++ b/src/frontend/src/handler/create_table.rs
@@ -1452,6 +1452,9 @@ pub async fn create_iceberg_engine_table(
constraints: Vec,
table_name: ObjectName,
) -> Result<()> {
+ risingwave_common::license::Feature::IcebergEngine
+ .check_available()
+ .map_err(|e| anyhow::anyhow!(e))?;
// 1. fetch iceberg engine options from the meta node.
// 2. create a hummock table
// 3. create an iceberg sink
diff --git a/src/license/src/feature.rs b/src/license/src/feature.rs
index 0d10d76869b0f..a0040a2c00d2b 100644
--- a/src/license/src/feature.rs
+++ b/src/license/src/feature.rs
@@ -57,6 +57,7 @@ macro_rules! for_all_features {
{ SqlServerCdcSource, Paid, "CDC source connector for Sql Server." },
{ CdcAutoSchemaChange, Paid, "Auto replicate upstream DDL to CDC Table." },
{ IcebergSinkWithGlue, Paid, "Delivering data to Iceberg with Glue catalog." },
+ { IcebergEngine, Paid, "Creating table with an iceberg engine." },
}
};
}
From 3fdfb522e01dfa76e896bf38ede72ca091afa7ee Mon Sep 17 00:00:00 2001
From: Li0k
Date: Mon, 10 Feb 2025 11:48:41 +0800
Subject: [PATCH 5/5] fix(storage): fix state store sync metrics (#20420)
---
src/storage/src/monitor/monitored_store.rs | 43 ++++++++++++++--------
src/storage/src/monitor/traced_store.rs | 40 +++++++++++++-------
src/storage/src/store_impl.rs | 24 ++++++++++--
src/stream/src/task/barrier_manager.rs | 13 ++++---
4 files changed, 81 insertions(+), 39 deletions(-)
diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs
index 09fca278335d5..3ed30d77b37a8 100644
--- a/src/storage/src/monitor/monitored_store.rs
+++ b/src/storage/src/monitor/monitored_store.rs
@@ -18,7 +18,8 @@ use std::sync::Arc;
use await_tree::InstrumentAwait;
use bytes::Bytes;
-use futures::{Future, TryFutureExt};
+use futures::future::BoxFuture;
+use futures::{Future, FutureExt, TryFutureExt};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
@@ -37,6 +38,7 @@ use crate::hummock::{HummockStorage, SstableObjectIdManagerRef};
use crate::monitor::monitored_storage_metrics::StateStoreIterStats;
use crate::monitor::{StateStoreIterLogStats, StateStoreIterStatsTrait};
use crate::store::*;
+use crate::store_impl::AsHummock;
/// A state store wrapper for monitoring metrics.
#[derive(Clone)]
@@ -377,25 +379,34 @@ impl MonitoredStateStore {
pub fn sstable_object_id_manager(&self) -> SstableObjectIdManagerRef {
self.inner.sstable_object_id_manager().clone()
}
+}
+
+impl AsHummock for MonitoredStateStore {
+ fn as_hummock(&self) -> Option<&HummockStorage> {
+ self.inner.as_hummock()
+ }
- pub async fn sync(
+ fn sync(
&self,
sync_table_epochs: Vec<(HummockEpoch, HashSet)>,
- ) -> StorageResult {
- let future = self
- .inner
- .sync(sync_table_epochs)
- .instrument_await("store_sync");
- let timer = self.storage_metrics.sync_duration.start_timer();
- let sync_size = self.storage_metrics.sync_size.clone();
- let sync_result = future
- .await
- .inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?;
- timer.observe_duration();
- if sync_result.sync_size != 0 {
- sync_size.observe(sync_result.sync_size as _);
+ ) -> BoxFuture<'_, StorageResult> {
+ async move {
+ let future = self
+ .inner
+ .sync(sync_table_epochs)
+ .instrument_await("store_sync");
+ let timer = self.storage_metrics.sync_duration.start_timer();
+ let sync_size = self.storage_metrics.sync_size.clone();
+ let sync_result = future
+ .await
+ .inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?;
+ timer.observe_duration();
+ if sync_result.sync_size != 0 {
+ sync_size.observe(sync_result.sync_size as _);
+ }
+ Ok(sync_result)
}
- Ok(sync_result)
+ .boxed()
}
}
diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs
index ea178350c5c4e..dff1cba16bb1c 100644
--- a/src/storage/src/monitor/traced_store.rs
+++ b/src/storage/src/monitor/traced_store.rs
@@ -16,6 +16,7 @@ use std::collections::HashSet;
use std::sync::Arc;
use bytes::Bytes;
+use futures::future::BoxFuture;
use futures::{Future, FutureExt};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
@@ -32,6 +33,7 @@ use crate::error::StorageResult;
use crate::hummock::sstable_store::SstableStoreRef;
use crate::hummock::{HummockStorage, SstableObjectIdManagerRef};
use crate::store::*;
+use crate::store_impl::AsHummock;
#[derive(Clone)]
pub struct TracedStateStore {
@@ -369,23 +371,33 @@ impl TracedStateStore {
pub fn sstable_object_id_manager(&self) -> &SstableObjectIdManagerRef {
self.inner.sstable_object_id_manager()
}
+}
+
+impl AsHummock for TracedStateStore {
+ fn as_hummock(&self) -> Option<&HummockStorage> {
+ self.inner.as_hummock()
+ }
- pub async fn sync(
+ fn sync(
&self,
sync_table_epochs: Vec<(HummockEpoch, HashSet)>,
- ) -> StorageResult {
- let span: MayTraceSpan = TraceSpan::new_sync_span(&sync_table_epochs, self.storage_type);
-
- let future = self.inner.sync(sync_table_epochs);
-
- future
- .map(move |sync_result| {
- span.may_send_result(OperationResult::Sync(
- sync_result.as_ref().map(|res| res.sync_size).into(),
- ));
- sync_result
- })
- .await
+ ) -> BoxFuture<'_, StorageResult> {
+ async move {
+ let span: MayTraceSpan =
+ TraceSpan::new_sync_span(&sync_table_epochs, self.storage_type);
+
+ let future = self.inner.sync(sync_table_epochs);
+
+ future
+ .map(move |sync_result| {
+ span.may_send_result(OperationResult::Sync(
+ sync_result.as_ref().map(|res| res.sync_size).into(),
+ ));
+ sync_result
+ })
+ .await
+ }
+ .boxed()
}
}
diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs
index e6db419127ab5..c675f7ee4d126 100644
--- a/src/storage/src/store_impl.rs
+++ b/src/storage/src/store_impl.rs
@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
@@ -20,10 +21,13 @@ use enum_as_inner::EnumAsInner;
use foyer::{
DirectFsDeviceOptions, Engine, HybridCacheBuilder, LargeEngineOptions, RateLimitPicker,
};
+use futures::future::BoxFuture;
+use futures::FutureExt;
use mixtrics::registry::prometheus::PrometheusMetricsRegistry;
+use risingwave_common::catalog::TableId;
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common_service::RpcNotificationClient;
-use risingwave_hummock_sdk::HummockSstableObjectId;
+use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId, SyncResult};
use risingwave_object_store::object::build_remote_object_store;
use crate::compaction_catalog_manager::{CompactionCatalogManager, RemoteTableAccessor};
@@ -272,7 +276,7 @@ pub mod verify {
pub _phantom: PhantomData,
}
- impl AsHummock for VerifyStateStore {
+ impl AsHummock for VerifyStateStore {
fn as_hummock(&self) -> Option<&HummockStorage> {
self.actual.as_hummock()
}
@@ -778,8 +782,22 @@ impl StateStoreImpl {
}
}
-pub trait AsHummock {
+pub trait AsHummock: Send + Sync {
fn as_hummock(&self) -> Option<&HummockStorage>;
+
+ fn sync(
+ &self,
+ sync_table_epochs: Vec<(HummockEpoch, HashSet)>,
+ ) -> BoxFuture<'_, StorageResult> {
+ async move {
+ if let Some(hummock) = self.as_hummock() {
+ hummock.sync(sync_table_epochs).await
+ } else {
+ Ok(SyncResult::default())
+ }
+ }
+ .boxed()
+ }
}
impl AsHummock for HummockStorage {
diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs
index 41d3f00fc4ba4..6c8164e467915 100644
--- a/src/stream/src/task/barrier_manager.rs
+++ b/src/stream/src/task/barrier_manager.rs
@@ -31,6 +31,7 @@ use risingwave_pb::stream_service::barrier_complete_response::{
PbCreateMviewProgress, PbLocalSstableInfo,
};
use risingwave_rpc_client::error::{ToTonicStatus, TonicStatusWrapper};
+use risingwave_storage::store_impl::AsHummock;
use thiserror_ext::AsReport;
use tokio::select;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
@@ -625,7 +626,7 @@ mod await_epoch_completed_future {
use await_epoch_completed_future::*;
use risingwave_common::catalog::{DatabaseId, TableId};
-use risingwave_storage::StateStoreImpl;
+use risingwave_storage::{dispatch_state_store, StateStoreImpl};
fn sync_epoch(
state_store: &StateStoreImpl,
@@ -634,14 +635,14 @@ fn sync_epoch(
table_ids: HashSet,
) -> BoxFuture<'static, StreamResult> {
let timer = streaming_metrics.barrier_sync_latency.start_timer();
- let hummock = state_store.as_hummock().cloned();
+
+ let state_store = state_store.clone();
let future = async move {
- if let Some(hummock) = hummock {
+ dispatch_state_store!(state_store, hummock, {
hummock.sync(vec![(prev_epoch, table_ids)]).await
- } else {
- Ok(SyncResult::default())
- }
+ })
};
+
future
.instrument_await(format!("sync_epoch (epoch {})", prev_epoch))
.inspect_ok(move |_| {