From cd4e11a76985d32e19700ef5398b339a92306584 Mon Sep 17 00:00:00 2001 From: lmangani Date: Wed, 1 Jan 2025 17:00:34 +0000 Subject: [PATCH] error handler --- src/clickhouse_scan.rs | 575 ++++++++++++++++++----------------------- src/lib.rs | 113 ++++---- 2 files changed, 321 insertions(+), 367 deletions(-) diff --git a/src/clickhouse_scan.rs b/src/clickhouse_scan.rs index ec8bf93..3ad4d98 100644 --- a/src/clickhouse_scan.rs +++ b/src/clickhouse_scan.rs @@ -1,12 +1,12 @@ -use std::{error::Error, sync::Arc}; +use clickhouse_rs::{types::SqlType, Pool}; use duckdb::{ - core::{DataChunkHandle, LogicalTypeHandle, LogicalTypeId, Inserter}, + core::{DataChunkHandle, Inserter, LogicalTypeHandle, LogicalTypeId}, vtab::{BindInfo, Free, FunctionInfo, InitInfo, VTab}, Connection, Result, }; -use clickhouse_rs::{Pool, types::SqlType}; -use tokio::runtime::Runtime; use std::ptr; +use std::{error::Error, sync::Arc}; +use tokio::runtime::Runtime; #[repr(C)] struct ClickHouseScanBindData { @@ -20,16 +20,13 @@ struct ClickHouseScanBindData { impl Drop for ClickHouseScanBindData { fn drop(&mut self) { - self.column_names.clear(); - self.column_types.clear(); + // No explicit clearing needed, let Rust handle the cleanup } } impl Free for ClickHouseScanBindData { fn free(&mut self) { - // Explicitly clear vectors to ensure proper cleanup - self.column_names.clear(); - self.column_types.clear(); + // No explicit clearing needed, drop will handle cleanup } } @@ -61,7 +58,6 @@ impl Free for ClickHouseScanInitData { } fn map_clickhouse_type(sql_type: SqlType) -> LogicalTypeId { - // println!("Mapping SQL type: {:?}", sql_type); // Debug print match sql_type { SqlType::Int8 | SqlType::Int16 | SqlType::Int32 => LogicalTypeId::Integer, SqlType::Int64 => LogicalTypeId::Bigint, @@ -73,8 +69,7 @@ fn map_clickhouse_type(sql_type: SqlType) -> LogicalTypeId { SqlType::Date => LogicalTypeId::Date, SqlType::DateTime(_) => LogicalTypeId::Timestamp, SqlType::Bool => LogicalTypeId::Boolean, - // Default to Integer for numeric literals - _ => LogicalTypeId::Integer + _ => LogicalTypeId::Integer, } } @@ -84,235 +79,192 @@ impl VTab for ClickHouseScanVTab { type InitData = ClickHouseScanInitData; type BindData = ClickHouseScanBindData; - // patch -unsafe fn bind(bind: &BindInfo, data: *mut Self::BindData) -> Result<(), Box> { - if data.is_null() { - return Err("Invalid bind data pointer".into()); - } - - let query = bind.get_parameter(0).to_string(); - let url = bind.get_named_parameter("url") - .map(|v| v.to_string()) - .unwrap_or_else(|| std::env::var("CLICKHOUSE_URL") - .unwrap_or_else(|_| "tcp://localhost:9000".to_string())); - let user = bind.get_named_parameter("user") - .map(|v| v.to_string()) - .unwrap_or_else(|| std::env::var("CLICKHOUSE_USER") - .unwrap_or_else(|_| "default".to_string())); - let password = bind.get_named_parameter("password") - .map(|v| v.to_string()) - .unwrap_or_else(|| std::env::var("CLICKHOUSE_PASSWORD") - .unwrap_or_default()); - - // println!("Parameters - URL: {}, User: {}, Query: {}", url, user, query); - - let runtime = Arc::new(Runtime::new()?); - - let result = runtime.block_on(async { - let pool = Pool::new(url.clone()); - let mut client = pool.get_handle().await?; - let block = client.query(&query).fetch_all().await?; - - let columns = block.columns(); - let mut names = Vec::new(); - let mut types = Vec::new(); - - for col in columns { - names.push(col.name().to_string()); - types.push(map_clickhouse_type(col.sql_type())); + unsafe fn bind(bind: &BindInfo, data: *mut Self::BindData) -> Result<(), Box> { + if data.is_null() { + return Err("Invalid bind data pointer".into()); } - Ok::<(Vec, Vec), Box>((names, types)) - })?; - - let (names, types) = result; - - // Create a new vector by recreating LogicalTypeId values - let types_for_iteration: Vec = types.iter().map(|type_id| { - match type_id { - LogicalTypeId::Integer => LogicalTypeId::Integer, - LogicalTypeId::Bigint => LogicalTypeId::Bigint, - LogicalTypeId::UInteger => LogicalTypeId::UInteger, - LogicalTypeId::UBigint => LogicalTypeId::UBigint, - LogicalTypeId::Float => LogicalTypeId::Float, - LogicalTypeId::Double => LogicalTypeId::Double, - LogicalTypeId::Varchar => LogicalTypeId::Varchar, - LogicalTypeId::Date => LogicalTypeId::Date, - LogicalTypeId::Timestamp => LogicalTypeId::Timestamp, - LogicalTypeId::Boolean => LogicalTypeId::Boolean, - _ => LogicalTypeId::Varchar, + let query = bind.get_parameter(0).to_string(); + let url = bind + .get_named_parameter("url") + .map(|v| v.to_string()) + .unwrap_or_else(|| { + std::env::var("CLICKHOUSE_URL") + .unwrap_or_else(|_| "tcp://localhost:9000".to_string()) + }); + let user = bind + .get_named_parameter("user") + .map(|v| v.to_string()) + .unwrap_or_else(|| { + std::env::var("CLICKHOUSE_USER").unwrap_or_else(|_| "default".to_string()) + }); + let password = bind + .get_named_parameter("password") + .map(|v| v.to_string()) + .unwrap_or_else(|| std::env::var("CLICKHOUSE_PASSWORD").unwrap_or_default()); + + let runtime = + Arc::new(Runtime::new().map_err(|e| format!("Failed to create runtime: {}", e))?); + + let result = runtime.block_on(async { + let pool = Pool::new(url.clone()); + let mut client = pool.get_handle().await?; + let block = client.query(&query).fetch_all().await?; + + let columns = block.columns(); + let mut names = Vec::new(); + let mut types = Vec::new(); + + for col in columns { + names.push(col.name().to_string()); + types.push(map_clickhouse_type(col.sql_type())); + } + + Ok::<(Vec, Vec), Box>((names, types)) + })?; + + let (names, types) = result; + + for (name, type_id) in names.iter().zip(types.iter()) { + let logical_type = match type_id { + LogicalTypeId::Integer => LogicalTypeId::Integer, + LogicalTypeId::Bigint => LogicalTypeId::Bigint, + LogicalTypeId::UInteger => LogicalTypeId::UInteger, + LogicalTypeId::UBigint => LogicalTypeId::UBigint, + LogicalTypeId::Float => LogicalTypeId::Float, + LogicalTypeId::Double => LogicalTypeId::Double, + LogicalTypeId::Varchar => LogicalTypeId::Varchar, + LogicalTypeId::Date => LogicalTypeId::Date, + LogicalTypeId::Timestamp => LogicalTypeId::Timestamp, + LogicalTypeId::Boolean => LogicalTypeId::Boolean, + _ => LogicalTypeId::Varchar, + }; + let type_handle = LogicalTypeHandle::from(logical_type); + bind.add_result_column(name, type_handle); } - }).collect(); - - // Create bind data - let bind_data = ClickHouseScanBindData { - url, - user, - password, - query, - column_names: names.clone(), - column_types: types, - }; - - // Add result columns before storing the bind data - for (name, type_id) in names.iter().zip(types_for_iteration.iter()) { - let type_handle = LogicalTypeHandle::from(match type_id { - LogicalTypeId::Integer => LogicalTypeId::Integer, - LogicalTypeId::Bigint => LogicalTypeId::Bigint, - LogicalTypeId::UInteger => LogicalTypeId::UInteger, - LogicalTypeId::UBigint => LogicalTypeId::UBigint, - LogicalTypeId::Float => LogicalTypeId::Float, - LogicalTypeId::Double => LogicalTypeId::Double, - LogicalTypeId::Varchar => LogicalTypeId::Varchar, - LogicalTypeId::Date => LogicalTypeId::Date, - LogicalTypeId::Timestamp => LogicalTypeId::Timestamp, - LogicalTypeId::Boolean => LogicalTypeId::Boolean, - _ => LogicalTypeId::Varchar, - }); - bind.add_result_column(name, type_handle); - } - // Store the bind data after adding columns - unsafe { - ptr::write(data, bind_data); - } + let bind_data = ClickHouseScanBindData { + url, + user, + password, + query, + column_names: names, + column_types: types, + }; - Ok(()) -} + unsafe { + ptr::write(data, bind_data); + } -unsafe fn init(info: &InitInfo, data: *mut Self::InitData) -> Result<(), Box> { - if data.is_null() { - return Err("Invalid init data pointer".into()); + Ok(()) } - let bind_data = info.get_bind_data::(); - if bind_data.is_null() { - return Err("Invalid bind data".into()); - } + unsafe fn init(info: &InitInfo, data: *mut Self::InitData) -> Result<(), Box> { + if data.is_null() { + return Err("Invalid init data pointer".into()); + } - let runtime = Arc::new(Runtime::new()?); + let bind_data = info.get_bind_data::(); + if bind_data.is_null() { + return Err("Invalid bind data".into()); + } - let result = runtime.block_on(async { - let pool = Pool::new((*bind_data).url.clone()); - let mut client = pool.get_handle().await?; - let block = client.query(&(*bind_data).query).fetch_all().await?; + let runtime = + Arc::new(Runtime::new().map_err(|e| format!("Failed to create runtime: {}", e))?); - let columns = block.columns(); - let mut data: Vec> = Vec::new(); + let result = runtime.block_on(async { + let pool = Pool::new((*bind_data).url.clone()); + let mut client = pool.get_handle().await?; + let block = client.query(&(*bind_data).query).fetch_all().await?; - for _ in columns { - data.push(Vec::new()); - } + let columns = block.columns(); + let mut data: Vec> = Vec::new(); - let mut row_count = 0; - for row in block.rows() { - for (col_idx, col) in columns.iter().enumerate() { - let value = match col.sql_type() { - SqlType::UInt8 => { - match row.get::(col.name()) { + for _ in columns { + data.push(Vec::new()); + } + + let mut row_count = 0; + for row in block.rows() { + for (col_idx, col) in columns.iter().enumerate() { + let value = match col.sql_type() { + SqlType::UInt8 => match row.get::(col.name()) { Ok(val) => val.to_string(), - Err(_) => "0".to_string() - } - }, - SqlType::UInt16 => { - match row.get::(col.name()) { + Err(_) => "0".to_string(), + }, + SqlType::UInt16 => match row.get::(col.name()) { Ok(val) => val.to_string(), - Err(_) => "0".to_string() - } - }, - SqlType::UInt32 => { - match row.get::(col.name()) { + Err(_) => "0".to_string(), + }, + SqlType::UInt32 => match row.get::(col.name()) { Ok(val) => val.to_string(), - Err(_) => "0".to_string() - } - }, - SqlType::UInt64 => { - match row.get::(col.name()) { + Err(_) => "0".to_string(), + }, + SqlType::UInt64 => match row.get::(col.name()) { Ok(val) => val.to_string(), - Err(_) => "0".to_string() - } - }, - SqlType::Int8 => { - match row.get::(col.name()) { + Err(_) => "0".to_string(), + }, + SqlType::Int8 => match row.get::(col.name()) { Ok(val) => val.to_string(), - Err(_) => "0".to_string() - } - }, - SqlType::Int16 => { - match row.get::(col.name()) { + Err(_) => "0".to_string(), + }, + SqlType::Int16 => match row.get::(col.name()) { Ok(val) => val.to_string(), - Err(_) => "0".to_string() - } - }, - SqlType::Int32 => { - match row.get::(col.name()) { + Err(_) => "0".to_string(), + }, + SqlType::Int32 => match row.get::(col.name()) { Ok(val) => val.to_string(), - Err(_) => "0".to_string() - } - }, - SqlType::Int64 => { - match row.get::(col.name()) { + Err(_) => "0".to_string(), + }, + SqlType::Int64 => match row.get::(col.name()) { Ok(val) => val.to_string(), - Err(_) => "0".to_string() - } - }, - SqlType::Float32 => { - match row.get::(col.name()) { + Err(_) => "0".to_string(), + }, + SqlType::Float32 => match row.get::(col.name()) { Ok(val) => val.to_string(), - Err(_) => "0.0".to_string() - } - }, - SqlType::Float64 => { - match row.get::(col.name()) { + Err(_) => "0.0".to_string(), + }, + SqlType::Float64 => match row.get::(col.name()) { Ok(val) => val.to_string(), - Err(_) => "0.0".to_string() - } - }, - SqlType::String | SqlType::FixedString(_) => { - match row.get::(col.name()) { - Ok(val) => val, - Err(_) => String::new() + Err(_) => "0.0".to_string(), + }, + SqlType::String | SqlType::FixedString(_) => { + match row.get::(col.name()) { + Ok(val) => val, + Err(_) => String::new(), + } } - }, - SqlType::Bool => { - match row.get::(col.name()) { + SqlType::Bool => match row.get::(col.name()) { Ok(val) => val.to_string(), - Err(_) => "false".to_string() - } - }, - SqlType::Date => { - match row.get::(col.name()) { + Err(_) => "false".to_string(), + }, + SqlType::Date => match row.get::(col.name()) { Ok(val) => val, - Err(_) => "1970-01-01".to_string() - } - }, - SqlType::DateTime(_) => { - match row.get::(col.name()) { + Err(_) => "1970-01-01".to_string(), + }, + SqlType::DateTime(_) => match row.get::(col.name()) { Ok(val) => val, - Err(_) => "1970-01-01 00:00:00".to_string() - } - }, - _ => { - match row.get::(col.name()) { + Err(_) => "1970-01-01 00:00:00".to_string(), + }, + _ => match row.get::(col.name()) { Ok(val) => val, - Err(_) => "0".to_string() - } - } - }; - data[col_idx].push(value); + Err(_) => "0".to_string(), + }, + }; + data[col_idx].push(value); + } + row_count += 1; } - row_count += 1; - } - Ok::<(Vec>, usize), Box>((data, row_count)) - })?; + Ok::<(Vec>, usize), Box>((data, row_count)) + })?; - let (block_data, total_rows) = result; + let (block_data, total_rows) = result; - // Create new vectors by mapping over references - let column_types = unsafe { - (*bind_data).column_types.iter().map(|type_id| { - match type_id { + let column_types = (*bind_data) + .column_types + .iter() + .map(|type_id| match type_id { LogicalTypeId::Integer => LogicalTypeId::Integer, LogicalTypeId::Bigint => LogicalTypeId::Bigint, LogicalTypeId::UInteger => LogicalTypeId::UInteger, @@ -324,134 +276,115 @@ unsafe fn init(info: &InitInfo, data: *mut Self::InitData) -> Result<(), Box LogicalTypeId::Timestamp, LogicalTypeId::Boolean => LogicalTypeId::Boolean, _ => LogicalTypeId::Varchar, - } - }).collect::>() - }; - - let column_names = unsafe { (*bind_data).column_names.clone() }; - - // Create init data using ptr::write - unsafe { - ptr::write(data, ClickHouseScanInitData { - runtime: Some(runtime), - block_data: Some(block_data), - column_types, - column_names, - current_row: 0, - total_rows, - done: false, - }); - } - - Ok(()) -} - - // end patch + }) + .collect::>(); + + let column_names = (*bind_data).column_names.clone(); + + unsafe { + ptr::write( + data, + ClickHouseScanInitData { + runtime: Some(runtime), + block_data: Some(block_data), + column_types, + column_names, + current_row: 0, + total_rows, + done: false, + }, + ); + } - unsafe fn func(func: &FunctionInfo, output: &mut DataChunkHandle) -> Result<(), Box> { - let init_data = func.get_init_data::(); - - if init_data.is_null() { - return Err("Invalid init data pointer".into()); + Ok(()) } - unsafe { - if (*init_data).done || (*init_data).current_row >= (*init_data).total_rows { - output.set_len(0); - (*init_data).done = true; - return Ok(()); + unsafe fn func( + func: &FunctionInfo, + output: &mut DataChunkHandle, + ) -> Result<(), Box> { + let init_data = func.get_init_data::(); + + if init_data.is_null() { + return Err("Invalid init data pointer".into()); } - let block_data = match (*init_data).block_data.as_ref() { - Some(data) => data, - None => return Err("Block data is not available".into()), - }; + unsafe { + if (*init_data).done || (*init_data).current_row >= (*init_data).total_rows { + output.set_len(0); + (*init_data).done = true; + return Ok(()); + } - let column_types = &(*init_data).column_types; - - let batch_size = 1024.min((*init_data).total_rows - (*init_data).current_row); - - for col_idx in 0..column_types.len() { - let mut vector = output.flat_vector(col_idx); - let type_id = &column_types[col_idx]; - - match type_id { - LogicalTypeId::Integer | LogicalTypeId::UInteger => { - let slice = vector.as_mut_slice::(); - for row_offset in 0..batch_size { - let row_idx = (*init_data).current_row + row_offset; - let val_str = &block_data[col_idx][row_idx]; - // println!("Parsing value: {}", val_str); // Debug print - - // Try parsing with different number bases - let val = if let Ok(v) = val_str.parse::() { - v - } else if let Ok(v) = val_str.parse::() { - v as i32 - } else if let Ok(v) = i32::from_str_radix(val_str.trim(), 10) { - v - } else { - println!("Failed to parse: {}", val_str); // Debug print - 0 - }; - slice[row_offset] = val; - } - }, - LogicalTypeId::UInteger => { - let slice = vector.as_mut_slice::(); - for row_offset in 0..batch_size { - let row_idx = (*init_data).current_row + row_offset; - // Try parsing as different unsigned integer types - let val = if let Ok(v) = block_data[col_idx][row_idx].parse::() { - v as i32 - } else if let Ok(v) = block_data[col_idx][row_idx].parse::() { - v as i32 - } else if let Ok(v) = block_data[col_idx][row_idx].parse::() { - v as i32 - } else { - 0 - }; - slice[row_offset] = val; - } - }, - LogicalTypeId::Bigint => { - let slice = vector.as_mut_slice::(); - for row_offset in 0..batch_size { - let row_idx = (*init_data).current_row + row_offset; - if let Ok(val) = block_data[col_idx][row_idx].parse::() { + let block_data = match (*init_data).block_data.as_ref() { + Some(data) => data, + None => return Err("Block data is not available".into()), + }; + + let column_types = &(*init_data).column_types; + + let batch_size = 1024.min((*init_data).total_rows - (*init_data).current_row); + + for col_idx in 0..column_types.len() { + let mut vector = output.flat_vector(col_idx); + let type_id = &column_types[col_idx]; + + match type_id { + LogicalTypeId::Integer | LogicalTypeId::UInteger => { + let slice = vector.as_mut_slice::(); + for row_offset in 0..batch_size { + let row_idx = (*init_data).current_row + row_offset; + let val_str = &block_data[col_idx][row_idx]; + + let val = if let Ok(v) = val_str.parse::() { + v + } else if let Ok(v) = val_str.parse::() { + v as i32 + } else if let Ok(v) = i32::from_str_radix(val_str.trim(), 10) { + v + } else { + 0 + }; slice[row_offset] = val; - } else { - slice[row_offset] = 0; } } - }, - LogicalTypeId::UBigint => { - let slice = vector.as_mut_slice::(); - for row_offset in 0..batch_size { - let row_idx = (*init_data).current_row + row_offset; - if let Ok(val) = block_data[col_idx][row_idx].parse::() { - slice[row_offset] = val as i64; - } else { - slice[row_offset] = 0; + LogicalTypeId::Bigint => { + let slice = vector.as_mut_slice::(); + for row_offset in 0..batch_size { + let row_idx = (*init_data).current_row + row_offset; + if let Ok(val) = block_data[col_idx][row_idx].parse::() { + slice[row_offset] = val; + } else { + slice[row_offset] = 0; + } } } - }, - _ => { - for row_offset in 0..batch_size { - let row_idx = (*init_data).current_row + row_offset; - let val = block_data[col_idx][row_idx].as_str(); - Inserter::insert(&mut vector, row_offset, val); + LogicalTypeId::UBigint => { + let slice = vector.as_mut_slice::(); + for row_offset in 0..batch_size { + let row_idx = (*init_data).current_row + row_offset; + if let Ok(val) = block_data[col_idx][row_idx].parse::() { + slice[row_offset] = val as i64; + } else { + slice[row_offset] = 0; + } + } + } + _ => { + for row_offset in 0..batch_size { + let row_idx = (*init_data).current_row + row_offset; + let val = block_data[col_idx][row_idx].as_str(); + Inserter::insert(&mut vector, row_offset, val); + } } } } - } - (*init_data).current_row += batch_size; - output.set_len(batch_size); - } - Ok(()) + (*init_data).current_row += batch_size; + output.set_len(batch_size); + } + Ok(()) } - // end func fn parameters() -> Option> { Some(vec![LogicalTypeHandle::from(LogicalTypeId::Varchar)]) diff --git a/src/lib.rs b/src/lib.rs index 2519733..baf1b6c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,17 @@ -use std::{error::Error, ffi::{c_char, CStr, CString}, fs::File, io::{self, Read, BufReader, Seek}}; +use byteorder::{LittleEndian, ReadBytesExt}; use duckdb::{ - core::{DataChunkHandle, LogicalTypeHandle, LogicalTypeId, Inserter}, + core::{DataChunkHandle, Inserter, LogicalTypeHandle, LogicalTypeId}, vtab::{BindInfo, Free, FunctionInfo, InitInfo, VTab}, Connection, Result, }; use duckdb_loadable_macros::duckdb_entrypoint_c_api; use libduckdb_sys as ffi; -use byteorder::{ReadBytesExt, LittleEndian}; +use std::{ + error::Error, + ffi::{c_char, CStr, CString}, + fs::File, + io::{self, BufReader, Read, Seek}, +}; mod clickhouse_scan; @@ -95,28 +100,25 @@ fn read_string(reader: &mut impl Read) -> io::Result { fn parse_enum_values(params: &str) -> Option { let inner = params.trim_matches(|c| c == '(' || c == ')').trim(); - + if inner.is_empty() { return None; } - + let mut values = Vec::new(); for pair in inner.split(',') { let parts: Vec<&str> = pair.split('=').collect(); if parts.len() != 2 { continue; } - - let name = parts[0] - .trim() - .trim_matches('\'') - .to_string(); - + + let name = parts[0].trim().trim_matches('\'').to_string(); + if let Ok(value) = parts[1].trim().parse::() { values.push(EnumValue { name, value }); } } - + if values.is_empty() { None } else { @@ -154,32 +156,37 @@ fn parse_column_type(type_str: &str) -> (ColumnType, Option) { } else { ColumnType::Unsupported("Invalid Enum8".to_string()) } - }, + } other => ColumnType::Unsupported(other.to_string()), }; (column_type, params) } -fn read_column_data(reader: &mut impl Read, column_type: &ColumnType, rows: u64) -> io::Result> { +fn read_column_data( + reader: &mut impl Read, + column_type: &ColumnType, + rows: u64, +) -> io::Result> { let mut data = Vec::with_capacity(rows as usize); for _ in 0..rows { let value = match column_type { ColumnType::UInt64 => { let val = reader.read_u64::()?; ColumnData::UInt64(val) - }, + } ColumnType::String => ColumnData::String(read_string(reader)?), ColumnType::UInt8 => ColumnData::UInt8(reader.read_u8()?), ColumnType::Enum8(enum_type) => { let val = reader.read_u8()?; - let enum_str = enum_type.values + let enum_str = enum_type + .values .iter() .find(|ev| ev.value == val as i8) .map(|ev| ev.name.clone()) .unwrap_or_else(|| format!("Unknown({})", val)); ColumnData::Enum8(enum_str) - }, + } ColumnType::Int => ColumnData::Int(reader.read_i32::()?), ColumnType::Unsupported(type_name) => { ColumnData::String(format!("", type_name)) @@ -202,19 +209,22 @@ fn read_var_u64(reader: &mut impl Read) -> io::Result { return Ok(x); } } - - Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid VarUInt")) + + Err(io::Error::new( + io::ErrorKind::InvalidData, + "Invalid VarUInt", + )) } fn skip_block_header(reader: &mut BufReader) -> io::Result<()> { let mut marker = [0u8; 4]; reader.read_exact(&mut marker)?; - + for _ in 0..2 { let str_len = reader.read_u8()? as u64; reader.seek_relative(str_len as i64)?; } - + Ok(()) } @@ -225,10 +235,15 @@ fn read_native_format(reader: &mut BufReader) -> io::Result> { for _ in 0..num_columns { let name = read_string(reader)?; - let type_str = read_string(reader)?; + let type_str = read_string(reader)?; let (column_type, type_params) = parse_column_type(&type_str); let data = read_column_data(reader, &column_type, num_rows)?; - columns.push(Column { name, type_: column_type, type_params, data }); + columns.push(Column { + name, + type_: column_type, + type_params, + data, + }); } loop { @@ -266,57 +281,64 @@ impl VTab for ClickHouseVTab { unsafe fn bind(bind: &BindInfo, data: *mut ClickHouseBindData) -> Result<(), Box> { let filepath = bind.get_parameter(0).to_string(); - + let file = File::open(&filepath)?; let mut reader = BufReader::with_capacity(64 * 1024, file); let columns = read_native_format(&mut reader)?; - + for column in &columns { let logical_type = match &column.type_ { ColumnType::String => LogicalTypeId::Varchar, ColumnType::UInt8 => LogicalTypeId::Integer, ColumnType::UInt64 => LogicalTypeId::Integer, ColumnType::Int => LogicalTypeId::Integer, - ColumnType::Enum8(_) => LogicalTypeId::Varchar, // Store enums as strings + ColumnType::Enum8(_) => LogicalTypeId::Varchar, // Store enums as strings ColumnType::Unsupported(_) => LogicalTypeId::Varchar, }; bind.add_result_column(&column.name, LogicalTypeHandle::from(logical_type)); } - + let filepath_cstring = CString::new(filepath)?; let raw_ptr = filepath_cstring.as_ptr() as *mut c_char; - + unsafe { (*data).filepath = raw_ptr; (*data)._filepath_holder = Some(filepath_cstring); } - + Ok(()) } unsafe fn init(info: &InitInfo, data: *mut ClickHouseInitData) -> Result<(), Box> { let bind_data = info.get_bind_data::(); let filepath = unsafe { CStr::from_ptr((*bind_data).filepath).to_str()? }; - + let file = File::open(filepath)?; let mut reader = BufReader::with_capacity(64 * 1024, file); - + let read_result = read_native_format(&mut reader)?; - let total_rows = if read_result.is_empty() { 0 } else { read_result[0].data.len() }; - + let total_rows = if read_result.is_empty() { + 0 + } else { + read_result[0].data.len() + }; + unsafe { std::ptr::write(&mut (*data).columns, read_result); (*data).current_row = 0; (*data).total_rows = total_rows; (*data).done = false; } - + Ok(()) } - unsafe fn func(func: &FunctionInfo, output: &mut DataChunkHandle) -> Result<(), Box> { + unsafe fn func( + func: &FunctionInfo, + output: &mut DataChunkHandle, + ) -> Result<(), Box> { let init_data = func.get_init_data::(); - + unsafe { if (*init_data).done || (*init_data).current_row >= (*init_data).total_rows { output.set_len(0); @@ -325,7 +347,7 @@ impl VTab for ClickHouseVTab { } let batch_size = 1024.min((*init_data).total_rows - (*init_data).current_row); - + for col_idx in 0..(*init_data).columns.len() { let column = &(*init_data).columns[col_idx]; let mut vector = output.flat_vector(col_idx); @@ -336,23 +358,22 @@ impl VTab for ClickHouseVTab { let data_idx = (*init_data).current_row + row; match &column.data[data_idx] { ColumnData::String(s) => { - let cleaned = s.replace('\0', "") - .replace('\u{FFFD}', ""); + let cleaned = s.replace('\0', "").replace('\u{FFFD}', ""); vector.insert(row, cleaned.as_str()) - }, + } _ => vector.insert(row, ""), } } - }, + } ColumnType::UInt8 => { let slice = vector.as_mut_slice::(); for row in 0..batch_size { - let data_idx = (*init_data).current_row + row; + let data_idx = (*init_data).current_row + row; if let ColumnData::UInt8(v) = column.data[data_idx] { slice[row] = v as i32; } } - }, + } ColumnType::Enum8(_) => { for row in 0..batch_size { let data_idx = (*init_data).current_row + row; @@ -360,7 +381,7 @@ impl VTab for ClickHouseVTab { vector.insert(row, s.as_str()); } } - }, + } ColumnType::UInt64 => { let slice = vector.as_mut_slice::(); @@ -370,7 +391,7 @@ impl VTab for ClickHouseVTab { slice[row] = v as i32; } } - }, + } ColumnType::Int => { let slice = vector.as_mut_slice::(); for row in 0..batch_size { @@ -379,7 +400,7 @@ impl VTab for ClickHouseVTab { slice[row] = v; } } - }, + } } } (*init_data).current_row += batch_size;