Skip to content

Commit

Permalink
Add support for legacy Date in Hive for Orc
Browse files Browse the repository at this point in the history
  • Loading branch information
marcinsbd committed Jan 14, 2025
1 parent 9c9f15b commit 68abef8
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 14 deletions.
4 changes: 3 additions & 1 deletion lib/trino-orc/src/main/java/io/trino/orc/OrcWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import static io.trino.orc.OrcWriterStats.FlushReason.DICTIONARY_FULL;
import static io.trino.orc.OrcWriterStats.FlushReason.MAX_BYTES;
import static io.trino.orc.OrcWriterStats.FlushReason.MAX_ROWS;
import static io.trino.orc.metadata.CalendarKind.PROLEPTIC_GREGORIAN;
import static io.trino.orc.metadata.ColumnEncoding.ColumnEncodingKind.DIRECT;
import static io.trino.orc.metadata.OrcColumnId.ROOT_COLUMN;
import static io.trino.orc.metadata.PostScript.MAGIC;
Expand Down Expand Up @@ -531,7 +532,8 @@ private List<OrcDataOutput> bufferFileFooter()
orcTypes,
fileStats,
userMetadata,
Optional.empty()); // writer id will be set by MetadataWriter
Optional.empty(), // writer id will be set by MetadataWriter
PROLEPTIC_GREGORIAN);

closedStripes.clear();
closedStripesRetainedBytes = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.orc.metadata;

public enum CalendarKind
{
UNKNOWN_CALENDAR,
JULIAN_GREGORIAN,
PROLEPTIC_GREGORIAN;
}
11 changes: 10 additions & 1 deletion lib/trino-orc/src/main/java/io/trino/orc/metadata/Footer.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class Footer
private final Optional<ColumnMetadata<ColumnStatistics>> fileStats;
private final Map<String, Slice> userMetadata;
private final Optional<Integer> writerId;
private final CalendarKind calendar;

public Footer(
long numberOfRows,
Expand All @@ -45,7 +46,8 @@ public Footer(
ColumnMetadata<OrcType> types,
Optional<ColumnMetadata<ColumnStatistics>> fileStats,
Map<String, Slice> userMetadata,
Optional<Integer> writerId)
Optional<Integer> writerId,
CalendarKind calendar)
{
this.numberOfRows = numberOfRows;
rowsInRowGroup.ifPresent(value -> checkArgument(value > 0, "rowsInRowGroup must be at least 1"));
Expand All @@ -56,6 +58,7 @@ public Footer(
requireNonNull(userMetadata, "userMetadata is null");
this.userMetadata = ImmutableMap.copyOf(transformValues(userMetadata, Slice::copy));
this.writerId = requireNonNull(writerId, "writerId is null");
this.calendar = requireNonNull(calendar, "calendar is null");
}

public long getNumberOfRows()
Expand Down Expand Up @@ -93,6 +96,11 @@ public Optional<Integer> getWriterId()
return writerId;
}

public CalendarKind getCalendar()
{
return calendar;
}

@Override
public String toString()
{
Expand All @@ -104,6 +112,7 @@ public String toString()
.add("columnStatistics", fileStats)
.add("userMetadata", userMetadata.keySet())
.add("writerId", writerId)
.add("calendar", calendar)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public Footer readFooter(HiveWriterVersion hiveWriterVersion, InputStream inputS
toType(footer.getTypesList()),
toColumnStatistics(hiveWriterVersion, footer.getStatisticsList(), false),
toUserMetadata(footer.getMetadataList()),
Optional.of(footer.getWriter()));
Optional.of(footer.getWriter()),
toCalendarKind(footer.getCalendar()));
}

private static List<StripeInformation> toStripeInformation(List<OrcProto.StripeInformation> types)
Expand Down Expand Up @@ -409,6 +410,15 @@ private static BinaryStatistics toBinaryStatistics(OrcProto.BinaryStatistics bin
return new BinaryStatistics(binaryStatistics.getSum());
}

private static CalendarKind toCalendarKind(OrcProto.CalendarKind calendarKind)
{
return switch (calendarKind) {
case UNKNOWN_CALENDAR -> CalendarKind.UNKNOWN_CALENDAR;
case JULIAN_GREGORIAN -> CalendarKind.JULIAN_GREGORIAN;
case PROLEPTIC_GREGORIAN -> CalendarKind.PROLEPTIC_GREGORIAN;
};
}

private static Slice byteStringToSlice(ByteString value)
{
return Slices.wrappedBuffer(value.toByteArray());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ public int writeFooter(SliceOutput output, Footer footer)
.collect(toList()))
.addAllMetadata(footer.getUserMetadata().entrySet().stream()
.map(OrcMetadataWriter::toUserMetadata)
.collect(toList()));
.collect(toList()))
.setCalendar(toCalendarKind(footer.getCalendar()));

setWriter(builder);

Expand Down Expand Up @@ -361,6 +362,15 @@ private static OrcProto.Stream.Kind toStreamKind(StreamKind streamKind)
throw new IllegalArgumentException("Unsupported stream kind: " + streamKind);
}

private static OrcProto.CalendarKind toCalendarKind(CalendarKind calendarKind)
{
return switch (calendarKind) {
case UNKNOWN_CALENDAR -> OrcProto.CalendarKind.UNKNOWN_CALENDAR;
case JULIAN_GREGORIAN -> OrcProto.CalendarKind.JULIAN_GREGORIAN;
case PROLEPTIC_GREGORIAN -> OrcProto.CalendarKind.PROLEPTIC_GREGORIAN;
};
}

private static OrcProto.ColumnEncoding toColumnEncoding(ColumnEncoding columnEncodings)
{
return OrcProto.ColumnEncoding.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.plugin.base.metrics.LongCount;
import io.trino.plugin.hive.coercions.TypeCoercer;
import io.trino.plugin.hive.orc.OrcDeletedRows.MaskDeletedRowsFunction;
import io.trino.plugin.hive.util.ValueAdjuster;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
Expand Down Expand Up @@ -271,6 +272,16 @@ static ColumnAdaptation coercedColumn(int index, TypeCoercer<?, ?> typeCoercer)
return new CoercedColumn(sourceColumn(index), typeCoercer);
}

static ColumnAdaptation coercedColumn(int sourceIndex, TypeCoercer<?, ?> typeCoercer, ValueAdjuster<?> valueAdjuster)
{
return new CoercedColumn(new ValueAdjustedColumn(sourceColumn(sourceIndex), valueAdjuster), typeCoercer);
}

static ColumnAdaptation valueAdjustedColumn(int sourceIndex, ValueAdjuster<?> valueAdjuster)
{
return new ValueAdjustedColumn(sourceColumn(sourceIndex), valueAdjuster);
}

static ColumnAdaptation constantColumn(Block singleValueBlock)
{
return new ConstantAdaptation(singleValueBlock);
Expand Down Expand Up @@ -401,6 +412,35 @@ public String toString()
}
}

private static class ValueAdjustedColumn
implements ColumnAdaptation
{
private final ColumnAdaptation delegate;
private final ValueAdjuster<?> valueAdjuster;

public ValueAdjustedColumn(ColumnAdaptation delegate, ValueAdjuster<?> valueAdjuster)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.valueAdjuster = requireNonNull(valueAdjuster, "valueAdjustable is null");
}

@Override
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId)
{
Block block = delegate.block(sourcePage, maskDeletedRowsFunction, filePosition, startRowId);
return new LazyBlock(block.getPositionCount(), () -> valueAdjuster.apply(block.getLoadedBlock()));
}

@Override
public String toString()
{
return toStringHelper(this)
.add("delegate", delegate)
.add("forType", valueAdjuster.getForType())
.toString();
}
}

/*
* The rowId contains the ACID columns - - originalTransaction, rowId, bucket
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.trino.orc.OrcRecordReader;
import io.trino.orc.TupleDomainOrcPredicate;
import io.trino.orc.TupleDomainOrcPredicate.TupleDomainOrcPredicateBuilder;
import io.trino.orc.metadata.CalendarKind;
import io.trino.orc.metadata.OrcType.OrcTypeKind;
import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
import io.trino.plugin.hive.AcidInfo;
Expand All @@ -46,6 +47,8 @@
import io.trino.plugin.hive.acid.AcidTransaction;
import io.trino.plugin.hive.coercions.TypeCoercer;
import io.trino.plugin.hive.orc.OrcPageSource.ColumnAdaptation;
import io.trino.plugin.hive.util.ValueAdjuster;
import io.trino.plugin.hive.util.ValueAdjusters;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -107,7 +110,6 @@
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableList;

public class OrcPageSourceFactory
implements HivePageSourceFactory
Expand Down Expand Up @@ -188,7 +190,7 @@ public Optional<ReaderPageSource> createPageSource(
if (readerColumns.isPresent()) {
readerColumnHandles = readerColumns.get().get().stream()
.map(HiveColumnHandle.class::cast)
.collect(toUnmodifiableList());
.toList();
}

ConnectorPageSource orcPageSource = createOrcPageSource(
Expand Down Expand Up @@ -274,6 +276,7 @@ private ConnectorPageSource createOrcPageSource(
if (!originalFile && acidInfo.isPresent() && !acidInfo.get().isOrcAcidVersionValidated()) {
validateOrcAcidVersion(path, reader);
}
boolean convertDateToProleptic = reader.getFooter().getCalendar().equals(CalendarKind.JULIAN_GREGORIAN);

List<OrcColumn> fileColumns = reader.getRootColumn().getNestedColumns();
int actualColumnCount = columns.size() + (isFullAcid ? 3 : 0);
Expand Down Expand Up @@ -359,10 +362,22 @@ else if (column.getBaseHiveColumnIndex() < fileColumns.size()) {
if (orcColumn != null) {
int sourceIndex = fileReadColumns.size();
Optional<TypeCoercer<?, ?>> coercer = createCoercer(orcColumn.getColumnType(), orcColumn.getNestedColumns(), readType);
if (coercer.isPresent()) {
Optional<ValueAdjuster<?>> valueAdjuster = Optional.empty();
if (convertDateToProleptic) {
valueAdjuster = ValueAdjusters.createValueAdjuster(column.getBaseType());
}
if (coercer.isPresent() && valueAdjuster.isPresent()) {
fileReadTypes.add(coercer.get().getFromType());
columnAdaptations.add(ColumnAdaptation.coercedColumn(sourceIndex, coercer.get(), valueAdjuster.get()));
}
else if (coercer.isPresent()) {
fileReadTypes.add(coercer.get().getFromType());
columnAdaptations.add(ColumnAdaptation.coercedColumn(sourceIndex, coercer.get()));
}
else if (valueAdjuster.isPresent()) {
fileReadTypes.add(valueAdjuster.get().getForType());
columnAdaptations.add(ColumnAdaptation.valueAdjustedColumn(sourceIndex, valueAdjuster.get()));
}
else {
columnAdaptations.add(ColumnAdaptation.sourceColumn(sourceIndex));
fileReadTypes.add(readType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,22 @@ public void setUp()
@Test(groups = {HIVE4_OSS, PROFILE_SPECIFIC_TESTS})
public void testHiveParquetLegacyDateCompatibility()
{
String hiveTableName = "test_hive_%s_legacy_date_compatibility_%s".formatted("PARQUET".toLowerCase(ENGLISH), randomNameSuffix());
testHiveLegacyDateCompatibility("PARQUET");
}

@Test(groups = {HIVE4_OSS, PROFILE_SPECIFIC_TESTS})
public void testHiveOrcLegacyDateCompatibility()
{
testHiveLegacyDateCompatibility("ORC");
}

private void testHiveLegacyDateCompatibility(String format)
{
String hiveTableName = "test_hive_%s_legacy_date_compatibility_%s".formatted(format.toLowerCase(ENGLISH), randomNameSuffix());
String trinoTableName = format("%s.default.%s", TRINO_CATALOG, hiveTableName);

try {
onHive().executeQuery(format("CREATE TABLE default.%s (value integer, date_col date) STORED AS %s LOCATION 's3://%s/%s'", hiveTableName, "PARQUET", bucketName, hiveTableName));
onHive().executeQuery(format("CREATE TABLE default.%s (value integer, date_col date) STORED AS %s LOCATION 's3://%s/%s'", hiveTableName, format, bucketName, hiveTableName));
onHive().executeQuery(format("INSERT INTO %s VALUES (1, '2022-04-13')", hiveTableName));
onHive().executeQuery(format("INSERT INTO %s VALUES (2, '1584-09-15')", hiveTableName));
onHive().executeQuery(format("INSERT INTO %s VALUES (3, '1584-09-10')", hiveTableName));
Expand Down Expand Up @@ -93,14 +104,21 @@ public void testHiveParquetLegacyDateCompatibility()
@Test(groups = {HIVE4_OSS, PROFILE_SPECIFIC_TESTS})
public void testHiveParquetLegacyTimestampCompatibility()
{
testHiveLegacyTimestampCompatibility(MILLISECONDS, "123");
testHiveLegacyTimestampCompatibility(MICROSECONDS, "123456");
testHiveLegacyTimestampCompatibility(NANOSECONDS, "123456789");
testHiveLegacyTimestampCompatibility("PARQUET", MILLISECONDS, "123");
testHiveLegacyTimestampCompatibility("PARQUET", MICROSECONDS, "123456");
testHiveLegacyTimestampCompatibility("PARQUET", NANOSECONDS, "123456789");
}

@Test(groups = {HIVE4_OSS, PROFILE_SPECIFIC_TESTS})
public void testHiveOrcLegacyTimestampCompatibility()
{
testHiveLegacyTimestampCompatibility("ORC", MILLISECONDS, "123");
testHiveLegacyTimestampCompatibility("ORC", MICROSECONDS, "123456");
testHiveLegacyTimestampCompatibility("ORC", NANOSECONDS, "123456789");
}

private void testHiveLegacyTimestampCompatibility(HiveTimestampPrecision hiveTimestampPrecision, String fractionalPart)
private void testHiveLegacyTimestampCompatibility(String format, HiveTimestampPrecision hiveTimestampPrecision, String fractionalPart)
{
String format = "PARQUET";
String hiveTableName = "test_hive_%s_legacy_tmst_compatibility_%s".formatted(format.toLowerCase(ENGLISH), randomNameSuffix());
String trinoTableName = format("%s.default.%s", TRINO_CATALOG, hiveTableName);

Expand Down

0 comments on commit 68abef8

Please sign in to comment.