Skip to content

Commit

Permalink
Add support for legacy Date in Hive for Parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
marcinsbd committed Jan 17, 2025
1 parent 9b53e78 commit cac29f7
Show file tree
Hide file tree
Showing 13 changed files with 889 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.parquet.ParquetDataSourceId;
import io.trino.parquet.reader.ParquetReader;
import io.trino.plugin.hive.coercions.TypeCoercer;
import io.trino.plugin.hive.util.ValueAdjuster;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
Expand Down Expand Up @@ -181,6 +182,18 @@ public Builder addCoercedColumn(int sourceChannel, TypeCoercer<?, ?> typeCoercer
return this;
}

public Builder addCoercedColumn(int sourceChannel, ValueAdjuster<?> valueAdjuster, TypeCoercer<?, ?> typeCoercer)
{
columns.add(new CoercedColumn(new ValueAdjustedColumn(new SourceColumn(sourceChannel), valueAdjuster), typeCoercer));
return this;
}

public Builder addValueAdjustedColumn(int sourceChannel, ValueAdjuster<?> valueAdjuster)
{
columns.add(new ValueAdjustedColumn(new SourceColumn(sourceChannel), valueAdjuster));
return this;
}

public ConnectorPageSource build(ParquetReader parquetReader)
{
return new ParquetPageSource(parquetReader, this.columns.build());
Expand Down Expand Up @@ -309,10 +322,10 @@ public Block getBlock(Page sourcePage, long startRowId)
private static class CoercedColumn
implements ParquetPageSource.ColumnAdaptation
{
private final ParquetPageSource.SourceColumn sourceColumn;
private final ColumnAdaptation sourceColumn;
private final TypeCoercer<?, ?> typeCoercer;

public CoercedColumn(ParquetPageSource.SourceColumn sourceColumn, TypeCoercer<?, ?> typeCoercer)
public CoercedColumn(ColumnAdaptation sourceColumn, TypeCoercer<?, ?> typeCoercer)
{
this.sourceColumn = requireNonNull(sourceColumn, "sourceColumn is null");
this.typeCoercer = requireNonNull(typeCoercer, "typeCoercer is null");
Expand All @@ -336,6 +349,35 @@ public String toString()
}
}

private static class ValueAdjustedColumn
implements ColumnAdaptation
{
private final ColumnAdaptation sourceColumn;
private final ValueAdjuster<?> valueAdjuster;

private ValueAdjustedColumn(ColumnAdaptation sourceColumn, ValueAdjuster<?> valueAdjuster)
{
this.sourceColumn = requireNonNull(sourceColumn, "sourceColumn is null");
this.valueAdjuster = requireNonNull(valueAdjuster, "valueAdjustable is null");
}

@Override
public Block getBlock(Page sourcePage, long startRowId)
{
Block block = sourceColumn.getBlock(sourcePage, startRowId);
return new LazyBlock(block.getPositionCount(), () -> valueAdjuster.apply(block.getLoadedBlock()));
}

@Override
public String toString()
{
return toStringHelper(this)
.add("sourceColumn", sourceColumn)
.add("forType", valueAdjuster.getForType())
.toString();
}
}

private static Block createRowNumberBlock(long baseIndex, int size)
{
long[] rowIndices = new long[size];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@
import io.trino.plugin.hive.Schema;
import io.trino.plugin.hive.acid.AcidTransaction;
import io.trino.plugin.hive.coercions.TypeCoercer;
import io.trino.plugin.hive.util.ValueAdjuster;
import io.trino.plugin.hive.util.ValueAdjusters;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TimestampType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.io.ColumnIO;
import org.apache.parquet.io.MessageColumnIO;
Expand Down Expand Up @@ -99,6 +102,8 @@
import static io.trino.plugin.hive.parquet.ParquetPageSource.handleException;
import static io.trino.plugin.hive.parquet.ParquetTypeTranslator.createCoercer;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateType.DATE;
import static java.lang.Boolean.parseBoolean;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

Expand All @@ -118,6 +123,8 @@ public class ParquetPageSourceFactory
Optional.empty(),
HiveColumnHandle.ColumnType.SYNTHESIZED,
Optional.empty());
// Hive's key used in file footer's metadata to document which calendar (hybrid or proleptic Gregorian) was used for write Date type
public static final String HIVE_METADATA_KEY_WRITER_DATE_PROLEPTIC = "writer.date.proleptic";

private static final Set<String> PARQUET_SERDE_CLASS_NAMES = ImmutableSet.<String>builder()
.add(PARQUET_HIVE_SERDE_CLASS)
Expand Down Expand Up @@ -224,6 +231,8 @@ public static ReaderPageSource createPageSource(
FileMetadata fileMetaData = parquetMetadata.getFileMetaData();
fileSchema = fileMetaData.getSchema();

boolean convertDateToProleptic = shouldConvertDateToProleptic(fileMetaData.getKeyValueMetaData());

Optional<MessageType> message = getParquetMessageType(columns, useColumnNames, fileSchema);

requestedSchema = message.orElse(new MessageType(fileSchema.getName(), ImmutableList.of()));
Expand Down Expand Up @@ -264,7 +273,7 @@ public static ReaderPageSource createPageSource(
List<HiveColumnHandle> baseColumns = readerProjections.map(projection ->
projection.get().stream()
.map(HiveColumnHandle.class::cast)
.collect(ImmutableList.toImmutableList()))
.collect(toImmutableList()))
.orElse(ImmutableList.copyOf(columns));

ParquetDataSourceId dataSourceId = dataSource.getId();
Expand All @@ -282,7 +291,7 @@ public static ReaderPageSource createPageSource(
// are not present in the Parquet files which are read with disjunct predicates.
parquetPredicates.size() == 1 ? Optional.of(parquetPredicates.getFirst()) : Optional.empty(),
parquetWriteValidation);
ConnectorPageSource parquetPageSource = createParquetPageSource(baseColumns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider);
ConnectorPageSource parquetPageSource = createParquetPageSource(baseColumns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider, convertDateToProleptic);
return new ReaderPageSource(parquetPageSource, readerProjections);
}
catch (Exception e) {
Expand Down Expand Up @@ -421,6 +430,18 @@ public static ConnectorPageSource createParquetPageSource(
boolean useColumnNames,
ParquetReaderProvider parquetReaderProvider)
throws IOException
{
return createParquetPageSource(baseColumns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider, false);
}

public static ConnectorPageSource createParquetPageSource(
List<HiveColumnHandle> baseColumns,
MessageType fileSchema,
MessageColumnIO messageColumn,
boolean useColumnNames,
ParquetReaderProvider parquetReaderProvider,
boolean convertDateToProleptic)
throws IOException
{
ParquetPageSource.Builder pageSourceBuilder = ParquetPageSource.builder();
ImmutableList.Builder<Column> parquetColumnFieldsBuilder = ImmutableList.builder();
Expand All @@ -439,10 +460,14 @@ public static ConnectorPageSource createParquetPageSource(
String columnName = useColumnNames ? column.getBaseColumnName() : fileSchema.getFields().get(column.getBaseHiveColumnIndex()).getName();

Optional<TypeCoercer<?, ?>> coercer = Optional.empty();
Optional<ValueAdjuster<?>> valueAdjuster = Optional.empty();
ColumnIO columnIO = lookupColumnByName(messageColumn, columnName);
if (columnIO != null && columnIO.getType().isPrimitive()) {
PrimitiveType primitiveType = columnIO.getType().asPrimitiveType();
coercer = createCoercer(primitiveType.getPrimitiveTypeName(), primitiveType.getLogicalTypeAnnotation(), column.getBaseType());
if (convertDateToProleptic && (column.getBaseType().equals(DATE) || column.getBaseType() instanceof TimestampType)) {
valueAdjuster = ValueAdjusters.createValueAdjuster(column.getBaseType());
}
}

io.trino.spi.type.Type readType = coercer.map(TypeCoercer::getFromType).orElseGet(column::getBaseType);
Expand All @@ -452,10 +477,17 @@ public static ConnectorPageSource createParquetPageSource(
pageSourceBuilder.addNullColumn(readType);
continue;
}

parquetColumnFieldsBuilder.add(new Column(columnName, field.get()));
if (coercer.isPresent()) {
if (coercer.isPresent() && valueAdjuster.isPresent()) {
pageSourceBuilder.addCoercedColumn(sourceChannel, valueAdjuster.get(), coercer.get());
}
else if (coercer.isPresent()) {
pageSourceBuilder.addCoercedColumn(sourceChannel, coercer.get());
}
else if (valueAdjuster.isPresent()) {
pageSourceBuilder.addValueAdjustedColumn(sourceChannel, valueAdjuster.get());
}
else {
pageSourceBuilder.addSourceColumn(sourceChannel);
}
Expand All @@ -465,6 +497,12 @@ public static ConnectorPageSource createParquetPageSource(
return pageSourceBuilder.build(parquetReaderProvider.createParquetReader(parquetColumnFieldsBuilder.build()));
}

private static boolean shouldConvertDateToProleptic(Map<String, String> keyValueMetaData)
{
// if entry exists and explicitly states 'false' then we should convert to Proleptic, in other case no
return keyValueMetaData.containsKey(HIVE_METADATA_KEY_WRITER_DATE_PROLEPTIC) && !parseBoolean(keyValueMetaData.get(HIVE_METADATA_KEY_WRITER_DATE_PROLEPTIC));
}

private static Optional<org.apache.parquet.schema.Type> getBaseColumnParquetType(HiveColumnHandle column, MessageType messageType, boolean useParquetColumnNames)
{
if (useParquetColumnNames) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.util;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.TimeZone;

import static java.time.ZoneOffset.UTC;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public final class CalendarUtils
{
static final LocalDate GREGORIAN_START_DATE = LocalDate.of(1582, 10, 15);
static final LocalDate JULIAN_END_DATE = LocalDate.of(1582, 10, 4);

private static final TimeZone TZ_UTC = TimeZone.getTimeZone(UTC);
private static final String DATE_FORMAT = "yyyy-MM-dd";
private static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";

static final ThreadLocal<SimpleDateFormat> HYBRID_CALENDAR_DATE_FORMAT = ThreadLocal.withInitial(() -> {
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
format.setCalendar(new GregorianCalendar(TZ_UTC));
return format;
});

static final ThreadLocal<SimpleDateFormat> HYBRID_CALENDAR_DATE_TIME_FORMAT = ThreadLocal.withInitial(() -> {
SimpleDateFormat format = new SimpleDateFormat(DATE_TIME_FORMAT);
format.setCalendar(new GregorianCalendar(TZ_UTC));
return format;
});

static final ThreadLocal<SimpleDateFormat> PROLEPTIC_CALENDAR_DATE_FORMAT = ThreadLocal.withInitial(() -> {
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
GregorianCalendar prolepticGregorianCalendar = new GregorianCalendar(TZ_UTC);
prolepticGregorianCalendar.setGregorianChange(new Date(Long.MIN_VALUE));
format.setCalendar(prolepticGregorianCalendar);
return format;
});

static final ThreadLocal<SimpleDateFormat> PROLEPTIC_CALENDAR_DATE_TIME_FORMAT = ThreadLocal.withInitial(() -> {
SimpleDateFormat format = new SimpleDateFormat(DATE_TIME_FORMAT);
GregorianCalendar prolepticGregorianCalendar = new GregorianCalendar(TZ_UTC);
prolepticGregorianCalendar.setGregorianChange(new Date(Long.MIN_VALUE));
format.setCalendar(prolepticGregorianCalendar);
return format;
});

private static final long LAST_SWITCH_JULIAN_DAY_MILLIS;
private static final long LAST_SWITCH_JULIAN_DAY;

static {
try {
LAST_SWITCH_JULIAN_DAY_MILLIS = HYBRID_CALENDAR_DATE_FORMAT.get().parse("1582-10-15").getTime();
LAST_SWITCH_JULIAN_DAY = MILLISECONDS.toDays(LAST_SWITCH_JULIAN_DAY_MILLIS);
}
catch (ParseException e) {
throw new RuntimeException(e);
}
}

private CalendarUtils() {}

public static int convertDaysToProlepticGregorian(int julianDays)
{
if (julianDays < LAST_SWITCH_JULIAN_DAY) {
return convertDaysToProlepticDaysInternal(julianDays);
}
return julianDays;
}

private static int convertDaysToProlepticDaysInternal(int hybridDays)
{
long hybridMillis = DAYS.toMillis(hybridDays);
String hybridDateInString = HYBRID_CALENDAR_DATE_FORMAT.get().format(new Date(hybridMillis));
long result;
try {
result = PROLEPTIC_CALENDAR_DATE_FORMAT.get().parse(hybridDateInString).getTime();
}
catch (ParseException e) {
throw new RuntimeException(e);
}
long prolepticMillis = result;
return (int) MILLISECONDS.toDays(prolepticMillis);
}

public static long convertTimestampToProlepticGregorian(long epochMillis)
{
if (epochMillis < LAST_SWITCH_JULIAN_DAY_MILLIS) {
String dateTimeInString = HYBRID_CALENDAR_DATE_TIME_FORMAT.get().format(new Date(epochMillis));
try {
return PROLEPTIC_CALENDAR_DATE_TIME_FORMAT.get().parse(dateTimeInString).getTime();
}
catch (ParseException e) {
throw new RuntimeException(e);
}
}
return epochMillis;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.util;

import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.type.Type;

import java.util.function.Function;

import static java.util.Objects.requireNonNull;

public abstract class ValueAdjuster<T extends Type>
implements Function<Block, Block>
{
protected final T forType;

protected ValueAdjuster(T forType)
{
this.forType = requireNonNull(forType);
}

@Override
public Block apply(Block block)
{
BlockBuilder blockBuilder = forType.createBlockBuilder(null, block.getPositionCount());

for (int i = 0; i < block.getPositionCount(); i++) {
if (block.isNull(i)) {
blockBuilder.appendNull();
continue;
}
adjustValue(blockBuilder, block, i);
}
return blockBuilder.build();
}

protected abstract void adjustValue(BlockBuilder blockBuilder, Block block, int i);

public Type getForType()
{
return forType;
}
}
Loading

0 comments on commit cac29f7

Please sign in to comment.