Skip to content

Commit

Permalink
fix: handle schema retrieval for datafusion api (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan authored Nov 20, 2024
1 parent 0effb24 commit 2d94b74
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions crates/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use std::thread;

use arrow_schema::SchemaRef;
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::catalog::{Session, TableProviderFactory};
use datafusion::datasource::listing::PartitionedFile;
Expand Down Expand Up @@ -86,7 +86,8 @@ impl TableProvider for HudiDataSource {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async { table.get_schema().await })
});
SchemaRef::from(handle.join().unwrap().unwrap())
let result = handle.join().unwrap().unwrap_or_else(|_| Schema::empty());
SchemaRef::from(result)
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -212,7 +213,7 @@ mod tests {

use hudi_core::config::read::HudiReadConfig::InputPartitions;
use hudi_tests::TestTable::{
V6ComplexkeygenHivestyle, V6Nonpartitioned, V6SimplekeygenHivestyleNoMetafields,
V6ComplexkeygenHivestyle, V6Empty, V6Nonpartitioned, V6SimplekeygenHivestyleNoMetafields,
V6SimplekeygenNonhivestyle, V6SimplekeygenNonhivestyleOverwritetable,
V6TimebasedkeygenNonhivestyle,
};
Expand All @@ -231,6 +232,16 @@ mod tests {
assert_eq!(hudi.get_input_partitions(), 0)
}

#[tokio::test]
async fn test_get_empty_schema_from_empty_table() {
let table_provider =
HudiDataSource::new_with_options(V6Empty.path().as_str(), empty_options())
.await
.unwrap();
let schema = table_provider.schema();
assert!(schema.fields().is_empty());
}

async fn register_test_table_with_session<I, K, V>(
test_table: &TestTable,
options: I,
Expand Down Expand Up @@ -345,7 +356,7 @@ mod tests {
}

#[tokio::test]
async fn datafusion_read_hudi_table() {
async fn test_datafusion_read_hudi_table() {
for (test_table, use_sql, planned_input_partitions) in &[
(V6ComplexkeygenHivestyle, true, 2),
(V6Nonpartitioned, true, 1),
Expand Down Expand Up @@ -386,7 +397,7 @@ mod tests {
}

#[tokio::test]
async fn datafusion_read_hudi_table_with_replacecommits() {
async fn test_datafusion_read_hudi_table_with_replacecommits() {
for (test_table, use_sql, planned_input_partitions) in
&[(V6SimplekeygenNonhivestyleOverwritetable, true, 1)]
{
Expand Down

0 comments on commit 2d94b74

Please sign in to comment.