Skip to content

Commit

Permalink
WIP iceberg: simple error reporting for partition spec parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
ztlpn committed Jan 20, 2025
1 parent 9037ff2 commit fcf181a
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/v/iceberg/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ redpanda_cc_library(
deps = [
":datatypes",
":transform",
"//src/v/base",
"//src/v/container:fragmented_vector",
"//src/v/utils:named_type",
"@seastar",
Expand Down
42 changes: 33 additions & 9 deletions src/v/iceberg/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ struct parse_result {
std::string_view unparsed;
};

struct parse_ctx {
std::string_view original;
ss::sstring last_error;

void report_expected(std::string_view unparsed, std::string_view expected) {
last_error = fmt::format(
"col {}: expected {} (got instead: `{}')",
unparsed.data() - original.data(),
expected,
unparsed);
}
};

bool skip_space(std::string_view& str) {
auto it = str.begin();
while (it != str.end() && std::isspace(*it)) {
Expand Down Expand Up @@ -106,7 +119,7 @@ struct transform_field {
};

std::optional<parse_result<transform_field>>
parse_transform_field(const std::string_view& str) {
parse_transform_field(const std::string_view& str, parse_ctx& ctx) {
auto unparsed = str;

auto transform_id = parse_identifier(unparsed);
Expand All @@ -121,23 +134,27 @@ parse_transform_field(const std::string_view& str) {
} else if (transform_id->val == "identity") {
transform = identity_transform{};
} else {
ctx.report_expected(unparsed, "known transform name");
return std::nullopt;
}
unparsed = transform_id->unparsed;

skip_space(unparsed);
if (!skip_expected(unparsed, "(")) {
ctx.report_expected(unparsed, "'('");
return std::nullopt;
}

auto source = parse_qualified_identifier(unparsed);
if (!source) {
ctx.report_expected(unparsed, "qualified identifier");
return std::nullopt;
}
unparsed = source->unparsed;

skip_space(unparsed);
if (!skip_expected(unparsed, ")")) {
ctx.report_expected(unparsed, "')'");
return std::nullopt;
}

Expand All @@ -153,12 +170,12 @@ parse_transform_field(const std::string_view& str) {
}

std::optional<parse_result<unresolved_partition_spec::field>>
parse_partition_field(const std::string_view& str) {
parse_partition_field(const std::string_view& str, parse_ctx& ctx) {
auto unparsed = str;
skip_space(unparsed);

transform_field tf;
if (auto parsed_tf = parse_transform_field(unparsed); parsed_tf) {
if (auto parsed_tf = parse_transform_field(unparsed, ctx); parsed_tf) {
tf = std::move(parsed_tf->val);
unparsed = parsed_tf->unparsed;
} else if (auto parsed_sf = parse_qualified_identifier(unparsed);
Expand All @@ -167,6 +184,7 @@ parse_partition_field(const std::string_view& str) {
tf.transform = identity_transform{};
unparsed = parsed_sf->unparsed;
} else {
ctx.report_expected(unparsed, "qualified identifier or transform");
return std::nullopt;
}

Expand All @@ -175,11 +193,13 @@ parse_partition_field(const std::string_view& str) {
skip_space(unparsed)
&& (skip_expected(unparsed, "AS") || skip_expected(unparsed, "as"))) {
if (!skip_space(unparsed)) {
ctx.report_expected(unparsed, "whitespace");
return std::nullopt;
}

auto id = parse_identifier(unparsed);
if (!id) {
ctx.report_expected(unparsed, "identifier");
return std::nullopt;
}
source_field_str = std::move(id->val);
Expand Down Expand Up @@ -214,11 +234,12 @@ parse_partition_field(const std::string_view& str) {
}

std::optional<parse_result<unresolved_partition_spec>>
parse_partition_field_list(const std::string_view& str) {
parse_partition_field_list(const std::string_view& str, parse_ctx& ctx) {
auto unparsed = str;
skip_space(unparsed);

if (!skip_expected(unparsed, "(")) {
ctx.report_expected(unparsed, "'('");
return std::nullopt;
}

Expand All @@ -227,11 +248,12 @@ parse_partition_field_list(const std::string_view& str) {
if (!result.fields.empty()) {
skip_space(unparsed);
if (!skip_expected(unparsed, ",")) {
ctx.report_expected(unparsed, ",");
break;
}
}

auto field = parse_partition_field(unparsed);
auto field = parse_partition_field(unparsed, ctx);
if (!field) {
break;
}
Expand All @@ -241,6 +263,7 @@ parse_partition_field_list(const std::string_view& str) {

skip_space(unparsed);
if (!skip_expected(unparsed, ")")) {
ctx.report_expected(unparsed, "')'");
return std::nullopt;
}

Expand All @@ -252,15 +275,16 @@ parse_partition_field_list(const std::string_view& str) {

} // namespace

std::optional<unresolved_partition_spec>
checked<unresolved_partition_spec, ss::sstring>
unresolved_partition_spec::parse(const std::string_view& str) {
auto res = parse_partition_field_list(str);
parse_ctx ctx{.original = str};
auto res = parse_partition_field_list(str, ctx);
if (!res) {
return std::nullopt;
return ctx.last_error;
}
skip_space(res->unparsed);
if (!res->unparsed.empty()) {
return std::nullopt;
return fmt::format("unparsed: `{}'", res->unparsed);
}
return std::move(res->val);
}
Expand Down
3 changes: 2 additions & 1 deletion src/v/iceberg/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0
#pragma once

#include "base/outcome.h"
#include "container/fragmented_vector.h"
#include "iceberg/datatypes.h"
#include "iceberg/transform.h"
Expand Down Expand Up @@ -38,7 +39,7 @@ struct unresolved_partition_spec {
friend std::ostream&
operator<<(std::ostream&, const unresolved_partition_spec&);

static std::optional<unresolved_partition_spec>
static checked<unresolved_partition_spec, ss::sstring>
parse(const std::string_view&);
};

Expand Down
8 changes: 4 additions & 4 deletions src/v/iceberg/tests/partition_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,13 @@ TEST(PartitionTest, TestSpecResolve) {
TEST(PartitionTest, TestSpecParse) {
{
auto res = unresolved_partition_spec::parse("()");
ASSERT_TRUE(res);
ASSERT_FALSE(res.has_error()) << res.error();
ASSERT_EQ(res.value(), unresolved_partition_spec{});
}

{
auto res = unresolved_partition_spec::parse("(foo)");
ASSERT_TRUE(res);
ASSERT_FALSE(res.has_error()) << res.error();
auto expected = chunked_vector<unresolved_partition_spec::field>{
unresolved_partition_spec::field{
.source_name = {"foo"},
Expand All @@ -258,7 +258,7 @@ TEST(PartitionTest, TestSpecParse) {

{
auto res = unresolved_partition_spec::parse(" (foo.bar, baz ) ");
ASSERT_TRUE(res);
ASSERT_FALSE(res.has_error()) << res.error();
auto expected = chunked_vector<unresolved_partition_spec::field>{
unresolved_partition_spec::field{
.source_name = {"foo", "bar"},
Expand All @@ -277,7 +277,7 @@ TEST(PartitionTest, TestSpecParse) {
{
auto res = unresolved_partition_spec::parse(
" (hour(redpanda.timestamp), day(my_ts) as my_day )");
ASSERT_TRUE(res);
ASSERT_FALSE(res.has_error()) << res.error();
auto expected = chunked_vector<unresolved_partition_spec::field>{
unresolved_partition_spec::field{
.source_name = {"redpanda", "timestamp"},
Expand Down

0 comments on commit fcf181a

Please sign in to comment.