From 0c657e5bbce7f23df8c311d40e4bb13c8c26610a Mon Sep 17 00:00:00 2001 From: ashokborra Date: Sat, 16 Dec 2023 11:49:44 +0530 Subject: [PATCH] Add getSchema() to SourceClient interface --- .../java/io/onetable/spi/extractor/SourceClient.java | 9 +++++++++ .../java/io/onetable/delta/DeltaSourceClient.java | 5 +++++ core/src/main/java/io/onetable/hudi/HudiClient.java | 6 ++++++ .../java/io/onetable/iceberg/IcebergSourceClient.java | 7 +++++++ .../io/onetable/iceberg/TestIcebergSourceClient.java | 11 +++++++++++ 5 files changed, 38 insertions(+) diff --git a/api/src/main/java/io/onetable/spi/extractor/SourceClient.java b/api/src/main/java/io/onetable/spi/extractor/SourceClient.java index 355e1410f..69e1c5545 100644 --- a/api/src/main/java/io/onetable/spi/extractor/SourceClient.java +++ b/api/src/main/java/io/onetable/spi/extractor/SourceClient.java @@ -23,6 +23,7 @@ import io.onetable.model.*; import io.onetable.model.CommitsBacklog; +import io.onetable.model.schema.OneSchema; import io.onetable.model.schema.SchemaCatalog; /** @@ -48,6 +49,14 @@ public interface SourceClient extends Closeable { */ SchemaCatalog getSchemaCatalog(OneTable table, COMMIT commit); + /** + * Extracts the {@link OneSchema} as of the latest state. + * + * @param table the current state of the table + * @return + */ + OneSchema getSchema(OneTable table); + /** * Extracts the {@link OneSnapshot} as of latest state. * diff --git a/core/src/main/java/io/onetable/delta/DeltaSourceClient.java b/core/src/main/java/io/onetable/delta/DeltaSourceClient.java index 795a6babc..61136d9ed 100644 --- a/core/src/main/java/io/onetable/delta/DeltaSourceClient.java +++ b/core/src/main/java/io/onetable/delta/DeltaSourceClient.java @@ -90,6 +90,11 @@ public SchemaCatalog getSchemaCatalog(OneTable table, Long version) { return SchemaCatalog.builder().schemas(schemas).build(); } + @Override + public OneSchema getSchema(OneTable table) { + return table.getReadSchema(); + } + @Override public OneSnapshot getCurrentSnapshot() { DeltaLog deltaLog = DeltaLog.forTable(sparkSession, basePath); diff --git a/core/src/main/java/io/onetable/hudi/HudiClient.java b/core/src/main/java/io/onetable/hudi/HudiClient.java index a6acabfa6..1e99aa2aa 100644 --- a/core/src/main/java/io/onetable/hudi/HudiClient.java +++ b/core/src/main/java/io/onetable/hudi/HudiClient.java @@ -49,6 +49,7 @@ import io.onetable.exception.OneIOException; import io.onetable.model.*; import io.onetable.model.CommitsBacklog; +import io.onetable.model.schema.OneSchema; import io.onetable.model.schema.SchemaCatalog; import io.onetable.spi.extractor.SourceClient; @@ -82,6 +83,11 @@ public SchemaCatalog getSchemaCatalog(OneTable table, HoodieInstant commit) { return HudiSchemaCatalogExtractor.catalogWithTableSchema(table); } + @Override + public OneSchema getSchema(OneTable table) { + return table.getReadSchema(); + } + @Override public OneSnapshot getCurrentSnapshot() { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); diff --git a/core/src/main/java/io/onetable/iceberg/IcebergSourceClient.java b/core/src/main/java/io/onetable/iceberg/IcebergSourceClient.java index 6a2dacfab..24a19e867 100644 --- a/core/src/main/java/io/onetable/iceberg/IcebergSourceClient.java +++ b/core/src/main/java/io/onetable/iceberg/IcebergSourceClient.java @@ -129,6 +129,13 @@ public SchemaCatalog getSchemaCatalog(OneTable table, Snapshot snapshot) { return SchemaCatalog.builder().schemas(catalog).build(); } + @Override + public OneSchema getSchema(OneTable table) { + Table iceTable = getSourceTable(); + IcebergSchemaExtractor schemaExtractor = IcebergSchemaExtractor.getInstance(); + return schemaExtractor.fromIceberg(iceTable.schema()); + } + @Override public OneSnapshot getCurrentSnapshot() { Table iceTable = getSourceTable(); diff --git a/core/src/test/java/io/onetable/iceberg/TestIcebergSourceClient.java b/core/src/test/java/io/onetable/iceberg/TestIcebergSourceClient.java index babf9f5e5..bcb887f23 100644 --- a/core/src/test/java/io/onetable/iceberg/TestIcebergSourceClient.java +++ b/core/src/test/java/io/onetable/iceberg/TestIcebergSourceClient.java @@ -309,6 +309,17 @@ public void testGetCurrentCommitState(@TempDir Path workingDir) throws IOExcepti // validatePendingCommits(catalogSales, snapshot1, snapshot2, snapshot3b, snapshot4); } + @Test + public void getSchema(@TempDir Path workingDir) throws IOException { + Table catalogSales = createTestTableWithData(workingDir.toString()); + PerTableConfig sourceTableConfig = getPerTableConfig(catalogSales); + IcebergSourceClient client = clientProvider.getSourceClientInstance(sourceTableConfig); + IcebergSourceClient spyClient = spy(client); + OneSchema schema = spyClient.getSchema(null); + Assertions.assertNotNull(schema); + validateSchema(schema, catalogSales.schema()); + } + private void validatePendingCommits(Table table, Snapshot lastSync, Snapshot... snapshots) { InstantsForIncrementalSync instant = InstantsForIncrementalSync.builder()