Skip to content

Commit

Permalink
Minor Fixups
Browse files Browse the repository at this point in the history
This commit fixes up a few things I noticed when previewing the PR to
Trino.

* Column/Field name casing should be preserved when writing
* Some missing operational/metrics calls in IonPageSource
* Throw clearer Exception for errors in IonFileWriter
* Move some tests from TestHiveFileFormats to IonPageSourceSmokeTest
* Add test for Timestamp Encoding
  • Loading branch information
rmarrowstone committed Dec 19, 2024
1 parent f790bae commit f18f95d
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.function.IntFunction;

Expand All @@ -64,7 +63,7 @@ private IonEncoderFactory() {}
public static IonEncoder buildEncoder(List<Column> columns)
{
return RowEncoder.forFields(columns.stream()
.map(c -> new RowType.Field(Optional.of(c.name().toLowerCase(Locale.ROOT)), c.type()))
.map(c -> new RowType.Field(Optional.of(c.name()), c.type()))
.toList());
}

Expand All @@ -89,8 +88,7 @@ private static BlockEncoder encoderForType(Type type)
case DecimalType t -> decimalEncoder(t);
case DateType _ -> dateEncoder;
case TimestampType t -> timestampEncoder(t);
case MapType t -> new MapEncoder(t, t.getKeyType(),
encoderForType(t.getValueType()));
case MapType t -> new MapEncoder(t, t.getKeyType(), encoderForType(t.getValueType()));
case RowType t -> RowEncoder.forFields(t.getFields());
case ArrayType t -> new ArrayEncoder(wrapEncoder(encoderForType(t.getElementType())));
default -> throw new IllegalArgumentException(String.format("Unsupported type: %s", type));
Expand Down Expand Up @@ -119,7 +117,7 @@ private static RowEncoder forFields(List<RowType.Field> fields)
ImmutableList.Builder<BlockEncoder> fieldEncodersBuilder = ImmutableList.builder();

for (RowType.Field field : fields) {
fieldNamesBuilder.add(field.getName().get().toLowerCase(Locale.ROOT));
fieldNamesBuilder.add(field.getName().get());
fieldEncodersBuilder.add(wrapEncoder(encoderForType(field.getType())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,27 @@ public void testEncode()
assertIonEquivalence(TEST_COLUMNS, page, ionText);
}

@Test
public void testEncodeTimestamp()
throws IOException
{
List<Column> timestampColumn = List.of(new Column("my_ts", TimestampType.TIMESTAMP_NANOS, 0));
Page page = toPage(timestampColumn, List.of(
toSqlTimestamp(TimestampType.TIMESTAMP_NANOS, LocalDateTime.of(2024, 11, 23, 1, 23, 45, 666777888))));
assertIonEquivalence(timestampColumn, page, "{ my_ts: 2024-11-23T01:23:45.666777888Z }");
}

@Test
public void testEncodeMixedCaseColumn()
throws IOException
{
List<Column> casedColumn = List.of(
new Column("TheAnswer", INTEGER, 0));

Page page = toPage(casedColumn, List.of(42));
assertIonEquivalence(casedColumn, page, "{ TheAnswer: 42 }");
}

@Test
public void testEncodeWithNullField()
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.function.LongSupplier;

import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;

public class IonFileWriter
implements FileWriter
Expand Down Expand Up @@ -106,7 +107,7 @@ public void rollback()
writer.close();
}
catch (IOException e) {
throw new RuntimeException(e);
throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive", e);
}
}

Expand All @@ -123,7 +124,7 @@ public void appendRows(Page page)
pageEncoder.encode(writer, page);
}
catch (IOException e) {
throw new RuntimeException(e);
throw new TrinoException(HIVE_WRITER_DATA_ERROR, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ public Optional<FileWriter> createFileWriter(

Closeable rollbackAction = () -> fileSystem.deleteFile(location);

// we take the column names from the schema, not what was input
// this is what the LineWriterFactory does, I don't understand why
List<String> fileColumnNames = getColumnNames(schema);
List<Type> fileColumnTypes = getColumnTypes(schema).stream()
.map(hiveType -> getType(hiveType, typeManager, getTimestampPrecision(session)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.hive.ion;

import com.amazon.ion.IonBufferConfiguration;
import com.amazon.ion.IonReader;
import com.amazon.ion.IonType;
import io.trino.hive.formats.ion.IonDecoder;
Expand All @@ -24,9 +25,13 @@
import java.util.OptionalLong;
import java.util.function.LongSupplier;

import static io.airlift.slice.SizeOf.instanceSize;

public class IonPageSource
implements ConnectorPageSource
{
private static final int INSTANCE_SIZE = instanceSize(IonPageSource.class);

private final IonReader ionReader;
private final PageBuilder pageBuilder;
private final IonDecoder decoder;
Expand Down Expand Up @@ -86,7 +91,10 @@ public Page getNextPage()
@Override
public long getMemoryUsage()
{
return 4096;
// we don't have the ability to ask an IonReader how many bytes it has buffered
// it will buffer as much as is needed for each top-level-value.
int assumedIonBufferSize = IonBufferConfiguration.DEFAULT.getInitialBufferSize() * 4;
return INSTANCE_SIZE + assumedIonBufferSize + pageBuilder.getRetainedSizeInBytes();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@
import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR;
import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR;
import static io.trino.plugin.hive.HivePageSourceProvider.ColumnMapping.buildColumnMappings;
import static io.trino.plugin.hive.HivePartitionKey.HIVE_DEFAULT_DYNAMIC_PARTITION;
import static io.trino.plugin.hive.HiveStorageFormat.AVRO;
Expand All @@ -157,8 +156,6 @@
import static io.trino.plugin.hive.HiveTestUtils.getHiveSession;
import static io.trino.plugin.hive.HiveTestUtils.mapType;
import static io.trino.plugin.hive.acid.AcidTransaction.NO_ACID_TRANSACTION;
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_ENCODING_PROPERTY;
import static io.trino.plugin.hive.ion.IonWriterOptions.TEXT_ENCODING;
import static io.trino.plugin.hive.util.HiveTypeTranslator.toHiveType;
import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMNS;
import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMN_TYPES;
Expand Down Expand Up @@ -234,7 +231,6 @@ public final class TestHiveFileFormats
private static final FileFormatDataSourceStats STATS = new FileFormatDataSourceStats();
private static final ConnectorSession PARQUET_SESSION = getHiveSession(createParquetHiveConfig(false));
private static final ConnectorSession PARQUET_SESSION_USE_NAME = getHiveSession(createParquetHiveConfig(true));
private static final String ERROR_ENCODING = "error_encoding";

@DataProvider(name = "rowCount")
public static Object[][] rowCountProvider()
Expand Down Expand Up @@ -377,7 +373,8 @@ public void testIonWithBinaryEncoding(int rowCount, long fileSizePadding)
throws Exception
{
List<TestColumn> testColumns = TEST_COLUMNS.stream()
// todo: add support for maps to trino impl
// even though maps with text keys work with the native trino impl
// there is an error when testing against the hive serde
.filter(tc -> !(tc.type instanceof MapType))
.collect(toList());

Expand All @@ -394,54 +391,6 @@ public void testIonWithBinaryEncoding(int rowCount, long fileSizePadding)
.isReadableByPageSource(fileSystemFactory -> new IonPageSourceFactory(fileSystemFactory, hiveConfig));
}

@Test(dataProvider = "validRowAndFileSizePadding")
public void testIonWithTextEncoding(int rowCount, long fileSizePadding)
throws Exception
{
List<TestColumn> testColumns = TEST_COLUMNS.stream()
// todo: add support for maps to trino impl
.filter(tc -> !(tc.type instanceof MapType))
.collect(toList());

HiveConfig hiveConfig = new HiveConfig();
// enable Ion native trino integration for testing while the implementation is in progress
// TODO: In future this flag should change to `true` as default and then the following statement can be removed.
hiveConfig.setIonNativeTrinoEnabled(true);

assertThatFileFormat(ION)
.withColumns(testColumns)
.withRowsCount(rowCount)
.withFileSizePadding(fileSizePadding)
.withTableProperties(ImmutableMap.of(ION_ENCODING_PROPERTY, TEXT_ENCODING))
.withFileWriterFactory(fileSystemFactory -> new IonFileWriterFactory(fileSystemFactory, TESTING_TYPE_MANAGER))
.isReadableByPageSource(fileSystemFactory -> new IonPageSourceFactory(fileSystemFactory, hiveConfig));
}

@Test(dataProvider = "validRowAndFileSizePadding")
public void testInvalidIonEncoding(int rowCount, long fileSizePadding)
throws Exception
{
List<TestColumn> testColumns = TEST_COLUMNS.stream()
// todo: add support for maps to trino impl
.filter(tc -> !(tc.type instanceof MapType))
.collect(toList());

HiveConfig hiveConfig = new HiveConfig();
// enable Ion native trino integration for testing while the implementation is in progress
// TODO: In future this flag should change to `true` as default and then the following statement can be removed.
hiveConfig.setIonNativeTrinoEnabled(true);

assertTrinoExceptionThrownBy(() -> assertThatFileFormat(ION)
.withColumns(testColumns)
.withRowsCount(rowCount)
.withFileSizePadding(fileSizePadding)
.withTableProperties(ImmutableMap.of(ION_ENCODING_PROPERTY, ERROR_ENCODING))
.withFileWriterFactory(fileSystemFactory -> new IonFileWriterFactory(fileSystemFactory, TESTING_TYPE_MANAGER))
.isReadableByPageSource(fileSystemFactory -> new IonPageSourceFactory(fileSystemFactory, hiveConfig)))
.hasErrorCode(HIVE_WRITER_OPEN_ERROR)
.hasMessage("Error creating Ion Output");
}

@Test(dataProvider = "validRowAndFileSizePadding")
public void testRcTextPageSource(int rowCount, long fileSizePadding)
throws Exception
Expand Down Expand Up @@ -1275,7 +1224,6 @@ private static class FileFormatAssertion
private boolean skipGenericWrite;
private HiveFileWriterFactory fileWriterFactory;
private long fileSizePadding;
private Map<String, String> customTableProperties = ImmutableMap.of();

private final TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory();

Expand Down Expand Up @@ -1333,12 +1281,6 @@ public FileFormatAssertion withRowsCount(int rowsCount)
return this;
}

public FileFormatAssertion withTableProperties(Map<String, String> tableProperties)
{
this.customTableProperties = requireNonNull(tableProperties, "customTableProperties is null");
return this;
}

public FileFormatAssertion withSession(ConnectorSession session)
{
this.session = requireNonNull(session, "session is null");
Expand Down Expand Up @@ -1397,7 +1339,7 @@ private void assertRead(HivePageSourceFactory pageSourceFactory)
if (fileWriterFactory == null) {
continue;
}
createTestFileTrino(location, storageFormat, compressionCodec, writeColumns, session, rowsCount, fileWriterFactory, customTableProperties);
createTestFileTrino(location, storageFormat, compressionCodec, writeColumns, session, rowsCount, fileWriterFactory);
}
else {
if (skipGenericWrite) {
Expand Down Expand Up @@ -1427,8 +1369,7 @@ private static void createTestFileTrino(
List<TestColumn> testColumns,
ConnectorSession session,
int numRows,
HiveFileWriterFactory fileWriterFactory,
Map<String, String> customTableProperties)
HiveFileWriterFactory fileWriterFactory)
{
// filter out partition keys, which are not written to the file
testColumns = testColumns.stream()
Expand All @@ -1453,7 +1394,6 @@ private static void createTestFileTrino(
Map<String, String> tableProperties = ImmutableMap.<String, String>builder()
.put(LIST_COLUMNS, testColumns.stream().map(TestColumn::name).collect(Collectors.joining(",")))
.put(LIST_COLUMN_TYPES, testColumns.stream().map(TestColumn::type).map(HiveTypeTranslator::toHiveType).map(HiveType::toString).collect(Collectors.joining(",")))
.putAll(customTableProperties)
.buildOrThrow();

Optional<FileWriter> fileWriter = fileWriterFactory.createFileWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,16 @@ public void testBinaryEncoding()
assertEncoding(tableColumns, BINARY_ENCODING);
}

@Test
public void testBadEncodingName()
throws IOException
{
TestFixture fixture = new TestFixture(FOO_BAR_COLUMNS)
.withEncoding("unknown_encoding_name");

Assertions.assertThrows(TrinoException.class, fixture::getFileWriter);
}

private void assertEncoding(List<HiveColumnHandle> tableColumns,
String encoding)
throws IOException
Expand Down

0 comments on commit f18f95d

Please sign in to comment.