Skip to content

Commit

Permalink
WIP iceberg: simple parser for unresolved_partition_spec
Browse files Browse the repository at this point in the history
  • Loading branch information
ztlpn committed Jan 16, 2025
1 parent fe9fa9b commit 9c7d055
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 0 deletions.
226 changes: 226 additions & 0 deletions src/v/iceberg/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,232 @@ std::ostream& operator<<(std::ostream& o, const unresolved_partition_spec& ps) {
return o;
}

namespace {

template<typename T>
struct parse_result {
T val;
std::string_view unparsed;
};

bool skip_space(std::string_view& str) {
auto it = str.begin();
while (it != str.end() && std::isspace(*it)) {
++it;
}
str = std::string_view{it, str.end()};
return it != str.begin();
}

bool skip_expected(std::string_view& str, const std::string_view& expected) {
if (!str.starts_with(expected)) {
return false;
}
str.remove_prefix(expected.length());
return true;
}

std::optional<parse_result<ss::sstring>>
parse_identifier(const std::string_view& str) {
auto it = str.begin();
while (it != str.end() && (*it == '_' || std::isalnum(*it))) {
++it;
}

if (it == str.begin()) {
return std::nullopt;
}

return parse_result<ss::sstring>{
.val = ss::sstring{str.begin(), it},
.unparsed = std::string_view{it, str.end()},
};
}

std::optional<parse_result<std::vector<ss::sstring>>>
parse_qualified_identifier(const std::string_view& str) {
auto unparsed = str;

std::vector<ss::sstring> result;
while (true) {
if (!result.empty()) {
if (!skip_expected(unparsed, ".")) {
break;
}
}

auto id = parse_identifier(unparsed);
if (!id) {
break;
}
result.push_back(id->val);
unparsed = id->unparsed;
}

if (result.empty()) {
return std::nullopt;
}

return parse_result<std::vector<ss::sstring>>{
.val = std::move(result),
.unparsed = unparsed,
};
}

struct transform_field {
std::vector<ss::sstring> source;
transform transform;
};

std::optional<parse_result<transform_field>>
parse_transform_field(const std::string_view& str) {
auto unparsed = str;

auto transform_id = parse_identifier(unparsed);
if (!transform_id) {
return std::nullopt;
}
transform transform;
if (transform_id->val == "hour") {
transform = hour_transform{};
} else if (transform_id->val == "day") {
transform = day_transform{};
} else if (transform_id->val == "identity") {
transform = identity_transform{};
} else {
return std::nullopt;
}
unparsed = transform_id->unparsed;

skip_space(unparsed);
if (!skip_expected(unparsed, "(")) {
return std::nullopt;
}

auto source = parse_qualified_identifier(unparsed);
if (!source) {
return std::nullopt;
}
unparsed = source->unparsed;

skip_space(unparsed);
if (!skip_expected(unparsed, ")")) {
return std::nullopt;
}

auto result = transform_field{
.source = std::move(source->val),
.transform = transform,
};

return parse_result<transform_field>{
.val = std::move(result),
.unparsed = unparsed,
};
}

std::optional<parse_result<unresolved_partition_spec::field>>
parse_partition_field(const std::string_view& str) {
auto unparsed = str;
skip_space(unparsed);

transform_field tf;
if (auto parsed_tf = parse_transform_field(unparsed); parsed_tf) {
tf = std::move(parsed_tf->val);
unparsed = parsed_tf->unparsed;
} else if (auto parsed_sf = parse_qualified_identifier(unparsed);
parsed_sf) {
tf.source = std::move(parsed_sf->val);
tf.transform = identity_transform{};
unparsed = parsed_sf->unparsed;
} else {
return std::nullopt;
}

ss::sstring source_field_str;
{
// TODO: better
bool first = true;
for (const auto& id : tf.source) {
if (!first) {
source_field_str += ".";
} else {
first = false;
}
source_field_str += id;
}

if (tf.transform != identity_transform{}) {
source_field_str += fmt::format("_{}", tf.transform);
}
}

// TODO: AS <name>

unresolved_partition_spec::field val{
.source_name = std::move(tf.source),
.transform = tf.transform,
.name = std::move(source_field_str),
};

return parse_result<unresolved_partition_spec::field>{
.val = std::move(val),
.unparsed = unparsed,
};
}

std::optional<parse_result<unresolved_partition_spec>>
parse_partition_field_list(const std::string_view& str) {
auto unparsed = str;
skip_space(unparsed);

if (!skip_expected(unparsed, "(")) {
return std::nullopt;
}

unresolved_partition_spec result;
while (true) {
if (!result.fields.empty()) {
skip_space(unparsed);
if (!skip_expected(unparsed, ",")) {
break;
}
}

auto field = parse_partition_field(unparsed);
if (!field) {
break;
}
result.fields.push_back(field->val);
unparsed = field->unparsed;
}

skip_space(unparsed);
if (!skip_expected(unparsed, ")")) {
return std::nullopt;
}

return parse_result<unresolved_partition_spec>{
.val = std::move(result),
.unparsed = unparsed,
};
}

} // namespace

std::optional<unresolved_partition_spec>
unresolved_partition_spec::parse(const std::string_view& str) {
auto res = parse_partition_field_list(str);
if (!res) {
return std::nullopt;
}
skip_space(res->unparsed);
if (!res->unparsed.empty()) {
return std::nullopt;
}
return std::move(res->val);
}

std::ostream& operator<<(std::ostream& o, const partition_field& f) {
fmt::print(
o,
Expand Down
8 changes: 8 additions & 0 deletions src/v/iceberg/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,21 @@ struct unresolved_partition_spec {
transform transform;
ss::sstring name;

friend bool operator==(const field&, const field&) = default;
friend std::ostream& operator<<(std::ostream&, const field&);
};

chunked_vector<field> fields;

friend bool operator==(
const unresolved_partition_spec&, const unresolved_partition_spec&)
= default;

friend std::ostream&
operator<<(std::ostream&, const unresolved_partition_spec&);

static std::optional<unresolved_partition_spec>
parse(const std::string_view&);
};

struct partition_field {
Expand Down
55 changes: 55 additions & 0 deletions src/v/iceberg/tests/partition_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,58 @@ TEST(PartitionTest, TestSpecResolve) {
ASSERT_EQ(resolved, std::nullopt);
}
}

TEST(PartitionTest, TestSpecParse) {
{
auto res = unresolved_partition_spec::parse("()");
ASSERT_TRUE(res);
ASSERT_EQ(res.value(), unresolved_partition_spec{});
}

{
auto res = unresolved_partition_spec::parse("(foo)");
ASSERT_TRUE(res);
auto expected = chunked_vector<unresolved_partition_spec::field>{
unresolved_partition_spec::field{
.source_name = {"foo"},
.transform = identity_transform{},
.name = "foo"},
};
ASSERT_EQ(
res.value(),
unresolved_partition_spec{.fields = std::move(expected)});
}

{
auto res = unresolved_partition_spec::parse(" (foo.bar, baz ) ");
ASSERT_TRUE(res);
auto expected = chunked_vector<unresolved_partition_spec::field>{
unresolved_partition_spec::field{
.source_name = {"foo", "bar"},
.transform = identity_transform{},
.name = "foo.bar"},
unresolved_partition_spec::field{
.source_name = {"baz"},
.transform = identity_transform{},
.name = "baz"},
};
ASSERT_EQ(
res.value(),
unresolved_partition_spec{.fields = std::move(expected)});
}

{
auto res = unresolved_partition_spec::parse(
" (hour(redpanda.timestamp)) ");
ASSERT_TRUE(res);
auto expected = chunked_vector<unresolved_partition_spec::field>{
unresolved_partition_spec::field{
.source_name = {"redpanda", "timestamp"},
.transform = hour_transform{},
.name = "redpanda.timestamp_hour"},
};
ASSERT_EQ(
res.value(),
unresolved_partition_spec{.fields = std::move(expected)});
}
}

0 comments on commit 9c7d055

Please sign in to comment.