Skip to content

Commit

Permalink
feat: allow pass-by source enumerator check (#20352)
Browse files Browse the repository at this point in the history
Co-authored-by: tabversion <[email protected]>
  • Loading branch information
tabVersion and tabversion authored Feb 8, 2025
1 parent 1578865 commit e928b30
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 6 deletions.
6 changes: 6 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ apt-get -y install jq
echo "--- e2e, inline test"
RUST_LOG="debug,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info,risingwave_meta=info" \
risedev ci-start ci-inline-source-test

# check if run debug only test
if [ "$profile" == "ci-dev" ]; then
echo "--- Run debug mode only tests"
risedev slt './e2e_test/debug_mode_only/debug_splits.slt'
fi
risedev slt './e2e_test/source_inline/**/*.slt' -j16
risedev slt './e2e_test/source_inline/**/*.slt.serial'
echo "--- Kill cluster"
Expand Down
73 changes: 73 additions & 0 deletions e2e_test/debug_mode_only/debug_splits.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
control substitution on

system ok
rpk topic delete 'test_debug_splits' || true;

system ok
rpk topic create test_debug_splits -p 2

system ok
cat <<EOF | rpk topic produce test_debug_splits -p 0
{"x":1}
{"x":3}
{"x":5}
EOF

system ok
cat <<EOF | rpk topic produce test_debug_splits -p 1
{"x":2}
{"x":4}
{"x":6}
EOF

statement ok
create table all_splits(x int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_debug_splits',
debug_splits = '[{"split_info": {"partition": 0, "start_offset": -1, "stop_offset": null, "topic": "test_debug_splits"}, "split_type": "kafka"},{"split_info": {"partition": 1, "start_offset": -1, "stop_offset": null, "topic": "test_debug_splits"}, "split_type": "kafka"}]'
) format plain encode json;

statement ok
create table one_split(x int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_debug_splits',
debug_splits = '[{"split_info": {"partition": 0, "start_offset": -1, "stop_offset": null, "topic": "test_debug_splits"}, "split_type": "kafka"}]'
) format plain encode json;

# only change the topic name to non-exist inside debug_splits
statement ok
create table one_split_err(x int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_debug_splits',
debug_splits = '[{"split_info": {"partition": 0, "start_offset": -1, "stop_offset": null, "topic": "test_debug_splits_non_exist"}, "split_type": "kafka"}]'
) format plain encode json;

sleep 1s

query I
select count(*) from all_splits;
----
6

query I
select count(*) from one_split;
----
3

# should not crash
query I
select count(*) from one_split_err;
----
0

statement ok
drop table all_splits;

statement ok
drop table one_split;

statement ok
drop table one_split_err;

system ok
rpk topic delete 'test_debug_splits'
78 changes: 72 additions & 6 deletions src/meta/src/stream/source_manager/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(not(debug_assertions))]
use risingwave_connector::error::ConnectorError;
use risingwave_connector::source::AnySplitEnumerator;

use super::*;

const MAX_FAIL_CNT: u32 = 10;
const DEFAULT_SOURCE_TICK_TIMEOUT: Duration = Duration::from_secs(10);

// The key used to load `SplitImpl` directly from source properties.
// When this key is present, the enumerator will only return the given ones
// instead of fetching them from the external source.
// Only valid in debug builds - will return an error in release builds.
const DEBUG_SPLITS_KEY: &str = "debug_splits";

pub struct SharedSplitMap {
pub splits: Option<BTreeMap<SplitId, SplitImpl>>,
}
Expand All @@ -38,6 +46,8 @@ pub struct ConnectorSourceWorker {
connector_properties: ConnectorProperties,
fail_cnt: u32,
source_is_up: LabelGuardedIntGauge<2>,

debug_splits: Option<Vec<SplitImpl>>,
}

fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
Expand All @@ -48,8 +58,24 @@ fn extract_prop_from_existing_source(source: &Source) -> ConnectorResult<Connect
Ok(properties)
}
fn extract_prop_from_new_source(source: &Source) -> ConnectorResult<ConnectorProperties> {
let options_with_secret =
WithOptionsSecResolved::new(source.with_properties.clone(), source.secret_refs.clone());
let options_with_secret = WithOptionsSecResolved::new(
{
let mut with_properties = source.with_properties.clone();
let _removed = with_properties.remove(DEBUG_SPLITS_KEY);

#[cfg(not(debug_assertions))]
{
if _removed.is_some() {
return Err(ConnectorError::from(anyhow::anyhow!(
"`debug_splits` is not allowed in release mode"
)));
}
}

with_properties
},
source.secret_refs.clone(),
);
let mut properties = ConnectorProperties::extract(options_with_secret, true)?;
properties.init_from_pb_source(source);
Ok(properties)
Expand Down Expand Up @@ -220,6 +246,38 @@ impl ConnectorSourceWorker {
connector_properties,
fail_cnt: 0,
source_is_up,
debug_splits: {
let debug_splits = source.with_properties.get(DEBUG_SPLITS_KEY);
#[cfg(not(debug_assertions))]
{
if debug_splits.is_some() {
return Err(ConnectorError::from(anyhow::anyhow!(
"`debug_splits` is not allowed in release mode"
))
.into());
}
None
}

#[cfg(debug_assertions)]
{
use risingwave_common::types::JsonbVal;
if let Some(debug_splits) = debug_splits {
let mut splits = Vec::new();
let debug_splits_value =
jsonbb::serde_json::from_str::<serde_json::Value>(debug_splits)
.context("failed to parse split impl")?;
for split_impl_value in debug_splits_value.as_array().unwrap() {
splits.push(SplitImpl::restore_from_json(JsonbVal::from(
split_impl_value.clone(),
))?);
}
Some(splits)
} else {
None
}
}
},
})
}

Expand Down Expand Up @@ -272,10 +330,18 @@ impl ConnectorSourceWorker {
let source_is_up = |res: i64| {
self.source_is_up.set(res);
};
let splits = self.enumerator.list_splits().await.inspect_err(|_| {
source_is_up(0);
self.fail_cnt += 1;
})?;

let splits = {
if let Some(debug_splits) = &self.debug_splits {
debug_splits.clone()
} else {
self.enumerator.list_splits().await.inspect_err(|_| {
source_is_up(0);
self.fail_cnt += 1;
})?
}
};

source_is_up(1);
self.fail_cnt = 0;
let mut current_splits = self.current_splits.lock().await;
Expand Down

0 comments on commit e928b30

Please sign in to comment.