Skip to content

Commit

Permalink
refactor(connector): make SplitEnumerator/Reader dyn (#20098)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Jan 14, 2025
1 parent 4064587 commit 325404c
Show file tree
Hide file tree
Showing 15 changed files with 345 additions and 270 deletions.
90 changes: 72 additions & 18 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#[macro_export]
macro_rules! for_all_classified_sources {
($macro:path $(,$extra_args:tt)*) => {
($macro:path $(, $extra_args:tt)*) => {
$macro! {
// cdc sources
{
Expand Down Expand Up @@ -67,7 +67,7 @@ macro_rules! for_all_connections {
#[macro_export]
macro_rules! for_all_sources_inner {
(
{$({ $cdc_source_type:ident }),* },
{ $({ $cdc_source_type:ident }),* },
{ $({ $source_variant:ident, $prop_name:ty, $split:ty }),* },
$macro:tt $(, $extra_args:tt)*
) => {
Expand All @@ -79,13 +79,14 @@ macro_rules! for_all_sources_inner {
[< $cdc_source_type Cdc >],
$crate::source::cdc::[< $cdc_source_type CdcProperties >],
$crate::source::cdc::DebeziumCdcSplit<$crate::source::cdc::$cdc_source_type>
},
)*
}
),*
,
$(
{ $source_variant, $prop_name, $split }
),*
}
$(,$extra_args)*
$(, $extra_args)*
}
}
};
Expand All @@ -98,22 +99,55 @@ macro_rules! for_all_sources {
};
}

/// The invocation:
/// ```ignore
/// dispatch_source_enum_inner!(
/// {
/// {A1,B1,C1},
/// {A2,B2,C2}
/// },
/// EnumType, enum_value, inner_ident, body
/// );
/// ```
/// expands to:
/// ```ignore
/// match enum_value {
/// EnumType::A1(inner_ident) => {
/// #[allow(dead_code)]
/// type PropType = B1;
/// #[allow(dead_code)]
/// type SplitType = C1;
/// {
/// body
/// }
/// }
/// EnumType::A2(inner_ident) => {
/// #[allow(dead_code)]
/// type PropType = B2;
/// #[allow(dead_code)]
/// type SplitType = C2;
/// {
/// body
/// }
/// }
/// }
/// ```
#[macro_export]
macro_rules! dispatch_source_enum_inner {
(
{$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
$enum_name:ident,
$impl:tt,
{$inner_name:ident, $prop_type_name:ident, $split_type_name:ident},
$enum_type:ident,
$enum_value:expr,
$inner_name:ident,
$body:expr
) => {{
match $impl {
match $enum_value {
$(
$enum_name::$source_variant($inner_name) => {
$enum_type::$source_variant($inner_name) => {
#[allow(dead_code)]
type $prop_type_name = $prop_name;
type PropType = $prop_name;
#[allow(dead_code)]
type $split_type_name = $split;
type SplitType = $split;
{
$body
}
Expand All @@ -123,10 +157,28 @@ macro_rules! dispatch_source_enum_inner {
}}
}

/// Usage: `dispatch_source_enum!(EnumType, enum_value, |inner_ident| body)`.
///
/// Inside `body`:
/// - use `inner_ident` to represent the matched variant.
/// - use `PropType` to represent the concrete property type.
/// - use `SplitType` to represent the concrete split type.
///
/// Expands to:
/// ```ignore
/// match enum_value {
/// EnumType::Variant1(inner_ident) => {
/// body
/// }
/// ...
/// }
/// ```
///
/// Note: `inner_ident` must be passed as an argument due to macro hygiene.
#[macro_export]
macro_rules! dispatch_source_enum {
($enum_name:ident, $impl:expr, $inner_name:tt, $body:expr) => {{
$crate::for_all_sources! {$crate::dispatch_source_enum_inner, $enum_name, { $impl }, $inner_name, $body}
($enum_type:ident, $enum_value:expr, |$inner_name:ident| $body:expr) => {{
$crate::for_all_sources! {$crate::dispatch_source_enum_inner, $enum_type, { $enum_value }, $inner_name, $body}
}};
}

Expand Down Expand Up @@ -167,11 +219,12 @@ macro_rules! match_source_name_str {
}};
}

/// [`dispatch_source_enum`] with `SplitImpl` as the enum type.
#[macro_export]
macro_rules! dispatch_split_impl {
($impl:expr, $inner_name:ident, $prop_type_name:ident, $body:expr) => {{
($impl:expr, | $inner_name:ident | $body:expr) => {{
use $crate::source::SplitImpl;
$crate::dispatch_source_enum! {SplitImpl, { $impl }, {$inner_name, $prop_type_name, IgnoreSplitType}, $body}
$crate::dispatch_source_enum! {SplitImpl, { $impl }, |$inner_name| $body}
}};
}

Expand Down Expand Up @@ -290,11 +343,12 @@ macro_rules! impl_split {
}
}

/// [`dispatch_source_enum`] with `ConnectorProperties` as the enum type.
#[macro_export]
macro_rules! dispatch_source_prop {
($impl:expr, $source_prop:tt, $body:expr) => {{
($connector_properties:expr, |$inner_ident:ident| $body:expr) => {{
use $crate::source::ConnectorProperties;
$crate::dispatch_source_enum! {ConnectorProperties, { $impl }, {$source_prop, IgnorePropType, IgnoreSplitType}, {$body}}
$crate::dispatch_source_enum! {ConnectorProperties, { $connector_properties }, |$inner_ident| {$body}}
}};
}

Expand Down
147 changes: 124 additions & 23 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use async_trait::async_trait;
use aws_sdk_s3::types::Object;
use bytes::Bytes;
use enum_as_inner::EnumAsInner;
use futures::future::try_join_all;
use futures::stream::BoxStream;
use futures::Stream;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::bail;
Expand All @@ -31,6 +32,7 @@ use risingwave_common::types::{JsonbVal, Scalar};
use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
use risingwave_pb::plan_common::ExternalTableDesc;
use risingwave_pb::source::ConnectorSplit;
use rw_futures_util::select_all;
use serde::de::DeserializeOwned;
use serde_json::json;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -113,21 +115,77 @@ impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
}
}

pub async fn create_split_reader<P: SourceProperties>(
#[derive(Default)]
pub struct CreateSplitReaderOpt {
pub support_multiple_splits: bool,
pub seek_to_latest: bool,
}

#[derive(Default)]
pub struct CreateSplitReaderResult {
pub latest_splits: Option<Vec<SplitImpl>>,
pub backfill_info: HashMap<SplitId, BackfillInfo>,
}

pub async fn create_split_readers<P: SourceProperties>(
prop: P,
splits: Vec<SplitImpl>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
) -> Result<P::SplitReader> {
opt: CreateSplitReaderOpt,
) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> {
let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
P::SplitReader::new(prop, splits, parser_config, source_ctx, columns).await
let mut res = CreateSplitReaderResult {
backfill_info: HashMap::new(),
latest_splits: None,
};
if opt.support_multiple_splits {
let mut reader = P::SplitReader::new(
prop.clone(),
splits,
parser_config.clone(),
source_ctx.clone(),
columns.clone(),
)
.await?;
if opt.seek_to_latest {
res.latest_splits = Some(reader.seek_to_latest().await?);
}
res.backfill_info = reader.backfill_info();
Ok((reader.into_stream().boxed(), res))
} else {
let mut readers = try_join_all(splits.into_iter().map(|split| {
// TODO: is this reader split across multiple threads...? Realistically, we want
// source_ctx to live in a single actor.
P::SplitReader::new(
prop.clone(),
vec![split],
parser_config.clone(),
source_ctx.clone(),
columns.clone(),
)
}))
.await?;
if opt.seek_to_latest {
let mut latest_splits = vec![];
for reader in &mut readers {
latest_splits.extend(reader.seek_to_latest().await?);
}
res.latest_splits = Some(latest_splits);
}
res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
Ok((
select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
res,
))
}
}

/// [`SplitEnumerator`] fetches the split metadata from the external source service.
/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
#[async_trait]
pub trait SplitEnumerator: Sized {
pub trait SplitEnumerator: Sized + Send {
type Split: SplitMetaData + Send;
type Properties;

Expand All @@ -139,6 +197,21 @@ pub trait SplitEnumerator: Sized {
pub type SourceContextRef = Arc<SourceContext>;
pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;

/// Dyn-compatible [`SplitEnumerator`].
#[async_trait]
pub trait AnySplitEnumerator: Send {
async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
}

#[async_trait]
impl<T: SplitEnumerator<Split: Into<SplitImpl>>> AnySplitEnumerator for T {
async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
SplitEnumerator::list_splits(self)
.await
.map(|s| s.into_iter().map(|s| s.into()).collect())
}
}

/// The max size of a chunk yielded by source stream.
pub const MAX_CHUNK_SIZE: usize = 1024;

Expand Down Expand Up @@ -484,12 +557,13 @@ impl ConnectorProperties {

/// Load additional info from `PbSource`. Currently only used by CDC.
pub fn init_from_pb_source(&mut self, source: &PbSource) {
dispatch_source_prop!(self, prop, prop.init_from_pb_source(source))
dispatch_source_prop!(self, |prop| prop.init_from_pb_source(source))
}

/// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
dispatch_source_prop!(self, prop, prop.init_from_pb_cdc_table_desc(cdc_table_desc))
dispatch_source_prop!(self, |prop| prop
.init_from_pb_cdc_table_desc(cdc_table_desc))
}

pub fn support_multiple_splits(&self) -> bool {
Expand All @@ -498,16 +572,52 @@ impl ConnectorProperties {
|| matches!(self, ConnectorProperties::Gcs(_))
|| matches!(self, ConnectorProperties::Azblob(_))
}

pub async fn create_split_enumerator(
self,
context: crate::source::base::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
let enumerator: Box<dyn AnySplitEnumerator> = dispatch_source_prop!(self, |prop| Box::new(
<PropType as SourceProperties>::SplitEnumerator::new(*prop, context).await?
));
Ok(enumerator)
}

pub async fn create_split_reader(
self,
splits: Vec<SplitImpl>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
mut opt: crate::source::CreateSplitReaderOpt,
) -> Result<(BoxSourceChunkStream, crate::source::CreateSplitReaderResult)> {
opt.support_multiple_splits = self.support_multiple_splits();
tracing::debug!(
?splits,
support_multiple_splits = opt.support_multiple_splits,
"spawning connector split reader",
);

dispatch_source_prop!(self, |prop| create_split_readers(
*prop,
splits,
parser_config,
source_ctx,
columns,
opt
)
.await)
}
}

for_all_sources!(impl_split);
for_all_connections!(impl_connection);

impl From<&SplitImpl> for ConnectorSplit {
fn from(split: &SplitImpl) -> Self {
dispatch_split_impl!(split, inner, SourcePropType, {
dispatch_split_impl!(split, |inner| {
ConnectorSplit {
split_type: String::from(SourcePropType::SOURCE_NAME),
split_type: String::from(PropType::SOURCE_NAME),
encoded_split: inner.encode_to_bytes().to_vec(),
}
})
Expand Down Expand Up @@ -564,7 +674,7 @@ impl SplitImpl {

impl SplitMetaData for SplitImpl {
fn id(&self) -> SplitId {
dispatch_split_impl!(self, inner, IgnoreType, inner.id())
dispatch_split_impl!(self, |inner| inner.id())
}

fn encode_to_json(&self) -> JsonbVal {
Expand All @@ -587,31 +697,22 @@ impl SplitMetaData for SplitImpl {
}

fn update_offset(&mut self, last_seen_offset: String) -> Result<()> {
dispatch_split_impl!(
self,
inner,
IgnoreType,
inner.update_offset(last_seen_offset)
)
dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset))
}
}

impl SplitImpl {
pub fn get_type(&self) -> String {
dispatch_split_impl!(self, _ignored, PropType, {
PropType::SOURCE_NAME.to_owned()
})
dispatch_split_impl!(self, |_inner| PropType::SOURCE_NAME.to_owned())
}

pub fn update_in_place(&mut self, last_seen_offset: String) -> Result<()> {
dispatch_split_impl!(self, inner, IgnoreType, {
inner.update_offset(last_seen_offset)?
});
dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset)?);
Ok(())
}

pub fn encode_to_json_inner(&self) -> JsonbVal {
dispatch_split_impl!(self, inner, IgnoreType, inner.encode_to_json())
dispatch_split_impl!(self, |inner| inner.encode_to_json())
}
}

Expand Down
Loading

0 comments on commit 325404c

Please sign in to comment.