Skip to content

Commit

Permalink
Core: Add internal Avro reader (#11108)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue authored Oct 7, 2024
1 parent 3220fad commit f0e4fd2
Show file tree
Hide file tree
Showing 6 changed files with 483 additions and 70 deletions.
26 changes: 15 additions & 11 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.io.DatumReader;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.avro.InternalReader;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
Expand Down Expand Up @@ -65,16 +67,16 @@ public class ManifestReader<F extends ContentFile<F>> extends CloseableGroup
"record_count");

protected enum FileType {
DATA_FILES(GenericDataFile.class.getName()),
DELETE_FILES(GenericDeleteFile.class.getName());
DATA_FILES(GenericDataFile.class),
DELETE_FILES(GenericDeleteFile.class);

private final String fileClass;
private final Class<? extends StructLike> fileClass;

FileType(String fileClass) {
FileType(Class<? extends StructLike> fileClass) {
this.fileClass = fileClass;
}

private String fileClass() {
private Class<? extends StructLike> fileClass() {
return fileClass;
}
}
Expand Down Expand Up @@ -261,12 +263,7 @@ private CloseableIterable<ManifestEntry<F>> open(Schema projection) {
AvroIterable<ManifestEntry<F>> reader =
Avro.read(file)
.project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
.rename("manifest_entry", GenericManifestEntry.class.getName())
.rename("partition", PartitionData.class.getName())
.rename("r102", PartitionData.class.getName())
.rename("data_file", content.fileClass())
.rename("r2", content.fileClass())
.classLoader(GenericManifestEntry.class.getClassLoader())
.createResolvingReader(this::newReader)
.reuseContainers()
.build();

Expand All @@ -279,6 +276,13 @@ private CloseableIterable<ManifestEntry<F>> open(Schema projection) {
}
}

private DatumReader<?> newReader(Schema schema) {
return InternalReader.create(schema)
.setRootType(GenericManifestEntry.class)
.setCustomType(ManifestEntry.DATA_FILE_ID, content.fileClass())
.setCustomType(DataFile.PARTITION_ID, PartitionData.class);
}

CloseableIterable<ManifestEntry<F>> liveEntries() {
return entries(true /* only live entries */);
}
Expand Down
64 changes: 6 additions & 58 deletions core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
Expand All @@ -43,7 +40,7 @@ public class GenericAvroReader<T>
private final Types.StructType expectedType;
private ClassLoader loader = Thread.currentThread().getContextClassLoader();
private Map<String, String> renames = ImmutableMap.of();
private final Map<Integer, ?> idToConstant = ImmutableMap.of();
private final Map<Integer, Object> idToConstant = ImmutableMap.of();
private Schema fileSchema = null;
private ValueReader<T> reader = null;

Expand Down Expand Up @@ -111,48 +108,13 @@ private ResolvingReadBuilder(Types.StructType expectedType, String rootName) {

@Override
public ValueReader<?> record(Type partner, Schema record, List<ValueReader<?>> fieldResults) {
Types.StructType expected = partner != null ? partner.asStructType() : null;
Map<Integer, Integer> idToPos = idToPos(expected);

List<Pair<Integer, ValueReader<?>>> readPlan = Lists.newArrayList();
List<Schema.Field> fileFields = record.getFields();
for (int pos = 0; pos < fileFields.size(); pos += 1) {
Schema.Field field = fileFields.get(pos);
ValueReader<?> fieldReader = fieldResults.get(pos);
Integer fieldId = AvroSchemaUtil.fieldId(field);
Integer projectionPos = idToPos.remove(fieldId);

Object constant = idToConstant.get(fieldId);
if (projectionPos != null && constant != null) {
readPlan.add(
Pair.of(projectionPos, ValueReaders.replaceWithConstant(fieldReader, constant)));
} else {
readPlan.add(Pair.of(projectionPos, fieldReader));
}
if (partner == null) {
return ValueReaders.skipStruct(fieldResults);
}

// handle any expected columns that are not in the data file
for (Map.Entry<Integer, Integer> idAndPos : idToPos.entrySet()) {
int fieldId = idAndPos.getKey();
int pos = idAndPos.getValue();

Object constant = idToConstant.get(fieldId);
Types.NestedField field = expected.field(fieldId);
if (constant != null) {
readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
} else if (field.initialDefault() != null) {
readPlan.add(Pair.of(pos, ValueReaders.constant(field.initialDefault())));
} else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
readPlan.add(Pair.of(pos, ValueReaders.constant(false)));
} else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
readPlan.add(Pair.of(pos, ValueReaders.positions()));
} else if (field.isOptional()) {
readPlan.add(Pair.of(pos, ValueReaders.constant(null)));
} else {
throw new IllegalArgumentException(
String.format("Missing required field: %s", field.name()));
}
}
Types.StructType expected = partner.asStructType();
List<Pair<Integer, ValueReader<?>>> readPlan =
ValueReaders.buildReadPlan(expected, record, fieldResults, idToConstant);

return recordReader(readPlan, avroSchemas.get(partner), record.getFullName());
}
Expand Down Expand Up @@ -266,19 +228,5 @@ public ValueReader<?> primitive(Type partner, Schema primitive) {
throw new IllegalArgumentException("Unsupported type: " + primitive);
}
}

private Map<Integer, Integer> idToPos(Types.StructType struct) {
Map<Integer, Integer> idToPos = Maps.newHashMap();

if (struct != null) {
List<Types.NestedField> fields = struct.fields();
for (int pos = 0; pos < fields.size(); pos += 1) {
Types.NestedField field = fields.get(pos);
idToPos.put(field.fieldId(), pos);
}
}

return idToPos;
}
}
}
Loading

0 comments on commit f0e4fd2

Please sign in to comment.