Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Feb 10, 2025
2 parents fb25fe0 + 3fdfb52 commit f70e622
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 45 deletions.
6 changes: 6 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ apt-get -y install jq
echo "--- e2e, inline test"
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_meta=info" \
risedev ci-start ci-inline-source-test

# check if run debug only test
if [ "$profile" == "ci-dev" ]; then
echo "--- Run debug mode only tests"
risedev slt './e2e_test/debug_mode_only/debug_splits.slt'
fi
risedev slt './e2e_test/source_inline/**/*.slt' -j16
risedev slt './e2e_test/source_inline/**/*.slt.serial'
echo "--- Kill cluster"
Expand Down
73 changes: 73 additions & 0 deletions e2e_test/debug_mode_only/debug_splits.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
control substitution on

system ok
rpk topic delete 'test_debug_splits' || true;

system ok
rpk topic create test_debug_splits -p 2

system ok
cat <<EOF | rpk topic produce test_debug_splits -p 0
{"x":1}
{"x":3}
{"x":5}
EOF

system ok
cat <<EOF | rpk topic produce test_debug_splits -p 1
{"x":2}
{"x":4}
{"x":6}
EOF

statement ok
create table all_splits(x int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_debug_splits',
debug_splits = '[{"split_info": {"partition": 0, "start_offset": -1, "stop_offset": null, "topic": "test_debug_splits"}, "split_type": "kafka"},{"split_info": {"partition": 1, "start_offset": -1, "stop_offset": null, "topic": "test_debug_splits"}, "split_type": "kafka"}]'
) format plain encode json;

statement ok
create table one_split(x int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_debug_splits',
debug_splits = '[{"split_info": {"partition": 0, "start_offset": -1, "stop_offset": null, "topic": "test_debug_splits"}, "split_type": "kafka"}]'
) format plain encode json;

# only change the topic name to non-exist inside debug_splits
statement ok
create table one_split_err(x int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_debug_splits',
debug_splits = '[{"split_info": {"partition": 0, "start_offset": -1, "stop_offset": null, "topic": "test_debug_splits_non_exist"}, "split_type": "kafka"}]'
) format plain encode json;

sleep 1s

query I
select count(*) from all_splits;
----
6

query I
select count(*) from one_split;
----
3

# should not crash
query I
select count(*) from one_split_err;
----
0

statement ok
drop table all_splits;

statement ok
drop table one_split;

statement ok
drop table one_split_err;

system ok
rpk topic delete 'test_debug_splits'
6 changes: 6 additions & 0 deletions java/connector-node/risingwave-sink-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,11 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-snowflake -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-snowflake</artifactId>
<version>1.7.1</version>
</dependency>
</dependencies>
</project>
2 changes: 2 additions & 0 deletions src/connector/src/connector_common/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ impl IcebergCommon {
}
catalog_type
if catalog_type == "hive"
|| catalog_type == "snowflake"
|| catalog_type == "jdbc"
|| catalog_type == "rest"
|| catalog_type == "glue" =>
Expand All @@ -449,6 +450,7 @@ impl IcebergCommon {
let catalog_impl = match catalog_type {
"hive" => "org.apache.iceberg.hive.HiveCatalog",
"jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
"snowflake" => "org.apache.iceberg.snowflake.SnowflakeCatalog",
"rest" => "org.apache.iceberg.rest.RESTCatalog",
"glue" => "org.apache.iceberg.aws.glue.GlueCatalog",
_ => unreachable!(),
Expand Down
3 changes: 3 additions & 0 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ impl Sink for IcebergSink {
const SINK_NAME: &'static str = ICEBERG_SINK;

async fn validate(&self) -> Result<()> {
if "snowflake".eq_ignore_ascii_case(self.config.catalog_type()) {
bail!("Snowflake catalog only supports iceberg sources");
}
if "glue".eq_ignore_ascii_case(self.config.catalog_type()) {
risingwave_common::license::Feature::IcebergSinkWithGlue
.check_available()
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,9 @@ pub async fn create_iceberg_engine_table(
constraints: Vec<TableConstraint>,
table_name: ObjectName,
) -> Result<()> {
risingwave_common::license::Feature::IcebergEngine
.check_available()
.map_err(|e| anyhow::anyhow!(e))?;
// 1. fetch iceberg engine options from the meta node. Or use iceberg engine connection provided by users.
// 2. create a hummock table
// 3. create an iceberg sink
Expand Down
1 change: 1 addition & 0 deletions src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl JavaVmWrapper {
.version(JNIVersion::V8)
.option("-Dis_embedded_connector=true")
.option(format!("-Djava.class.path={}", class_vec.join(":")))
.option("--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED")
.option("-Xms16m")
.option(format!("-Xmx{}", jvm_heap_size));

Expand Down
1 change: 1 addition & 0 deletions src/license/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ macro_rules! for_all_features {
{ SqlServerCdcSource, Paid, "CDC source connector for Sql Server." },
{ CdcAutoSchemaChange, Paid, "Auto replicate upstream DDL to CDC Table." },
{ IcebergSinkWithGlue, Paid, "Delivering data to Iceberg with Glue catalog." },
{ IcebergEngine, Paid, "Creating table with an iceberg engine." },
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,20 @@ impl HummockManager {
);
}

// clear `partition_vnode_count` for the hybrid group
{
if let Err(err) = compaction_groups_txn.update_compaction_config(
&[left_group_id],
&[MutableConfig::SplitWeightByVnode(0)], // default
) {
tracing::error!(
error = %err.as_report(),
"failed to update compaction config for group-{}",
left_group_id
);
}
}

new_version_delta.pre_apply();

// remove right_group_id
Expand Down
78 changes: 72 additions & 6 deletions src/meta/src/stream/source_manager/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(not(debug_assertions))]
use risingwave_connector::error::ConnectorError;
use risingwave_connector::source::AnySplitEnumerator;

use super::*;

const MAX_FAIL_CNT: u32 = 10;
const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);

// The key used to load `SplitImpl` directly from source properties.
// When this key is present, the enumerator will only return the given ones
// instead of fetching them from the external source.
// Only valid in debug builds - will return an error in release builds.
const DEBUG_SPLITS_KEY: &str = "debug_splits";

pub struct SharedSplitMap {
pub splits: Option<BTreeMap<SplitId, SplitImpl>>,
}
Expand All @@ -38,6 +46,8 @@ pub struct ConnectorSourceWorker {
connector_properties: ConnectorProperties,
fail_cnt: u32,
source_is_up: LabelGuardedIntGauge<2>,

debug_splits: Option<Vec<SplitImpl>>,
}

fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
Expand All @@ -48,8 +58,24 @@ fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult<Connect
Ok(properties)
}
fn extract_prop_from_new_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
let options_with_secret =
WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
let options_with_secret = WithOptionsSecResolved::new(
{
let mut with_properties = source.with_properties.clone();
let _removed = with_properties.remove(DEBUG_SPLITS_KEY);

#[cfg(not(debug_assertions))]
{
if _removed.is_some() {
return Err(ConnectorError::from(anyhow::anyhow!(
"`debug_splits` is not allowed in release mode"
)));
}
}

with_properties
},
source.secret_refs.clone(),
);
let mut properties = ConnectorProperties::extract(options_with_secret, true)?;
properties.init_from_pb_source(source);
Ok(properties)
Expand Down Expand Up @@ -220,6 +246,38 @@ impl ConnectorSourceWorker {
connector_properties,
fail_cnt: 0,
source_is_up,
debug_splits: {
let debug_splits = source.with_properties.get(DEBUG_SPLITS_KEY);
#[cfg(not(debug_assertions))]
{
if debug_splits.is_some() {
return Err(ConnectorError::from(anyhow::anyhow!(
"`debug_splits` is not allowed in release mode"
))
.into());
}
None
}

#[cfg(debug_assertions)]
{
use risingwave_common::types::JsonbVal;
if let Some(debug_splits) = debug_splits {
let mut splits = Vec::new();
let debug_splits_value =
jsonbb::serde_json::from_str::<serde_json::Value>(debug_splits)
.context("failed to parse split impl")?;
for split_impl_value in debug_splits_value.as_array().unwrap() {
splits.push(SplitImpl::restore_from_json(JsonbVal::from(
split_impl_value.clone(),
))?);
}
Some(splits)
} else {
None
}
}
},
})
}

Expand Down Expand Up @@ -272,10 +330,18 @@ impl ConnectorSourceWorker {
let source_is_up = |res: i64| {
self.source_is_up.set(res);
};
let splits = self.enumerator.list_splits().await.inspect_err(|_| {
source_is_up(0);
self.fail_cnt += 1;
})?;

let splits = {
if let Some(debug_splits) = &self.debug_splits {
debug_splits.clone()
} else {
self.enumerator.list_splits().await.inspect_err(|_| {
source_is_up(0);
self.fail_cnt += 1;
})?
}
};

source_is_up(1);
self.fail_cnt = 0;
let mut current_splits = self.current_splits.lock().await;
Expand Down
43 changes: 27 additions & 16 deletions src/storage/src/monitor/monitored_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use std::sync::Arc;

use await_tree::InstrumentAwait;
use bytes::Bytes;
use futures::{Future, TryFutureExt};
use futures::future::BoxFuture;
use futures::{Future, FutureExt, TryFutureExt};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
Expand All @@ -37,6 +38,7 @@ use crate::hummock::{HummockStorage, SstableObjectIdManagerRef};
use crate::monitor::monitored_storage_metrics::StateStoreIterStats;
use crate::monitor::{StateStoreIterLogStats, StateStoreIterStatsTrait};
use crate::store::*;
use crate::store_impl::AsHummock;

/// A state store wrapper for monitoring metrics.
#[derive(Clone)]
Expand Down Expand Up @@ -377,25 +379,34 @@ impl MonitoredStateStore<HummockStorage> {
pub fn sstable_object_id_manager(&self) -> SstableObjectIdManagerRef {
self.inner.sstable_object_id_manager().clone()
}
}

impl<S: AsHummock> AsHummock for MonitoredStateStore<S> {
fn as_hummock(&self) -> Option<&HummockStorage> {
self.inner.as_hummock()
}

pub async fn sync(
fn sync(
&self,
sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
) -> StorageResult<SyncResult> {
let future = self
.inner
.sync(sync_table_epochs)
.instrument_await("store_sync");
let timer = self.storage_metrics.sync_duration.start_timer();
let sync_size = self.storage_metrics.sync_size.clone();
let sync_result = future
.await
.inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?;
timer.observe_duration();
if sync_result.sync_size != 0 {
sync_size.observe(sync_result.sync_size as _);
) -> BoxFuture<'_, StorageResult<SyncResult>> {
async move {
let future = self
.inner
.sync(sync_table_epochs)
.instrument_await("store_sync");
let timer = self.storage_metrics.sync_duration.start_timer();
let sync_size = self.storage_metrics.sync_size.clone();
let sync_result = future
.await
.inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?;
timer.observe_duration();
if sync_result.sync_size != 0 {
sync_size.observe(sync_result.sync_size as _);
}
Ok(sync_result)
}
Ok(sync_result)
.boxed()
}
}

Expand Down
Loading

0 comments on commit f70e622

Please sign in to comment.