Skip to content

Commit

Permalink
snapshot creation with v2checkpoints mvp
Browse files Browse the repository at this point in the history
This reverts commit 3b0b2a0.
  • Loading branch information
sebastiantia committed Feb 8, 2025
1 parent 08bbc01 commit e405e69
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 37 deletions.
8 changes: 4 additions & 4 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,15 +783,15 @@ mod tests {
}

#[test]
fn test_v2_checkpoint_unsupported() {
fn test_v2_checkpoint_supported() {
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::V2Checkpoint]),
Some([ReaderFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
assert!(protocol.ensure_read_supported().is_ok());

let protocol = Protocol::try_new(
4,
Expand Down Expand Up @@ -821,7 +821,7 @@ mod tests {
Some(&empty_features),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
assert!(protocol.ensure_read_supported().is_ok());

let protocol = Protocol::try_new(
3,
Expand All @@ -839,7 +839,7 @@ mod tests {
Some([WriterFeatures::V2Checkpoint]),
)
.unwrap();
assert!(protocol.ensure_read_supported().is_err());
assert!(protocol.ensure_read_supported().is_ok());

let protocol = Protocol {
min_reader_version: 1,
Expand Down
8 changes: 4 additions & 4 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,13 @@ impl LogSegment {
/// Returns an iterator over checkpoint data, processing sidecar files when necessary.
///
/// Checkpoint data is returned directly if:
/// - Processing a multi-part checkpoint
/// - Schema does not contain file actions
/// - Processing multi-part checkpoints
/// - Schema doesn't contain file actions
///
/// For single-part checkpoints, any referenced sidecar files are processed. These
/// sidecar files contain the actual add/remove actions that would otherwise be
/// stored directly in the checkpoint. The sidecar file batches replace the checkpoint
/// batch in the top level iterator to be returned.
/// stored directly in the checkpoint, providing a more efficient storage mechanism
/// for large change sets.
fn create_checkpoint_stream(
engine: &dyn Engine,
checkpoint_read_schema: SchemaRef,
Expand Down
87 changes: 77 additions & 10 deletions kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ fn build_log_with_paths_and_checkpoint(
}

#[test]
fn build_snapshot_with_unsupported_uuid_checkpoint() {
fn build_snapshot_with_uuid_checkpoint_parquet() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
Expand All @@ -130,18 +130,88 @@ fn build_snapshot_with_unsupported_uuid_checkpoint() {
],
None,
);

let log_segment = LogSegment::for_snapshot(client.as_ref(), log_root, None, None).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;

assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 3);
assert_eq!(checkpoint_parts[0].version, 5);

let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![4, 5, 6, 7];
let expected_versions = vec![6, 7];
assert_eq!(versions, expected_versions);
}

#[test]
fn build_snapshot_with_uuid_checkpoint_json() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "json"),
delta_path_for_version(5, "checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
None,
);

let log_segment = LogSegment::for_snapshot(client.as_ref(), log_root, None, None).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;

assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(checkpoint_parts[0].version, 5);

let versions = commit_files.into_iter().map(|x| x.version).collect_vec();
let expected_versions = vec![6, 7];
assert_eq!(versions, expected_versions);
}

#[test]
fn build_snapshot_with_correct_last_uuid_checkpoint() {
let checkpoint_metadata = CheckpointMetadata {
version: 5,
size: 10,
parts: Some(1),
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
};

let (client, log_root) = build_log_with_paths_and_checkpoint(
&[
delta_path_for_version(0, "json"),
delta_path_for_version(1, "checkpoint.parquet"),
delta_path_for_version(1, "json"),
delta_path_for_version(2, "json"),
delta_path_for_version(3, "checkpoint.parquet"),
delta_path_for_version(3, "json"),
delta_path_for_version(4, "json"),
delta_path_for_version(5, "checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet"),
delta_path_for_version(5, "json"),
delta_path_for_version(6, "json"),
delta_path_for_version(7, "json"),
],
Some(&checkpoint_metadata),
);

let log_segment =
LogSegment::for_snapshot(client.as_ref(), log_root, checkpoint_metadata, None).unwrap();
let commit_files = log_segment.ascending_commit_files;
let checkpoint_parts = log_segment.checkpoint_parts;

assert_eq!(checkpoint_parts.len(), 1);
assert_eq!(commit_files.len(), 2);
assert_eq!(checkpoint_parts[0].version, 5);
assert_eq!(commit_files[0].version, 6);
assert_eq!(commit_files[1].version, 7);
}
#[test]
fn build_snapshot_with_multiple_incomplete_multipart_checkpoints() {
let (client, log_root) = build_log_with_paths_and_checkpoint(
Expand Down Expand Up @@ -641,8 +711,6 @@ fn test_sidecar_to_filemeta() -> DeltaResult<()> {
// This forms a valid URL but represents an invalid path since sidecar files
// are restricted to the `_delta_log/_sidecars` directory.
// Attempting to read this file will fail because it does not exist.

// TODO: Catch this error earlier to return a more informative error message.
(
"test/test/example.parquet",
"file:///var/_delta_log/_sidecars/test/test/example.parquet",
Expand Down Expand Up @@ -816,7 +884,7 @@ fn test_create_checkpoint_stream_errors_when_schema_has_add_but_no_sidecar_actio
}

// TODO: Use a batch of sidecar actions to test that we do not visit batches when the schema has no file actions
// View multi-part checkpoint test for more details.
// View multi_part checkpoint test for more details.
#[tokio::test]
async fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schema_has_no_file_actions(
) -> DeltaResult<()> {
Expand Down Expand Up @@ -1010,8 +1078,7 @@ async fn test_create_checkpoint_stream_reads_json_checkpoint_batch() -> DeltaRes
// More integration test than unit test below:

// Encapsulates logic that has already been tested but tests the interaction between the functions,
// such as performing a map operation on the returned sidecar batches from `process_single_checkpoint_batch`
// to include the is_log_batch flag
// such as performing a map operation on the returned sidecar batches to include the is_log_batch flag
#[tokio::test]
async fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar_batches(
) -> DeltaResult<()> {
Expand Down Expand Up @@ -1042,7 +1109,7 @@ async fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar
)
.await;

let checkpoint_file_path = mock_table
let checkpoint_one_file = mock_table
.log_root()
.join("00001.checkpoint.parquet")?
.to_string();
Expand All @@ -1051,7 +1118,7 @@ async fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar
&engine,
v2_checkpoint_read_schema.clone(),
None,
vec![create_file_meta(&checkpoint_file_path)],
vec![create_file_meta(&checkpoint_one_file)],
mock_table.log_root(),
)?
.into_iter();
Expand Down
20 changes: 6 additions & 14 deletions kernel/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,17 @@ impl<Location: AsUrl> ParsedLogPath<Location> {
// TODO: Include UuidCheckpoint once we actually support v2 checkpoints
matches!(
self.file_type,
LogPathFileType::SinglePartCheckpoint | LogPathFileType::MultiPartCheckpoint { .. }
LogPathFileType::SinglePartCheckpoint
| LogPathFileType::MultiPartCheckpoint { .. }
| LogPathFileType::UuidCheckpoint(_)
)
}

#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[allow(dead_code)] // currently only used in tests, which don't "count"
fn is_unknown(&self) -> bool {
// TODO: Stop treating UuidCheckpoint as unknown once we support v2 checkpoints
matches!(
self.file_type,
LogPathFileType::Unknown | LogPathFileType::UuidCheckpoint(_)
)
matches!(self.file_type, LogPathFileType::Unknown)
}
}

Expand Down Expand Up @@ -357,10 +355,7 @@ mod tests {
LogPathFileType::UuidCheckpoint(ref u) if u == "3a0d65cd-4056-49b8-937b-95f9e3ee90e5",
));
assert!(!log_path.is_commit());

// TODO: Support v2 checkpoints! Until then we can't treat these as checkpoint files.
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
assert!(log_path.is_checkpoint());

let log_path = table_log_dir
.join("00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.json")
Expand All @@ -377,10 +372,7 @@ mod tests {
LogPathFileType::UuidCheckpoint(ref u) if u == "3a0d65cd-4056-49b8-937b-95f9e3ee90e5",
));
assert!(!log_path.is_commit());

// TODO: Support v2 checkpoints! Until then we can't treat these as checkpoint files.
assert!(!log_path.is_checkpoint());
assert!(log_path.is_unknown());
assert!(log_path.is_checkpoint());

let log_path = table_log_dir
.join("00000000000000000002.checkpoint.3a0d65cd-4056-49b8-937b-95f9e3ee90e5.foo")
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub struct TableChanges {
}

impl TableChanges {
/// Creates a new [`TableChanges`] instance for the given version range. This function checks
/// Creates a new [`TableChanges`] instance for the given version range. This function checks
/// these properties:
/// - The change data feed table feature must be enabled in both the start or end versions.
/// - Other than the deletion vector reader feature, no other reader features are enabled for the table.
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/table_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ mod test {
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::V2Checkpoint]),
Some([WriterFeatures::V2Checkpoint]),
Some([ReaderFeatures::UnsupportedFeature]),
Some([WriterFeatures::UnsupportedFeature]),
)
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
Expand Down
7 changes: 7 additions & 0 deletions kernel/src/table_features/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub enum ReaderFeatures {
/// vacuumProtocolCheck ReaderWriter feature ensures consistent application of reader and writer
/// protocol checks during VACUUM operations
VacuumProtocolCheck,
/// A dummy variant used to represent an unsupported feature for testing purposes
UnsupportedFeature,
}

/// Similar to reader features, writer features communicate capabilities that must be implemented
Expand Down Expand Up @@ -109,6 +111,8 @@ pub enum WriterFeatures {
/// vacuumProtocolCheck ReaderWriter feature ensures consistent application of reader and writer
/// protocol checks during VACUUM operations
VacuumProtocolCheck,
/// A dummy variant used to represent an unsupported feature for testing purposes
UnsupportedFeature,
}

impl From<ReaderFeatures> for String {
Expand All @@ -133,6 +137,7 @@ pub(crate) static SUPPORTED_READER_FEATURES: LazyLock<HashSet<ReaderFeatures>> =
ReaderFeatures::TypeWidening,
ReaderFeatures::TypeWideningPreview,
ReaderFeatures::VacuumProtocolCheck,
ReaderFeatures::V2Checkpoint,
])
});

Expand All @@ -154,6 +159,7 @@ mod tests {
(ReaderFeatures::TypeWideningPreview, "typeWidening-preview"),
(ReaderFeatures::V2Checkpoint, "v2Checkpoint"),
(ReaderFeatures::VacuumProtocolCheck, "vacuumProtocolCheck"),
(ReaderFeatures::UnsupportedFeature, "unsupportedFeature"),
];

assert_eq!(ReaderFeatures::VARIANTS.len(), cases.len());
Expand Down Expand Up @@ -192,6 +198,7 @@ mod tests {
(WriterFeatures::IcebergCompatV1, "icebergCompatV1"),
(WriterFeatures::IcebergCompatV2, "icebergCompatV2"),
(WriterFeatures::VacuumProtocolCheck, "vacuumProtocolCheck"),
(WriterFeatures::UnsupportedFeature, "unsupportedFeature"),
];

assert_eq!(WriterFeatures::VARIANTS.len(), cases.len());
Expand Down
4 changes: 2 additions & 2 deletions kernel/tests/golden_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,8 @@ golden_test!("time-travel-start", latest_snapshot_test);
golden_test!("time-travel-start-start20", latest_snapshot_test);
golden_test!("time-travel-start-start20-start40", latest_snapshot_test);

skip_test!("v2-checkpoint-json": "v2 checkpoint not supported");
skip_test!("v2-checkpoint-parquet": "v2 checkpoint not supported");
golden_test!("v2-checkpoint-json", latest_snapshot_test);
golden_test!("v2-checkpoint-parquet", latest_snapshot_test);
// BUG:
// - AddFile: 'file:/some/unqualified/absolute/path'
// - RemoveFile: '/some/unqualified/absolute/path'
Expand Down

0 comments on commit e405e69

Please sign in to comment.