Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adopt types for table features, fixed a few clippy warnings #684

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kernel/src/actions/deletion_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl DeletionVectorDescriptor {
let path_len = self.path_or_inline_dv.len();
require!(
path_len >= 20,
Error::deletion_vector("Invalid length {path_len}, must be >= 20")
Error::deletion_vector(format!("Invalid length {path_len}, must be >= 20"))
);
let prefix_len = path_len - 20;
let decoded = z85::decode(&self.path_or_inline_dv[prefix_len..])
Expand Down
149 changes: 100 additions & 49 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
//! Provides parsing and manipulation of the various actions defined in the [Delta
//! specification](https://github.com/delta-io/delta/blob/master/PROTOCOL.md)

use std::any::type_name;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::fmt::Debug;
use std::str::FromStr;
use std::sync::LazyLock;

Expand Down Expand Up @@ -159,11 +157,11 @@ pub struct Protocol {
/// A collection of features that a client must implement in order to correctly
/// read this table (exist only when minReaderVersion is set to 3)
#[serde(skip_serializing_if = "Option::is_none")]
reader_features: Option<Vec<String>>,
reader_features: Option<Vec<ReaderFeatures>>,
/// A collection of features that a client must implement in order to correctly
/// write this table (exist only when minWriterVersion is set to 7)
#[serde(skip_serializing_if = "Option::is_none")]
writer_features: Option<Vec<String>>,
writer_features: Option<Vec<WriterFeatures>>,
}

impl Protocol {
Expand Down Expand Up @@ -191,8 +189,16 @@ impl Protocol {
)
);
}
let reader_features = reader_features.map(|f| f.into_iter().map(Into::into).collect());
let writer_features = writer_features.map(|f| f.into_iter().map(Into::into).collect());
let reader_features: Option<Vec<ReaderFeatures>> = reader_features.and_then(|f| {
f.into_iter()
.map(|f| ReaderFeatures::from_str(&f.into()).ok())
.collect::<Option<Vec<ReaderFeatures>>>()
});
let writer_features = writer_features.and_then(|f| {
f.into_iter()
.map(|f| WriterFeatures::from_str(&f.into()).ok())
.collect::<Option<Vec<WriterFeatures>>>()
});
Ok(Protocol {
min_reader_version,
min_writer_version,
Expand Down Expand Up @@ -220,25 +226,25 @@ impl Protocol {
}

/// Get the reader features for the protocol
pub fn reader_features(&self) -> Option<&[String]> {
pub fn reader_features(&self) -> Option<&[ReaderFeatures]> {
self.reader_features.as_deref()
}

/// Get the writer features for the protocol
pub fn writer_features(&self) -> Option<&[String]> {
pub fn writer_features(&self) -> Option<&[WriterFeatures]> {
self.writer_features.as_deref()
}

/// True if this protocol has the requested reader feature
pub fn has_reader_feature(&self, feature: &ReaderFeatures) -> bool {
pub fn has_reader_feature(&self, feature: ReaderFeatures) -> bool {
self.reader_features()
.is_some_and(|features| features.iter().any(|f| f == feature.as_ref()))
.is_some_and(|features| features.iter().any(|f| *f == feature))
}

/// True if this protocol has the requested writer feature
pub fn has_writer_feature(&self, feature: &WriterFeatures) -> bool {
pub fn has_writer_feature(&self, feature: WriterFeatures) -> bool {
self.writer_features()
.is_some_and(|features| features.iter().any(|f| f == feature.as_ref()))
.is_some_and(|features| features.iter().any(|f| *f == feature))
}

/// Check if reading a table with this protocol is supported. That is: does the kernel support
Expand All @@ -248,7 +254,7 @@ impl Protocol {
match &self.reader_features {
// if min_reader_version = 3 and all reader features are subset of supported => OK
Some(reader_features) if self.min_reader_version == 3 => {
ensure_supported_features(reader_features, &SUPPORTED_READER_FEATURES)
ensure_reader_supported_features(reader_features, SUPPORTED_READER_FEATURES.clone())
}
// if min_reader_version = 3 and no reader features => ERROR
// NOTE this is caught by the protocol parsing.
Expand Down Expand Up @@ -276,9 +282,12 @@ impl Protocol {
/// support the specified protocol writer version and all enabled writer features?
pub fn ensure_write_supported(&self) -> DeltaResult<()> {
match &self.writer_features {
Some(writer_features) if self.min_writer_version == 7 => {
// if we're on version 7, make sure we support all the specified features
ensure_supported_features(writer_features, &SUPPORTED_WRITER_FEATURES)
// if min_reader_version = 3 and min_writer_version = 7 and all writer features are
// supported => OK
Some(writer_features)
if self.min_reader_version == 3 && self.min_writer_version == 7 =>
{
ensure_writer_supported_features(writer_features, SUPPORTED_WRITER_FEATURES.clone())
}
Some(_) => {
// there are features, but we're not on 7, so the protocol is actually broken
Expand All @@ -300,39 +309,79 @@ impl Protocol {
}
}

// given unparsed `table_features`, parse and check if they are subset of `supported_features`
pub(crate) fn ensure_supported_features<T>(
table_features: &[String],
supported_features: &HashSet<T>,
) -> DeltaResult<()>
#[inline]
fn create_feature_error<T>(
unsupported: Vec<T>,
unsupported_or_unknown: &str,
features: HashSet<T>,
) -> Error
where
<T as FromStr>::Err: Display,
T: Debug + FromStr + Hash + Eq,
T: ToString + Debug,
{
let error = |unsupported, unsupported_or_unknown| {
let supported = supported_features.iter().collect::<Vec<_>>();
let features_type = type_name::<T>()
.rsplit("::")
.next()
.unwrap_or("table features");
Error::Unsupported(format!(
"{} {} {:?}. Supported {} are {:?}",
unsupported_or_unknown, features_type, unsupported, features_type, supported
))
};
let parsed_features: HashSet<T> = table_features
let unsupported = unsupported
.iter()
.map(|s| T::from_str(s).map_err(|_| error(vec![s.to_string()], "Unknown")))
.collect::<Result<_, Error>>()?;
.map(ToString::to_string)
.collect::<Vec<_>>();

Error::Unsupported(format!(
"{} reader features {:?}. Supported reader features are {:?}",
unsupported_or_unknown, unsupported, features
))
}

pub(crate) fn ensure_reader_supported_features(
table_features: &[ReaderFeatures],
supported_features: HashSet<ReaderFeatures>,
) -> DeltaResult<()> {
let (unknown_features, other_features): (Vec<_>, Vec<_>) = table_features
.iter()
.cloned()
.partition(|f| matches!(f, ReaderFeatures::Unknown(_)));
if !unknown_features.is_empty() {
return Err(create_feature_error(
unknown_features,
"Unknown",
supported_features,
));
}
let parsed_features = HashSet::from_iter(other_features);
parsed_features
.is_subset(&supported_features)
.then_some(())
.ok_or_else(|| {
let unsupported = parsed_features
.difference(&supported_features)
.cloned()
.collect::<Vec<_>>();
create_feature_error(unsupported, "Unsupported", supported_features)
})
}

pub(crate) fn ensure_writer_supported_features(
table_features: &[WriterFeatures],
supported_features: HashSet<WriterFeatures>,
) -> DeltaResult<()> {
let (unknown_features, other_features): (Vec<_>, Vec<_>) = table_features
.iter()
.cloned()
.partition(|f| matches!(f, WriterFeatures::Unknown(_)));
if !unknown_features.is_empty() {
return Err(create_feature_error(
unknown_features,
"Unknown",
supported_features,
));
}
let parsed_features = HashSet::from_iter(other_features);
parsed_features
.is_subset(supported_features)
.is_subset(&supported_features)
.then_some(())
.ok_or_else(|| {
let unsupported = parsed_features
.difference(supported_features)
.map(|f| format!("{:?}", f))
.difference(&supported_features)
.cloned()
.collect::<Vec<_>>();
error(unsupported, "Unsupported")
create_feature_error(unsupported, "Unsupported", supported_features)
})
}

Expand Down Expand Up @@ -889,24 +938,26 @@ mod tests {

#[test]
fn test_ensure_supported_features() {
let supported_features = [
let supported_features: HashSet<_> = [
ReaderFeatures::ColumnMapping,
ReaderFeatures::DeletionVectors,
]
.into_iter()
.collect();
let table_features = vec![ReaderFeatures::ColumnMapping.to_string()];
ensure_supported_features(&table_features, &supported_features).unwrap();
let table_features = vec![ReaderFeatures::ColumnMapping];
ensure_reader_supported_features(&table_features, supported_features.clone()).unwrap();

// test unknown features
let table_features = vec![ReaderFeatures::ColumnMapping.to_string(), "idk".to_string()];
let error = ensure_supported_features(&table_features, &supported_features).unwrap_err();
let table_features = vec![ReaderFeatures::Unknown("idk".into())];
let error =
ensure_reader_supported_features(&table_features, supported_features).unwrap_err();
dbg!(&error);
match error {
Error::Unsupported(e) if e ==
"Unknown ReaderFeatures [\"idk\"]. Supported ReaderFeatures are [ColumnMapping, DeletionVectors]"
"Unknown reader features [\"idk\"]. Supported reader features are {ColumnMapping, DeletionVectors}"
=> {},
Error::Unsupported(e) if e ==
"Unknown ReaderFeatures [\"idk\"]. Supported ReaderFeatures are [DeletionVectors, ColumnMapping]"
"Unknown reader features [\"idk\"]. Supported reader features are {DeletionVectors, ColumnMapping}"
=> {},
_ => panic!("Expected unsupported error"),
}
Expand Down
5 changes: 3 additions & 2 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ pub(crate) fn visit_deletion_vector_at<'a>(

#[cfg(test)]
mod tests {
use crate::table_features::{ReaderFeatures, WriterFeatures};
use std::sync::Arc;

use crate::arrow::array::{RecordBatch, StringArray};
Expand Down Expand Up @@ -559,8 +560,8 @@ mod tests {
let expected = Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: Some(vec!["deletionVectors".into()]),
writer_features: Some(vec!["deletionVectors".into()]),
reader_features: Some(vec![ReaderFeatures::DeletionVectors]),
writer_features: Some(vec![WriterFeatures::DeletionVectors]),
};
assert_eq!(parsed, expected);
Ok(())
Expand Down
15 changes: 9 additions & 6 deletions kernel/src/engine/arrow_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ mod tests {
use crate::{
actions::{get_log_schema, Metadata, Protocol},
engine::sync::SyncEngine,
table_features::{ReaderFeatures, WriterFeatures},
DeltaResult, Engine, EngineData,
};

Expand Down Expand Up @@ -339,20 +340,22 @@ mod tests {
let engine = SyncEngine::new();
let handler = engine.get_json_handler();
let json_strings: StringArray = vec![
r#"{"protocol": {"minReaderVersion": 3, "minWriterVersion": 7, "readerFeatures": ["rw1"], "writerFeatures": ["rw1", "w2"]}}"#,
r#"{"protocol": {"minReaderVersion": 3, "minWriterVersion": 7, "readerFeatures": ["deletionVectors"], "writerFeatures": ["invariants", "appendOnly"]}}"#,
]
.into();
let output_schema = get_log_schema().project(&["protocol"])?;
let parsed = handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
let parsed =
handler.parse_json(string_array_to_engine_data(json_strings), output_schema)?;
let protocol = Protocol::try_new_from_data(parsed.as_ref())?.unwrap();
assert_eq!(protocol.min_reader_version(), 3);
assert_eq!(protocol.min_writer_version(), 7);
assert_eq!(protocol.reader_features(), Some(["rw1".into()].as_slice()));
assert_eq!(
protocol.reader_features(),
Some([ReaderFeatures::DeletionVectors].as_slice())
);
assert_eq!(
protocol.writer_features(),
Some(["rw1".into(), "w2".into()].as_slice())
Some([WriterFeatures::Invariants, WriterFeatures::AppendOnly].as_slice())
);
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<Location: AsUrl> ParsedLogPath<Location> {
let filename = url
.path_segments()
.ok_or_else(|| Error::invalid_log_path(url))?
.last()
.next_back()
.unwrap() // "the iterator always contains at least one string (which may be empty)"
.to_string();
if filename.is_empty() {
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::sync::{Arc, LazyLock};
use scan::TableChangesScanBuilder;
use url::Url;

use crate::actions::{ensure_supported_features, Protocol};
use crate::actions::{ensure_reader_supported_features, Protocol};
use crate::log_segment::LogSegment;
use crate::path::AsUrl;
use crate::schema::{DataType, Schema, StructField, StructType};
Expand Down Expand Up @@ -257,7 +257,7 @@ fn ensure_cdf_read_supported(protocol: &Protocol) -> DeltaResult<()> {
match &protocol.reader_features() {
// if min_reader_version = 3 and all reader features are subset of supported => OK
Some(reader_features) if protocol.min_reader_version() == 3 => {
ensure_supported_features(reader_features, &CDF_SUPPORTED_READER_FEATURES)
ensure_reader_supported_features(reader_features, CDF_SUPPORTED_READER_FEATURES.clone())
}
// if min_reader_version = 1 and there are no reader features => OK
None if protocol.min_reader_version() == 1 => Ok(()),
Expand Down
12 changes: 8 additions & 4 deletions kernel/src/table_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::{Arc, LazyLock};

use url::Url;

use crate::actions::{ensure_supported_features, Metadata, Protocol};
use crate::actions::{ensure_reader_supported_features, Metadata, Protocol};
use crate::schema::{Schema, SchemaRef};
use crate::table_features::{
column_mapping_mode, validate_schema_column_mapping, ColumnMappingMode, ReaderFeatures,
Expand Down Expand Up @@ -142,7 +142,11 @@ impl TableConfiguration {
let protocol_supported = match self.protocol.reader_features() {
// if min_reader_version = 3 and all reader features are subset of supported => OK
Some(reader_features) if self.protocol.min_reader_version() == 3 => {
ensure_supported_features(reader_features, &CDF_SUPPORTED_READER_FEATURES).is_ok()
ensure_reader_supported_features(
reader_features,
CDF_SUPPORTED_READER_FEATURES.clone(),
)
.is_ok()
}
// if min_reader_version = 1 and there are no reader features => OK
None => self.protocol.min_reader_version() == 1,
Expand All @@ -169,11 +173,11 @@ impl TableConfiguration {
pub(crate) fn is_deletion_vector_supported(&self) -> bool {
let read_supported = self
.protocol()
.has_reader_feature(&ReaderFeatures::DeletionVectors)
.has_reader_feature(ReaderFeatures::DeletionVectors)
&& self.protocol.min_reader_version() == 3;
let write_supported = self
.protocol()
.has_writer_feature(&WriterFeatures::DeletionVectors)
.has_writer_feature(WriterFeatures::DeletionVectors)
&& self.protocol.min_writer_version() == 7;
read_supported && write_supported
}
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/table_features/column_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub(crate) fn column_mapping_mode(
// (but should be ignored) even when the feature is not supported. For details see
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#column-mapping
(Some(mode), 2) => mode,
(Some(mode), 3) if protocol.has_reader_feature(&ReaderFeatures::ColumnMapping) => mode,
(Some(mode), 3) if protocol.has_reader_feature(ReaderFeatures::ColumnMapping) => mode,
_ => ColumnMappingMode::None,
}
}
Expand Down
Loading
Loading