diff --git a/lib/trino-hive-formats/pom.xml b/lib/trino-hive-formats/pom.xml index 44bf90e72f63..6eae3c69c77a 100644 --- a/lib/trino-hive-formats/pom.xml +++ b/lib/trino-hive-formats/pom.xml @@ -17,6 +17,19 @@ + + + com.amazon.ion + ion-java + ${dep.ion.version} + + + + com.amazon.ion + ion-java-path-extraction + ${dep.ion-path-extraction.version} + + com.fasterxml.jackson.core jackson-core diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/HiveClassNames.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/HiveClassNames.java index 43a357b59bd9..f381758603f8 100644 --- a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/HiveClassNames.java +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/HiveClassNames.java @@ -27,6 +27,9 @@ public final class HiveClassNames public static final String HUDI_PARQUET_REALTIME_INPUT_FORMAT = "org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat"; public static final String HUDI_INPUT_FORMAT = "com.uber.hoodie.hadoop.HoodieInputFormat"; public static final String HUDI_REALTIME_INPUT_FORMAT = "com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat"; + public static final String ION_SERDE_CLASS = "com.amazon.ionhiveserde.IonHiveSerDe"; + public static final String ION_INPUT_FORMAT = "com.amazon.ionhiveserde.formats.IonInputFormat"; + public static final String ION_OUTPUT_FORMAT = "com.amazon.ionhiveserde.formats.IonOutputFormat"; public static final String JSON_SERDE_CLASS = "org.apache.hive.hcatalog.data.JsonSerDe"; public static final String LEGACY_JSON_SERDE_CLASS = "org.apache.hadoop.hive.serde2.JsonSerDe"; public static final String OPENX_JSON_SERDE_CLASS = "org.openx.data.jsonserde.JsonSerDe"; diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonDecoder.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonDecoder.java new file mode 100644 index 000000000000..4463bc79c500 --- /dev/null +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonDecoder.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.hive.formats.ion; + +import com.amazon.ion.IonException; +import com.amazon.ion.IonReader; + +public interface IonDecoder +{ + /** + * Reads the _current_ top-level-value from the IonReader. + *

+ * Expects that the calling code has called IonReader.next() + * to position the reader at the value to be decoded. + */ + void decode(IonReader reader) + throws IonException; +} diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonDecoderConfig.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonDecoderConfig.java new file mode 100644 index 000000000000..755d00ae9010 --- /dev/null +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonDecoderConfig.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.hive.formats.ion; + +import java.util.Map; + +/** + * Captures the SerDe properties that affect decoding. + * + * @param pathExtractors Map of column name => ion paths + * for each entry in the map, the value bound to the column will be the result + * of extracting the given search path. + * @param strictTyping whether the path extractions should enforce type expectations. + * this only affects type checking of path extractions; any value decoded into + * a trino column will be correctly typed or coerced for that column. + * @param caseSensitive whether field name matching should be case-sensitive or not. + */ +public record IonDecoderConfig(Map pathExtractors, Boolean strictTyping, Boolean caseSensitive) +{ + static IonDecoderConfig defaultConfig() + { + return new IonDecoderConfig(Map.of(), false, false); + } + + IonDecoderConfig withStrictTyping() + { + return new IonDecoderConfig(pathExtractors, true, caseSensitive); + } + + IonDecoderConfig withCaseSensitive() + { + return new IonDecoderConfig(pathExtractors, strictTyping, true); + } + + IonDecoderConfig withPathExtractors(Map pathExtractors) + { + return new IonDecoderConfig(pathExtractors, strictTyping, caseSensitive); + } +} diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonDecoderFactory.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonDecoderFactory.java new file mode 100644 index 000000000000..d31963bd9301 --- /dev/null +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonDecoderFactory.java @@ -0,0 +1,487 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.hive.formats.ion; + +import com.amazon.ion.IonException; +import com.amazon.ion.IonReader; +import com.amazon.ion.IonType; +import com.amazon.ion.IonWriter; +import com.amazon.ion.Timestamp; +import com.amazon.ion.system.IonTextWriterBuilder; +import com.amazon.ionpathextraction.PathExtractor; +import com.amazon.ionpathextraction.PathExtractorBuilder; +import com.amazon.ionpathextraction.pathcomponents.Text; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.slice.Slices; +import io.trino.hive.formats.DistinctMapKeys; +import io.trino.hive.formats.line.Column; +import io.trino.spi.PageBuilder; +import io.trino.spi.StandardErrorCode; +import io.trino.spi.TrinoException; +import io.trino.spi.block.ArrayBlockBuilder; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.block.MapBlockBuilder; +import io.trino.spi.block.RowBlockBuilder; +import io.trino.spi.block.ValueBlock; +import io.trino.spi.type.ArrayType; +import io.trino.spi.type.BigintType; +import io.trino.spi.type.BooleanType; +import io.trino.spi.type.CharType; +import io.trino.spi.type.Chars; +import io.trino.spi.type.DateType; +import io.trino.spi.type.DecimalType; +import io.trino.spi.type.Decimals; +import io.trino.spi.type.DoubleType; +import io.trino.spi.type.Int128; +import io.trino.spi.type.IntegerType; +import io.trino.spi.type.LongTimestamp; +import io.trino.spi.type.MapType; +import io.trino.spi.type.RealType; +import io.trino.spi.type.RowType; +import io.trino.spi.type.SmallintType; +import io.trino.spi.type.TimestampType; +import io.trino.spi.type.Timestamps; +import io.trino.spi.type.TinyintType; +import io.trino.spi.type.Type; +import io.trino.spi.type.VarbinaryType; +import io.trino.spi.type.VarcharType; +import io.trino.spi.type.Varchars; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.RoundingMode; +import java.time.LocalDate; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.IntFunction; + +public class IonDecoderFactory +{ + private IonDecoderFactory() {} + + /** + * Builds a decoder for the given columns. + *

+ * The decoder expects to decode the _current_ Ion Value. + * It also expects that the calling code will manage the PageBuilder. + *

+ */ + public static IonDecoder buildDecoder( + List columns, + IonDecoderConfig decoderConfig, + PageBuilder pageBuilder) + { + PathExtractorBuilder extractorBuilder = PathExtractorBuilder.standard() + .withMatchCaseInsensitive(!decoderConfig.caseSensitive()); + + for (int pos = 0; pos < columns.size(); pos++) { + String name = columns.get(pos).name(); + BlockDecoder decoder = decoderForType(columns.get(pos).type()); + BiFunction callback = callbackFor(decoder, pos); + + String extractionPath = decoderConfig.pathExtractors().get(name); + if (extractionPath == null) { + extractorBuilder.withSearchPath(List.of(new Text(name)), callback); + } + else { + extractorBuilder.withSearchPath(extractionPath, callback); + } + } + PathExtractor extractor = extractorBuilder.buildStrict(decoderConfig.strictTyping()); + PageExtractionContext context = new PageExtractionContext(pageBuilder, new boolean[columns.size()]); + + return (ionReader) -> { + extractor.matchCurrentValue(ionReader, context); + context.completeRowAndReset(); + }; + } + + private static BiFunction callbackFor(BlockDecoder decoder, int pos) + { + return (ionReader, context) -> { + BlockBuilder blockBuilder = context.pageBuilder.getBlockBuilder(pos); + if (context.encountered[pos]) { + blockBuilder.resetTo(blockBuilder.getPositionCount() - 1); + } + else { + context.encountered[pos] = true; + } + + decoder.decode(ionReader, context.pageBuilder.getBlockBuilder(pos)); + return 0; + }; + } + + private record PageExtractionContext(PageBuilder pageBuilder, boolean[] encountered) + { + private void completeRowAndReset() + { + for (int i = 0; i < encountered.length; i++) { + if (!encountered[i]) { + pageBuilder.getBlockBuilder(i).appendNull(); + } + encountered[i] = false; + } + } + } + + private interface BlockDecoder + { + void decode(IonReader reader, BlockBuilder builder); + } + + private static BlockDecoder decoderForType(Type type) + { + return switch (type) { + case TinyintType t -> wrapDecoder(byteDecoder, t, IonType.INT); + case SmallintType t -> wrapDecoder(shortDecoder, t, IonType.INT); + case IntegerType t -> wrapDecoder(intDecoder, t, IonType.INT); + case BigintType t -> wrapDecoder(longDecoder, t, IonType.INT); + case RealType t -> wrapDecoder(realDecoder, t, IonType.FLOAT); + case DoubleType t -> wrapDecoder(floatDecoder, t, IonType.FLOAT); + case DecimalType t -> wrapDecoder(decimalDecoder(t), t, IonType.DECIMAL, IonType.INT); + case BooleanType t -> wrapDecoder(boolDecoder, t, IonType.BOOL); + case DateType t -> wrapDecoder(dateDecoder, t, IonType.TIMESTAMP); + case TimestampType t -> wrapDecoder(timestampDecoder(t), t, IonType.TIMESTAMP); + case VarcharType t -> wrapDecoder(varcharDecoder(t), t, IonType.values()); + case CharType t -> wrapDecoder(charDecoder(t), t, IonType.values()); + case VarbinaryType t -> wrapDecoder(binaryDecoder, t, IonType.BLOB, IonType.CLOB); + case RowType t -> wrapDecoder(RowDecoder.forFields(t.getFields()), t, IonType.STRUCT); + case ArrayType t -> wrapDecoder(new ArrayDecoder(decoderForType(t.getElementType())), t, IonType.LIST, IonType.SEXP); + case MapType t -> wrapDecoder(new MapDecoder(t), t, IonType.STRUCT); + default -> throw new IllegalArgumentException(String.format("Unsupported type: %s", type)); + }; + } + + /** + * Wraps decoders for common handling logic. + *

+ * Handles un-typed and correctly typed null values. + * Throws for mistyped values, whether null or not. + * Delegates to Decoder for correctly-typed, non-null values. + *

+ * This code treats all values as nullable. + */ + private static BlockDecoder wrapDecoder(BlockDecoder decoder, Type trinoType, IonType... allowedTypes) + { + final Set allowedWithNull = new HashSet<>(Arrays.asList(allowedTypes)); + allowedWithNull.add(IonType.NULL); + + return (reader, builder) -> { + final IonType ionType = reader.getType(); + if (!allowedWithNull.contains(ionType)) { + throw new TrinoException(StandardErrorCode.GENERIC_USER_ERROR, + "Cannot coerce IonType %s to Trino type %s".formatted(ionType, trinoType)); + } + if (reader.isNullValue()) { + builder.appendNull(); + } + else { + decoder.decode(reader, builder); + } + }; + } + + private record RowDecoder(Map fieldPositions, List fieldDecoders) + implements BlockDecoder + { + private static RowDecoder forFields(List fields) + { + ImmutableList.Builder decoderBuilder = ImmutableList.builder(); + ImmutableMap.Builder fieldPositionBuilder = ImmutableMap.builder(); + for (int pos = 0; pos < fields.size(); pos++) { + RowType.Field field = fields.get(pos); + decoderBuilder.add(decoderForType(field.getType())); + fieldPositionBuilder.put(field.getName().get().toLowerCase(Locale.ROOT), pos); + } + return new RowDecoder(fieldPositionBuilder.buildOrThrow(), decoderBuilder.build()); + } + + @Override + public void decode(IonReader ionReader, BlockBuilder blockBuilder) + { + ((RowBlockBuilder) blockBuilder) + .buildEntry(fieldBuilders -> decode(ionReader, fieldBuilders::get)); + } + + // assumes that the reader is positioned on a non-null struct value + private void decode(IonReader ionReader, IntFunction blockSelector) + { + boolean[] encountered = new boolean[fieldDecoders.size()]; + ionReader.stepIn(); + + while (ionReader.next() != null) { + final Integer fieldIndex = fieldPositions.get(ionReader.getFieldName().toLowerCase(Locale.ROOT)); + if (fieldIndex == null) { + continue; + } + final BlockBuilder blockBuilder = blockSelector.apply(fieldIndex); + if (!encountered[fieldIndex]) { + fieldDecoders.get(fieldIndex).decode(ionReader, blockBuilder); + encountered[fieldIndex] = true; + } + } + + for (int i = 0; i < encountered.length; i++) { + if (!encountered[i]) { + blockSelector.apply(i).appendNull(); + } + } + + ionReader.stepOut(); + } + } + + private static class MapDecoder + implements BlockDecoder + { + private final BiConsumer keyConsumer; + private final BlockDecoder valueDecoder; + private final DistinctMapKeys distinctMapKeys; + private BlockBuilder keyBlockBuilder; + private BlockBuilder valueBlockBuilder; + + MapDecoder(MapType mapType) + { + Type keyType = mapType.getKeyType(); + Type valueType = mapType.getValueType(); + this.valueDecoder = decoderForType(valueType); + this.distinctMapKeys = new DistinctMapKeys(mapType, true); + this.keyBlockBuilder = keyType.createBlockBuilder(null, 128); + this.valueBlockBuilder = valueType.createBlockBuilder(null, 128); + + this.keyConsumer = switch (keyType) { + case VarcharType t -> { + yield (String fieldName, BlockBuilder blockBuilder) -> + t.writeSlice(blockBuilder, Varchars.truncateToLength(Slices.utf8Slice(fieldName), t)); + } + case CharType t -> { + yield (String fieldName, BlockBuilder blockBuilder) -> + t.writeSlice(blockBuilder, Chars.truncateToLengthAndTrimSpaces(Slices.utf8Slice(fieldName), t)); + } + default -> throw new UnsupportedOperationException("Unsupported map key type: " + keyType); + }; + } + + @Override + public void decode(IonReader ionReader, BlockBuilder builder) + { + ionReader.stepIn(); + // buffer the keys and values + while (ionReader.next() != null) { + keyConsumer.accept(ionReader.getFieldName(), keyBlockBuilder); + valueDecoder.decode(ionReader, valueBlockBuilder); + } + ValueBlock keys = keyBlockBuilder.buildValueBlock(); + ValueBlock values = valueBlockBuilder.buildValueBlock(); + keyBlockBuilder = keyBlockBuilder.newBlockBuilderLike(null); + valueBlockBuilder = valueBlockBuilder.newBlockBuilderLike(null); + + // copy the distinct key entries to the output + boolean[] distinctKeys = distinctMapKeys.selectDistinctKeys(keys); + + ((MapBlockBuilder) builder).buildEntry((keyBuilder, valueBuilder) -> { + for (int index = 0; index < distinctKeys.length; index++) { + boolean distinctKey = distinctKeys[index]; + if (distinctKey) { + keyBuilder.append(keys, index); + valueBuilder.append(values, index); + } + } + }); + ionReader.stepOut(); + } + } + + private record ArrayDecoder(BlockDecoder elementDecoder) + implements BlockDecoder + { + @Override + public void decode(IonReader ionReader, BlockBuilder blockBuilder) + { + ((ArrayBlockBuilder) blockBuilder) + .buildEntry(elementBuilder -> { + ionReader.stepIn(); + while (ionReader.next() != null) { + elementDecoder.decode(ionReader, elementBuilder); + } + ionReader.stepOut(); + }); + } + } + + private static BlockDecoder timestampDecoder(TimestampType type) + { + // Ion supports arbitrarily precise Timestamps. + // Other Hive formats are using the DecodedTimestamp and TrinoTimestampEncoders in + // io.trino.plugin.base.type but those don't cover picos. + // This code uses same pattern of splitting the parsed timestamp into (seconds, fraction) + // then rounding the fraction using Timestamps.round() ensures consistency with the others + // while capturing picos if present. Fractional precision beyond picos is ignored. + return (reader, builder) -> { + BigDecimal decimalSeconds = reader.timestampValue() + .getDecimalMillis() + .movePointLeft(3); + BigDecimal decimalPicos = decimalSeconds.remainder(BigDecimal.ONE) + .movePointRight(12); + + long fractionalPicos = Timestamps.round(decimalPicos.longValue(), 12 - type.getPrecision()); + long epochMicros = decimalSeconds.longValue() * Timestamps.MICROSECONDS_PER_SECOND + + fractionalPicos / Timestamps.PICOSECONDS_PER_MICROSECOND; + + if (type.isShort()) { + type.writeLong(builder, epochMicros); + } + else { + type.writeObject(builder, + new LongTimestamp(epochMicros, (int) (fractionalPicos % Timestamps.PICOSECONDS_PER_MICROSECOND))); + } + }; + } + + private static BlockDecoder decimalDecoder(DecimalType type) + { + int precision = type.getPrecision(); + int scale = type.getScale(); + + return (reader, builder) -> { + try { + BigDecimal decimal = reader.bigDecimalValue(); + BigInteger unscaled = decimal + .setScale(scale, RoundingMode.UNNECESSARY) + .unscaledValue(); + + if (Decimals.overflows(unscaled, precision)) { + throw new TrinoException(StandardErrorCode.NUMERIC_VALUE_OUT_OF_RANGE, + "Decimal value %s does not fit %d digits of precision and %d of scale!" + .formatted(decimal, precision, scale)); + } + if (type.isShort()) { + type.writeLong(builder, unscaled.longValue()); + } + else { + type.writeObject(builder, Int128.valueOf(unscaled)); + } + } + catch (ArithmeticException e) { + throw new TrinoException(StandardErrorCode.NUMERIC_VALUE_OUT_OF_RANGE, + "Decimal value %s does not fit %d digits of scale!".formatted(reader.bigDecimalValue(), scale)); + } + }; + } + + private static String getCoercedValue(IonReader ionReader) + { + IonTextWriterBuilder textWriterBuilder = IonTextWriterBuilder.standard(); + StringBuilder stringBuilder = new StringBuilder(); + IonWriter writer = textWriterBuilder.build(stringBuilder); + try { + writer.writeValue(ionReader); + } + catch (IOException e) { + throw new RuntimeException(e); + } + return stringBuilder.toString(); + } + + private static BlockDecoder varcharDecoder(VarcharType type) + { + return (ionReader, blockBuilder) -> { + IonType valueType = ionReader.getType(); + String value; + + if (valueType == IonType.SYMBOL || valueType == IonType.STRING) { + value = ionReader.stringValue(); + } + else { + // For any types other than IonType.SYMBOL and IonType.STRING, performs text coercion + value = getCoercedValue(ionReader); + } + type.writeSlice(blockBuilder, Varchars.truncateToLength(Slices.utf8Slice(value), type)); + }; + } + + private static BlockDecoder charDecoder(CharType type) + { + return (ionReader, blockBuilder) -> { + IonType valueType = ionReader.getType(); + String value; + + if (valueType == IonType.SYMBOL || valueType == IonType.STRING) { + value = ionReader.stringValue(); + } + else { + // For any types other than IonType.SYMBOL and IonType.STRING, performs text coercion + value = getCoercedValue(ionReader); + } + type.writeSlice(blockBuilder, Chars.truncateToLengthAndTrimSpaces(Slices.utf8Slice(value), type)); + }; + } + + private static final BlockDecoder byteDecoder = (ionReader, blockBuilder) -> + TinyintType.TINYINT.writeLong(blockBuilder, readLong(ionReader)); + + private static final BlockDecoder shortDecoder = (ionReader, blockBuilder) -> + SmallintType.SMALLINT.writeLong(blockBuilder, readLong(ionReader)); + + private static final BlockDecoder intDecoder = (ionReader, blockBuilder) -> + IntegerType.INTEGER.writeLong(blockBuilder, readLong(ionReader)); + + private static final BlockDecoder longDecoder = (ionReader, blockBuilder) -> + BigintType.BIGINT.writeLong(blockBuilder, readLong(ionReader)); + + private static long readLong(IonReader ionReader) + { + try { + return ionReader.longValue(); + } + catch (IonException e) { + throw new TrinoException(StandardErrorCode.GENERIC_USER_ERROR, e.getMessage()); + } + } + + private static final BlockDecoder realDecoder = (ionReader, blockBuilder) -> { + double readValue = ionReader.doubleValue(); + if (readValue == (float) readValue) { + RealType.REAL.writeFloat(blockBuilder, (float) readValue); + } + else { + throw new TrinoException(StandardErrorCode.GENERIC_USER_ERROR, + "Won't truncate double precise float to real!"); + } + }; + + private static final BlockDecoder floatDecoder = (ionReader, blockBuilder) -> + DoubleType.DOUBLE.writeDouble(blockBuilder, ionReader.doubleValue()); + + private static final BlockDecoder boolDecoder = (ionReader, blockBuilder) -> + BooleanType.BOOLEAN.writeBoolean(blockBuilder, ionReader.booleanValue()); + + private static final BlockDecoder dateDecoder = (ionReader, blockBuilder) -> { + Timestamp ionTs = ionReader.timestampValue(); + LocalDate localDate = LocalDate.of(ionTs.getZYear(), ionTs.getZMonth(), ionTs.getZDay()); + DateType.DATE.writeLong(blockBuilder, localDate.toEpochDay()); + }; + + private static final BlockDecoder binaryDecoder = (ionReader, blockBuilder) -> + VarbinaryType.VARBINARY.writeSlice(blockBuilder, Slices.wrappedBuffer(ionReader.newBytes())); +} diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonEncoder.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonEncoder.java new file mode 100644 index 000000000000..7e7fd899a8f3 --- /dev/null +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonEncoder.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.hive.formats.ion; + +import com.amazon.ion.IonWriter; +import io.trino.spi.Page; + +import java.io.IOException; + +public interface IonEncoder +{ + /** + * Encodes the Page into the IonWriter provided. + *

+ * Will flush() the writer after encoding the page. + * Expects that the calling code is responsible for closing + * the writer after all pages are written. + */ + void encode(IonWriter writer, Page page) + throws IOException; +} diff --git a/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonEncoderFactory.java b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonEncoderFactory.java new file mode 100644 index 000000000000..904450b6cb0f --- /dev/null +++ b/lib/trino-hive-formats/src/main/java/io/trino/hive/formats/ion/IonEncoderFactory.java @@ -0,0 +1,291 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.hive.formats.ion; + +import com.amazon.ion.IonType; +import com.amazon.ion.IonWriter; +import com.amazon.ion.Timestamp; +import com.google.common.collect.ImmutableList; +import io.trino.hive.formats.line.Column; +import io.trino.spi.Page; +import io.trino.spi.block.ArrayBlock; +import io.trino.spi.block.Block; +import io.trino.spi.block.RowBlock; +import io.trino.spi.block.SqlMap; +import io.trino.spi.type.ArrayType; +import io.trino.spi.type.BigintType; +import io.trino.spi.type.BooleanType; +import io.trino.spi.type.CharType; +import io.trino.spi.type.DateType; +import io.trino.spi.type.DecimalType; +import io.trino.spi.type.DoubleType; +import io.trino.spi.type.Int128; +import io.trino.spi.type.IntegerType; +import io.trino.spi.type.LongTimestamp; +import io.trino.spi.type.MapType; +import io.trino.spi.type.RealType; +import io.trino.spi.type.RowType; +import io.trino.spi.type.SmallintType; +import io.trino.spi.type.TimestampType; +import io.trino.spi.type.TinyintType; +import io.trino.spi.type.Type; +import io.trino.spi.type.VarbinaryType; +import io.trino.spi.type.VarcharType; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.ZoneId; +import java.util.Date; +import java.util.List; +import java.util.Optional; +import java.util.function.IntFunction; + +import static com.google.common.base.Preconditions.checkArgument; + +public class IonEncoderFactory +{ + private IonEncoderFactory() {} + + public static IonEncoder buildEncoder(List columns) + { + return RowEncoder.forFields(columns.stream() + .map(c -> new RowType.Field(Optional.of(c.name()), c.type())) + .toList()); + } + + private interface BlockEncoder + { + void encode(IonWriter writer, Block block, int position) + throws IOException; + } + + private static BlockEncoder encoderForType(Type type) + { + return switch (type) { + case TinyintType _ -> byteEncoder; + case SmallintType _ -> shortEncoder; + case IntegerType _ -> intEncoder; + case BigintType _ -> longEncoder; + case BooleanType _ -> boolEncoder; + case VarbinaryType _ -> binaryEncoder; + case RealType _ -> realEncoder; + case DoubleType _ -> doubleEncoder; + case VarcharType _, CharType _ -> stringEncoder; + case DecimalType t -> decimalEncoder(t); + case DateType _ -> dateEncoder; + case TimestampType t -> timestampEncoder(t); + case MapType t -> new MapEncoder(t, t.getKeyType(), encoderForType(t.getValueType())); + case RowType t -> RowEncoder.forFields(t.getFields()); + case ArrayType t -> new ArrayEncoder(wrapEncoder(encoderForType(t.getElementType()))); + default -> throw new IllegalArgumentException(String.format("Unsupported type: %s", type)); + }; + } + + private static BlockEncoder wrapEncoder(BlockEncoder encoder) + { + return (writer, block, position) -> + { + if (block.isNull(position)) { + writer.writeNull(); + } + else { + encoder.encode(writer, block, position); + } + }; + } + + private record RowEncoder(List fieldNames, List fieldEncoders) + implements BlockEncoder, IonEncoder + { + private static RowEncoder forFields(List fields) + { + ImmutableList.Builder fieldNamesBuilder = ImmutableList.builder(); + ImmutableList.Builder fieldEncodersBuilder = ImmutableList.builder(); + + for (RowType.Field field : fields) { + fieldNamesBuilder.add(field.getName().get()); + fieldEncodersBuilder.add(wrapEncoder(encoderForType(field.getType()))); + } + + return new RowEncoder(fieldNamesBuilder.build(), fieldEncodersBuilder.build()); + } + + @Override + public void encode(IonWriter writer, Block block, int position) + throws IOException + { + encodeStruct(writer, ((RowBlock) block)::getFieldBlock, position); + } + + @Override + public void encode(IonWriter writer, Page page) + throws IOException + { + for (int i = 0; i < page.getPositionCount(); i++) { + encodeStruct(writer, page::getBlock, i); + } + // todo: it's probably preferable to decouple ion writer flushes + // from page sizes, but it's convenient for now + writer.flush(); + } + + private void encodeStruct(IonWriter writer, IntFunction blockSelector, int position) + throws IOException + { + writer.stepIn(IonType.STRUCT); + for (int i = 0; i < fieldEncoders.size(); i++) { + // fields are omitted by default, as was true in the hive serde. + // there is an unimplemented hive legacy property of `ion.serialize_null` + // that could be used to specify typed or untyped ion nulls instead. + Block block = blockSelector.apply(i); + if (block.isNull(position)) { + continue; + } + writer.setFieldName(fieldNames.get(i)); + fieldEncoders.get(i) + .encode(writer, block, position); + } + writer.stepOut(); + } + } + + private record MapEncoder(MapType mapType, Type keyType, + BlockEncoder encoder) + implements BlockEncoder + { + public MapEncoder(MapType mapType, Type keyType, BlockEncoder encoder) + + { + this.mapType = mapType; + if (!(keyType instanceof VarcharType _ || keyType instanceof CharType _)) { + throw new UnsupportedOperationException("Unsupported map key type: " + keyType); + } + this.keyType = keyType; + this.encoder = encoder; + } + + @Override + public void encode(IonWriter writer, Block block, int position) + throws IOException + { + SqlMap sqlMap = mapType.getObject(block, position); + int rawOffset = sqlMap.getRawOffset(); + Block rawKeyBlock = sqlMap.getRawKeyBlock(); + Block rawValueBlock = sqlMap.getRawValueBlock(); + + writer.stepIn(IonType.STRUCT); + for (int i = 0; i < sqlMap.getSize(); i++) { + checkArgument(!rawKeyBlock.isNull(rawOffset + i), "map key is null"); + writer.setFieldName(VarcharType.VARCHAR.getSlice(rawKeyBlock, rawOffset + i).toString(StandardCharsets.UTF_8)); + encoder.encode(writer, rawValueBlock, rawOffset + i); + } + writer.stepOut(); + } + } + + private record ArrayEncoder(BlockEncoder elementEncoder) + implements BlockEncoder + { + @Override + public void encode(IonWriter writer, Block block, int position) + throws IOException + { + writer.stepIn(IonType.LIST); + Block elementBlock = ((ArrayBlock) block).getArray(position); + for (int i = 0; i < elementBlock.getPositionCount(); i++) { + elementEncoder.encode(writer, elementBlock, i); + } + writer.stepOut(); + } + } + + private static BlockEncoder timestampEncoder(TimestampType type) + { + if (type.isShort()) { + return (writer, block, position) -> { + long epochMicros = type.getLong(block, position); + BigDecimal decimalMillis = BigDecimal.valueOf(epochMicros) + .movePointLeft(3) + .setScale(type.getPrecision() - 3, RoundingMode.UNNECESSARY); + + writer.writeTimestamp(Timestamp.forMillis(decimalMillis, 0)); + }; + } + else { + return (writer, block, position) -> { + LongTimestamp longTimestamp = (LongTimestamp) type.getObject(block, position); + BigDecimal picosOfMicros = BigDecimal.valueOf(longTimestamp.getPicosOfMicro()) + .movePointLeft(9); + BigDecimal decimalMillis = BigDecimal.valueOf(longTimestamp.getEpochMicros()) + .movePointLeft(3) + .add(picosOfMicros) + .setScale(type.getPrecision() - 3, RoundingMode.UNNECESSARY); + + writer.writeTimestamp(Timestamp.forMillis(decimalMillis, 0)); + }; + } + } + + private static BlockEncoder decimalEncoder(DecimalType type) + { + if (type.isShort()) { + return (writer, block, position) -> { + writer.writeDecimal(BigDecimal.valueOf(type.getLong(block, position), type.getScale())); + }; + } + else { + return (writer, block, position) -> { + writer.writeDecimal(new BigDecimal(((Int128) type.getObject(block, position)).toBigInteger(), type.getScale())); + }; + } + } + + private static final BlockEncoder byteEncoder = (writer, block, position) -> + writer.writeInt(TinyintType.TINYINT.getLong(block, position)); + + private static final BlockEncoder shortEncoder = (writer, block, position) -> + writer.writeInt(SmallintType.SMALLINT.getLong(block, position)); + + private static final BlockEncoder intEncoder = (writer, block, position) -> + writer.writeInt(IntegerType.INTEGER.getInt(block, position)); + + private static final BlockEncoder stringEncoder = (writer, block, position) -> + writer.writeString(VarcharType.VARCHAR.getSlice(block, position).toString(StandardCharsets.UTF_8)); + + private static final BlockEncoder boolEncoder = (writer, block, position) -> + writer.writeBool(BooleanType.BOOLEAN.getBoolean(block, position)); + + private static final BlockEncoder binaryEncoder = (writer, block, position) -> + writer.writeBlob(VarbinaryType.VARBINARY.getSlice(block, position).getBytes()); + + private static final BlockEncoder longEncoder = (writer, block, position) -> + writer.writeInt(BigintType.BIGINT.getLong(block, position)); + + private static final BlockEncoder realEncoder = (writer, block, position) -> + writer.writeFloat(RealType.REAL.getFloat(block, position)); + + private static final BlockEncoder doubleEncoder = (writer, block, position) -> + writer.writeFloat(DoubleType.DOUBLE.getDouble(block, position)); + + private static final BlockEncoder dateEncoder = (writer, block, position) -> + writer.writeTimestamp( + Timestamp.forDateZ( + Date.from( + LocalDate.ofEpochDay(DateType.DATE.getLong(block, position)) + .atStartOfDay(ZoneId.of("UTC")) + .toInstant()))); +} diff --git a/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/FormatTestUtils.java b/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/FormatTestUtils.java index 8efdf1e816e0..fa8454e04f87 100644 --- a/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/FormatTestUtils.java +++ b/lib/trino-hive-formats/src/test/java/io/trino/hive/formats/FormatTestUtils.java @@ -483,6 +483,18 @@ public static Page toSingleRowPage(List columns, List expectedValues) return page; } + public static Page toPage(List columns, List... expectedValues) + { + PageBuilder pageBuilder = new PageBuilder(columns.stream().map(Column::type).collect(toImmutableList())); + for (List expectedValue : expectedValues) { + pageBuilder.declarePosition(); + for (int col = 0; col < columns.size(); col++) { + writeTrinoValue(columns.get(col).type(), pageBuilder.getBlockBuilder(col), expectedValue.get(col)); + } + } + return pageBuilder.build(); + } + public static void writeTrinoValue(Type type, BlockBuilder blockBuilder, Object value) { if (value == null) { @@ -653,6 +665,11 @@ public static Object toHiveWriteValue(Type type, Object value, Optional TEST_COLUMNS = List.of( + new Column("magic_num", INTEGER, 0), + new Column("some_text", VARCHAR, 1), + new Column("is_summer", BOOLEAN, 2), + new Column("byte_clob", VARBINARY, 3), + new Column("sequencer", new ArrayType(INTEGER), 4), + new Column("struction", RowType.rowType( + field("foo", INTEGER), + field("bar", VARCHAR)), 5), + new Column("map", new MapType(VARCHAR, INTEGER, TYPE_OPERATORS), 6), + new Column("double_value", DOUBLE, 7), + new Column("decimal_value", DecimalType.createDecimalType(10, 2), 8), + new Column("tiny_int", TINYINT, 9), + new Column("small_int", SMALLINT, 10), + new Column("big_int", BIGINT, 11), + new Column("real_num", REAL, 12), + new Column("date", DATE, 13)); + + @Test + public void testSuperBasicStruct() + throws IOException + { + assertValues( + RowType.rowType( + field("foo", INTEGER), + field("bar", VARCHAR)), + "{ bar: baz, foo: 31, ignored: true }", + List.of(31, "baz")); + } + + @Test + public void testMap() + throws IOException + { + MapType mapType = new MapType(VarcharType.createVarcharType(3), INTEGER, TYPE_OPERATORS); + assertValues( + RowType.rowType(field("foo", mapType)), + "{ foo: { bar: 1, bar: 2, baz: 5, quxx: 8 } } { foo: { bar: 17, baz: 31, qux: 53 } }", + List.of(Map.of("bar", 2, "baz", 5, "qux", 8)), + List.of(Map.of("bar", 17, "baz", 31, "qux", 53))); + + mapType = new MapType(CharType.createCharType(3), INTEGER, TYPE_OPERATORS); + assertValues( + RowType.rowType(field("foo", mapType)), + "{ foo: { bar: 1, bar: 2, baz: 5, quxx: 8 } }", + List.of(Map.of("bar", 2, "baz", 5, "qux", 8))); + } + + @Test + public void testUnsupportedMapKeys() + throws IOException + { + MapType mapType = new MapType(INTEGER, INTEGER, TYPE_OPERATORS); + Assertions.assertThrows(UnsupportedOperationException.class, () -> + assertValues(RowType.rowType(field("bad_map", mapType)), "", List.of())); + } + + @Test + public void testVariousTlvsStrict() + throws IOException + { + RowType rowType = RowType.rowType(field("foo", INTEGER), field("bar", VARCHAR)); + IonDecoderConfig decoderConfig = IonDecoderConfig.defaultConfig().withStrictTyping(); + List expected = new ArrayList<>(2); + expected.add(null); + expected.add(null); + + assertValues(rowType, + decoderConfig, + // empty struct, untyped null, struct null, and explicitly typed null null, phew. + "{} null null.struct null.null", + expected, expected, expected, expected); + + Assertions.assertThrows(PathExtractionException.class, () -> { + assertValues(rowType, decoderConfig, "null.int", expected); + assertValues(rowType, decoderConfig, "[]", expected); + }); + } + + @Test + public void testVariousTlvsLax() + throws IOException + { + RowType rowType = RowType.rowType(field("foo", INTEGER), field("bar", VARCHAR)); + List expected = new ArrayList<>(2); + expected.add(null); + expected.add(null); + + assertValues(rowType, + "{} 37 null.list null.struct null spam false", + expected, expected, expected, expected, expected, expected, expected); + } + + @Test + public void testColumnMistypings() + { + RowType rowType = RowType.rowType(field("foo", INTEGER), field("bar", BOOLEAN)); + + List ions = List.of( + "{ foo: blarg, bar: false }", + "{ foo: 12345, bar: blarg }", + "{ foo: null.list, bar: false }", + "{ foo: 12345, bar: null.int }"); + + for (String ion : ions) { + Assertions.assertThrows(TrinoException.class, () -> { + assertValues(rowType, ion, List.of()); + }); + } + } + + @Test + public void testTextTruncation() + throws IOException + { + String ionText = """ + { my_text: 'abcdefghijk' } + { my_text: 'abcd ' } + { my_text: 'abcd ijk' }"""; + + assertValues(RowType.rowType(field("my_text", VarcharType.createVarcharType(8))), + ionText, + List.of("abcdefgh"), List.of("abcd "), List.of("abcd ")); + + assertValues(RowType.rowType(field("my_text", CharType.createCharType(8))), + ionText, + List.of("abcdefgh"), List.of("abcd "), List.of("abcd ")); + } + + @Test + public void testCaseInsensitivityOfKeys() + throws IOException + { + assertValues( + RowType.rowType( + field("Foo", INTEGER), + field("BAR", VARCHAR)), + "{ Bar: baz, foo: 31 }", + List.of(31, "baz")); + } + + @Test + public void testStringCoercions() + throws IOException + { + assertValues( + RowType.rowType( + field("foo", VARCHAR)), + "{ foo: true }", + List.of("true")); + assertValues( + RowType.rowType( + field("foo", VARCHAR)), + "{ foo: 31 }", + List.of("31")); + assertValues( + RowType.rowType( + field("foo", VARCHAR)), + "{ foo: 31.50 }", + List.of("31.50")); + assertValues( + RowType.rowType( + field("foo", VARCHAR)), + "{ foo: [1, 2, 3] }", + List.of("[1,2,3]")); + assertValues( + RowType.rowType( + field("foo", VARCHAR)), + "{ foo: \"bar\" }", + List.of("bar")); + assertValues( + RowType.rowType( + field("foo", VARCHAR)), + "{ foo: { nested_foo: 12 } }", + List.of("{nested_foo:12}")); + } + + @Test + public void testCaseInsensitivityOfDuplicateKeys() + throws IOException + { + // this test asserts that duplicate key behavior works as expected(i.e. capturing the last value), + // for duplicate keys with different casing. + assertValues( + RowType.rowType( + field("Foo", INTEGER), + field("BAR", VARCHAR)), + "{ bar: baz, Foo: 31, foo: 5 }", + List.of(5, "baz")); + } + + @Test + public void testCaseSensitiveExtraction() + throws IOException + { + assertValues( + RowType.rowType( + field("Foo", INTEGER), + field("Bar", VARCHAR)), + IonDecoderConfig.defaultConfig().withCaseSensitive(), + // assumes duplicate fields overwrite, which is asserted in the test above + "{ Bar: baz, bar: blegh, Foo: 31, foo: 67 }", + List.of(31, "baz")); + } + + @Test + public void testStructWithNullAndMissingValues() + throws IOException + { + assertValues( + RowType.rowType( + field("foo", INTEGER), + field("bar", VARCHAR)), + "{ bar: null.symbol }", + Arrays.asList(null, null)); + } + + @Test + public void testNestedList() + throws IOException + { + assertValues( + RowType.rowType( + field("primes", new ArrayType(INTEGER))), + "{ primes: [ 17, 31, 51 ] }", + List.of(List.of(17, 31, 51))); + } + + @Test + public void testNestedStruct() + throws IOException + { + assertValues( + RowType.rowType( + field("name", RowType.rowType( + field("first", VARCHAR), + field("last", VARCHAR)))), + "{ name: { first: Woody, last: Guthrie, superfluous: ignored } }", + List.of(List.of("Woody", "Guthrie"))); + } + + /** + * The Ion Hive SerDe captures the last value for fields with + * duplicate keys. There is different behavior for nested Rows, + * which you can see below. + */ + @Test + public void testTopLevelStructWithDuplicateKeys() + throws IOException + { + assertValues( + RowType.rowType(field("foo", INTEGER)), + "{ foo: 17, foo: 31, foo: 53 } { foo: 67 }", + List.of(53), List.of(67)); + } + + /** + * The Ion Hive SerDe captures the first value for duplicate fields in + * nested Rows, so that is what we default to here. + */ + @Test + public void testNestedStructWithDuplicateAndMissingKeys() + throws IOException + { + assertValues( + RowType.rowType( + field("name", RowType.rowType( + field("first", VARCHAR), + field("last", VARCHAR)))), + """ + { name: { last: Guthrie, last: Godfrey } } + { name: { first: Joan, last: Baez } } + """, + List.of(Arrays.asList(null, "Guthrie")), + List.of(List.of("Joan", "Baez"))); + } + + @Test + public void testStructInList() + throws IOException + { + assertValues( + RowType.rowType( + field("elements", new ArrayType( + RowType.rowType( + field("foo", INTEGER))))), + "{ elements: [ { foo: 13 }, { foo: 17 } ] }", + // yes, there are three layers of list here: + // top-level struct (row), list of elements (array), then inner struct (row) + List.of( + List.of(List.of(13), List.of(17)))); + } + + @Test + public void testIntsOfVariousSizes() + throws IOException + { + List ions = List.of( + "{ ion_int: 0x7f }", // < one byte + "{ ion_int: 0x7fff }", // < two bytes + "{ ion_int: 0x7fffffff }", // < four bytes + "{ ion_int: 0x7fffffffffffffff }", // < eight bytes + "{ ion_int: 0x7fffffffffffffff1 }" // > eight bytes + ); + + List intTypes = List.of(TINYINT, SMALLINT, INTEGER, BIGINT); + List expected = List.of((byte) 0x7f, (short) 0x7fff, 0x7fffffff, 0x7fffffffffffffffL); + for (int i = 0; i < intTypes.size(); i++) { + RowType rowType = RowType.rowType(field("ion_int", intTypes.get(i))); + assertValues( + rowType, + ions.get(i), + List.of(expected.get(i))); + + int nextIon = i + 1; + Assertions.assertThrows(TrinoException.class, () -> { + assertValues(rowType, + ions.get(nextIon), + List.of()); + }); + } + } + + @Test + public void testFloat() + throws IOException + { + RowType rowType = RowType.rowType(field("my_double", DoubleType.DOUBLE)); + assertValues( + rowType, + "{ my_double: 4444e-4 }", + List.of(.4444)); + } + + @Test + public void testBytes() + throws IOException + { + RowType rowType = RowType.rowType(field("blobby", VARBINARY)); + assertValues( + rowType, + "{ blobby: {{ YmxvYmJ5IG1jYmxvYmZhY2U= }} }", + List.of(new SqlVarbinary("blobby mcblobface".getBytes(StandardCharsets.UTF_8)))); + } + + @Test + public void testDoubleAsFloat() + throws IOException + { + RowType rowType = RowType.rowType(field("my_float", REAL)); + assertValues( + rowType, + "{ my_float: 625e-3 }", + List.of(.625f)); + + Assertions.assertThrows(TrinoException.class, () -> { + assertValues( + rowType, + "{ my_float: 9e+99 }", + List.of()); + }); + } + + @Test + public void testDateDecoding() + throws IOException + { + RowType rowType = RowType.rowType(field("my_date", DATE)); + SqlDate expected = new SqlDate((int) LocalDate.of(2022, 2, 22).toEpochDay()); + + List ions = List.of( + "{ my_date: 2022-02-22T }", + "{ my_date: 2022-02-21T12:00-12:00 } ", + "{ my_date: 2022-02-22T22:22:22Z }", + "{ my_date: 2022-02-23T00:00+01:00 }", + "{ my_date: 2022-02-22T00:01Z }", + "{ my_date: 2022-02-22T00:00:01Z }", + "{ my_date: 2022-02-22T00:00:00.001Z }", + "{ my_date: 2022-02-22T23:59:59.999999999Z }"); + + for (String ion : ions) { + assertValues(rowType, ion, List.of(expected)); + } + } + + @Test + public void testTimestampDecoding() + throws IOException + { + List ions = List.of( + "{ my_ts: 2067-08-09T11:22:33Z }", + "{ my_ts: 2067-08-09T11:22:33.111Z }", + "{ my_ts: 2067-08-09T11:22:33.111222Z }", + "{ my_ts: 2067-08-09T11:22:33.111222333Z }", + "{ my_ts: 2067-08-09T11:22:33.111222333444Z }", + // fraction beyond picos is truncated + "{ my_ts: 2067-08-09T11:22:33.111222333444555Z }"); + + LocalDateTime dateTimeToSeconds = LocalDateTime.of(2067, 8, 9, 11, 22, 33); + List sqlTimestamps = List.of( + toSqlTimestamp(TimestampType.TIMESTAMP_SECONDS, dateTimeToSeconds), + toSqlTimestamp(TimestampType.TIMESTAMP_MILLIS, dateTimeToSeconds.plusNanos(111000000)), + toSqlTimestamp(TimestampType.TIMESTAMP_MICROS, dateTimeToSeconds.plusNanos(111222000)), + toSqlTimestamp(TimestampType.TIMESTAMP_NANOS, dateTimeToSeconds.plusNanos(111222333)), + toSqlTimestamp(TimestampType.TIMESTAMP_PICOS, dateTimeToSeconds.plusNanos(111222333), 444)); + + for (int i = 0; i < sqlTimestamps.size(); i++) { + SqlTimestamp sqlTimestamp = sqlTimestamps.get(i); + RowType rowType = RowType.rowType( + field("my_ts", TimestampType.createTimestampType(sqlTimestamp.getPrecision()))); + + for (int j = i; j < ions.size(); j++) { + assertValues(rowType, ions.get(j), List.of(sqlTimestamp)); + } + } + } + + @Test + public void testDecimalPrecisionAndScale() + throws IOException + { + assertValues( + RowType.rowType( + field("amount", DecimalType.createDecimalType(10, 2)), + field("big_amount", DecimalType.createDecimalType(25, 5))), + "{ amount: 1234.00, big_amount: 1234.00000 }" + + "{ amount: 1234d0, big_amount: 1234d0 }" + + "{ amount: 12d2, big_amount: 12d2 }" + + "{ amount: 1234.000, big_amount: 1234.000000 }" + + "{ amount: 1234, big_amount: 1234 }", // these are both IonInts + List.of(new SqlDecimal(BigInteger.valueOf(123400), 10, 2), new SqlDecimal(BigInteger.valueOf(123400000), 25, 5)), + List.of(new SqlDecimal(BigInteger.valueOf(123400), 10, 2), new SqlDecimal(BigInteger.valueOf(123400000), 25, 5)), + List.of(new SqlDecimal(BigInteger.valueOf(120000), 10, 2), new SqlDecimal(BigInteger.valueOf(120000000), 25, 5)), + List.of(new SqlDecimal(BigInteger.valueOf(123400), 10, 2), new SqlDecimal(BigInteger.valueOf(123400000), 25, 5)), + List.of(new SqlDecimal(BigInteger.valueOf(123400), 10, 2), new SqlDecimal(BigInteger.valueOf(123400000), 25, 5))); + } + + @Test + public void testNumbersTooBigForShortDecimal() + { + RowType rowType = RowType.rowType( + field("amount", DecimalType.createDecimalType(4, 2))); + + List ions = List.of( + "{ amount: 123.4 }", + "{ amount: 1.234 }", + "{ amount: 123 }", + "{ amount: 1234d-10 }", + "{ amount: 1234d2 }"); + + for (String ionText : ions) { + Assertions.assertThrows(TrinoException.class, () -> + assertValues(rowType, ionText, List.of())); + } + } + + @Test + public void testNumbersTooBigForDecimal128() + { + RowType rowType = RowType.rowType( + field("amount", DecimalType.createDecimalType(20, 2))); + + List ions = List.of( + "{ amount: 12345678901234567890.4 }", + "{ amount: 1.234 }", + "{ amount: 12345678901234567890 }", + "{ amount: 999999999999999999999999999999999999.999 }", // 39 nines + "{ amount: 1234d-10 }", + "{ amount: 1234d22 }"); + + for (String ionText : ions) { + Assertions.assertThrows(TrinoException.class, () -> + assertValues(rowType, ionText, List.of())); + } + } + + @Test + public void testPathExtraction() + throws IOException + { + Map pathExtractions = Map.of("bar", "(foo bar)", "baz", "(foo baz)"); + assertValues( + RowType.rowType(field("qux", BOOLEAN), field("bar", INTEGER), field("baz", VARCHAR)), + IonDecoderConfig.defaultConfig().withPathExtractors(pathExtractions), + "{ foo: { bar: 31, baz: quux }, qux: true }", + List.of(true, 31, "quux")); + } + + @Test + public void testNonStructTlvPathExtraction() + throws IOException + { + Map pathExtractions = Map.of("tlv", "()"); + assertValues( + RowType.rowType(field("tlv", new ArrayType(INTEGER))), + IonDecoderConfig.defaultConfig().withPathExtractors(pathExtractions), + "[13, 17] [19, 23]", + List.of(List.of(13, 17)), + List.of(List.of(19, 23))); + } + + /** + * Shows how users can configure mapping sequence positions from Ion values to a Trino row. + */ + @Test + public void testPositionalPathExtraction() + throws IOException + { + Map pathExtractions = Map.of( + "foo", "(0)", + "bar", "(1)"); + RowType rowType = RowType.rowType( + field("foo", INTEGER), + field("bar", VARCHAR)); + + assertValues( + rowType, + IonDecoderConfig.defaultConfig().withPathExtractors(pathExtractions), + "[13, baz] [17, qux]", + List.of(13, "baz"), + List.of(17, "qux")); + } + + @Test + public void testEncode() + throws IOException + { + List row1 = List.of(17, "something", true, new SqlVarbinary(new byte[] {(byte) 0xff}), List.of(1, 2, + 3), List.of(51, "baz"), ImmutableMap.builder() + .put("a", 2) + .put("b", 5) + .buildOrThrow(), 5e0, new SqlDecimal(BigInteger.valueOf(123400), 10, 2), (byte) -1, (short) 32767, + 15L, 12.0f, new SqlDate(toIntExact(LocalDate.of(2025, 1, 1).toEpochDay()))); + List row2 = List.of(31, "somebody", false, new SqlVarbinary(new byte[] {(byte) 0x01, (byte) 0xaa}), + List.of(7, 8, 9), List.of(67, "qux"), ImmutableMap.builder() + .put("foo", 12) + .put("bar", 50) + .buildOrThrow(), 5e0, new SqlDecimal(BigInteger.valueOf(123400), 10, 2), (byte) 0, (short) -1 + , 0L, 0.0f, new SqlDate(toIntExact(LocalDate.of(2025, 1, 1).toEpochDay()))); + String ionText = """ + { magic_num:17, some_text:"something", is_summer:true, byte_clob:{{/w==}}, sequencer:[1,2,3], struction:{ foo:51, bar:"baz"}, map: {a: 2, b: 5}, double_value: 5e0, decimal_value: 1234.00, tiny_int: -1, small_int: 32767, big_int: 15, real_num: 12e0, date: 2025-01-01T00:00:00.000Z } + { magic_num:31, some_text:"somebody", is_summer:false, byte_clob:{{Aao=}}, sequencer:[7,8,9], struction:{ foo:67, bar:"qux"}, map: {foo: 12, bar: 50}, double_value: 5e0, decimal_value: 1234.00, tiny_int: 0, small_int: -1, big_int: 0, real_num: 0e0, date: 2025-01-01T00:00:00.000Z } + """; + + Page page = toPage(TEST_COLUMNS, row1, row2); + assertIonEquivalence(TEST_COLUMNS, page, ionText); + } + + @Test + public void testEncodeTimestamp() + throws IOException + { + List timestampColumn = List.of(new Column("my_ts", TimestampType.TIMESTAMP_NANOS, 0)); + Page page = toPage(timestampColumn, List.of( + toSqlTimestamp(TimestampType.TIMESTAMP_NANOS, LocalDateTime.of(2024, 11, 23, 1, 23, 45, 666777888)))); + assertIonEquivalence(timestampColumn, page, "{ my_ts: 2024-11-23T01:23:45.666777888Z }"); + } + + @Test + public void testEncodeMixedCaseColumn() + throws IOException + { + List casedColumn = List.of( + new Column("TheAnswer", INTEGER, 0)); + + Page page = toPage(casedColumn, List.of(42)); + assertIonEquivalence(casedColumn, page, "{ TheAnswer: 42 }"); + } + + @Test + public void testEncodeWithNullField() + throws IOException + { + List row1 = Arrays.asList(null, null, null, null, null, null, null, null, null, null, null, null, + null, null); + String ionText = """ + {} + """; + + Page page = toPage(TEST_COLUMNS, row1); + assertIonEquivalence(TEST_COLUMNS, page, ionText); + } + + @Test + public void testEncodeWithNullNestedField() + throws IOException + { + List row1 = Arrays.asList(17, "something", true, new SqlVarbinary(new byte[] {(byte) 0xff}), + List.of(1, 2, 3), Arrays.asList(null, "baz"), ImmutableMap.builder() + .put("a", 2) + .put("b", 5) + .buildOrThrow(), 5e0, new SqlDecimal(BigInteger.valueOf(123400), 10, 2), (byte) -1, + (short) 32767, 15L, 12.0f, new SqlDate(toIntExact(LocalDate.of(2025, 1, 1).toEpochDay()))); + List row2 = Arrays.asList(31, "somebody", null, new SqlVarbinary(new byte[] {(byte) 0x01, + (byte) 0xaa}), List.of(7, 8, 9), Arrays.asList(null, "qux"), ImmutableMap.builder() + .put("foo", 12) + .put("bar", 50) + .buildOrThrow(), 5e0, new SqlDecimal(BigInteger.valueOf(123400), 10, 2), (byte) 0, (short) -1, 0L, + 0.0f, new SqlDate(toIntExact(LocalDate.of(2025, 1, 1).toEpochDay()))); + String ionText = """ + { magic_num:17, some_text:"something", is_summer:true, byte_clob:{{/w==}}, sequencer:[1,2,3], struction:{bar:"baz"}, map: {a: 2, b: 5}, double_value: 5e0, decimal_value: 1234.00, tiny_int: -1, small_int: 32767, big_int: 15, real_num: 12e0, date: 2025-01-01T00:00:00.000Z } + { magic_num:31, some_text:"somebody", byte_clob:{{Aao=}}, sequencer:[7,8,9], struction:{bar:"qux"}, map: {foo: 12, bar: 50}, double_value: 5e0, decimal_value: 1234.00, tiny_int: 0, small_int: -1, big_int: 0, real_num: 0e0, date: 2025-01-01T00:00:00.000Z } + """; + + Page page = toPage(TEST_COLUMNS, row1, row2); + assertIonEquivalence(TEST_COLUMNS, page, ionText); + } + + private void assertValues(RowType rowType, String ionText, List... expected) + throws IOException + { + assertValues(rowType, IonDecoderConfig.defaultConfig(), ionText, expected); + } + + private void assertValues(RowType rowType, IonDecoderConfig config, String ionText, List... expected) + throws IOException + { + List fields = rowType.getFields(); + List columns = IntStream.range(0, fields.size()) + .boxed() + .map(i -> { + final RowType.Field field = fields.get(i); + return new Column(field.getName().get(), field.getType(), i); + }) + .toList(); + PageBuilder pageBuilder = new PageBuilder(expected.length, rowType.getFields().stream().map(RowType.Field::getType).toList()); + IonDecoder decoder = IonDecoderFactory.buildDecoder(columns, config, pageBuilder); + + try (IonReader ionReader = IonReaderBuilder.standard().build(ionText)) { + for (int i = 0; i < expected.length; i++) { + assertThat(ionReader.next()).isNotNull(); + pageBuilder.declarePosition(); + decoder.decode(ionReader); + } + assertThat(ionReader.next()).isNull(); + } + + for (int i = 0; i < expected.length; i++) { + List actual = readTrinoValues(columns, pageBuilder.build(), i); + assertColumnValuesEquals(columns, actual, expected[i]); + } + } + + /** + * Encodes the page as Ion and asserts its equivalence to ionText, per the Ion datamodel. + *
+ * This allows us to make assertions about how the data is encoded that may be equivalent + * in the trino datamodel but distinct per the Ion datamodel. Some examples: + * - absent fields vs null field values + * - Symbol vs String for text values + * - Timestamps with UTC vs unknown offset + */ + private void assertIonEquivalence(List columns, Page page, String ionText) + throws IOException + { + IonSystem system = IonSystemBuilder.standard().build(); + IonDatagram datagram = system.newDatagram(); + IonEncoder encoder = IonEncoderFactory.buildEncoder(columns); + IonWriter ionWriter = system.newWriter(datagram); + encoder.encode(ionWriter, page); + ionWriter.close(); + + IonDatagram expected = system.getLoader().load(ionText); + Assertions.assertEquals(datagram.size(), expected.size()); + for (int i = 0; i < expected.size(); i++) { + // IonValue.equals() is Ion model equivalence. + Assertions.assertEquals(expected.get(i), datagram.get(i)); + } + } +} diff --git a/plugin/trino-hive/pom.xml b/plugin/trino-hive/pom.xml index fdda2afcc447..60ca167c9f2f 100644 --- a/plugin/trino-hive/pom.xml +++ b/plugin/trino-hive/pom.xml @@ -14,6 +14,13 @@ Trino - Hive connector + + + com.amazon.ion + ion-java + ${dep.ion.version} + + com.amazonaws aws-java-sdk-core @@ -386,6 +393,13 @@ runtime + + com.amazon.ion + ion-hive3-serde + 1.2.0 + test + + io.airlift http-server diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java index 5b392f4bf14f..5ea6dedd8b02 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java @@ -111,6 +111,12 @@ public class HiveConfig private String orcLegacyTimeZone = TimeZone.getDefault().getID(); + // This is a feature flag for Ion native trino integration. + // Default value is false and requires explicitly setting it to true to enable Ion native trino integration. + // TODO: This should change to default as true in future once we have a complete implementation of Ion native + // trino integration supported. + private boolean ionNativeTrinoEnabled; + private String parquetTimeZone = TimeZone.getDefault().getID(); private boolean useParquetColumnNames = true; @@ -714,6 +720,19 @@ public HiveConfig setOrcLegacyTimeZone(String orcLegacyTimeZone) return this; } + public boolean getIonNativeTrinoEnabled() + { + return ionNativeTrinoEnabled; + } + + @Config("hive.ion.nativetrino") + @ConfigDescription("Feature flag to enable Ion native trino integration") + public HiveConfig setIonNativeTrinoEnabled(boolean ionNativeTrinoEnabled) + { + this.ionNativeTrinoEnabled = ionNativeTrinoEnabled; + return this; + } + public DateTimeZone getParquetDateTimeZone() { TimeZone timeZone = TimeZone.getTimeZone(ZoneId.of(parquetTimeZone)); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java index 253cd16e01b7..ce8d8feae5da 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java @@ -26,6 +26,8 @@ import io.trino.plugin.hive.fs.CachingDirectoryLister; import io.trino.plugin.hive.fs.DirectoryLister; import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryListerFactory; +import io.trino.plugin.hive.ion.IonFileWriterFactory; +import io.trino.plugin.hive.ion.IonPageSourceFactory; import io.trino.plugin.hive.line.CsvFileWriterFactory; import io.trino.plugin.hive.line.CsvPageSourceFactory; import io.trino.plugin.hive.line.JsonFileWriterFactory; @@ -121,6 +123,7 @@ public void configure(Binder binder) pageSourceFactoryBinder.addBinding().to(ParquetPageSourceFactory.class).in(Scopes.SINGLETON); pageSourceFactoryBinder.addBinding().to(RcFilePageSourceFactory.class).in(Scopes.SINGLETON); pageSourceFactoryBinder.addBinding().to(AvroPageSourceFactory.class).in(Scopes.SINGLETON); + pageSourceFactoryBinder.addBinding().to(IonPageSourceFactory.class).in(Scopes.SINGLETON); Multibinder fileWriterFactoryBinder = newSetBinder(binder, HiveFileWriterFactory.class); binder.bind(OrcFileWriterFactory.class).in(Scopes.SINGLETON); @@ -136,6 +139,7 @@ public void configure(Binder binder) fileWriterFactoryBinder.addBinding().to(OrcFileWriterFactory.class).in(Scopes.SINGLETON); fileWriterFactoryBinder.addBinding().to(RcFileFileWriterFactory.class).in(Scopes.SINGLETON); fileWriterFactoryBinder.addBinding().to(AvroFileWriterFactory.class).in(Scopes.SINGLETON); + fileWriterFactoryBinder.addBinding().to(IonFileWriterFactory.class).in(Scopes.SINGLETON); configBinder(binder).bindConfig(ParquetReaderConfig.class); configBinder(binder).bindConfig(ParquetWriterConfig.class); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java index b9036479cff0..db3c4a2b5ffa 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java @@ -35,6 +35,9 @@ import static io.trino.hive.formats.HiveClassNames.COLUMNAR_SERDE_CLASS; import static io.trino.hive.formats.HiveClassNames.HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS; import static io.trino.hive.formats.HiveClassNames.HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS; +import static io.trino.hive.formats.HiveClassNames.ION_INPUT_FORMAT; +import static io.trino.hive.formats.HiveClassNames.ION_OUTPUT_FORMAT; +import static io.trino.hive.formats.HiveClassNames.ION_SERDE_CLASS; import static io.trino.hive.formats.HiveClassNames.JSON_SERDE_CLASS; import static io.trino.hive.formats.HiveClassNames.LAZY_BINARY_COLUMNAR_SERDE_CLASS; import static io.trino.hive.formats.HiveClassNames.LAZY_SIMPLE_SERDE_CLASS; @@ -101,7 +104,11 @@ public enum HiveStorageFormat REGEX( REGEX_SERDE_CLASS, TEXT_INPUT_FORMAT_CLASS, - HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS); + HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS), + ION( + ION_SERDE_CLASS, + ION_INPUT_FORMAT, + ION_OUTPUT_FORMAT); private final String serde; private final String inputFormat; @@ -135,6 +142,7 @@ public boolean isSplittable(String path) return switch (this) { case ORC, PARQUET, AVRO, RCBINARY, RCTEXT, SEQUENCEFILE -> true; case JSON, OPENX_JSON, TEXTFILE, CSV, REGEX -> CompressionKind.forFile(path).isEmpty(); + case ION -> false; }; } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java index 1cdc3935d84d..15e722e71425 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java @@ -64,6 +64,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.MoreCollectors.onlyElement; import static io.trino.hive.formats.HiveClassNames.HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS; +import static io.trino.hive.formats.HiveClassNames.ION_OUTPUT_FORMAT; import static io.trino.metastore.AcidOperation.CREATE_TABLE; import static io.trino.metastore.Partitions.makePartName; import static io.trino.plugin.hive.HiveCompressionCodecs.selectCompressionCodec; @@ -674,11 +675,13 @@ public static int getBucketFromFileName(String fileName) public static String getFileExtension(HiveCompressionCodec compression, StorageFormat format) { - // text format files must have the correct extension when compressed - return compression.getHiveCompressionKind() - .filter(_ -> format.getOutputFormat().equals(HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS)) - .map(CompressionKind::getFileExtension) - .orElse(""); + // text format files and ion format files must have the correct extension when compressed + return switch (format.getOutputFormat()) { + case ION_OUTPUT_FORMAT, HIVE_IGNORE_KEY_OUTPUT_FORMAT_CLASS -> compression.getHiveCompressionKind() + .map(CompressionKind::getFileExtension) + .orElse(""); + default -> ""; + }; } @VisibleForTesting diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriter.java new file mode 100644 index 000000000000..1e2c4b1ff6d3 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriter.java @@ -0,0 +1,130 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.ion; + +import com.amazon.ion.IonWriter; +import com.google.common.io.CountingOutputStream; +import io.trino.hive.formats.compression.CompressionKind; +import io.trino.hive.formats.ion.IonEncoder; +import io.trino.hive.formats.ion.IonEncoderFactory; +import io.trino.hive.formats.line.Column; +import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.plugin.hive.FileWriter; +import io.trino.spi.Page; +import io.trino.spi.TrinoException; +import io.trino.spi.type.TypeManager; + +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.Optional; +import java.util.function.LongSupplier; + +import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; + +public class IonFileWriter + implements FileWriter +{ + private final AggregatedMemoryContext outputStreamMemoryContext; + private final Closeable rollbackAction; + private final IonEncoder pageEncoder; + private final IonWriter writer; + private final OutputStream outputStream; + private final LongSupplier bytesWritten; + + public IonFileWriter( + OutputStream outputStream, + AggregatedMemoryContext outputStreamMemoryContext, + Closeable rollbackAction, + TypeManager typeManager, + Optional compressionKind, + IonSerDeProperties.IonEncoding ionEncoding, + List columns) + throws IOException + { + this.outputStreamMemoryContext = outputStreamMemoryContext; + this.rollbackAction = rollbackAction; + this.pageEncoder = IonEncoderFactory.buildEncoder(columns); + CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream); + this.bytesWritten = countingOutputStream::getCount; + if (compressionKind.isPresent()) { + this.outputStream = compressionKind.get().createCodec() + .createStreamCompressor(countingOutputStream); + } + else { + this.outputStream = countingOutputStream; + } + this.writer = ionEncoding.createWriter(this.outputStream); + } + + @Override + public long getWrittenBytes() + { + return bytesWritten.getAsLong(); + } + + @Override + public long getMemoryUsage() + { + return outputStreamMemoryContext.getBytes(); + } + + @Override + public Closeable commit() + { + try { + writer.close(); + } + catch (Exception e) { + try { + rollbackAction.close(); + } + catch (Exception _) { + // ignore + } + throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error committing write to Ion file", e); + } + return rollbackAction; + } + + @Override + public void rollback() + { + try (rollbackAction) { + writer.close(); + } + catch (IOException e) { + throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Ion file", e); + } + } + + @Override + public long getValidationCpuNanos() + { + return 0; + } + + @Override + public void appendRows(Page page) + { + try { + pageEncoder.encode(writer, page); + } + catch (IOException e) { + throw new TrinoException(HIVE_WRITER_DATA_ERROR, e); + } + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriterFactory.java new file mode 100644 index 000000000000..7290be076b8e --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonFileWriterFactory.java @@ -0,0 +1,116 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.ion; + +import com.google.inject.Inject; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.TrinoOutputFile; +import io.trino.hive.formats.line.Column; +import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.metastore.StorageFormat; +import io.trino.plugin.hive.FileWriter; +import io.trino.plugin.hive.HiveCompressionCodec; +import io.trino.plugin.hive.HiveConfig; +import io.trino.plugin.hive.HiveFileWriterFactory; +import io.trino.plugin.hive.WriterKind; +import io.trino.plugin.hive.acid.AcidTransaction; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.type.Type; +import io.trino.spi.type.TypeManager; + +import java.io.Closeable; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.stream.IntStream; + +import static io.trino.hive.formats.HiveClassNames.ION_OUTPUT_FORMAT; +import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR; +import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision; +import static io.trino.plugin.hive.util.HiveTypeUtil.getType; +import static io.trino.plugin.hive.util.HiveUtil.getColumnNames; +import static io.trino.plugin.hive.util.HiveUtil.getColumnTypes; + +public class IonFileWriterFactory + implements HiveFileWriterFactory +{ + private final TrinoFileSystemFactory fileSystemFactory; + private final TypeManager typeManager; + private final boolean nativeTrinoEnabled; + + @Inject + public IonFileWriterFactory( + TrinoFileSystemFactory fileSystemFactory, + TypeManager typeManager, + HiveConfig hiveConfig) + { + this.fileSystemFactory = fileSystemFactory; + this.typeManager = typeManager; + this.nativeTrinoEnabled = hiveConfig.getIonNativeTrinoEnabled(); + } + + @Override + public Optional createFileWriter( + Location location, + List inputColumnNames, + StorageFormat storageFormat, + HiveCompressionCodec compressionCodec, + Map schema, + ConnectorSession session, + OptionalInt bucketNumber, + AcidTransaction transaction, + boolean useAcidSchema, + WriterKind writerKind) + { + if (!nativeTrinoEnabled + || !ION_OUTPUT_FORMAT.equals(storageFormat.getOutputFormat()) + || IonSerDeProperties.hasUnsupportedProperty(schema)) { + return Optional.empty(); + } + + try { + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + TrinoOutputFile outputFile = fileSystem.newOutputFile(location); + AggregatedMemoryContext outputStreamMemoryContext = newSimpleAggregatedMemoryContext(); + + Closeable rollbackAction = () -> fileSystem.deleteFile(location); + + List fileColumnNames = getColumnNames(schema); + List fileColumnTypes = getColumnTypes(schema).stream() + .map(hiveType -> getType(hiveType, typeManager, getTimestampPrecision(session))) + .toList(); + + List columns = IntStream.range(0, fileColumnNames.size()) + .mapToObj(ordinal -> new Column(fileColumnNames.get(ordinal), fileColumnTypes.get(ordinal), ordinal)) + .toList(); + + return Optional.of(new IonFileWriter( + outputFile.create(outputStreamMemoryContext), + outputStreamMemoryContext, + rollbackAction, + typeManager, + compressionCodec.getHiveCompressionKind(), + IonSerDeProperties.getIonEncoding(schema), + columns)); + } + catch (Exception e) { + throw new TrinoException(HIVE_WRITER_OPEN_ERROR, "Error creating Ion Output", e); + } + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonPageSource.java new file mode 100644 index 000000000000..6c1ab4cca7a0 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonPageSource.java @@ -0,0 +1,125 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.ion; + +import com.amazon.ion.IonBufferConfiguration; +import com.amazon.ion.IonReader; +import com.amazon.ion.IonType; +import io.trino.hive.formats.ion.IonDecoder; +import io.trino.spi.Page; +import io.trino.spi.PageBuilder; +import io.trino.spi.connector.ConnectorPageSource; + +import java.io.IOException; +import java.util.OptionalLong; +import java.util.function.LongSupplier; + +import static io.airlift.slice.SizeOf.instanceSize; + +public class IonPageSource + implements ConnectorPageSource +{ + private static final int INSTANCE_SIZE = instanceSize(IonPageSource.class); + + private final IonReader ionReader; + private final LongSupplier counter; + private final PageBuilder pageBuilder; + private final IonDecoder decoder; + private long readTimeNanos; + private int completedPositions; + private boolean finished; + + public IonPageSource(IonReader ionReader, LongSupplier counter, IonDecoder decoder, PageBuilder pageBuilder) + { + this.ionReader = ionReader; + this.counter = counter; + this.decoder = decoder; + this.pageBuilder = pageBuilder; + this.completedPositions = 0; + this.readTimeNanos = 0; + } + + @Override + public long getCompletedBytes() + { + return counter.getAsLong(); + } + + @Override + public OptionalLong getCompletedPositions() + { + return OptionalLong.of(completedPositions); + } + + @Override + public long getReadTimeNanos() + { + return readTimeNanos; + } + + @Override + public boolean isFinished() + { + return finished; + } + + @Override + public Page getNextPage() + { + while (!pageBuilder.isFull()) { + if (!readNextValue()) { + finished = true; + break; + } + } + + Page page = pageBuilder.build(); + completedPositions += page.getPositionCount(); + pageBuilder.reset(); + return page; + } + + @Override + public long getMemoryUsage() + { + // we don't have the ability to ask an IonReader how many bytes it has buffered + // it will buffer as much as is needed for each top-level-value. + int assumedIonBufferSize = IonBufferConfiguration.DEFAULT.getInitialBufferSize() * 4; + return INSTANCE_SIZE + assumedIonBufferSize + pageBuilder.getRetainedSizeInBytes(); + } + + @Override + public void close() + throws IOException + { + ionReader.close(); + } + + private boolean readNextValue() + { + long start = System.nanoTime(); + final IonType type = ionReader.next(); + + if (type == null) { + readTimeNanos += System.nanoTime() - start; + return false; + } + + pageBuilder.declarePosition(); + decoder.decode(ionReader); + + readTimeNanos += System.nanoTime() - start; + return true; + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonPageSourceFactory.java new file mode 100644 index 000000000000..7c5150eec1fd --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonPageSourceFactory.java @@ -0,0 +1,148 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.ion; + +import com.amazon.ion.IonReader; +import com.amazon.ion.system.IonReaderBuilder; +import com.google.common.io.CountingInputStream; +import com.google.inject.Inject; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.TrinoInputFile; +import io.trino.hive.formats.compression.Codec; +import io.trino.hive.formats.compression.CompressionKind; +import io.trino.hive.formats.ion.IonDecoder; +import io.trino.hive.formats.ion.IonDecoderConfig; +import io.trino.hive.formats.ion.IonDecoderFactory; +import io.trino.hive.formats.line.Column; +import io.trino.plugin.hive.AcidInfo; +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hive.HiveConfig; +import io.trino.plugin.hive.HivePageSourceFactory; +import io.trino.plugin.hive.ReaderColumns; +import io.trino.plugin.hive.ReaderPageSource; +import io.trino.plugin.hive.Schema; +import io.trino.plugin.hive.acid.AcidTransaction; +import io.trino.spi.PageBuilder; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.EmptyPageSource; +import io.trino.spi.predicate.TupleDomain; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Optional; +import java.util.OptionalInt; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.hive.formats.HiveClassNames.ION_SERDE_CLASS; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; +import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns; +import static io.trino.plugin.hive.ReaderPageSource.noProjectionAdaptation; +import static io.trino.plugin.hive.util.HiveUtil.splitError; + +public class IonPageSourceFactory + implements HivePageSourceFactory +{ + private final TrinoFileSystemFactory trinoFileSystemFactory; + private final boolean nativeTrinoEnabled; + + @Inject + public IonPageSourceFactory(TrinoFileSystemFactory trinoFileSystemFactory, HiveConfig hiveConfig) + { + this.trinoFileSystemFactory = trinoFileSystemFactory; + this.nativeTrinoEnabled = hiveConfig.getIonNativeTrinoEnabled(); + } + + @Override + public Optional createPageSource( + ConnectorSession session, + Location path, + long start, + long length, + long estimatedFileSize, + long lastModifiedTime, + Schema schema, + List columns, + TupleDomain effectivePredicate, + Optional acidInfo, + OptionalInt bucketNumber, + boolean originalFile, + AcidTransaction transaction) + { + if (!nativeTrinoEnabled + || !ION_SERDE_CLASS.equals(schema.serializationLibraryName()) + || IonSerDeProperties.hasUnsupportedProperty(schema.serdeProperties())) { + return Optional.empty(); + } + + checkArgument(acidInfo.isEmpty(), "Acid is not supported for Ion files"); + + // Skip empty inputs + if (length == 0) { + return Optional.of(noProjectionAdaptation(new EmptyPageSource())); + } + + if (start != 0) { + throw new TrinoException(HIVE_CANNOT_OPEN_SPLIT, "Split start must be 0 for Ion files"); + } + + List projectedReaderColumns = columns; + Optional readerProjections = projectBaseColumns(columns); + + if (readerProjections.isPresent()) { + projectedReaderColumns = readerProjections.get().get().stream() + .map(HiveColumnHandle.class::cast) + .collect(toImmutableList()); + } + + TrinoFileSystem trinoFileSystem = trinoFileSystemFactory.create(session); + TrinoInputFile inputFile = trinoFileSystem.newInputFile(path, estimatedFileSize); + + // todo: optimization for small files that should just be read into memory + try { + Optional codec = CompressionKind.forFile(inputFile.location().fileName()) + .map(CompressionKind::createCodec); + + CountingInputStream countingInputStream = new CountingInputStream(inputFile.newStream()); + InputStream inputStream; + if (codec.isPresent()) { + inputStream = codec.get().createStreamDecompressor(countingInputStream); + } + else { + inputStream = countingInputStream; + } + + PageBuilder pageBuilder = new PageBuilder(projectedReaderColumns.stream() + .map(HiveColumnHandle::getType) + .toList()); + List decoderColumns = projectedReaderColumns.stream() + .map(hc -> new Column(hc.getName(), hc.getType(), hc.getBaseHiveColumnIndex())) + .toList(); + + IonDecoderConfig decoderConfig = IonSerDeProperties.decoderConfigFor(schema.serdeProperties()); + IonDecoder decoder = IonDecoderFactory.buildDecoder(decoderColumns, decoderConfig, pageBuilder); + IonReader ionReader = IonReaderBuilder.standard().build(inputStream); + IonPageSource pageSource = new IonPageSource(ionReader, countingInputStream::getCount, decoder, pageBuilder); + + return Optional.of(new ReaderPageSource(pageSource, readerProjections)); + } + catch (IOException e) { + throw new TrinoException(HIVE_CANNOT_OPEN_SPLIT, splitError(e, path, start, length), e); + } + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonSerDeProperties.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonSerDeProperties.java new file mode 100644 index 000000000000..aac0d3735636 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ion/IonSerDeProperties.java @@ -0,0 +1,142 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.ion; + +import com.amazon.ion.IonWriter; +import com.amazon.ion.system.IonBinaryWriterBuilder; +import com.amazon.ion.system.IonTextWriterBuilder; +import com.google.common.collect.ImmutableMap; +import io.trino.hive.formats.ion.IonDecoderConfig; +import io.trino.spi.TrinoException; + +import java.io.OutputStream; +import java.util.Locale; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT; + +public final class IonSerDeProperties +{ + // Reader properties + public static final String STRICT_PATH_TYPING_PROPERTY = "ion.path_extractor.strict"; + public static final String STRICT_PATH_TYPING_DEFAULT = "false"; + public static final String PATH_EXTRACTOR_PROPERTY = "ion.(\\w+).path_extractor"; + public static final String PATH_EXTRACTION_CASE_SENSITIVITY = "ion.path_extractor.case_sensitive"; + public static final String PATH_EXTRACTION_CASE_SENSITIVITY_DEFAULT = "false"; + private static final Pattern pathExtractorPattern = Pattern.compile(PATH_EXTRACTOR_PROPERTY); + + // unimplemented reader properties + public static final String FAIL_ON_OVERFLOW_PROPERTY = "ion.fail_on_overflow"; + public static final String FAIL_ON_OVERFLOW_PROPERTY_DEFAULT = "true"; + public static final String COLUMN_FAIL_ON_OVERFLOW_PROPERTY = "ion.\\w+.fail_on_overflow"; + public static final String IGNORE_MALFORMED = "ion.ignore_malformed"; + public static final String IGNORE_MALFORMED_DEFAULT = "false"; + + // Writer properties + public static final String ION_ENCODING_PROPERTY = "ion.encoding"; + public static final String TEXT_ENCODING = "text"; + public static final String BINARY_ENCODING = "binary"; + + // unimplemented writer properties + public static final String ION_TIMESTAMP_OFFSET_PROPERTY = "ion.timestamp.serialization_offset"; + public static final String ION_TIMESTAMP_OFFSET_DEFAULT = "Z"; + public static final String ION_SERIALIZE_NULL_AS_PROPERTY = "ion.serialize_null"; + public static final String ION_SERIALIZE_NULL_AS_DEFAULT = "OMIT"; + public static final String ION_SERIALIZE_AS_PROPERTY = "ion.\\w+.serialize_as"; + + private static final Pattern unsupportedPropertiesRegex = Pattern.compile( + ION_SERIALIZE_AS_PROPERTY + "|" + COLUMN_FAIL_ON_OVERFLOW_PROPERTY); + + private static final Map defaultOnlyProperties = Map.of( + // reader properties + FAIL_ON_OVERFLOW_PROPERTY, FAIL_ON_OVERFLOW_PROPERTY_DEFAULT, + IGNORE_MALFORMED, IGNORE_MALFORMED_DEFAULT, + + // writer properties + ION_TIMESTAMP_OFFSET_PROPERTY, ION_TIMESTAMP_OFFSET_DEFAULT, + ION_SERIALIZE_NULL_AS_PROPERTY, ION_SERIALIZE_NULL_AS_DEFAULT); + + private IonSerDeProperties() {} + + public static IonDecoderConfig decoderConfigFor(Map propertiesMap) + { + ImmutableMap.Builder extractionsBuilder = ImmutableMap.builder(); + + for (Map.Entry property : propertiesMap.entrySet()) { + Matcher matcher = pathExtractorPattern.matcher(property.getKey()); + if (matcher.matches()) { + extractionsBuilder.put(matcher.group(1), property.getValue()); + } + } + + Boolean strictTyping = Boolean.parseBoolean( + propertiesMap.getOrDefault(STRICT_PATH_TYPING_PROPERTY, STRICT_PATH_TYPING_DEFAULT)); + Boolean caseSensitive = Boolean.parseBoolean( + propertiesMap.getOrDefault(PATH_EXTRACTION_CASE_SENSITIVITY, PATH_EXTRACTION_CASE_SENSITIVITY_DEFAULT)); + + return new IonDecoderConfig(extractionsBuilder.buildOrThrow(), strictTyping, caseSensitive); + } + + public enum IonEncoding + { + BINARY { + @Override + public IonWriter createWriter(OutputStream outputStream) + { + return IonBinaryWriterBuilder.standard().build(outputStream); + } + }, + + TEXT { + @Override + public IonWriter createWriter(OutputStream outputStream) + { + return IonTextWriterBuilder.minimal().build(outputStream); + } + }; + + public abstract IonWriter createWriter(OutputStream outputStream); + } + + public static IonEncoding getIonEncoding(Map schema) + { + String encodingStr = schema.getOrDefault(ION_ENCODING_PROPERTY, BINARY_ENCODING); + return switch (encodingStr.toLowerCase(Locale.ROOT)) { + case TEXT_ENCODING -> IonEncoding.TEXT; + case BINARY_ENCODING -> IonEncoding.BINARY; + default -> throw new TrinoException(HIVE_UNSUPPORTED_FORMAT, + "Unsupported Ion encoding format: " + encodingStr); + }; + } + + public static boolean hasUnsupportedProperty(Map properties) + { + return properties.entrySet().stream() + .anyMatch((entry) -> { + String key = entry.getKey(); + String value = entry.getValue(); + if (!key.startsWith("ion.")) { + return false; + } + + if (!defaultOnlyProperties.getOrDefault(key, value).equals(value)) { + return true; + } + + return unsupportedPropertiesRegex.matcher(key).matches(); + }); + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 765e7625827c..49e7b4a5aecf 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -225,6 +225,11 @@ protected static QueryRunner createHiveQueryRunner(HiveQueryRunner.Builder bu // Make weighted split scheduling more conservative to avoid OOMs in test .addHiveProperty("hive.minimum-assigned-split-weight", "0.5") .addHiveProperty("hive.partition-projection-enabled", "true") + // This is needed for Ion native trino support as the implementation is in progress and can be + // backwards incompatible. It requires explicitly opting to use this feature. + // TODO: In future this property should change to `true` as default and then the following statement can + // be removed. + .addHiveProperty("hive.ion.nativetrino", "true") // This is needed for e2e scale writers test otherwise 50% threshold of // bufferSize won't get exceeded for scaling to happen. .addExtraProperty("task.max-local-exchange-buffer-size", "32MB") @@ -5685,6 +5690,7 @@ private boolean isMappingByName(HiveStorageFormat format) case PARQUET -> true; case AVRO -> true; case JSON -> true; + case ION -> true; case ORC -> false; case RCBINARY -> false; case RCTEXT -> false; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java index 1d74a78334ed..a50c66f67f20 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java @@ -36,6 +36,8 @@ import io.trino.plugin.base.metrics.FileFormatDataSourceStats; import io.trino.plugin.hive.avro.AvroFileWriterFactory; import io.trino.plugin.hive.avro.AvroPageSourceFactory; +import io.trino.plugin.hive.ion.IonFileWriterFactory; +import io.trino.plugin.hive.ion.IonPageSourceFactory; import io.trino.plugin.hive.line.CsvFileWriterFactory; import io.trino.plugin.hive.line.CsvPageSourceFactory; import io.trino.plugin.hive.line.JsonFileWriterFactory; @@ -57,6 +59,7 @@ import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.ParquetWriterConfig; import io.trino.plugin.hive.rcfile.RcFilePageSourceFactory; +import io.trino.plugin.hive.util.HiveTypeTranslator; import io.trino.spi.PageSorter; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorSession; @@ -178,6 +181,9 @@ public static Set getDefaultHivePageSourceFactories(HdfsE public static Set getDefaultHivePageSourceFactories(TrinoFileSystemFactory fileSystemFactory, HiveConfig hiveConfig) { FileFormatDataSourceStats stats = new FileFormatDataSourceStats(); + // set IonNativeTrino as true in hiveConfig for testing + // TODO: In future this flag should change to `true` as default and then the following statement can be removed. + hiveConfig.setIonNativeTrinoEnabled(true); return ImmutableSet.builder() .add(new CsvPageSourceFactory(fileSystemFactory, hiveConfig)) .add(new JsonPageSourceFactory(fileSystemFactory, hiveConfig)) @@ -189,6 +195,7 @@ public static Set getDefaultHivePageSourceFactories(Trino .add(new RcFilePageSourceFactory(fileSystemFactory, hiveConfig)) .add(new OrcPageSourceFactory(new OrcReaderConfig(), fileSystemFactory, stats, hiveConfig)) .add(new ParquetPageSourceFactory(fileSystemFactory, stats, new ParquetReaderConfig(), hiveConfig)) + .add(new IonPageSourceFactory(fileSystemFactory, hiveConfig)) .build(); } @@ -211,6 +218,7 @@ public static Set getDefaultHiveFileWriterFactories(HiveC .add(new RcFileFileWriterFactory(fileSystemFactory, TESTING_TYPE_MANAGER, nodeVersion, hiveConfig)) .add(new OrcFileWriterFactory(fileSystemFactory, TESTING_TYPE_MANAGER, nodeVersion, new FileFormatDataSourceStats(), new OrcWriterConfig())) .add(new ParquetFileWriterFactory(fileSystemFactory, nodeVersion, TESTING_TYPE_MANAGER, hiveConfig, new FileFormatDataSourceStats())) + .add(new IonFileWriterFactory(fileSystemFactory, TESTING_TYPE_MANAGER, hiveConfig)) .build(); } @@ -318,6 +326,62 @@ public static Object toNativeContainerValue(Type type, Object hiveValue) throw new IllegalArgumentException("Unsupported type: " + type); } + public static HiveColumnHandle toHiveBaseColumnHandle(String name, Type type, int ordinal) + { + return new HiveColumnHandle( + name, + ordinal, + HiveTypeTranslator.toHiveType(type), + type, + Optional.empty(), + HiveColumnHandle.ColumnType.REGULAR, + Optional.empty()); + } + + public static HiveColumnHandle projectedColumn(HiveColumnHandle baseColumn, String... path) + { + if (path.length == 0) { + throw new IllegalArgumentException("path must have at least one element"); + } + + final Type baseType = baseColumn.getBaseType(); + Type type = baseType; + ImmutableList.Builder derefBuilder = ImmutableList.builder(); + + for (String fieldName : path) { + if (type instanceof RowType rowType) { + List fields = rowType.getFields(); + type = null; + for (int pos = 0; pos < fields.size(); pos++) { + if (fields.get(pos).getName().get().equals(fieldName)) { + derefBuilder.add(pos); + type = fields.get(pos).getType(); + break; + } + } + if (type == null) { + throw new IllegalArgumentException(String.format("could not find field named: %s!", fieldName)); + } + } + else { + throw new IllegalArgumentException("cannot step into non-RowType!"); + } + } + + return new HiveColumnHandle( + baseColumn.getBaseColumnName(), + baseColumn.getBaseHiveColumnIndex(), + HiveTypeTranslator.toHiveType(baseType), + baseType, + Optional.of(new HiveColumnProjectionInfo( + derefBuilder.build(), + ImmutableList.copyOf(path), + HiveTypeTranslator.toHiveType(type), + type)), + baseColumn.getColumnType(), + baseColumn.getComment()); + } + private static UUID uuidFromBytes(byte[] bytes) { ByteBuffer buffer = ByteBuffer.wrap(bytes); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java index d07f3a58b821..c02fe9725d5e 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java @@ -78,6 +78,7 @@ public void testDefaults() .setParallelPartitionedBucketedWrites(true) .setTextMaxLineLength(DataSize.of(100, Unit.MEGABYTE)) .setOrcLegacyTimeZone(TimeZone.getDefault().getID()) + .setIonNativeTrinoEnabled(false) .setParquetTimeZone(TimeZone.getDefault().getID()) .setUseParquetColumnNames(true) .setRcfileTimeZone(TimeZone.getDefault().getID()) @@ -162,6 +163,7 @@ public void testExplicitPropertyMappings() .put("hive.max-partition-drops-per-query", "1000") .put("hive.text.max-line-length", "13MB") .put("hive.orc.time-zone", nonDefaultTimeZone().getID()) + .put("hive.ion.nativetrino", "true") .put("hive.parquet.time-zone", nonDefaultTimeZone().getID()) .put("hive.parquet.use-column-names", "false") .put("hive.rcfile.time-zone", nonDefaultTimeZone().getID()) @@ -245,6 +247,7 @@ public void testExplicitPropertyMappings() .setParallelPartitionedBucketedWrites(false) .setTextMaxLineLength(DataSize.of(13, Unit.MEGABYTE)) .setOrcLegacyTimeZone(nonDefaultTimeZone().getID()) + .setIonNativeTrinoEnabled(true) .setParquetTimeZone(nonDefaultTimeZone().getID()) .setUseParquetColumnNames(false) .setRcfileTimeZone(nonDefaultTimeZone().getID()) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java index d4efe5fe6fc3..87e2e81e48bd 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java @@ -32,6 +32,8 @@ import io.trino.plugin.base.type.DecodedTimestamp; import io.trino.plugin.hive.avro.AvroFileWriterFactory; import io.trino.plugin.hive.avro.AvroPageSourceFactory; +import io.trino.plugin.hive.ion.IonFileWriterFactory; +import io.trino.plugin.hive.ion.IonPageSourceFactory; import io.trino.plugin.hive.line.CsvFileWriterFactory; import io.trino.plugin.hive.line.CsvPageSourceFactory; import io.trino.plugin.hive.line.JsonFileWriterFactory; @@ -141,6 +143,7 @@ import static io.trino.plugin.hive.HivePageSourceProvider.ColumnMapping.buildColumnMappings; import static io.trino.plugin.hive.HiveStorageFormat.AVRO; import static io.trino.plugin.hive.HiveStorageFormat.CSV; +import static io.trino.plugin.hive.HiveStorageFormat.ION; import static io.trino.plugin.hive.HiveStorageFormat.JSON; import static io.trino.plugin.hive.HiveStorageFormat.OPENX_JSON; import static io.trino.plugin.hive.HiveStorageFormat.ORC; @@ -365,6 +368,29 @@ public void testOpenXJson(int rowCount, long fileSizePadding) .isReadableByPageSource(fileSystemFactory -> new OpenXJsonPageSourceFactory(fileSystemFactory, new HiveConfig())); } + @Test(dataProvider = "validRowAndFileSizePadding") + public void testIonWithBinaryEncoding(int rowCount, long fileSizePadding) + throws Exception + { + List testColumns = TEST_COLUMNS.stream() + // even though maps with text keys work with the native trino impl + // there is an error when testing against the hive serde + .filter(tc -> !(tc.type instanceof MapType)) + .collect(toList()); + + HiveConfig hiveConfig = new HiveConfig(); + // enable Ion native trino integration for testing while the implementation is in progress + // TODO: In future this flag should change to `true` as default and then the following statement can be removed. + hiveConfig.setIonNativeTrinoEnabled(true); + + assertThatFileFormat(ION) + .withColumns(testColumns) + .withRowsCount(rowCount) + .withFileSizePadding(fileSizePadding) + .withFileWriterFactory(fileSystemFactory -> new IonFileWriterFactory(fileSystemFactory, TESTING_TYPE_MANAGER, hiveConfig)) + .isReadableByPageSource(fileSystemFactory -> new IonPageSourceFactory(fileSystemFactory, hiveConfig)); + } + @Test(dataProvider = "validRowAndFileSizePadding") public void testRcTextPageSource(int rowCount, long fileSizePadding) throws Exception diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/ion/IonPageSourceSmokeTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/ion/IonPageSourceSmokeTest.java new file mode 100644 index 000000000000..a5f15a59a86d --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/ion/IonPageSourceSmokeTest.java @@ -0,0 +1,529 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.ion; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoOutputFile; +import io.trino.filesystem.memory.MemoryFileSystemFactory; +import io.trino.hive.formats.compression.CompressionKind; +import io.trino.metastore.HiveType; +import io.trino.plugin.hive.FileWriter; +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hive.HiveCompressionCodec; +import io.trino.plugin.hive.HiveConfig; +import io.trino.plugin.hive.HivePageSourceProvider; +import io.trino.plugin.hive.Schema; +import io.trino.plugin.hive.WriterKind; +import io.trino.spi.Page; +import io.trino.spi.TrinoException; +import io.trino.spi.block.IntArrayBlock; +import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.type.ArrayType; +import io.trino.spi.type.RowType; +import io.trino.testing.MaterializedResult; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static io.trino.plugin.hive.HivePageSourceProvider.ColumnMapping.buildColumnMappings; +import static io.trino.plugin.hive.HiveStorageFormat.ION; +import static io.trino.plugin.hive.HiveTestUtils.getHiveSession; +import static io.trino.plugin.hive.HiveTestUtils.projectedColumn; +import static io.trino.plugin.hive.HiveTestUtils.toHiveBaseColumnHandle; +import static io.trino.plugin.hive.acid.AcidTransaction.NO_ACID_TRANSACTION; +import static io.trino.plugin.hive.ion.IonSerDeProperties.BINARY_ENCODING; +import static io.trino.plugin.hive.ion.IonSerDeProperties.FAIL_ON_OVERFLOW_PROPERTY; +import static io.trino.plugin.hive.ion.IonSerDeProperties.FAIL_ON_OVERFLOW_PROPERTY_DEFAULT; +import static io.trino.plugin.hive.ion.IonSerDeProperties.IGNORE_MALFORMED; +import static io.trino.plugin.hive.ion.IonSerDeProperties.IGNORE_MALFORMED_DEFAULT; +import static io.trino.plugin.hive.ion.IonSerDeProperties.ION_ENCODING_PROPERTY; +import static io.trino.plugin.hive.ion.IonSerDeProperties.ION_SERIALIZE_NULL_AS_DEFAULT; +import static io.trino.plugin.hive.ion.IonSerDeProperties.ION_SERIALIZE_NULL_AS_PROPERTY; +import static io.trino.plugin.hive.ion.IonSerDeProperties.ION_TIMESTAMP_OFFSET_DEFAULT; +import static io.trino.plugin.hive.ion.IonSerDeProperties.ION_TIMESTAMP_OFFSET_PROPERTY; +import static io.trino.plugin.hive.ion.IonSerDeProperties.STRICT_PATH_TYPING_PROPERTY; +import static io.trino.plugin.hive.ion.IonSerDeProperties.TEXT_ENCODING; +import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMNS; +import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMN_TYPES; +import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.RowType.field; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; +import static java.util.stream.Collectors.toList; + +/** + * Most basic test to reflect PageSource-fu is wired up correctly. + */ +public class IonPageSourceSmokeTest +{ + // In the Ion binary format, a value stream is always start with binary version marker. This help distinguish Ion binary + // data from other formats, including Ion text format. + private static final byte[] BINARY_VERSION_MARKER = {(byte) 0xE0, (byte) 0x01, (byte) 0x00, (byte) 0XEA}; + private static final String EXPECTED_TEXT = "{foo:3,bar:6}"; + + public static final String TEST_ION_LOCATION = "memory:///test.ion"; + public static final List FOO_BAR_COLUMNS = List.of( + toHiveBaseColumnHandle("foo", INTEGER, 0), + toHiveBaseColumnHandle("bar", VARCHAR, 1)); + + @Test + public void testReadTwoValues() + throws IOException + { + assertRowCount( + FOO_BAR_COLUMNS, + FOO_BAR_COLUMNS, + "{ foo: 31, bar: baz } { foo: 31, bar: \"baz\" }", + 2); + } + + @Test + public void testReadArray() + throws IOException + { + List tablesColumns = List.of( + toHiveBaseColumnHandle("my_seq", new ArrayType(BOOLEAN), 0)); + + assertRowCount( + tablesColumns, + tablesColumns, + "{ my_seq: ( true false ) } { my_seq: [false, false, true] }", + 2); + } + + @Test + public void testStrictAndLaxPathTyping() + throws IOException + { + TestFixture defaultFixture = new TestFixture(FOO_BAR_COLUMNS); + defaultFixture.assertRowCount("37 null.timestamp []", 3); + + TestFixture laxFixture = new TestFixture(FOO_BAR_COLUMNS); + laxFixture.withSerdeProperty(STRICT_PATH_TYPING_PROPERTY, "false"); + laxFixture.assertRowCount("37 null.timestamp []", 3); + + TestFixture strictFixture = new TestFixture(FOO_BAR_COLUMNS); + strictFixture.withSerdeProperty(STRICT_PATH_TYPING_PROPERTY, "true"); + + Assertions.assertThrows(TrinoException.class, () -> + strictFixture.assertRowCount("37 null.timestamp []", 3)); + } + + @Test + public void testPathExtraction() + throws IOException + { + TestFixture fixture = new TestFixture(List.of(toHiveBaseColumnHandle("bar", INTEGER, 0))) + .withSerdeProperty("ion.bar.path_extractor", "(foo bar)"); + + // these would result in errors if we tried to extract the bar field from the root instead of the nested bar + fixture.assertRowCount("{ foo: { bar: 17 }, bar: not_this_bar } { foo: { bar: 31 }, bar: not_this_bar }", 2); + } + + @Test + public void testCaseSensitive() + throws IOException + { + TestFixture fixture = new TestFixture(List.of(toHiveBaseColumnHandle("bar", INTEGER, 0))) + .withSerdeProperty("ion.path_extractor.case_sensitive", "true"); + + // this would result in errors if we tried to extract the BAR field + fixture.assertRowCount("{ BAR: should_be_skipped } { bar: 17 }", 2); + } + + @Test + public void testProjectedColumn() + throws IOException + { + final RowType spamType = RowType.rowType(field("nested_to_prune", INTEGER), field("eggs", INTEGER)); + List tableColumns = List.of( + toHiveBaseColumnHandle("spam", spamType, 0), + toHiveBaseColumnHandle("ham", BOOLEAN, 1)); + List projectedColumns = List.of( + projectedColumn(tableColumns.get(0), "eggs")); + + assertRowCount( + tableColumns, + projectedColumns, + // the data below reflects that "ham" is not decoded, that column is pruned + // "nested_to_prune" is decoded, however, because nested fields are not pruned, yet. + // so this test will fail if you change that to something other than an int + "{ spam: { nested_to_prune: 31, eggs: 12 }, ham: exploding }", + 1); + } + + @Test + public void testPageSourceTelemetryUncompressed() + throws IOException + { + TestFixture fixture = new TestFixture(FOO_BAR_COLUMNS); + int bytes = fixture.writeIonTextFile("{ foo: 17, bar: baz } { foo: 31, bar: qux }"); + + ConnectorPageSource pageSource = fixture.getPageSource(); + + Assertions.assertNotNull(pageSource.getNextPage()); + Assertions.assertEquals(2, pageSource.getCompletedPositions().getAsLong()); + Assertions.assertEquals(bytes, pageSource.getCompletedBytes()); + Assertions.assertTrue(pageSource.getReadTimeNanos() > 0); + } + + @Test + public void testPageSourceTelemetryCompressed() + throws IOException + { + TestFixture fixture = new TestFixture(FOO_BAR_COLUMNS); + int bytes = fixture.writeZstdCompressedIonText("{ foo: 17, bar: baz } { foo: 31, bar: qux }"); + + ConnectorPageSource pageSource = fixture.getPageSource(); + + Assertions.assertNotNull(pageSource.getNextPage()); + Assertions.assertEquals(2, pageSource.getCompletedPositions().getAsLong()); + Assertions.assertEquals(bytes, pageSource.getCompletedBytes()); + Assertions.assertTrue(pageSource.getReadTimeNanos() > 0); + } + + @Test + public void testNativeTrinoDisabled() + throws IOException + { + TestFixture fixture = new TestFixture(FOO_BAR_COLUMNS) + .withNativeIonDisabled(); + + Assertions.assertTrue( + fixture.getOptionalFileWriter().isEmpty(), + "Expected empty file writer when native trino is disabled"); + + fixture.writeIonTextFile(""); + Assertions.assertTrue( + fixture.getOptionalPageSource().isEmpty(), + "Expected empty page source when native trino is disabled"); + } + + private static Stream> propertiesWithDefaults() + { + return Stream.of( + entry(FAIL_ON_OVERFLOW_PROPERTY, FAIL_ON_OVERFLOW_PROPERTY_DEFAULT), + entry(IGNORE_MALFORMED, IGNORE_MALFORMED_DEFAULT), + entry(ION_TIMESTAMP_OFFSET_PROPERTY, ION_TIMESTAMP_OFFSET_DEFAULT), + entry(ION_SERIALIZE_NULL_AS_PROPERTY, ION_SERIALIZE_NULL_AS_DEFAULT)); + } + + private static Stream> propertiesWithValues() + { + return Stream.of( + entry(FAIL_ON_OVERFLOW_PROPERTY, "false"), + entry(IGNORE_MALFORMED, "true"), + entry(ION_TIMESTAMP_OFFSET_PROPERTY, "01:00"), + entry(ION_SERIALIZE_NULL_AS_PROPERTY, "TYPED"), + // These entries represent column-specific properties that are not supported. + // Any presence of these properties in the schema will result in an empty PageSource, + // regardless of their assigned values. + entry("ion.foo.fail_on_overflow", "property_value"), + entry("ion.foo.serialize_as", "property_value")); + } + + private static Map.Entry entry(String key, String value) + { + return new AbstractMap.SimpleEntry<>(key, value); + } + + @ParameterizedTest + @MethodSource("propertiesWithValues") + void testPropertiesWithValues(Map.Entry property) + throws IOException + { + TestFixture fixture = new TestFixture(FOO_BAR_COLUMNS) + .withSerdeProperty(property.getKey(), property.getValue()); + + Assertions.assertTrue( + fixture.getOptionalFileWriter().isEmpty(), + "Expected empty file writer when there are unsupported Serde properties"); + + fixture.writeIonTextFile("{ }"); + Assertions.assertTrue( + fixture.getOptionalPageSource().isEmpty(), + "Expected empty page source when there are unsupported Serde properties"); + } + + @ParameterizedTest + @MethodSource("propertiesWithDefaults") + void testPropertiesWithDefaults(Map.Entry property) + throws IOException + { + TestFixture fixture = new TestFixture(FOO_BAR_COLUMNS) + .withSerdeProperty(property.getKey(), property.getValue()); + + Assertions.assertTrue( + fixture.getOptionalFileWriter().isPresent(), + "Expected present file writer when there are unsupported Serde properties"); + + fixture.writeIonTextFile(""); + Assertions.assertTrue( + fixture.getOptionalPageSource().isPresent(), + "Expected present page source when there are unsupported Serde properties"); + } + + @Test + public void testTextEncoding() + throws IOException + { + List tableColumns = List.of( + toHiveBaseColumnHandle("foo", INTEGER, 0), + toHiveBaseColumnHandle("bar", INTEGER, 0)); + + assertEncoding(tableColumns, TEXT_ENCODING); + } + + @Test + public void testBinaryEncoding() + throws IOException + { + List tableColumns = List.of( + toHiveBaseColumnHandle("foo", INTEGER, 0), + toHiveBaseColumnHandle("bar", INTEGER, 0)); + + assertEncoding(tableColumns, BINARY_ENCODING); + } + + @Test + public void testBadEncodingName() + throws IOException + { + TestFixture fixture = new TestFixture(FOO_BAR_COLUMNS) + .withSerdeProperty(ION_ENCODING_PROPERTY, "unknown_encoding_name"); + + Assertions.assertThrows(TrinoException.class, fixture::getFileWriter); + } + + private void assertEncoding(List tableColumns, + String encoding) + throws IOException + { + TestFixture fixture = new TestFixture(tableColumns) + .withSerdeProperty(ION_ENCODING_PROPERTY, encoding); + + writeTestData(fixture.getFileWriter()); + byte[] inputStreamBytes = fixture.getTrinoInputFile() + .newStream() + .readAllBytes(); + + if (encoding.equals(BINARY_ENCODING)) { + // Check if the first 4 bytes is binary version marker + Assertions.assertArrayEquals(Arrays.copyOfRange(inputStreamBytes, 0, 4), BINARY_VERSION_MARKER); + } + else { + Assertions.assertEquals(new String(inputStreamBytes, StandardCharsets.UTF_8), EXPECTED_TEXT); + } + } + + private void assertRowCount(List tableColumns, List projectedColumns, String ionText, int rowCount) + throws IOException + { + TestFixture fixture = new TestFixture(tableColumns, projectedColumns); + fixture.assertRowCount(ionText, rowCount); + } + + private static void writeTestData(FileWriter ionFileWriter) + { + ionFileWriter.appendRows(new Page( + RunLengthEncodedBlock.create(new IntArrayBlock(1, Optional.empty(), new int[] {3}), 1), + RunLengthEncodedBlock.create(new IntArrayBlock(1, Optional.empty(), new int[] {6}), 1))); + ionFileWriter.commit(); + } + + private static class TestFixture + { + private TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + private Location fileLocation = Location.of(TEST_ION_LOCATION); + private HiveConfig hiveConfig = new HiveConfig(); + private Map tableProperties = new HashMap<>(); + private List columns; + private List projections; + + private ConnectorSession session; + + TestFixture(List columns) + { + this(columns, columns); + } + + TestFixture(List columns, List projections) + { + this.columns = columns; + this.projections = projections; + tableProperties.put(LIST_COLUMNS, columns.stream() + .map(HiveColumnHandle::getName) + .collect(Collectors.joining(","))); + tableProperties.put(LIST_COLUMN_TYPES, columns.stream().map(HiveColumnHandle::getHiveType) + .map(HiveType::toString) + .collect(Collectors.joining(","))); + // the default at runtime is false, but most of our testing obviously assumes it is enabled. + hiveConfig.setIonNativeTrinoEnabled(true); + } + + TestFixture withNativeIonDisabled() + { + hiveConfig.setIonNativeTrinoEnabled(false); + return this; + } + + TestFixture withSerdeProperty(String key, String value) + { + tableProperties.put(key, value); + return this; + } + + Optional getOptionalPageSource() + throws IOException + { + IonPageSourceFactory pageSourceFactory = new IonPageSourceFactory(fileSystemFactory, hiveConfig); + + long length = fileSystemFactory.create(getSession()).newInputFile(fileLocation).length(); + long nowMillis = Instant.now().toEpochMilli(); + + List columnMappings = buildColumnMappings( + "", + ImmutableList.of(), + projections, + ImmutableList.of(), + ImmutableMap.of(), + fileLocation.toString(), + OptionalInt.empty(), + length, + nowMillis); + + return HivePageSourceProvider.createHivePageSource( + ImmutableSet.of(pageSourceFactory), + getSession(), + fileLocation, + OptionalInt.empty(), + 0, + length, + length, + nowMillis, + new Schema(ION.getSerde(), false, tableProperties), + TupleDomain.all(), + TESTING_TYPE_MANAGER, + Optional.empty(), + Optional.empty(), + Optional.empty(), + false, + NO_ACID_TRANSACTION, + columnMappings); + } + + ConnectorPageSource getPageSource() + throws IOException + { + return getOptionalPageSource().orElseThrow(); + } + + ConnectorSession getSession() + { + if (session == null) { + session = getHiveSession(hiveConfig); + } + return session; + } + + int writeIonTextFile(String ionText) + throws IOException + { + TrinoOutputFile outputFile = fileSystemFactory.create(getSession()).newOutputFile(fileLocation); + byte[] bytes = ionText.getBytes(StandardCharsets.UTF_8); + outputFile.createOrOverwrite(bytes); + + return bytes.length; + } + + int writeZstdCompressedIonText(String ionText) + throws IOException + { + fileLocation = Location.of(TEST_ION_LOCATION + ".zst"); + TrinoOutputFile outputFile = fileSystemFactory.create(getSession()).newOutputFile(fileLocation); + + Slice textSlice = Slices.wrappedBuffer(ionText.getBytes(StandardCharsets.UTF_8)); + Slice compressed = CompressionKind.ZSTD.createCodec().createValueCompressor().compress(textSlice); + outputFile.createOrOverwrite(compressed.getBytes()); + + return compressed.length(); + } + + Optional getOptionalFileWriter() + { + return new IonFileWriterFactory(fileSystemFactory, TESTING_TYPE_MANAGER, hiveConfig) + .createFileWriter( + fileLocation, + columns.stream().map(HiveColumnHandle::getName).collect(toList()), + ION.toStorageFormat(), + HiveCompressionCodec.NONE, + tableProperties, + getSession(), + OptionalInt.empty(), + NO_ACID_TRANSACTION, + false, + WriterKind.INSERT); + } + + FileWriter getFileWriter() + { + return getOptionalFileWriter().orElseThrow(); + } + + TrinoInputFile getTrinoInputFile() + { + return fileSystemFactory.create(getSession()) + .newInputFile(fileLocation); + } + + void assertRowCount(String ionText, int rowCount) + throws IOException + { + writeIonTextFile(ionText); + + try (ConnectorPageSource pageSource = getPageSource()) { + final MaterializedResult result = MaterializedResult.materializeSourceDataStream( + getSession(), + pageSource, + projections.stream().map(HiveColumnHandle::getType).toList()); + Assertions.assertEquals(rowCount, result.getRowCount()); + } + } + } +} diff --git a/pom.xml b/pom.xml index 37b24b583750..e2d9a6bdd8c9 100644 --- a/pom.xml +++ b/pom.xml @@ -201,6 +201,8 @@ 1.45.3 5.3.2 1.7.1 + 1.11.9 + 1.5.0 5.16.0 2.12.7 0.12.6 diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveStorageFormats.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveStorageFormats.java index fdb07b5d4007..6d1ced716263 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveStorageFormats.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveStorageFormats.java @@ -307,6 +307,8 @@ public void verifyDataProviderCompleteness() .filter(format -> !"JSON".equals(format)) // OPENX is not supported in Hive by default .filter(format -> !"OPENX_JSON".equals(format)) + // Ion is not supported in Hive by default + .filter(format -> !"ION".equals(format)) .collect(toImmutableSet()); assertThat(ImmutableSet.copyOf(storageFormats()))