Skip to content

Commit

Permalink
Make COPY TO align with CREATE EXTERNAL TABLE (#9604)
Browse files Browse the repository at this point in the history
  • Loading branch information
metesynnada authored Mar 18, 2024
1 parent e53eb03 commit b137f60
Show file tree
Hide file tree
Showing 26 changed files with 598 additions and 398 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl SchemaProvider for DynamicFileSchemaProvider {
&state,
table_url.scheme(),
url,
state.default_table_options(),
&state.default_table_options(),
)
.await?;
state.runtime_env().register_object_store(url, store);
Expand Down
6 changes: 3 additions & 3 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ mod tests {
)
})?;
for location in locations {
let sql = format!("copy (values (1,2)) to '{}';", location);
let sql = format!("copy (values (1,2)) to '{}' STORED AS PARQUET;", location);
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
//Should not fail
Expand All @@ -438,8 +438,8 @@ mod tests {
let location = "s3://bucket/path/file.parquet";

// Missing region, use object_store defaults
let sql = format!("COPY (values (1,2)) TO '{location}'
(format parquet, 'aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}')");
let sql = format!("COPY (values (1,2)) TO '{location}' STORED AS PARQUET
OPTIONS ('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}')");
copy_to_table_test(location, &sql).await?;

Ok(())
Expand Down
221 changes: 166 additions & 55 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1109,87 +1109,171 @@ macro_rules! extensions_options {
}
}

/// Represents the configuration options available for handling different table formats within a data processing application.
/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
#[derive(Debug, Clone, Default)]
pub struct TableOptions {
/// Configuration options for CSV file handling. This includes settings like the delimiter,
/// quote character, and whether the first row is considered as headers.
pub csv: CsvOptions,

/// Configuration options for Parquet file handling. This includes settings for compression,
/// encoding, and other Parquet-specific file characteristics.
pub parquet: TableParquetOptions,

/// Configuration options for JSON file handling.
pub json: JsonOptions,

/// The current file format that the table operations should assume. This option allows
/// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
pub current_format: Option<FileType>,
/// Optional extensions registered using [`Extensions::insert`]

/// Optional extensions that can be used to extend or customize the behavior of the table
/// options. Extensions can be registered using `Extensions::insert` and might include
/// custom file handling logic, additional configuration parameters, or other enhancements.
pub extensions: Extensions,
}

impl ConfigField for TableOptions {
/// Visits configuration settings for the current file format, or all formats if none is selected.
///
/// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
/// If a format is selected, it visits only the settings relevant to that format. Otherwise,
/// it visits all available format settings.
fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
self.csv.visit(v, "csv", "");
self.parquet.visit(v, "parquet", "");
self.json.visit(v, "json", "");
if let Some(file_type) = &self.current_format {
match file_type {
#[cfg(feature = "parquet")]
FileType::PARQUET => self.parquet.visit(v, "format", ""),
FileType::CSV => self.csv.visit(v, "format", ""),
FileType::JSON => self.json.visit(v, "format", ""),
_ => {}
}
} else {
self.csv.visit(v, "csv", "");
self.parquet.visit(v, "parquet", "");
self.json.visit(v, "json", "");
}
}

/// Sets a configuration value for a specific key within `TableOptions`.
///
/// This method delegates setting configuration values to the specific file format configurations,
/// based on the current format selected. If no format is selected, it returns an error.
///
/// # Parameters
///
/// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
/// for CSV format.
/// * `value`: The value to set for the specified configuration key.
///
/// # Returns
///
/// A result indicating success or an error if the key is not recognized, if a format is not specified,
/// or if setting the configuration value fails for the specific format.
fn set(&mut self, key: &str, value: &str) -> Result<()> {
// Extensions are handled in the public `ConfigOptions::set`
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
let Some(format) = &self.current_format else {
return _config_err!("Specify a format for TableOptions");
};
match key {
"csv" => self.csv.set(rem, value),
"parquet" => self.parquet.set(rem, value),
"json" => self.json.set(rem, value),
"format" => match format {
#[cfg(feature = "parquet")]
FileType::PARQUET => self.parquet.set(rem, value),
FileType::CSV => self.csv.set(rem, value),
FileType::JSON => self.json.set(rem, value),
_ => {
_config_err!("Config value \"{key}\" is not supported on {}", format)
}
},
_ => _config_err!("Config value \"{key}\" not found on TableOptions"),
}
}
}

impl TableOptions {
/// Creates a new [`ConfigOptions`] with default values
/// Constructs a new instance of `TableOptions` with default settings.
///
/// # Returns
///
/// A new `TableOptions` instance with default configuration values.
pub fn new() -> Self {
Self::default()
}

/// Sets the file format for the table.
///
/// # Parameters
///
/// * `format`: The file format to use (e.g., CSV, Parquet).
pub fn set_file_format(&mut self, format: FileType) {
self.current_format = Some(format);
}

/// Creates a new `TableOptions` instance initialized with settings from a given session config.
///
/// # Parameters
///
/// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
///
/// # Returns
///
/// A new `TableOptions` instance with settings applied from the session config.
pub fn default_from_session_config(config: &ConfigOptions) -> Self {
let mut initial = TableOptions::default();
initial.parquet.global = config.execution.parquet.clone();
let initial = TableOptions::default();
initial.combine_with_session_config(config);
initial
}

/// Set extensions to provided value
/// Updates the current `TableOptions` with settings from a given session config.
///
/// # Parameters
///
/// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
///
/// # Returns
///
/// A new `TableOptions` instance with updated settings from the session config.
pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
let mut clone = self.clone();
clone.parquet.global = config.execution.parquet.clone();
clone
}

/// Sets the extensions for this `TableOptions` instance.
///
/// # Parameters
///
/// * `extensions`: The `Extensions` instance to set.
///
/// # Returns
///
/// A new `TableOptions` instance with the specified extensions applied.
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
self.extensions = extensions;
self
}

/// Set a configuration option
/// Sets a specific configuration option.
///
/// # Parameters
///
/// * `key`: The configuration key (e.g., "format.delimiter").
/// * `value`: The value to set for the specified key.
///
/// # Returns
///
/// A result indicating success or failure in setting the configuration option.
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (prefix, _) = key.split_once('.').ok_or_else(|| {
DataFusionError::Configuration(format!(
"could not find config namespace for key \"{key}\""
))
})?;

if prefix == "csv" || prefix == "json" || prefix == "parquet" {
if let Some(format) = &self.current_format {
match format {
FileType::CSV if prefix != "csv" => {
return Err(DataFusionError::Configuration(format!(
"Key \"{key}\" is not applicable for CSV format"
)))
}
#[cfg(feature = "parquet")]
FileType::PARQUET if prefix != "parquet" => {
return Err(DataFusionError::Configuration(format!(
"Key \"{key}\" is not applicable for PARQUET format"
)))
}
FileType::JSON if prefix != "json" => {
return Err(DataFusionError::Configuration(format!(
"Key \"{key}\" is not applicable for JSON format"
)))
}
_ => {}
}
}
if prefix == "format" {
return ConfigField::set(self, key, value);
}

Expand All @@ -1202,6 +1286,15 @@ impl TableOptions {
e.0.set(key, value)
}

/// Initializes a new `TableOptions` from a hash map of string settings.
///
/// # Parameters
///
/// * `settings`: A hash map where each key-value pair represents a configuration setting.
///
/// # Returns
///
/// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
let mut ret = Self::default();
for (k, v) in settings {
Expand All @@ -1211,6 +1304,15 @@ impl TableOptions {
Ok(ret)
}

/// Modifies the current `TableOptions` instance with settings from a hash map.
///
/// # Parameters
///
/// * `settings`: A hash map where each key-value pair represents a configuration setting.
///
/// # Returns
///
/// A result indicating success or failure in applying the settings.
pub fn alter_with_string_hash_map(
&mut self,
settings: &HashMap<String, String>,
Expand All @@ -1221,7 +1323,11 @@ impl TableOptions {
Ok(())
}

/// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
/// Retrieves all configuration entries from this `TableOptions`.
///
/// # Returns
///
/// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
pub fn entries(&self) -> Vec<ConfigEntry> {
struct Visitor(Vec<ConfigEntry>);

Expand Down Expand Up @@ -1249,9 +1355,7 @@ impl TableOptions {
}

let mut v = Visitor(vec![]);
self.visit(&mut v, "csv", "");
self.visit(&mut v, "json", "");
self.visit(&mut v, "parquet", "");
self.visit(&mut v, "format", "");

v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
v.0
Expand Down Expand Up @@ -1556,6 +1660,7 @@ mod tests {
use crate::config::{
ConfigEntry, ConfigExtension, ExtensionOptions, Extensions, TableOptions,
};
use crate::FileType;

#[derive(Default, Debug, Clone)]
pub struct TestExtensionConfig {
Expand Down Expand Up @@ -1609,12 +1714,13 @@ mod tests {
}

#[test]
fn alter_kafka_config() {
fn alter_test_extension_config() {
let mut extension = Extensions::new();
extension.insert(TestExtensionConfig::default());
let mut table_config = TableOptions::new().with_extensions(extension);
table_config.set("parquet.write_batch_size", "10").unwrap();
assert_eq!(table_config.parquet.global.write_batch_size, 10);
table_config.set_file_format(FileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter, b';');
table_config.set("test.bootstrap.servers", "asd").unwrap();
let kafka_config = table_config
.extensions
Expand All @@ -1626,38 +1732,43 @@ mod tests {
);
}

#[test]
fn csv_u8_table_options() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter as char, ';');
table_config.set("format.escape", "\"").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '"');
table_config.set("format.escape", "\'").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
}

#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config
.set("parquet.bloom_filter_enabled::col1", "true")
.set("format.bloom_filter_enabled::col1", "true")
.unwrap();
assert_eq!(
table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
Some(true)
);
}

#[test]
fn csv_u8_table_options() {
let mut table_config = TableOptions::new();
table_config.set("csv.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter as char, ';');
table_config.set("csv.escape", "\"").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '"');
table_config.set("csv.escape", "\'").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
}

#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options_config_entry() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config
.set("parquet.bloom_filter_enabled::col1", "true")
.set("format.bloom_filter_enabled::col1", "true")
.unwrap();
let entries = table_config.entries();
assert!(entries
.iter()
.any(|item| item.key == "parquet.bloom_filter_enabled::col1"))
.any(|item| item.key == "format.bloom_filter_enabled::col1"))
}
}
Loading

0 comments on commit b137f60

Please sign in to comment.