Skip to content

Commit

Permalink
Implement Path Extraction (#19)
Browse files Browse the repository at this point in the history
This change implements support for path extraction SerDe Properties.
It uses the same ion-java-path-extraction library the Ion Hive SerDe
does. Unlike the Hive SerDe, this ensures that the "strict" and more
performant path extraction implementation is used.

I chose to use the path-extraction in the absence of any defined path
extractors. When a path extractor is defined, you have to define all
columns as extractions. With the strict implementation, the field lookup
is effectively the same as the Decoder here. So given that I would
rather cut modality unless there's a really compelling reason.
  • Loading branch information
rmarrowstone committed Jan 14, 2025
1 parent 5e4dde0 commit bba2adc
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 71 deletions.
6 changes: 6 additions & 0 deletions lib/trino-hive-formats/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
<version>1.11.9</version>
</dependency>

<dependency>
<groupId>com.amazon.ion</groupId>
<artifactId>ion-java-path-extraction</artifactId>
<version>1.5.0</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.amazon.ion.IonException;
import com.amazon.ion.IonReader;
import io.trino.spi.PageBuilder;

public interface IonDecoder
{
Expand All @@ -25,6 +24,6 @@ public interface IonDecoder
* Expects that the calling code has called IonReader.next()
* to position the reader at the value to be decoded.
*/
void decode(IonReader reader, PageBuilder builder)
void decode(IonReader reader)
throws IonException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.hive.formats.ion;

import java.util.Map;

/**
* Captures the SerDe properties that affect decoding.
*
* @param pathExtractors Map of column name => ion paths
* for each entry in the map, the value bound to the column will be the result
* of extracting the given search path.
* @param strictTyping whether the path extractions should enforce type expectations.
* this only affects type checking of path extractions; any value decoded into
* a trino column will be correctly typed or coerced for that column.
* @param caseSensitive whether field name matching should be case-sensitive or not.
*/
public record IonDecoderConfig(Map<String, String> pathExtractors, Boolean strictTyping, Boolean caseSensitive)
{
static IonDecoderConfig defaultConfig()
{
return new IonDecoderConfig(Map.of(), false, false);
}

IonDecoderConfig withStrictTyping()
{
return new IonDecoderConfig(pathExtractors, true, caseSensitive);
}

IonDecoderConfig withCaseSensitive()
{
return new IonDecoderConfig(pathExtractors, strictTyping, true);
}

IonDecoderConfig withPathExtractors(Map<String, String> pathExtractors)
{
return new IonDecoderConfig(pathExtractors, strictTyping, caseSensitive);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
import com.amazon.ion.IonWriter;
import com.amazon.ion.Timestamp;
import com.amazon.ion.system.IonTextWriterBuilder;
import com.amazon.ionpathextraction.PathExtractor;
import com.amazon.ionpathextraction.PathExtractorBuilder;
import com.amazon.ionpathextraction.pathcomponents.Text;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slices;
import io.trino.hive.formats.DistinctMapKeys;
import io.trino.hive.formats.line.Column;
import io.trino.spi.PageBuilder;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.block.ArrayBlockBuilder;
Expand Down Expand Up @@ -65,8 +69,8 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.IntFunction;

public class IonDecoderFactory
Expand All @@ -79,39 +83,66 @@ private IonDecoderFactory() {}
* The decoder expects to decode the _current_ Ion Value.
* It also expects that the calling code will manage the PageBuilder.
* <p>
*
* @param strictPathing controls behavior when encountering mistyped
* values during path extraction. That is outside (before), the trino
* type model. The ion-hive-serde used path extraction for navigating
* the top-level-values even if no path extractions were configured.
* So, in absence of support for path extraction configurations this
* still affects the handling of mistyped top-level-values.
* todo: revisit the above once path extraction config is supported.
*/
public static IonDecoder buildDecoder(List<Column> columns, boolean strictPathing)
public static IonDecoder buildDecoder(
List<Column> columns,
IonDecoderConfig decoderConfig,
PageBuilder pageBuilder)
{
RowDecoder rowDecoder = RowDecoder.forFields(
columns.stream()
.map(c -> new RowType.Field(Optional.of(c.name()), c.type()))
.toList());
PathExtractorBuilder<PageExtractionContext> extractorBuilder = PathExtractorBuilder.<PageExtractionContext>standard()
.withMatchCaseInsensitive(!decoderConfig.caseSensitive());

return (ionReader, pageBuilder) -> {
IonType ionType = ionReader.getType();
IntFunction<BlockBuilder> blockSelector = pageBuilder::getBlockBuilder;
for (int pos = 0; pos < columns.size(); pos++) {
String name = columns.get(pos).name();
BlockDecoder decoder = decoderForType(columns.get(pos).type());
BiFunction<IonReader, PageExtractionContext, Integer> callback = callbackFor(decoder, pos);

if (ionType == IonType.STRUCT && !ionReader.isNullValue()) {
rowDecoder.decode(ionReader, blockSelector);
String extractionPath = decoderConfig.pathExtractors().get(name);
if (extractionPath == null) {
extractorBuilder.withSearchPath(List.of(new Text(name)), callback);
}
else if (ionType == IonType.STRUCT || ionType == IonType.NULL || !strictPathing) {
rowDecoder.appendNulls(blockSelector);
else {
extractorBuilder.withSearchPath(extractionPath, callback);
}
}
PathExtractor<PageExtractionContext> extractor = extractorBuilder.buildStrict(decoderConfig.strictTyping());
PageExtractionContext context = new PageExtractionContext(pageBuilder, new boolean[columns.size()]);

return (ionReader) -> {
extractor.matchCurrentValue(ionReader, context);
context.completeRowAndReset();
};
}

private static BiFunction<IonReader, PageExtractionContext, Integer> callbackFor(BlockDecoder decoder, int pos)
{
return (ionReader, context) -> {
BlockBuilder blockBuilder = context.pageBuilder.getBlockBuilder(pos);
if (context.encountered[pos]) {
blockBuilder.resetTo(blockBuilder.getPositionCount() - 1);
}
else {
throw new TrinoException(StandardErrorCode.GENERIC_USER_ERROR,
"Top-level-value of IonType %s is not valid with strict typing.".formatted(ionType));
context.encountered[pos] = true;
}

decoder.decode(ionReader, context.pageBuilder.getBlockBuilder(pos));
return 0;
};
}

private record PageExtractionContext(PageBuilder pageBuilder, boolean[] encountered)
{
private void completeRowAndReset()
{
for (int i = 0; i < encountered.length; i++) {
if (!encountered[i]) {
pageBuilder.getBlockBuilder(i).appendNull();
}
encountered[i] = false;
}
}
}

private interface BlockDecoder
{
void decode(IonReader reader, BlockBuilder builder);
Expand Down Expand Up @@ -169,10 +200,6 @@ private static BlockDecoder wrapDecoder(BlockDecoder decoder, Type trinoType, Io
};
}

/**
* The RowDecoder is used as the BlockDecoder for nested RowTypes and is used for decoding
* top-level structs into pages.
*/
private record RowDecoder(Map<String, Integer> fieldPositions, List<BlockDecoder> fieldDecoders)
implements BlockDecoder
{
Expand Down Expand Up @@ -224,13 +251,6 @@ private void decode(IonReader ionReader, IntFunction<BlockBuilder> blockSelector

ionReader.stepOut();
}

private void appendNulls(IntFunction<BlockBuilder> blockSelector)
{
for (int i = 0; i < fieldDecoders.size(); i++) {
blockSelector.apply(i).appendNull();
}
}
}

private static class MapDecoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.amazon.ion.IonWriter;
import com.amazon.ion.system.IonReaderBuilder;
import com.amazon.ion.system.IonSystemBuilder;
import com.amazon.ionpathextraction.exceptions.PathExtractionException;
import com.google.common.collect.ImmutableMap;
import io.trino.hive.formats.line.Column;
import io.trino.spi.Page;
Expand Down Expand Up @@ -48,6 +49,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

import static io.trino.hive.formats.FormatTestUtils.assertColumnValuesEquals;
Expand Down Expand Up @@ -108,18 +110,20 @@ public void testVariousTlvsStrict()
throws IOException
{
RowType rowType = RowType.rowType(field("foo", INTEGER), field("bar", VARCHAR));
IonDecoderConfig decoderConfig = IonDecoderConfig.defaultConfig().withStrictTyping();
List<Object> expected = new ArrayList<>(2);
expected.add(null);
expected.add(null);

assertValues(rowType,
decoderConfig,
// empty struct, untyped null, struct null, and explicitly typed null null, phew.
"{} null null.struct null.null",
expected, expected, expected, expected);

Assertions.assertThrows(TrinoException.class, () -> {
assertValues(rowType, "null.int", expected);
assertValues(rowType, "[]", expected);
Assertions.assertThrows(PathExtractionException.class, () -> {
assertValues(rowType, decoderConfig, "null.int", expected);
assertValues(rowType, decoderConfig, "[]", expected);
});
}

Expand All @@ -133,7 +137,6 @@ public void testVariousTlvsLax()
expected.add(null);

assertValues(rowType,
false,
"{} 37 null.list null.struct null spam false",
expected, expected, expected, expected, expected, expected, expected);
}
Expand Down Expand Up @@ -236,6 +239,20 @@ public void testCaseInsensitivityOfDuplicateKeys()
List.of(5, "baz"));
}

@Test
public void testCaseSensitiveExtraction()
throws IOException
{
assertValues(
RowType.rowType(
field("Foo", INTEGER),
field("Bar", VARCHAR)),
IonDecoderConfig.defaultConfig().withCaseSensitive(),
// assumes duplicate fields overwrite, which is asserted in the test above
"{ Bar: baz, bar: blegh, Foo: 31, foo: 67 }",
List.of(31, "baz"));
}

@Test
public void testStructWithNullAndMissingValues()
throws IOException
Expand Down Expand Up @@ -448,6 +465,53 @@ public void testNumbersTooBigForDecimal128()
}
}

@Test
public void testPathExtraction()
throws IOException
{
Map<String, String> pathExtractions = Map.of("bar", "(foo bar)", "baz", "(foo baz)");
assertValues(
RowType.rowType(field("qux", BOOLEAN), field("bar", INTEGER), field("baz", VARCHAR)),
IonDecoderConfig.defaultConfig().withPathExtractors(pathExtractions),
"{ foo: { bar: 31, baz: quux }, qux: true }",
List.of(true, 31, "quux"));
}

@Test
public void testNonStructTlvPathExtraction()
throws IOException
{
Map<String, String> pathExtractions = Map.of("tlv", "()");
assertValues(
RowType.rowType(field("tlv", new ArrayType(INTEGER))),
IonDecoderConfig.defaultConfig().withPathExtractors(pathExtractions),
"[13, 17] [19, 23]",
List.of(List.of(13, 17)),
List.of(List.of(19, 23)));
}

/**
* Shows how users can configure mapping sequence positions from Ion values to a Trino row.
*/
@Test
public void testPositionalPathExtraction()
throws IOException
{
Map<String, String> pathExtractions = Map.of(
"foo", "(0)",
"bar", "(1)");
RowType rowType = RowType.rowType(
field("foo", INTEGER),
field("bar", VARCHAR));

assertValues(
rowType,
IonDecoderConfig.defaultConfig().withPathExtractors(pathExtractions),
"[13, baz] [17, qux]",
List.of(13, "baz"),
List.of(17, "qux"));
}

@Test
public void testEncode()
throws IOException
Expand Down Expand Up @@ -531,10 +595,10 @@ public void testEncodeWithNullNestedField()
private void assertValues(RowType rowType, String ionText, List<Object>... expected)
throws IOException
{
assertValues(rowType, true, ionText, expected);
assertValues(rowType, IonDecoderConfig.defaultConfig(), ionText, expected);
}

private void assertValues(RowType rowType, Boolean strictTlvs, String ionText, List<Object>... expected)
private void assertValues(RowType rowType, IonDecoderConfig config, String ionText, List<Object>... expected)
throws IOException
{
List<RowType.Field> fields = rowType.getFields();
Expand All @@ -545,14 +609,14 @@ private void assertValues(RowType rowType, Boolean strictTlvs, String ionText, L
return new Column(field.getName().get(), field.getType(), i);
})
.toList();
IonDecoder decoder = IonDecoderFactory.buildDecoder(columns, strictTlvs);
PageBuilder pageBuilder = new PageBuilder(expected.length, rowType.getFields().stream().map(RowType.Field::getType).toList());
IonDecoder decoder = IonDecoderFactory.buildDecoder(columns, config, pageBuilder);

try (IonReader ionReader = IonReaderBuilder.standard().build(ionText)) {
for (int i = 0; i < expected.length; i++) {
assertThat(ionReader.next()).isNotNull();
pageBuilder.declarePosition();
decoder.decode(ionReader, pageBuilder);
decoder.decode(ionReader);
}
assertThat(ionReader.next()).isNull();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private boolean readNextValue()
}

pageBuilder.declarePosition();
decoder.decode(ionReader, pageBuilder);
decoder.decode(ionReader);
return true;
}
}
Loading

0 comments on commit bba2adc

Please sign in to comment.