Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Part 4: read_table.c uses transform in ffi #614

Draft
wants to merge 28 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6e0b1c9
also extract partitionValues
nicklan Dec 9, 2024
c53b7de
checkpoint
nicklan Dec 11, 2024
9ac7173
Merge branch 'main' into transform-expr
nicklan Dec 12, 2024
f75b2e3
hey, it kinda works
nicklan Dec 12, 2024
4d1d4f7
Merge branch 'main' into transform-expr
nicklan Dec 17, 2024
c8cc84b
undo change to ColumnType, will go a different direction
nicklan Dec 17, 2024
29ded0e
use TransformExpr
nicklan Dec 18, 2024
9d4688c
cleanup
nicklan Dec 18, 2024
631f403
Merge branch 'main' into transform-expr
nicklan Dec 18, 2024
f791167
optional transform
nicklan Dec 18, 2024
b7268e5
add initial tests
nicklan Dec 18, 2024
da5a9e8
adjust comments
nicklan Dec 18, 2024
e3fdfaa
fix comment
nicklan Dec 18, 2024
e9a8d1c
oops, fix ffi
nicklan Dec 19, 2024
b773614
cleanup examples
nicklan Dec 19, 2024
ebcb42d
Actually use ExpressionRef
nicklan Dec 19, 2024
3a38785
Merge branch 'main' into transform-expr
nicklan Dec 19, 2024
58ad2a3
remove unused try_from
nicklan Dec 19, 2024
3d040f7
need transform if column mapping is enabled
nicklan Dec 19, 2024
d39322c
checkpoint
nicklan Dec 19, 2024
3cb4746
pass through transforms in kernel
nicklan Dec 19, 2024
d23c4d0
pass through in ffi (missing final bit)
nicklan Dec 19, 2024
ff5d5fe
fmt cleanup
nicklan Dec 19, 2024
fad4b45
use transform in execute
nicklan Dec 19, 2024
d0e7d9b
multi-threaded reader uses transform
nicklan Dec 19, 2024
a740ffc
read.rs uses transform
nicklan Dec 19, 2024
6d72b75
remove transform_to_logical :)
nicklan Dec 19, 2024
b961220
use transform in read_table
nicklan Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ffi/examples/read-table/arrow.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ static GArrowRecordBatch* add_partition_columns(
char* col = partition_cols->cols[i];
guint pos = cols + i;
KernelStringSlice key = { col, strlen(col) };
char* partition_val = get_from_map(partition_values, key, allocate_string);
char* partition_val = get_from_string_map(partition_values, key, allocate_string);
print_diag(
" Adding partition column '%s' with value '%s' at column %u\n",
col,
Expand Down
7 changes: 4 additions & 3 deletions ffi/examples/read-table/read_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void print_partition_info(struct EngineContext* context, const CStringMap* parti
for (uintptr_t i = 0; i < context->partition_cols->len; i++) {
char* col = context->partition_cols->cols[i];
KernelStringSlice key = { col, strlen(col) };
char* partition_val = get_from_map(partition_values, key, allocate_string);
char* partition_val = get_from_string_map(partition_values, key, allocate_string);
if (partition_val) {
print_diag(" partition '%s' here: %s\n", col, partition_val);
free(partition_val);
Expand Down Expand Up @@ -87,14 +87,15 @@ void scan_row_callback(
void do_visit_scan_data(
void* engine_context,
ExclusiveEngineData* engine_data,
KernelBoolSlice selection_vec)
KernelBoolSlice selection_vec,
const CTransformMap* transforms)
{
print_diag("\nScan iterator found some data to read\n Of this data, here is "
"a selection vector\n");
print_selection_vector(" ", &selection_vec);
// Ask kernel to iterate each individual file and call us back with extracted metadata
print_diag("Asking kernel to call us back for each scan row (file to read)\n");
visit_scan_data(engine_data, selection_vec, engine_context, scan_row_callback);
visit_scan_data(engine_data, selection_vec, transforms, engine_context, scan_row_callback);
free_bool_slice(selection_vec);
free_engine_data(engine_data);
}
Expand Down
41 changes: 36 additions & 5 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState};
use delta_kernel::scan::{Scan, ScanData};
use delta_kernel::schema::Schema;
use delta_kernel::snapshot::Snapshot;
use delta_kernel::{DeltaResult, Error};
use delta_kernel::{DeltaResult, Error, ExpressionRef};
use delta_kernel_ffi_macros::handle_descriptor;
use tracing::debug;
use url::Url;
Expand Down Expand Up @@ -211,6 +211,7 @@ pub unsafe extern "C" fn kernel_scan_data_next(
engine_context: NullableCvoid,
engine_data: Handle<ExclusiveEngineData>,
selection_vector: KernelBoolSlice,
transforms: &CTransformMap,
),
) -> ExternResult<bool> {
let data = unsafe { data.as_ref() };
Expand All @@ -224,15 +225,17 @@ fn kernel_scan_data_next_impl(
engine_context: NullableCvoid,
engine_data: Handle<ExclusiveEngineData>,
selection_vector: KernelBoolSlice,
transforms: &CTransformMap,
),
) -> DeltaResult<bool> {
let mut data = data
.data
.lock()
.map_err(|_| Error::generic("poisoned mutex"))?;
if let Some((data, sel_vec)) = data.next().transpose()? {
if let Some((data, sel_vec, transforms)) = data.next().transpose()? {
let bool_slice = KernelBoolSlice::from(sel_vec);
(engine_visitor)(engine_context, data.into(), bool_slice);
let transform_map = CTransformMap { transforms };
(engine_visitor)(engine_context, data.into(), bool_slice, &transform_map);
Ok(true)
} else {
Ok(false)
Expand Down Expand Up @@ -281,7 +284,7 @@ pub struct CStringMap {
/// # Safety
///
/// The engine is responsible for providing a valid [`CStringMap`] pointer and [`KernelStringSlice`]
pub unsafe extern "C" fn get_from_map(
pub unsafe extern "C" fn get_from_string_map(
map: &CStringMap,
key: KernelStringSlice,
allocate_fn: AllocateStringFn,
Expand All @@ -293,6 +296,32 @@ pub unsafe extern "C" fn get_from_map(
.and_then(|v| allocate_fn(kernel_string_slice!(v)))
}


pub struct CTransformMap {
transforms: HashMap<usize, ExpressionRef>,
}

// #[no_mangle]
// /// allow probing into a CStringMap. If the specified key is in the map, kernel will call
// /// allocate_fn with the value associated with the key and return the value returned from that
// /// function. If the key is not in the map, this will return NULL
// ///
// /// # Safety
// ///
// /// The engine is responsible for providing a valid [`CStringMap`] pointer and [`KernelStringSlice`]
// pub unsafe extern "C" fn get_from_transform_map(
// map: &CTransformMap,
// key: usize,
// allocate_fn: AllocateStringFn,
// ) -> NullableCvoid {
// // TODO: Return ExternResult to caller instead of panicking?
// let string_key = unsafe { TryFromStringSlice::try_from_slice(&key) };
// map.values
// .get(string_key.unwrap())
// .and_then(|v| allocate_fn(kernel_string_slice!(v)))
// }


/// Get a selection vector out of a [`DvInfo`] struct
///
/// # Safety
Expand Down Expand Up @@ -355,6 +384,7 @@ fn rust_callback(
size: i64,
kernel_stats: Option<delta_kernel::scan::state::Stats>,
dv_info: DvInfo,
_transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
) {
let partition_map = CStringMap {
Expand Down Expand Up @@ -388,6 +418,7 @@ struct ContextWrapper {
pub unsafe extern "C" fn visit_scan_data(
data: Handle<ExclusiveEngineData>,
selection_vec: KernelBoolSlice,
transforms: &CTransformMap,
engine_context: NullableCvoid,
callback: CScanCallback,
) {
Expand All @@ -398,5 +429,5 @@ pub unsafe extern "C" fn visit_scan_data(
callback,
};
// TODO: return ExternResult to caller instead of panicking?
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reminder: get to this TODO!

visit_scan_files(data, selection_vec, context_wrapper, rust_callback).unwrap();
visit_scan_files(data, selection_vec, &transforms.transforms, context_wrapper, rust_callback).unwrap();
}
7 changes: 5 additions & 2 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use delta_kernel::expressions::ColumnName;
use delta_kernel::scan::state::{DvInfo, Stats};
use delta_kernel::scan::ScanBuilder;
use delta_kernel::schema::{ColumnNamesAndTypes, DataType};
use delta_kernel::{DeltaResult, Error, Table};
use delta_kernel::{DeltaResult, Error, ExpressionRef, Table};

use std::collections::HashMap;
use std::process::ExitCode;
Expand Down Expand Up @@ -163,6 +163,7 @@ fn print_scan_file(
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
) {
let num_record_str = if let Some(s) = stats {
Expand All @@ -176,6 +177,7 @@ fn print_scan_file(
Size (bytes):\t{size}\n \
Num Records:\t{num_record_str}\n \
Has DV?:\t{}\n \
TransformExpr:\t {transform:?}\n \
Part Vals:\t{partition_values:?}",
dv_info.has_vector()
);
Expand Down Expand Up @@ -209,10 +211,11 @@ fn try_main() -> DeltaResult<()> {
let scan = ScanBuilder::new(snapshot).build()?;
let scan_data = scan.scan_data(&engine)?;
for res in scan_data {
let (data, vector) = res?;
let (data, vector, transforms) = res?;
delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
&transforms,
(),
print_scan_file,
)?;
Expand Down
6 changes: 4 additions & 2 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use delta_kernel::engine::sync::SyncEngine;
use delta_kernel::scan::state::{DvInfo, GlobalScanState, Stats};
use delta_kernel::scan::transform_to_logical;
use delta_kernel::schema::Schema;
use delta_kernel::{DeltaResult, Engine, EngineData, FileMeta, Table};
use delta_kernel::{DeltaResult, Engine, EngineData, ExpressionRef, FileMeta, Table};

use clap::{Parser, ValueEnum};
use url::Url;
Expand Down Expand Up @@ -111,6 +111,7 @@ fn send_scan_file(
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
_transform: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
) {
let scan_file = ScanFile {
Expand Down Expand Up @@ -210,10 +211,11 @@ fn try_main() -> DeltaResult<()> {
drop(record_batch_tx);

for res in scan_data {
let (data, vector) = res?;
let (data, vector, transforms) = res?;
scan_file_tx = delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
&transforms,
scan_file_tx,
send_scan_file,
)?;
Expand Down
4 changes: 3 additions & 1 deletion kernel/src/engine_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ pub trait TypedGetData<'a, T> {
fn get_opt(&'a self, row_index: usize, field_name: &str) -> DeltaResult<Option<T>>;
fn get(&'a self, row_index: usize, field_name: &str) -> DeltaResult<T> {
let val = self.get_opt(row_index, field_name)?;
val.ok_or_else(|| Error::MissingData(format!("Data missing for field {field_name}")))
val.ok_or_else(|| {
Error::MissingData(format!("Data missing for field {field_name}")).with_backtrace()
})
}
}

Expand Down
Loading