Skip to content

Commit

Permalink
Iceberg sync
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeiPatiakin committed Dec 31, 2024
1 parent 54bc304 commit a95ca93
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 23 deletions.
15 changes: 4 additions & 11 deletions src/context/iceberg.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::str;
use iceberg_datafusion::IcebergTableProvider;
use itertools::izip;
use std::collections::HashMap;
use std::error::Error;
Expand All @@ -12,7 +13,7 @@ use datafusion::error::Result;
use datafusion::execution::{RecordBatchStream, TaskContext};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::ExecutionPlanProperties;
use datafusion_common::{DataFusionError, TableReference};
use datafusion_common::DataFusionError;
use futures::stream::select_all;
use futures::{pin_mut, StreamExt, TryStream, TryStreamExt};
use iceberg::io::FileIO;
Expand All @@ -35,7 +36,7 @@ use tracing::info;
use url::Url;
use uuid::Uuid;

use super::{LakehouseTableProvider, SeafowlContext};
use super::SeafowlContext;

use thiserror::Error;

Expand Down Expand Up @@ -384,17 +385,9 @@ pub async fn record_batches_to_iceberg(
impl SeafowlContext {
pub async fn plan_to_iceberg_table(
&self,
name: impl Into<TableReference>,
provider: &IcebergTableProvider,
plan: &Arc<dyn ExecutionPlan>,
) -> Result<()> {
let provider = match self.get_lakehouse_table_provider(name).await? {
LakehouseTableProvider::Iceberg(p) => p,
_ => {
return Err(DataFusionError::Internal(
"Expected iceberg provider".to_string(),
));
}
};
let table = provider.table();
let schema = plan.schema();
let mut streams: Vec<Pin<Box<dyn RecordBatchStream + Send>>> = vec![];
Expand Down
5 changes: 2 additions & 3 deletions src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,8 @@ impl SeafowlContext {
.await?;
Ok(make_dummy_exec())
}
LakehouseTableProvider::Iceberg(_) => {
self.plan_to_iceberg_table(table_name.clone(), &physical)
.await?;
LakehouseTableProvider::Iceberg(provider) => {
self.plan_to_iceberg_table(&provider, &physical).await?;
Ok(make_dummy_exec())
}
}
Expand Down
51 changes: 46 additions & 5 deletions src/frontend/flight/handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::catalog::memory::MemoryStore;
use crate::catalog::metastore::Metastore;
use crate::catalog::DEFAULT_SCHEMA;
use arrow::record_batch::RecordBatch;
use arrow_flight::sql::metadata::{SqlInfoData, SqlInfoDataBuilder};
use arrow_flight::sql::{ProstMessageExt, SqlInfo, TicketStatementQuery};
Expand All @@ -10,7 +11,12 @@ use dashmap::DashMap;
use datafusion::common::Result;
use datafusion::execution::SendableRecordBatchStream;
use datafusion_common::DataFusionError;
use iceberg::io::FileIO;
use iceberg::table::StaticTable;
use iceberg::TableIdent;
use iceberg_datafusion::IcebergTableProvider;
use lazy_static::lazy_static;
use object_store_factory::object_store_opts_to_file_io_props;
use prost::Message;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -23,7 +29,7 @@ use url::Url;
use crate::context::SeafowlContext;
use crate::sync::schema::SyncSchema;
use crate::sync::writer::SeafowlDataSyncWriter;
use crate::sync::{LakehouseSyncTarget, SyncError, SyncResult};
use crate::sync::{IcebergSyncTarget, LakehouseSyncTarget, SyncError, SyncResult};

lazy_static! {
pub static ref SEAFOWL_SQL_DATA: SqlInfoData = {
Expand Down Expand Up @@ -163,7 +169,7 @@ impl SeafowlFlightHandler {
});
}

let (sync_target, url) = match cmd.format() {
let sync_target = match cmd.format() {
TableFormat::Delta => {
let log_store = match cmd.store {
None => self
Expand All @@ -186,14 +192,49 @@ impl SeafowlFlightHandler {
.await?
}
};
let url = log_store.root_uri();
(LakehouseSyncTarget::Delta(log_store), url)
LakehouseSyncTarget::Delta(log_store)
}
TableFormat::Iceberg => {
return Err(SyncError::NotImplemented);
let (location, file_io) = match cmd.store {
None => {
return Err(SyncError::NotImplemented);
}
Some(store_loc) => {
let location = store_loc.location;
let options = store_loc.options;
let file_io_props = object_store_opts_to_file_io_props(&options);
let file_io = FileIO::from_path(&location)
.unwrap()
.with_props(file_io_props)
.build()?;
(location, file_io)
}
};

// Create the full path to table metadata by combining the object store location and
// relative table metadata path
let absolute_path = format!(
"{}/{}",
location.trim_end_matches("/"),
cmd.path.trim_start_matches("/")
);
let iceberg_table = StaticTable::from_metadata_file(
&absolute_path,
TableIdent::from_strs(vec![DEFAULT_SCHEMA, "dummy_name"]).unwrap(),
file_io,
)
.await?
.into_table();
let table_provider =
IcebergTableProvider::try_new_from_table(iceberg_table).await?;
LakehouseSyncTarget::Iceberg(IcebergSyncTarget {
table_provider,
url: absolute_path.clone(),
})
}
};

let url = sync_target.get_url();
let num_batches = batches.len();

debug!("Processing data change with {num_rows} rows, {num_batches} batches, descriptor {sync_schema}, url {url} from origin {:?} at position {:?}",
Expand Down
16 changes: 15 additions & 1 deletion src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::sync::writer::SeafowlDataSyncWriter;
use deltalake::logstore::LogStore;
use iceberg_datafusion::IcebergTableProvider;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -38,6 +39,9 @@ pub enum SyncError {
#[error(transparent)]
DeltaTableError(#[from] deltalake::errors::DeltaTableError),

#[error(transparent)]
IcebergError(#[from] iceberg::Error),

#[error(transparent)]
ObjectStoreError(#[from] object_store::Error),
}
Expand All @@ -59,7 +63,8 @@ pub(super) struct SyncCommitInfo {

#[derive(Clone, Debug)]
pub struct IcebergSyncTarget {
url: String,
pub table_provider: IcebergTableProvider,
pub url: String,
}

#[derive(Clone, Debug)]
Expand All @@ -68,6 +73,15 @@ pub enum LakehouseSyncTarget {
Iceberg(IcebergSyncTarget),
}

impl LakehouseSyncTarget {
pub fn get_url(&self) -> String {
match self {
LakehouseSyncTarget::Iceberg(IcebergSyncTarget { url, .. }) => url.clone(),
LakehouseSyncTarget::Delta(log_store) => log_store.root_uri(),
}
}
}

impl SyncCommitInfo {
pub(super) fn new(
origin: impl Into<Origin>,
Expand Down
25 changes: 25 additions & 0 deletions src/sync/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,31 @@ impl SeafowlSyncPlanner {
}
}

pub(super) async fn plan_iceberg_syncs(
&self,
syncs: &[DataSyncItem],
table_schema: Arc<arrow_schema::Schema>,
table_provider: Arc<dyn TableProvider>,
) -> SyncResult<Arc<dyn ExecutionPlan>> {
// Convert the custom Iceberg table provider into a base logical plan
let base_plan = LogicalPlanBuilder::scan(
LOWER_REL,
provider_as_source(table_provider),
None,
)?
.build()?;

let base_df = DataFrame::new(self.session_state(), base_plan);

let (sync_schema, sync_df) = self.squash_syncs(syncs)?;
let (sync_schema, sync_df) = self.normalize_syncs(&sync_schema, sync_df)?;
let input_df = self
.apply_syncs(table_schema, base_df, sync_df, &sync_schema)
.await?;
let input_plan = input_df.create_physical_plan().await?;
Ok(input_plan)
}

// Construct a plan for flushing the pending syncs to the provided table.
// Return the plan and the files that are re-written by it (to be removed from the table state).
pub(super) async fn plan_delta_syncs(
Expand Down
24 changes: 21 additions & 3 deletions src/sync/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use deltalake::kernel::{Action, Schema};
use deltalake::operations::create::CreateBuilder;
use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::DeltaTable;
use iceberg::arrow::schema_to_arrow_schema;
use indexmap::IndexMap;
use itertools::Itertools;
use std::collections::{HashMap, HashSet};
Expand All @@ -20,7 +21,9 @@ use crate::sync::metrics::SyncWriterMetrics;
use crate::sync::planner::SeafowlSyncPlanner;
use crate::sync::schema::SyncSchema;
use crate::sync::utils::{get_size_and_rows, squash_batches};
use crate::sync::{Origin, SequenceNumber, SyncCommitInfo, SyncError, SyncResult};
use crate::sync::{
IcebergSyncTarget, Origin, SequenceNumber, SyncCommitInfo, SyncError, SyncResult,
};

use super::LakehouseSyncTarget;

Expand Down Expand Up @@ -491,8 +494,23 @@ impl SeafowlDataSyncWriter {
"Committed data sync up to {new_sync_commit:?} for location {url}"
);
}
LakehouseSyncTarget::Iceberg(..) => {
return Err(SyncError::NotImplemented);
LakehouseSyncTarget::Iceberg(IcebergSyncTarget {
table_provider, ..
}) => {
let table = table_provider.table();
let schema =
schema_to_arrow_schema(table.metadata().current_schema()).unwrap();
let planner = SeafowlSyncPlanner::new(self.context.clone());
let plan = planner
.plan_iceberg_syncs(
&entry.syncs,
Arc::new(schema),
Arc::new(table_provider.clone()),
)
.await?;
self.context
.plan_to_iceberg_table(table_provider, &plan)
.await?;
}
};

Expand Down
67 changes: 67 additions & 0 deletions tests/flight/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,3 +609,70 @@ async fn test_sync_custom_store(

Ok(())
}

#[tokio::test]
async fn test_sync_iceberg_custom_store(
) -> std::result::Result<(), Box<dyn std::error::Error>> {
let (_ctx, mut client) = flight_server(TestServerType::Memory).await;

let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
Field::new("c", DataType::Utf8, true),
]));
let column_descriptors = vec![
ColumnDescriptor {
role: ColumnRole::OldPk as _,
name: "key".to_string(),
},
ColumnDescriptor {
role: ColumnRole::NewPk as _,
name: "key".to_string(),
},
ColumnDescriptor {
role: ColumnRole::Value as _,
name: "value".to_string(),
},
];

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::new_null(100_000)),
Arc::new(Int32Array::from((0..100_000).collect::<Vec<i32>>())),
Arc::new(StringArray::from(vec!["a"; 100_000])),
],
)?;

let location = "s3://seafowl-test-bucket/test-data/iceberg/default.db";
let path = "iceberg_table_2/metadata/v1.metadata.json";
let options = minio_options();

let store = StorageLocation {
name: "custom-store".to_string(),
location: location.to_string(),
options,
};

let cmd = DataSyncCommand {
path: path.to_string(),
store: Some(store),
column_descriptors,
origin: "42".to_string(),
sequence_number: Some(1000),
format: TableFormat::Iceberg.into(),
};

let sync_result = do_put_sync(cmd.clone(), batch.clone(), &mut client).await?;
assert_eq!(
sync_result,
DataSyncResponse {
accepted: true,
memory_sequence_number: Some(1000),
durable_sequence_number: Some(1000),
first: true,
}
);

Ok(())
}

0 comments on commit a95ca93

Please sign in to comment.