From 672ae9558307787342f3377f032aeec045fd5322 Mon Sep 17 00:00:00 2001 From: andyfeng Date: Thu, 2 Jan 2025 21:26:46 +0800 Subject: [PATCH] Rename reader config to file scan info (#4673) * Rename reader config to file scan info * Run clang-format --------- Co-authored-by: CI Bot --- extension/delta/src/function/delta_scan.cpp | 2 +- .../delta/src/include/function/delta_scan.h | 4 +- .../iceberg/src/function/iceberg_bindfunc.cpp | 4 +- .../functions/table_functions/json_scan.cpp | 26 ++++----- scripts/headers.txt | 2 +- src/binder/bind/bind_file_scan.cpp | 16 +++--- src/binder/bind/copy/bind_copy_from.cpp | 5 +- src/binder/binder.cpp | 5 +- src/common/copier_config/reader_config.cpp | 3 +- src/function/table/bind_data.cpp | 2 +- src/function/table/scan_functions.cpp | 2 +- src/include/binder/binder.h | 4 +- src/include/binder/bound_export_database.h | 6 +- .../{reader_config.h => file_scan_info.h} | 10 ++-- src/include/function/table/bind_data.h | 16 +++--- src/include/function/table/bind_input.h | 4 +- src/include/function/table/scan_functions.h | 12 ++-- .../operator/simple/logical_export_db.h | 12 ++-- .../persistent/reader/csv/base_csv_reader.h | 5 +- .../reader/csv/parallel_csv_reader.h | 2 +- .../persistent/reader/csv/serial_csv_reader.h | 2 +- .../persistent/reader/npy/npy_reader.h | 2 +- .../reader/parquet/parquet_reader.h | 2 +- .../processor/operator/simple/export_db.h | 6 +- .../persistent/reader/csv/base_csv_reader.cpp | 4 +- .../reader/csv/parallel_csv_reader.cpp | 45 ++++++++------- .../reader/csv/serial_csv_reader.cpp | 55 ++++++++++--------- .../persistent/reader/npy/npy_reader.cpp | 37 +++++++------ .../reader/parquet/parquet_reader.cpp | 40 +++++++------- src/processor/operator/simple/export_db.cpp | 4 +- .../src_cpp/include/pandas/pandas_scan.h | 10 ++-- .../python_api/src_cpp/pandas/pandas_scan.cpp | 3 +- .../src_cpp/pyarrow/pyarrow_scan.cpp | 2 +- 33 files changed, 184 insertions(+), 170 deletions(-) rename src/include/common/copier_config/{reader_config.h => file_scan_info.h} (86%) diff --git a/extension/delta/src/function/delta_scan.cpp b/extension/delta/src/function/delta_scan.cpp index 33e2d6812df..da40f37bee4 100644 --- a/extension/delta/src/function/delta_scan.cpp +++ b/extension/delta/src/function/delta_scan.cpp @@ -30,7 +30,7 @@ static std::unique_ptr bindFunc(main::ClientContext* context, KU_ASSERT(returnTypes.size() == returnColumnNames.size()); auto columns = input->binder->createVariables(returnColumnNames, returnTypes); return std::make_unique(std::move(query), connector, - duckdb_extension::DuckDBResultConverter{returnTypes}, columns, ReaderConfig{}, context); + duckdb_extension::DuckDBResultConverter{returnTypes}, columns, FileScanInfo{}, context); } struct DeltaScanSharedState final : BaseScanSharedState { diff --git a/extension/delta/src/include/function/delta_scan.h b/extension/delta/src/include/function/delta_scan.h index f17bcf379b8..203dfd51823 100644 --- a/extension/delta/src/include/function/delta_scan.h +++ b/extension/delta/src/include/function/delta_scan.h @@ -25,8 +25,8 @@ struct DeltaScanBindData final : function::ScanBindData { DeltaScanBindData(std::string query, std::shared_ptr connector, duckdb_extension::DuckDBResultConverter converter, binder::expression_vector columns, - common::ReaderConfig config, main::ClientContext* ctx) - : ScanBindData{std::move(columns), std::move(config), ctx}, query{std::move(query)}, + common::FileScanInfo fileScanInfo, main::ClientContext* ctx) + : ScanBindData{std::move(columns), std::move(fileScanInfo), ctx}, query{std::move(query)}, connector{std::move(connector)}, converter{std::move(converter)} {} std::unique_ptr copy() const override { diff --git a/extension/iceberg/src/function/iceberg_bindfunc.cpp b/extension/iceberg/src/function/iceberg_bindfunc.cpp index 4c89bb31946..6bf14238612 100644 --- a/extension/iceberg/src/function/iceberg_bindfunc.cpp +++ b/extension/iceberg/src/function/iceberg_bindfunc.cpp @@ -29,7 +29,7 @@ static std::string generateQueryOptions(const TableFuncBindInput* input, }; if (functionName == "ICEBERG_SCAN") { auto scanInput = input->extraInput->constPtrCast(); - appendOptions(scanInput->config.options); + appendOptions(scanInput->fileScanInfo.options); } else { appendOptions(input->optionalParams); } @@ -72,7 +72,7 @@ std::unique_ptr bindFuncHelper(main::ClientContext* context, KU_ASSERT(returnTypes.size() == returnColumnNames.size()); auto columns = input->binder->createVariables(returnColumnNames, returnTypes); return std::make_unique(std::move(query), connector, - duckdb_extension::DuckDBResultConverter{returnTypes}, columns, ReaderConfig{}, context); + duckdb_extension::DuckDBResultConverter{returnTypes}, columns, FileScanInfo{}, context); } } // namespace iceberg_extension } // namespace kuzu diff --git a/extension/json/src/functions/table_functions/json_scan.cpp b/extension/json/src/functions/table_functions/json_scan.cpp index 9b0de98006e..25aaeafb0ba 100644 --- a/extension/json/src/functions/table_functions/json_scan.cpp +++ b/extension/json/src/functions/table_functions/json_scan.cpp @@ -686,9 +686,9 @@ struct JsonScanBindData : public ScanBindData { JsonScanFormat format; JsonScanBindData(binder::expression_vector columns, column_id_t numWarningDataColumns, - ReaderConfig config, main::ClientContext* ctx, case_insensitive_map_t colNameToIdx, - JsonScanFormat format) - : ScanBindData(columns, std::move(config), ctx, numWarningDataColumns, 0), + FileScanInfo fileScanInfo, main::ClientContext* ctx, + case_insensitive_map_t colNameToIdx, JsonScanFormat format) + : ScanBindData(columns, std::move(fileScanInfo), ctx, numWarningDataColumns, 0), colNameToIdx{std::move(colNameToIdx)}, format{format} {} uint64_t getFieldIdx(const std::string& fieldName) const; @@ -772,7 +772,7 @@ static std::unique_ptr bindFunc(main::ClientContext* context, auto scanInput = ku_dynamic_cast(input->extraInput.get()); std::vector columnTypes; std::vector columnNames; - JsonScanConfig scanConfig(scanInput->config.options); + JsonScanConfig scanConfig(scanInput->fileScanInfo.options); case_insensitive_map_t colNameToIdx; if (!scanInput->expectedColumnNames.empty() || !scanConfig.autoDetect) { if (scanInput->expectedColumnNames.empty()) { @@ -789,16 +789,16 @@ static std::unique_ptr bindFunc(main::ClientContext* context, if (scanConfig.format == JsonScanFormat::AUTO_DETECT) { JSONScanSharedState sharedState(*context, - scanInput->config.getFilePath(JsonExtension::JSON_SCAN_FILE_IDX), scanConfig.format, - 0); + scanInput->fileScanInfo.getFilePath(JsonExtension::JSON_SCAN_FILE_IDX), + scanConfig.format, 0); JSONScanLocalState localState(*context->getMemoryManager(), sharedState, context); localState.readNext(); scanConfig.format = sharedState.jsonReader->getFormat(); } } else { - scanConfig.format = - autoDetect(context, scanInput->config.getFilePath(JsonExtension::JSON_SCAN_FILE_IDX), - scanConfig, columnTypes, columnNames, colNameToIdx); + scanConfig.format = autoDetect(context, + scanInput->fileScanInfo.getFilePath(JsonExtension::JSON_SCAN_FILE_IDX), scanConfig, + columnTypes, columnNames, colNameToIdx); } scanInput->tableFunction->canParallelFunc = [scanConfig]() { return scanConfig.format == JsonScanFormat::NEWLINE_DELIMITED; @@ -806,8 +806,8 @@ static std::unique_ptr bindFunc(main::ClientContext* context, auto columns = input->binder->createVariables(columnNames, columnTypes); - const bool ignoreErrors = scanInput->config.getOption(CopyConstants::IGNORE_ERRORS_OPTION_NAME, - CopyConstants::DEFAULT_IGNORE_ERRORS); + const bool ignoreErrors = scanInput->fileScanInfo.getOption( + CopyConstants::IGNORE_ERRORS_OPTION_NAME, CopyConstants::DEFAULT_IGNORE_ERRORS); std::vector warningColumnNames; std::vector warningColumnTypes; @@ -825,7 +825,7 @@ static std::unique_ptr bindFunc(main::ClientContext* context, columns.push_back(column); } return std::make_unique(columns, numWarningDataColumns, - scanInput->config.copy(), context, std::move(colNameToIdx), scanConfig.format); + scanInput->fileScanInfo.copy(), context, std::move(colNameToIdx), scanConfig.format); } static decltype(auto) getWarningDataVectors(const DataChunk& chunk, column_id_t numWarningColumns) { @@ -871,7 +871,7 @@ static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& output) static std::unique_ptr initSharedState(const TableFunctionInitInput& input) { auto jsonBindData = input.bindData->constPtrCast(); return std::make_unique(*jsonBindData->context, - jsonBindData->config.filePaths[0], jsonBindData->format, 0); + jsonBindData->fileScanInfo.filePaths[0], jsonBindData->format, 0); } static std::unique_ptr initLocalState(const TableFunctionInitInput& input, diff --git a/scripts/headers.txt b/scripts/headers.txt index 36873186cc4..ff139314329 100644 --- a/scripts/headers.txt +++ b/scripts/headers.txt @@ -8,7 +8,7 @@ src/include/common/case_insensitive_map.h src/include/common/cast.h src/include/common/constants.h src/include/common/copier_config/csv_reader_config.h -src/include/common/copier_config/reader_config.h +src/include/common/copier_config/file_scan_info.h src/include/common/copy_constructors.h src/include/common/data_chunk/data_chunk.h src/include/common/data_chunk/data_chunk_state.h diff --git a/src/binder/bind/bind_file_scan.cpp b/src/binder/bind/bind_file_scan.cpp index 651cfcca50e..b55d0497534 100644 --- a/src/binder/bind/bind_file_scan.cpp +++ b/src/binder/bind/bind_file_scan.cpp @@ -103,10 +103,10 @@ std::unique_ptr Binder::bindFileScanSource(const BaseScanSo auto filePaths = bindFilePaths(fileSource->filePaths); auto parsingOptions = bindParsingOptions(options); FileTypeInfo fileTypeInfo; - if (parsingOptions.contains(ReaderConfig::FILE_FORMAT_OPTION_NAME)) { - auto fileFormat = parsingOptions.at(ReaderConfig::FILE_FORMAT_OPTION_NAME).toString(); + if (parsingOptions.contains(FileScanInfo::FILE_FORMAT_OPTION_NAME)) { + auto fileFormat = parsingOptions.at(FileScanInfo::FILE_FORMAT_OPTION_NAME).toString(); fileTypeInfo = FileTypeInfo{FileTypeUtils::fromString(fileFormat), fileFormat}; - parsingOptions.erase(ReaderConfig::FILE_FORMAT_OPTION_NAME); + parsingOptions.erase(FileScanInfo::FILE_FORMAT_OPTION_NAME); } else { fileTypeInfo = bindFileTypeInfo(filePaths); } @@ -122,14 +122,14 @@ std::unique_ptr Binder::bindFileScanSource(const BaseScanSo } } // Bind file configuration - auto config = std::make_unique(std::move(fileTypeInfo), filePaths); - config->options = std::move(parsingOptions); - auto func = getScanFunction(config->fileTypeInfo, *config); + auto fileScanInfo = std::make_unique(std::move(fileTypeInfo), filePaths); + fileScanInfo->options = std::move(parsingOptions); + auto func = getScanFunction(fileScanInfo->fileTypeInfo, *fileScanInfo); // Bind table function auto bindInput = TableFuncBindInput(); bindInput.addLiteralParam(Value::createValue(filePaths[0])); auto extraInput = std::make_unique(); - extraInput->config = config->copy(); + extraInput->fileScanInfo = fileScanInfo->copy(); extraInput->expectedColumnNames = columnNames; extraInput->expectedColumnTypes = LogicalType::copy(columnTypes); extraInput->tableFunction = &func; @@ -186,7 +186,7 @@ std::unique_ptr Binder::bindObjectScanSource(const BaseScan if (replacementData != nullptr) { // Replace as python object func = replacementData->func; auto replaceExtraInput = std::make_unique(); - replaceExtraInput->config.options = bindParsingOptions(options); + replaceExtraInput->fileScanInfo.options = bindParsingOptions(options); replacementData->bindInput.extraInput = std::move(replaceExtraInput); replacementData->bindInput.binder = this; bindData = func.bindFunc(clientContext, &replacementData->bindInput); diff --git a/src/binder/bind/copy/bind_copy_from.cpp b/src/binder/bind/copy/bind_copy_from.cpp index 2e08b6f777a..2d023366ac0 100644 --- a/src/binder/bind/copy/bind_copy_from.cpp +++ b/src/binder/bind/copy/bind_copy_from.cpp @@ -86,9 +86,10 @@ std::unique_ptr Binder::bindCopyNodeFrom(const Statement& statem if (boundSource->type == ScanSourceType::FILE) { auto& source = boundSource->constCast(); auto bindData = source.info.bindData->constPtrCast(); - if (copyStatement.byColumn() && bindData->config.fileTypeInfo.fileType != FileType::NPY) { + if (copyStatement.byColumn() && + bindData->fileScanInfo.fileTypeInfo.fileType != FileType::NPY) { throw BinderException(stringFormat("Copy by column with {} file type is not supported.", - bindData->config.fileTypeInfo.fileTypeStr)); + bindData->fileScanInfo.fileTypeInfo.fileTypeStr)); } } expression_vector columns; diff --git a/src/binder/binder.cpp b/src/binder/binder.cpp index 0fc6614e880..b57e8441380 100644 --- a/src/binder/binder.cpp +++ b/src/binder/binder.cpp @@ -227,7 +227,8 @@ void Binder::restoreScope(BinderScope prevScope) { scope = std::move(prevScope); } -function::TableFunction Binder::getScanFunction(FileTypeInfo typeInfo, const ReaderConfig& config) { +function::TableFunction Binder::getScanFunction(FileTypeInfo typeInfo, + const FileScanInfo& fileScanInfo) { function::Function* func = nullptr; std::vector inputTypes; inputTypes.push_back(LogicalType::STRING()); @@ -242,7 +243,7 @@ function::TableFunction Binder::getScanFunction(FileTypeInfo typeInfo, const Rea NpyScanFunction::name, inputTypes, functions); } break; case FileType::CSV: { - auto csvConfig = CSVReaderConfig::construct(config.options); + auto csvConfig = CSVReaderConfig::construct(fileScanInfo.options); func = function::BuiltInFunctionsUtils::matchFunction(clientContext->getTx(), csvConfig.parallel ? ParallelCSVScan::name : SerialCSVScan::name, inputTypes, functions); diff --git a/src/common/copier_config/reader_config.cpp b/src/common/copier_config/reader_config.cpp index 6012cb3be91..586a31a2c79 100644 --- a/src/common/copier_config/reader_config.cpp +++ b/src/common/copier_config/reader_config.cpp @@ -1,6 +1,5 @@ -#include "common/copier_config/reader_config.h" - #include "common/assert.h" +#include "common/copier_config/file_scan_info.h" #include "common/string_utils.h" namespace kuzu { diff --git a/src/function/table/bind_data.cpp b/src/function/table/bind_data.cpp index 458f3186a65..f1af03cac6c 100644 --- a/src/function/table/bind_data.cpp +++ b/src/function/table/bind_data.cpp @@ -21,7 +21,7 @@ bool TableFuncBindData::getIgnoreErrorsOption() const { } bool ScanBindData::getIgnoreErrorsOption() const { - return config.getOption(common::CopyConstants::IGNORE_ERRORS_OPTION_NAME, + return fileScanInfo.getOption(common::CopyConstants::IGNORE_ERRORS_OPTION_NAME, common::CopyConstants::DEFAULT_IGNORE_ERRORS); } diff --git a/src/function/table/scan_functions.cpp b/src/function/table/scan_functions.cpp index c0d11710cbb..810e13d7684 100644 --- a/src/function/table/scan_functions.cpp +++ b/src/function/table/scan_functions.cpp @@ -5,7 +5,7 @@ namespace function { std::pair ScanSharedState::getNext() { std::lock_guard guard{lock}; - if (fileIdx >= readerConfig.getNumFiles()) { + if (fileIdx >= fileScanInfo.getNumFiles()) { return {UINT64_MAX, UINT64_MAX}; } return {fileIdx, blockIdx++}; diff --git a/src/include/binder/binder.h b/src/include/binder/binder.h index 7dbe38d6721..9c405fbfc3c 100644 --- a/src/include/binder/binder.h +++ b/src/include/binder/binder.h @@ -5,7 +5,7 @@ #include "binder/query/bound_regular_query.h" #include "binder/query/query_graph.h" #include "catalog/catalog_entry/table_catalog_entry.h" -#include "common/copier_config/reader_config.h" +#include "common/copier_config/file_scan_info.h" #include "common/enums/table_type.h" #include "parser/ddl/parsed_property_definition.h" #include "parser/query/graph_pattern/pattern_element.h" @@ -302,7 +302,7 @@ class Binder { void restoreScope(BinderScope prevScope); function::TableFunction getScanFunction(common::FileTypeInfo typeInfo, - const common::ReaderConfig& config); + const common::FileScanInfo& fileScanInfo); ExpressionBinder* getExpressionBinder() { return &expressionBinder; } diff --git a/src/include/binder/bound_export_database.h b/src/include/binder/bound_export_database.h index 3e0ace21a4b..bd12d24ad8a 100644 --- a/src/include/binder/bound_export_database.h +++ b/src/include/binder/bound_export_database.h @@ -2,7 +2,7 @@ #include "binder/binder.h" #include "binder/bound_statement.h" #include "binder/query/bound_regular_query.h" -#include "common/copier_config/reader_config.h" +#include "common/copier_config/file_scan_info.h" namespace kuzu { namespace binder { @@ -34,12 +34,12 @@ class BoundExportDatabase final : public BoundStatement { common::case_insensitive_map_t getExportOptions() const { return boundFileInfo.options; } - const common::ReaderConfig* getBoundFileInfo() const { return &boundFileInfo; } + const common::FileScanInfo* getBoundFileInfo() const { return &boundFileInfo; } const std::vector* getExportData() const { return &exportData; } private: std::vector exportData; - common::ReaderConfig boundFileInfo; + common::FileScanInfo boundFileInfo; }; } // namespace binder diff --git a/src/include/common/copier_config/reader_config.h b/src/include/common/copier_config/file_scan_info.h similarity index 86% rename from src/include/common/copier_config/reader_config.h rename to src/include/common/copier_config/file_scan_info.h index 005d2a87b37..f158e3f2a45 100644 --- a/src/include/common/copier_config/reader_config.h +++ b/src/include/common/copier_config/file_scan_info.h @@ -28,17 +28,17 @@ struct FileTypeUtils { static FileType fromString(std::string fileType); }; -struct ReaderConfig { +struct FileScanInfo { static constexpr const char* FILE_FORMAT_OPTION_NAME = "FILE_FORMAT"; FileTypeInfo fileTypeInfo; std::vector filePaths; case_insensitive_map_t options; - ReaderConfig() : fileTypeInfo{FileType::UNKNOWN, ""} {} - ReaderConfig(FileTypeInfo fileTypeInfo, std::vector filePaths) + FileScanInfo() : fileTypeInfo{FileType::UNKNOWN, ""} {} + FileScanInfo(FileTypeInfo fileTypeInfo, std::vector filePaths) : fileTypeInfo{std::move(fileTypeInfo)}, filePaths{std::move(filePaths)} {} - EXPLICIT_COPY_DEFAULT_MOVE(ReaderConfig); + EXPLICIT_COPY_DEFAULT_MOVE(FileScanInfo); uint32_t getNumFiles() const { return filePaths.size(); } std::string getFilePath(idx_t fileIdx) const { @@ -57,7 +57,7 @@ struct ReaderConfig { } private: - ReaderConfig(const ReaderConfig& other) + FileScanInfo(const FileScanInfo& other) : fileTypeInfo{other.fileTypeInfo}, filePaths{other.filePaths}, options{other.options} {} }; diff --git a/src/include/function/table/bind_data.h b/src/include/function/table/bind_data.h index 87afaed09eb..d9032622735 100644 --- a/src/include/function/table/bind_data.h +++ b/src/include/function/table/bind_data.h @@ -1,6 +1,6 @@ #pragma once -#include "common/copier_config/reader_config.h" +#include "common/copier_config/file_scan_info.h" #include "common/types/types.h" #include "main/client_context.h" #include "storage/predicate/column_predicate.h" @@ -63,19 +63,21 @@ struct KUZU_API TableFuncBindData { }; struct KUZU_API ScanBindData : public TableFuncBindData { - common::ReaderConfig config; + common::FileScanInfo fileScanInfo; main::ClientContext* context; - ScanBindData(binder::expression_vector columns, common::ReaderConfig config, + ScanBindData(binder::expression_vector columns, common::FileScanInfo fileScanInfo, main::ClientContext* context) - : TableFuncBindData{std::move(columns)}, config{std::move(config)}, context{context} {} - ScanBindData(binder::expression_vector columns, common::ReaderConfig config, + : TableFuncBindData{std::move(columns)}, fileScanInfo{std::move(fileScanInfo)}, + context{context} {} + ScanBindData(binder::expression_vector columns, common::FileScanInfo fileScanInfo, main::ClientContext* context, common::column_id_t numWarningDataColumns, common::row_idx_t estCardinality) : TableFuncBindData{std::move(columns), numWarningDataColumns, estCardinality}, - config{std::move(config)}, context{context} {} + fileScanInfo{std::move(fileScanInfo)}, context{context} {} ScanBindData(const ScanBindData& other) - : TableFuncBindData{other}, config{other.config.copy()}, context{other.context} {} + : TableFuncBindData{other}, fileScanInfo{other.fileScanInfo.copy()}, + context{other.context} {} bool getIgnoreErrorsOption() const override; diff --git a/src/include/function/table/bind_input.h b/src/include/function/table/bind_input.h index d52caa39f37..f43a86f6906 100644 --- a/src/include/function/table/bind_input.h +++ b/src/include/function/table/bind_input.h @@ -4,7 +4,7 @@ #include "binder/expression/expression.h" #include "common/case_insensitive_map.h" -#include "common/copier_config/reader_config.h" +#include "common/copier_config/file_scan_info.h" #include "common/types/value/value.h" namespace kuzu { @@ -52,7 +52,7 @@ struct KUZU_API TableFuncBindInput { }; struct KUZU_API ExtraScanTableFuncBindInput : ExtraTableFuncBindInput { - common::ReaderConfig config; + common::FileScanInfo fileScanInfo; std::vector expectedColumnNames; std::vector expectedColumnTypes; function::TableFunction* tableFunction = nullptr; diff --git a/src/include/function/table/scan_functions.h b/src/include/function/table/scan_functions.h index cfd6ac143e6..9e8d711a1ec 100644 --- a/src/include/function/table/scan_functions.h +++ b/src/include/function/table/scan_functions.h @@ -2,7 +2,7 @@ #include -#include "common/copier_config/reader_config.h" +#include "common/copier_config/file_scan_info.h" #include "function/table_functions.h" namespace kuzu { @@ -27,12 +27,12 @@ struct BaseScanSharedStateWithNumRows : public BaseScanSharedState { }; struct ScanSharedState : public BaseScanSharedStateWithNumRows { - const common::ReaderConfig readerConfig; + const common::FileScanInfo fileScanInfo; uint64_t fileIdx; uint64_t blockIdx; - ScanSharedState(common::ReaderConfig readerConfig, uint64_t numRows) - : BaseScanSharedStateWithNumRows{numRows}, readerConfig{std::move(readerConfig)}, + ScanSharedState(common::FileScanInfo fileScanInfo, uint64_t numRows) + : BaseScanSharedStateWithNumRows{numRows}, fileScanInfo{std::move(fileScanInfo)}, fileIdx{0}, blockIdx{0} {} std::pair getNext(); @@ -43,9 +43,9 @@ struct ScanFileSharedState : public ScanSharedState { uint64_t totalSize; // TODO(Mattias): I think we should unify the design on how we calculate the // progress bar for scanning. Can we simply rely on a numRowsScaned stored // in the TableFuncSharedState to determine the progress. - ScanFileSharedState(common::ReaderConfig readerConfig, uint64_t numRows, + ScanFileSharedState(common::FileScanInfo fileScanInfo, uint64_t numRows, main::ClientContext* context) - : ScanSharedState{std::move(readerConfig), numRows}, context{context}, totalSize{0} {} + : ScanSharedState{std::move(fileScanInfo), numRows}, context{context}, totalSize{0} {} }; } // namespace function diff --git a/src/include/planner/operator/simple/logical_export_db.h b/src/include/planner/operator/simple/logical_export_db.h index 9fafa7cf8ff..0f455e37eab 100644 --- a/src/include/planner/operator/simple/logical_export_db.h +++ b/src/include/planner/operator/simple/logical_export_db.h @@ -1,18 +1,20 @@ #pragma once #include "common/copier_config/csv_reader_config.h" -#include "common/copier_config/reader_config.h" +#include "common/copier_config/file_scan_info.h" #include "logical_simple.h" namespace kuzu { namespace planner { class LogicalExportDatabase final : public LogicalSimple { + static constexpr LogicalOperatorType type_ = LogicalOperatorType::EXPORT_DATABASE; + public: - explicit LogicalExportDatabase(common::ReaderConfig boundFileInfo, + LogicalExportDatabase(common::FileScanInfo boundFileInfo, std::shared_ptr outputExpression, const std::vector>& plans) - : LogicalSimple{LogicalOperatorType::EXPORT_DATABASE, plans, std::move(outputExpression)}, + : LogicalSimple{type_, plans, std::move(outputExpression)}, boundFileInfo{std::move(boundFileInfo)} {} std::string getFilePath() const { return boundFileInfo.filePaths[0]; } @@ -21,7 +23,7 @@ class LogicalExportDatabase final : public LogicalSimple { auto csvConfig = common::CSVReaderConfig::construct(boundFileInfo.options); return csvConfig.option.copy(); } - const common::ReaderConfig* getBoundFileInfo() const { return &boundFileInfo; } + const common::FileScanInfo* getBoundFileInfo() const { return &boundFileInfo; } std::string getExpressionsForPrinting() const override { return std::string{}; } std::unique_ptr copy() override { @@ -30,7 +32,7 @@ class LogicalExportDatabase final : public LogicalSimple { } private: - common::ReaderConfig boundFileInfo; + common::FileScanInfo boundFileInfo; }; } // namespace planner diff --git a/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h index ec301c575bf..ac331cc04c4 100644 --- a/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h +++ b/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h @@ -12,7 +12,7 @@ namespace kuzu { namespace common { -struct ReaderConfig; +struct FileScanInfo; } namespace main { class ClientContext; @@ -68,7 +68,8 @@ class BaseCSVReader { std::string reconstructLine(uint64_t startPosition, uint64_t endPosition); static common::column_id_t appendWarningDataColumns(std::vector& resultColumnNames, - std::vector& resultColumnTypes, const common::ReaderConfig& config); + std::vector& resultColumnTypes, + const common::FileScanInfo& fileScanInfo); static PopulatedCopyFromError basePopulateErrorFunc(CopyFromFileError error, const SharedFileErrorHandler* sharedErrorHandler, BaseCSVReader* reader, diff --git a/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h index c8bac4989fd..dbdb95cd981 100644 --- a/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h +++ b/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h @@ -47,7 +47,7 @@ struct ParallelCSVScanSharedState final : public function::ScanFileSharedState { std::vector errorHandlers; populate_func_t populateErrorFunc; - ParallelCSVScanSharedState(common::ReaderConfig readerConfig, uint64_t numRows, + ParallelCSVScanSharedState(common::FileScanInfo fileScanInfo, uint64_t numRows, main::ClientContext* context, common::CSVOption csvOption, CSVColumnInfo columnInfo); void setFileComplete(uint64_t completedFileIdx); diff --git a/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h index 9fef2fb33f4..d0ff2fdb3f8 100644 --- a/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h +++ b/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h @@ -42,7 +42,7 @@ struct SerialCSVScanSharedState final : public function::ScanFileSharedState { uint64_t queryID; populate_func_t populateErrorFunc; - SerialCSVScanSharedState(common::ReaderConfig readerConfig, uint64_t numRows, + SerialCSVScanSharedState(common::FileScanInfo fileScanInfo, uint64_t numRows, main::ClientContext* context, common::CSVOption csvOption, CSVColumnInfo columnInfo, uint64_t queryID); diff --git a/src/include/processor/operator/persistent/reader/npy/npy_reader.h b/src/include/processor/operator/persistent/reader/npy/npy_reader.h index c2a76f85aeb..9681ecff157 100644 --- a/src/include/processor/operator/persistent/reader/npy/npy_reader.h +++ b/src/include/processor/operator/persistent/reader/npy/npy_reader.h @@ -56,7 +56,7 @@ class NpyMultiFileReader { }; struct NpyScanSharedState final : public function::ScanSharedState { - explicit NpyScanSharedState(const common::ReaderConfig readerConfig, uint64_t numRows); + explicit NpyScanSharedState(const common::FileScanInfo fileScanInfo, uint64_t numRows); std::unique_ptr npyMultiFileReader; }; diff --git a/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h b/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h index 04dcebe4e8d..9e0f627d917 100644 --- a/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h +++ b/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h @@ -92,7 +92,7 @@ class ParquetReader { }; struct ParquetScanSharedState final : function::ScanFileSharedState { - explicit ParquetScanSharedState(common::ReaderConfig readerConfig, uint64_t numRows, + explicit ParquetScanSharedState(common::FileScanInfo fileScanInfo, uint64_t numRows, main::ClientContext* context, std::vector columnSkips); std::vector> readers; diff --git a/src/include/processor/operator/simple/export_db.h b/src/include/processor/operator/simple/export_db.h index b0f0b6e09ad..ab28bf70f3d 100644 --- a/src/include/processor/operator/simple/export_db.h +++ b/src/include/processor/operator/simple/export_db.h @@ -1,6 +1,6 @@ #pragma once -#include "common/copier_config/reader_config.h" +#include "common/copier_config/file_scan_info.h" #include "processor/operator/simple/simple.h" namespace kuzu { @@ -28,7 +28,7 @@ class ExportDB final : public Simple { static constexpr PhysicalOperatorType type_ = PhysicalOperatorType::EXPORT_DATABASE; public: - ExportDB(common::ReaderConfig boundFileInfo, const DataPos& outputPos, uint32_t id, + ExportDB(common::FileScanInfo boundFileInfo, const DataPos& outputPos, uint32_t id, std::unique_ptr printInfo) : Simple{type_, outputPos, id, std::move(printInfo)}, boundFileInfo{std::move(boundFileInfo)} {} @@ -44,7 +44,7 @@ class ExportDB final : public Simple { } private: - common::ReaderConfig boundFileInfo; + common::FileScanInfo boundFileInfo; }; } // namespace processor } // namespace kuzu diff --git a/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp index aa9290ac39b..f73afa5e3b9 100644 --- a/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp @@ -518,8 +518,8 @@ uint64_t BaseCSVReader::parseCSV(Driver& driver) { } column_id_t BaseCSVReader::appendWarningDataColumns(std::vector& resultColumnNames, - std::vector& resultColumnTypes, const common::ReaderConfig& config) { - const bool ignoreErrors = config.getOption(CopyConstants::IGNORE_ERRORS_OPTION_NAME, + std::vector& resultColumnTypes, const common::FileScanInfo& fileScanInfo) { + const bool ignoreErrors = fileScanInfo.getOption(CopyConstants::IGNORE_ERRORS_OPTION_NAME, CopyConstants::DEFAULT_IGNORE_ERRORS); column_id_t numWarningDataColumns = 0; if (ignoreErrors) { diff --git a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp index 9c29d14125d..4fc7d258c35 100644 --- a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp @@ -119,12 +119,12 @@ bool ParallelCSVReader::finishedBlock() const { return getFileOffset() > (currentBlockIdx + 1) * CopyConstants::PARALLEL_BLOCK_SIZE; } -ParallelCSVScanSharedState::ParallelCSVScanSharedState(ReaderConfig readerConfig, uint64_t numRows, +ParallelCSVScanSharedState::ParallelCSVScanSharedState(FileScanInfo fileScanInfo, uint64_t numRows, main::ClientContext* context, CSVOption csvOption, CSVColumnInfo columnInfo) - : ScanFileSharedState{std::move(readerConfig), numRows, context}, + : ScanFileSharedState{std::move(fileScanInfo), numRows, context}, csvOption{std::move(csvOption)}, columnInfo{std::move(columnInfo)}, numBlocksReadByFiles{0} { - errorHandlers.reserve(this->readerConfig.getNumFiles()); - for (idx_t i = 0; i < this->readerConfig.getNumFiles(); ++i) { + errorHandlers.reserve(this->fileScanInfo.getNumFiles()); + for (idx_t i = 0; i < this->fileScanInfo.getNumFiles(); ++i) { errorHandlers.emplace_back(i, &lock); } populateErrorFunc = constructPopulateFunc(); @@ -134,21 +134,21 @@ ParallelCSVScanSharedState::ParallelCSVScanSharedState(ReaderConfig readerConfig } populate_func_t ParallelCSVScanSharedState::constructPopulateFunc() { - const auto numFiles = readerConfig.getNumFiles(); + const auto numFiles = fileScanInfo.getNumFiles(); auto localErrorHandlers = std::vector>(numFiles); auto readers = std::vector>(numFiles); for (idx_t i = 0; i < numFiles; ++i) { // If we run into errors while reconstructing lines they should be unrecoverable localErrorHandlers[i] = std::make_shared(&errorHandlers[i], false, context); - readers[i] = std::make_shared(readerConfig.filePaths[i], i, + readers[i] = std::make_shared(fileScanInfo.filePaths[i], i, csvOption.copy(), columnInfo.copy(), context, localErrorHandlers[i].get()); } return [this, movedErrorHandlers = std::move(localErrorHandlers), movedReaders = std::move(readers)](CopyFromFileError error, idx_t fileIdx) -> PopulatedCopyFromError { return BaseCSVReader::basePopulateErrorFunc(std::move(error), &errorHandlers[fileIdx], - movedReaders[fileIdx].get(), readerConfig.getFilePath(fileIdx)); + movedReaders[fileIdx].get(), fileScanInfo.getFilePath(fileIdx)); }; } @@ -188,7 +188,7 @@ static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& output) std::make_unique(&sharedState->errorHandlers[fileIdx], sharedState->csvOption.ignoreErrors, sharedState->context, true); localState->reader = - std::make_unique(sharedState->readerConfig.filePaths[fileIdx], + std::make_unique(sharedState->fileScanInfo.filePaths[fileIdx], fileIdx, sharedState->csvOption.copy(), sharedState->columnInfo.copy(), sharedState->context, localState->errorHandler.get()); } @@ -221,7 +221,7 @@ static std::unique_ptr bindFunc(main::ClientContext* context, bool detectedHeader = false; DialectOption detectedDialect; - auto csvOption = CSVReaderConfig::construct(scanInput->config.options).option; + auto csvOption = CSVReaderConfig::construct(scanInput->fileScanInfo.options).option; detectedDialect.doDialectDetection = csvOption.autoDetection; std::vector detectedColumnNames; @@ -238,40 +238,43 @@ static std::unique_ptr bindFunc(main::ClientContext* context, std::string quote(1, detectedDialect.quoteChar); std::string delim(1, detectedDialect.delimiter); std::string escape(1, detectedDialect.escapeChar); - scanInput->config.options.insert_or_assign("ESCAPE", Value(LogicalType::STRING(), escape)); - scanInput->config.options.insert_or_assign("QUOTE", Value(LogicalType::STRING(), quote)); - scanInput->config.options.insert_or_assign("DELIM", Value(LogicalType::STRING(), delim)); + scanInput->fileScanInfo.options.insert_or_assign("ESCAPE", + Value(LogicalType::STRING(), escape)); + scanInput->fileScanInfo.options.insert_or_assign("QUOTE", + Value(LogicalType::STRING(), quote)); + scanInput->fileScanInfo.options.insert_or_assign("DELIM", + Value(LogicalType::STRING(), delim)); } if (!csvOption.setHeader && csvOption.autoDetection && detectedHeader) { - scanInput->config.options.insert_or_assign("HEADER", Value(detectedHeader)); + scanInput->fileScanInfo.options.insert_or_assign("HEADER", Value(detectedHeader)); } auto resultColumns = input->binder->createVariables(resultColumnNames, resultColumnTypes); std::vector warningColumnNames; std::vector warningColumnTypes; const column_id_t numWarningDataColumns = BaseCSVReader::appendWarningDataColumns( - warningColumnNames, warningColumnTypes, scanInput->config); + warningColumnNames, warningColumnTypes, scanInput->fileScanInfo); auto warningColumns = input->binder->createInvisibleVariables(warningColumnNames, warningColumnTypes); for (auto& column : warningColumns) { resultColumns.push_back(column); } - return std::make_unique(std::move(resultColumns), scanInput->config.copy(), + return std::make_unique(std::move(resultColumns), scanInput->fileScanInfo.copy(), context, numWarningDataColumns, 0 /* estCardinality */); } static std::unique_ptr initSharedState(const TableFunctionInitInput& input) { auto bindData = input.bindData->constPtrCast(); - auto csvOption = CSVReaderConfig::construct(bindData->config.options).option; + auto csvOption = CSVReaderConfig::construct(bindData->fileScanInfo.options).option; row_idx_t numRows = 0; auto columnInfo = CSVColumnInfo(bindData->getNumColumns() - bindData->numWarningDataColumns, bindData->getColumnSkips(), bindData->numWarningDataColumns); - auto sharedState = std::make_unique(bindData->config.copy(), + auto sharedState = std::make_unique(bindData->fileScanInfo.copy(), numRows, bindData->context, csvOption.copy(), columnInfo.copy()); - for (idx_t i = 0; i < sharedState->readerConfig.getNumFiles(); ++i) { - auto filePath = sharedState->readerConfig.filePaths[i]; + for (idx_t i = 0; i < sharedState->fileScanInfo.getNumFiles(); ++i) { + auto filePath = sharedState->fileScanInfo.filePaths[i]; auto reader = std::make_unique(filePath, i, csvOption.copy(), columnInfo.copy(), bindData->context, nullptr); sharedState->totalSize += reader->getFileSize(); @@ -290,7 +293,7 @@ static std::unique_ptr initLocalState(const TableFunctionIn static double progressFunc(TableFuncSharedState* sharedState) { auto state = sharedState->ptrCast(); - if (state->fileIdx >= state->readerConfig.getNumFiles()) { + if (state->fileIdx >= state->fileScanInfo.getNumFiles()) { return 1.0; } if (state->totalSize == 0) { @@ -306,7 +309,7 @@ static double progressFunc(TableFuncSharedState* sharedState) { static void finalizeFunc(const ExecutionContext* ctx, TableFuncSharedState* sharedState) { auto state = ku_dynamic_cast(sharedState); - for (idx_t i = 0; i < state->readerConfig.getNumFiles(); ++i) { + for (idx_t i = 0; i < state->fileScanInfo.getNumFiles(); ++i) { state->errorHandlers[i].throwCachedErrorsIfNeeded(); } ctx->clientContext->getWarningContextUnsafe().populateWarnings(ctx->queryID, diff --git a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp index c5652a553af..b3fbd8fc118 100644 --- a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp @@ -21,7 +21,7 @@ SerialCSVReader::SerialCSVReader(const std::string& filePath, idx_t fileIdx, CSV std::vector> SerialCSVReader::sniffCSV( DialectOption& detectedDialect, bool& detectedHeader) { - auto csvOption = CSVReaderConfig::construct(bindInput->config.options).option; + auto csvOption = CSVReaderConfig::construct(bindInput->fileScanInfo.options).option; readBOM(); if (detectedDialect.doDialectDetection) { @@ -78,9 +78,9 @@ uint64_t SerialCSVReader::parseBlock(block_idx_t blockIdx, DataChunk& resultChun return numRowsRead; } -SerialCSVScanSharedState::SerialCSVScanSharedState(ReaderConfig readerConfig, uint64_t numRows, +SerialCSVScanSharedState::SerialCSVScanSharedState(FileScanInfo fileScanInfo, uint64_t numRows, main::ClientContext* context, CSVOption csvOption, CSVColumnInfo columnInfo, uint64_t queryID) - : ScanFileSharedState{std::move(readerConfig), numRows, context}, + : ScanFileSharedState{std::move(fileScanInfo), numRows, context}, csvOption{std::move(csvOption)}, columnInfo{std::move(columnInfo)}, totalReadSizeByFile{0}, queryID(queryID), populateErrorFunc(constructPopulateFunc()) { initReader(context); @@ -89,14 +89,14 @@ SerialCSVScanSharedState::SerialCSVScanSharedState(ReaderConfig readerConfig, ui populate_func_t SerialCSVScanSharedState::constructPopulateFunc() const { return [this](CopyFromFileError error, idx_t fileIdx) -> PopulatedCopyFromError { return BaseCSVReader::basePopulateErrorFunc(std::move(error), sharedErrorHandler.get(), - reader.get(), readerConfig.getFilePath(fileIdx)); + reader.get(), fileScanInfo.getFilePath(fileIdx)); }; } void SerialCSVScanSharedState::read(DataChunk& outputChunk) { std::lock_guard mtx{lock}; do { - if (fileIdx >= readerConfig.getNumFiles()) { + if (fileIdx >= fileScanInfo.getNumFiles()) { return; } uint64_t numRows = reader->parseBlock(reader->getFileOffset() == 0 ? 0 : 1, outputChunk); @@ -122,12 +122,12 @@ void SerialCSVScanSharedState::finalizeReader(main::ClientContext* context) cons } void SerialCSVScanSharedState::initReader(main::ClientContext* context) { - if (fileIdx < readerConfig.getNumFiles()) { + if (fileIdx < fileScanInfo.getNumFiles()) { sharedErrorHandler = std::make_unique(fileIdx, nullptr, populateErrorFunc); localErrorHandler = std::make_unique(sharedErrorHandler.get(), csvOption.ignoreErrors, context); - reader = std::make_unique(readerConfig.filePaths[fileIdx], fileIdx, + reader = std::make_unique(fileScanInfo.filePaths[fileIdx], fileIdx, csvOption.copy(), columnInfo.copy(), context, localErrorHandler.get()); } } @@ -141,20 +141,20 @@ static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& output) static void bindColumnsFromFile(const ExtraScanTableFuncBindInput* bindInput, uint32_t fileIdx, std::vector& columnNames, std::vector& columnTypes, DialectOption& detectedDialect, bool& detectedHeader, main::ClientContext* context) { - auto csvOption = CSVReaderConfig::construct(bindInput->config.options).option; + auto csvOption = CSVReaderConfig::construct(bindInput->fileScanInfo.options).option; auto columnInfo = CSVColumnInfo(bindInput->expectedColumnNames.size() /* numColumns */, {} /* columnSkips */, {} /*warningDataColumns*/); SharedFileErrorHandler sharedErrorHandler{fileIdx, nullptr}; // We don't want to cache CSV errors encountered during sniffing, they will be re-encountered // when actually parsing LocalFileErrorHandler errorHandler{&sharedErrorHandler, csvOption.ignoreErrors, context, false}; - auto csvReader = SerialCSVReader(bindInput->config.filePaths[fileIdx], fileIdx, + auto csvReader = SerialCSVReader(bindInput->fileScanInfo.filePaths[fileIdx], fileIdx, csvOption.copy(), columnInfo.copy(), context, &errorHandler, bindInput); sharedErrorHandler.setPopulateErrorFunc( [&sharedErrorHandler, &csvReader, bindInput](CopyFromFileError error, idx_t fileIdx) -> PopulatedCopyFromError { return BaseCSVReader::basePopulateErrorFunc(std::move(error), &sharedErrorHandler, - &csvReader, bindInput->config.filePaths[fileIdx]); + &csvReader, bindInput->fileScanInfo.filePaths[fileIdx]); }); auto sniffedColumns = csvReader.sniffCSV(detectedDialect, detectedHeader); sharedErrorHandler.throwCachedErrorsIfNeeded(); @@ -167,10 +167,10 @@ static void bindColumnsFromFile(const ExtraScanTableFuncBindInput* bindInput, ui void SerialCSVScan::bindColumns(const ExtraScanTableFuncBindInput* bindInput, std::vector& columnNames, std::vector& columnTypes, DialectOption& detectedDialect, bool& detectedHeader, main::ClientContext* context) { - KU_ASSERT(bindInput->config.getNumFiles() > 0); + KU_ASSERT(bindInput->fileScanInfo.getNumFiles() > 0); bindColumnsFromFile(bindInput, 0, columnNames, columnTypes, detectedDialect, detectedHeader, context); - for (auto i = 1u; i < bindInput->config.getNumFiles(); ++i) { + for (auto i = 1u; i < bindInput->fileScanInfo.getNumFiles(); ++i) { std::vector tmpColumnNames; std::vector tmpColumnTypes; bindColumnsFromFile(bindInput, i, tmpColumnNames, tmpColumnTypes, detectedDialect, @@ -183,14 +183,14 @@ static std::unique_ptr bindFunc(main::ClientContext* context, const TableFuncBindInput* input) { auto scanInput = ku_dynamic_cast(input->extraInput.get()); if (scanInput->expectedColumnTypes.size() > 0) { - scanInput->config.options.insert_or_assign("SAMPLE_SIZE", + scanInput->fileScanInfo.options.insert_or_assign("SAMPLE_SIZE", Value((int64_t)0)); // only scan headers } bool detectedHeader = false; DialectOption detectedDialect; - auto csvOption = CSVReaderConfig::construct(scanInput->config.options).option; + auto csvOption = CSVReaderConfig::construct(scanInput->fileScanInfo.options).option; detectedDialect.doDialectDetection = csvOption.autoDetection; std::vector detectedColumnNames; @@ -207,39 +207,42 @@ static std::unique_ptr bindFunc(main::ClientContext* context, std::string quote(1, detectedDialect.quoteChar); std::string delim(1, detectedDialect.delimiter); std::string escape(1, detectedDialect.escapeChar); - scanInput->config.options.insert_or_assign("ESCAPE", Value(LogicalType::STRING(), escape)); - scanInput->config.options.insert_or_assign("QUOTE", Value(LogicalType::STRING(), quote)); - scanInput->config.options.insert_or_assign("DELIM", Value(LogicalType::STRING(), delim)); + scanInput->fileScanInfo.options.insert_or_assign("ESCAPE", + Value(LogicalType::STRING(), escape)); + scanInput->fileScanInfo.options.insert_or_assign("QUOTE", + Value(LogicalType::STRING(), quote)); + scanInput->fileScanInfo.options.insert_or_assign("DELIM", + Value(LogicalType::STRING(), delim)); } if (!csvOption.setHeader && csvOption.autoDetection && detectedHeader) { - scanInput->config.options.insert_or_assign("HEADER", Value(detectedHeader)); + scanInput->fileScanInfo.options.insert_or_assign("HEADER", Value(detectedHeader)); } auto resultColumns = input->binder->createVariables(resultColumnNames, resultColumnTypes); std::vector warningColumnNames; std::vector warningColumnTypes; const column_id_t numWarningDataColumns = BaseCSVReader::appendWarningDataColumns( - warningColumnNames, warningColumnTypes, scanInput->config); + warningColumnNames, warningColumnTypes, scanInput->fileScanInfo); auto warningColumns = input->binder->createInvisibleVariables(warningColumnNames, warningColumnTypes); for (auto& column : warningColumns) { resultColumns.push_back(column); } - return std::make_unique(std::move(resultColumns), scanInput->config.copy(), + return std::make_unique(std::move(resultColumns), scanInput->fileScanInfo.copy(), context, numWarningDataColumns, 0 /* estCardinality */); } static std::unique_ptr initSharedState(const TableFunctionInitInput& input) { auto bindData = input.bindData->constPtrCast(); - auto csvOption = CSVReaderConfig::construct(bindData->config.options).option; + auto csvOption = CSVReaderConfig::construct(bindData->fileScanInfo.options).option; row_idx_t numRows = 0; auto columnInfo = CSVColumnInfo(bindData->getNumColumns() - bindData->numWarningDataColumns, bindData->getColumnSkips(), bindData->numWarningDataColumns); - auto sharedState = std::make_unique(bindData->config.copy(), numRows, - bindData->context, csvOption.copy(), columnInfo.copy(), input.queryID); - for (idx_t i = 0; i < sharedState->readerConfig.filePaths.size(); ++i) { - const auto& filePath = sharedState->readerConfig.filePaths[i]; + auto sharedState = std::make_unique(bindData->fileScanInfo.copy(), + numRows, bindData->context, csvOption.copy(), columnInfo.copy(), input.queryID); + for (idx_t i = 0; i < sharedState->fileScanInfo.filePaths.size(); ++i) { + const auto& filePath = sharedState->fileScanInfo.filePaths[i]; auto reader = std::make_unique(filePath, i, csvOption.copy(), columnInfo.copy(), sharedState->context, nullptr); sharedState->totalSize += reader->getFileSize(); @@ -261,7 +264,7 @@ static double progressFunc(TableFuncSharedState* sharedState) { auto state = ku_dynamic_cast(sharedState); if (state->totalSize == 0) { return 0.0; - } else if (state->fileIdx >= state->readerConfig.getNumFiles()) { + } else if (state->fileIdx >= state->fileScanInfo.getNumFiles()) { return 1.0; } uint64_t totalReadSize = state->totalReadSizeByFile + state->reader->getFileOffset(); diff --git a/src/processor/operator/persistent/reader/npy/npy_reader.cpp b/src/processor/operator/persistent/reader/npy/npy_reader.cpp index cf3e0b92d9a..f87939c316d 100644 --- a/src/processor/operator/persistent/reader/npy/npy_reader.cpp +++ b/src/processor/operator/persistent/reader/npy/npy_reader.cpp @@ -252,9 +252,9 @@ void NpyMultiFileReader::readBlock(block_idx_t blockIdx, DataChunk& dataChunkToR } } -NpyScanSharedState::NpyScanSharedState(ReaderConfig readerConfig, uint64_t numRows) - : ScanSharedState{std::move(readerConfig), numRows} { - npyMultiFileReader = std::make_unique(this->readerConfig.filePaths); +NpyScanSharedState::NpyScanSharedState(FileScanInfo fileScanInfo, uint64_t numRows) + : ScanSharedState{std::move(fileScanInfo), numRows} { + npyMultiFileReader = std::make_unique(this->fileScanInfo.filePaths); } static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& output) { @@ -273,23 +273,23 @@ static LogicalType bindColumnType(const NpyReader& reader) { return LogicalType::ARRAY(LogicalType(reader.getType()), reader.getNumElementsPerRow()); } -static void bindColumns(const ReaderConfig& readerConfig, uint32_t fileIdx, +static void bindColumns(const FileScanInfo& fileScanInfo, uint32_t fileIdx, std::vector& columnNames, std::vector& columnTypes) { - auto reader = NpyReader(readerConfig.filePaths[fileIdx]); // TODO: double check + auto reader = NpyReader(fileScanInfo.filePaths[fileIdx]); // TODO: double check auto columnName = std::string("column" + std::to_string(fileIdx)); auto columnType = bindColumnType(reader); columnNames.push_back(columnName); columnTypes.push_back(std::move(columnType)); } -static void bindColumns(const ReaderConfig& readerConfig, std::vector& columnNames, +static void bindColumns(const FileScanInfo& fileScanInfo, std::vector& columnNames, std::vector& columnTypes) { - KU_ASSERT(readerConfig.getNumFiles() > 0); - bindColumns(readerConfig, 0, columnNames, columnTypes); - for (auto i = 1u; i < readerConfig.getNumFiles(); ++i) { + KU_ASSERT(fileScanInfo.getNumFiles() > 0); + bindColumns(fileScanInfo, 0, columnNames, columnTypes); + for (auto i = 1u; i < fileScanInfo.getNumFiles(); ++i) { std::vector tmpColumnNames; std::vector tmpColumnTypes; - bindColumns(readerConfig, i, tmpColumnNames, tmpColumnTypes); + bindColumns(fileScanInfo, i, tmpColumnNames, tmpColumnTypes); ReaderBindUtils::validateNumColumns(1, tmpColumnTypes.size()); columnNames.push_back(tmpColumnNames[0]); columnTypes.push_back(std::move(tmpColumnTypes[0])); @@ -299,19 +299,19 @@ static void bindColumns(const ReaderConfig& readerConfig, std::vector bindFunc(main::ClientContext* context, const TableFuncBindInput* input) { auto scanInput = ku_dynamic_cast(input->extraInput.get()); - if (scanInput->config.options.size() > 1 || - (scanInput->config.options.size() == 1 && - !scanInput->config.options.contains(CopyConstants::IGNORE_ERRORS_OPTION_NAME))) { + if (scanInput->fileScanInfo.options.size() > 1 || + (scanInput->fileScanInfo.options.size() == 1 && + !scanInput->fileScanInfo.options.contains(CopyConstants::IGNORE_ERRORS_OPTION_NAME))) { throw BinderException{"Copy from numpy cannot have options other than IGNORE_ERRORS."}; } std::vector detectedColumnNames; std::vector detectedColumnTypes; - bindColumns(scanInput->config, detectedColumnNames, detectedColumnTypes); + bindColumns(scanInput->fileScanInfo, detectedColumnNames, detectedColumnTypes); std::vector resultColumnNames; std::vector resultColumnTypes; ReaderBindUtils::resolveColumns(scanInput->expectedColumnNames, detectedColumnNames, resultColumnNames, scanInput->expectedColumnTypes, detectedColumnTypes, resultColumnTypes); - auto config = scanInput->config.copy(); + auto config = scanInput->fileScanInfo.copy(); KU_ASSERT(!config.filePaths.empty() && config.getNumFiles() == resultColumnNames.size()); row_idx_t numRows = 0; for (auto i = 0u; i < config.getNumFiles(); i++) { @@ -322,14 +322,15 @@ static std::unique_ptr bindFunc(main::ClientContext* context, reader->validate(resultColumnTypes[i], numRows); } auto columns = input->binder->createVariables(resultColumnNames, resultColumnTypes); - return std::make_unique(columns, scanInput->config.copy(), context, + return std::make_unique(columns, scanInput->fileScanInfo.copy(), context, 0 /* numWarningColumns*/, numRows); } static std::unique_ptr initSharedState(const TableFunctionInitInput& input) { auto bindData = input.bindData->constPtrCast(); - auto reader = make_unique(bindData->config.filePaths[0]); - return std::make_unique(bindData->config.copy(), reader->getNumRows()); + auto reader = make_unique(bindData->fileScanInfo.filePaths[0]); + return std::make_unique(bindData->fileScanInfo.copy(), + reader->getNumRows()); } static void finalizeFunc(const ExecutionContext* ctx, TableFuncSharedState*) { diff --git a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp index fe1be813c14..b85116fe33e 100644 --- a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp +++ b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp @@ -574,15 +574,15 @@ uint64_t ParquetReader::getGroupOffset(ParquetReaderScanState& state) { return minOffset; } -ParquetScanSharedState::ParquetScanSharedState(ReaderConfig readerConfig, uint64_t numRows, +ParquetScanSharedState::ParquetScanSharedState(FileScanInfo fileScanInfo, uint64_t numRows, main::ClientContext* context, std::vector columnSkips) - : ScanFileSharedState{std::move(readerConfig), numRows, context}, columnSkips{columnSkips} { - readers.push_back(std::make_unique(this->readerConfig.filePaths[fileIdx], + : ScanFileSharedState{std::move(fileScanInfo), numRows, context}, columnSkips{columnSkips} { + readers.push_back(std::make_unique(this->fileScanInfo.filePaths[fileIdx], columnSkips, context)); totalRowsGroups = 0; - for (auto i = fileIdx; i < this->readerConfig.getNumFiles(); i++) { + for (auto i = fileIdx; i < this->fileScanInfo.getNumFiles(); i++) { auto reader = - std::make_unique(this->readerConfig.filePaths[i], columnSkips, context); + std::make_unique(this->fileScanInfo.filePaths[i], columnSkips, context); totalRowsGroups += reader->getNumRowsGroups(); } numBlocksReadByFiles = 0; @@ -592,7 +592,7 @@ static bool parquetSharedStateNext(ParquetScanLocalState& localState, ParquetScanSharedState& sharedState) { std::lock_guard mtx{sharedState.lock}; while (true) { - if (sharedState.fileIdx >= sharedState.readerConfig.getNumFiles()) { + if (sharedState.fileIdx >= sharedState.fileScanInfo.getNumFiles()) { return false; } if (sharedState.blockIdx < sharedState.readers[sharedState.fileIdx]->getNumRowsGroups()) { @@ -606,11 +606,11 @@ static bool parquetSharedStateNext(ParquetScanLocalState& localState, sharedState.readers[sharedState.fileIdx]->getNumRowsGroups(); sharedState.blockIdx = 0; sharedState.fileIdx++; - if (sharedState.fileIdx >= sharedState.readerConfig.getNumFiles()) { + if (sharedState.fileIdx >= sharedState.fileScanInfo.getNumFiles()) { return false; } sharedState.readers.push_back(std::make_unique( - sharedState.readerConfig.filePaths[sharedState.fileIdx], sharedState.columnSkips, + sharedState.fileScanInfo.filePaths[sharedState.fileIdx], sharedState.columnSkips, sharedState.context)); continue; } @@ -638,7 +638,7 @@ static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& output) static void bindColumns(const ExtraScanTableFuncBindInput* bindInput, uint32_t fileIdx, std::vector& columnNames, std::vector& columnTypes, main::ClientContext* context) { - auto reader = ParquetReader(bindInput->config.filePaths[fileIdx], {}, context); + auto reader = ParquetReader(bindInput->fileScanInfo.filePaths[fileIdx], {}, context); auto state = std::make_unique(); reader.initializeScan(*state, std::vector{}, context->getVFSUnsafe()); for (auto i = 0u; i < reader.getNumColumns(); ++i) { @@ -650,9 +650,9 @@ static void bindColumns(const ExtraScanTableFuncBindInput* bindInput, uint32_t f static void bindColumns(const ExtraScanTableFuncBindInput* bindInput, std::vector& columnNames, std::vector& columnTypes, main::ClientContext* context) { - KU_ASSERT(bindInput->config.getNumFiles() > 0); + KU_ASSERT(bindInput->fileScanInfo.getNumFiles() > 0); bindColumns(bindInput, 0 /* fileIdx */, columnNames, columnTypes, context); - for (auto i = 1u; i < bindInput->config.getNumFiles(); ++i) { + for (auto i = 1u; i < bindInput->fileScanInfo.getNumFiles(); ++i) { std::vector tmpColumnNames; std::vector tmpColumnTypes; bindColumns(bindInput, i, tmpColumnNames, tmpColumnTypes, context); @@ -663,7 +663,7 @@ static void bindColumns(const ExtraScanTableFuncBindInput* bindInput, static row_idx_t getNumRows(const ScanBindData* bindData) { row_idx_t numRows = 0; - for (const auto& path : bindData->config.filePaths) { + for (const auto& path : bindData->fileScanInfo.filePaths) { auto reader = std::make_unique(path, bindData->getColumnSkips(), bindData->context); numRows += reader->getMetadata()->num_rows; @@ -674,9 +674,9 @@ static row_idx_t getNumRows(const ScanBindData* bindData) { static std::unique_ptr bindFunc(main::ClientContext* context, const TableFuncBindInput* input) { auto scanInput = ku_dynamic_cast(input->extraInput.get()); - if (scanInput->config.options.size() > 1 || - (scanInput->config.options.size() == 1 && - !scanInput->config.options.contains(CopyConstants::IGNORE_ERRORS_OPTION_NAME))) { + if (scanInput->fileScanInfo.options.size() > 1 || + (scanInput->fileScanInfo.options.size() == 1 && + !scanInput->fileScanInfo.options.contains(CopyConstants::IGNORE_ERRORS_OPTION_NAME))) { throw BinderException{"Copy from Parquet cannot have options other than IGNORE_ERRORS."}; } std::vector detectedColumnNames; @@ -689,16 +689,16 @@ static std::unique_ptr bindFunc(main::ClientContext* context, } auto resultColumns = input->binder->createVariables(detectedColumnNames, detectedColumnTypes); - auto bindData = - std::make_unique(std::move(resultColumns), scanInput->config.copy(), context); + auto bindData = std::make_unique(std::move(resultColumns), + scanInput->fileScanInfo.copy(), context); bindData->cardinality = getNumRows(bindData.get()); return bindData; } static std::unique_ptr initSharedState(const TableFunctionInitInput& input) { auto bindData = input.bindData->constPtrCast(); - return std::make_unique(bindData->config.copy(), getNumRows(bindData), - bindData->context, bindData->getColumnSkips()); + return std::make_unique(bindData->fileScanInfo.copy(), + getNumRows(bindData), bindData->context, bindData->getColumnSkips()); } static std::unique_ptr initLocalState(const TableFunctionInitInput&, @@ -713,7 +713,7 @@ static std::unique_ptr initLocalState(const TableFunctionIn static double progressFunc(TableFuncSharedState* sharedState) { auto state = sharedState->ptrCast(); - if (state->fileIdx >= state->readerConfig.getNumFiles()) { + if (state->fileIdx >= state->fileScanInfo.getNumFiles()) { return 1.0; } if (state->totalRowsGroups == 0) { diff --git a/src/processor/operator/simple/export_db.cpp b/src/processor/operator/simple/export_db.cpp index 8f8d833f385..53230809e82 100644 --- a/src/processor/operator/simple/export_db.cpp +++ b/src/processor/operator/simple/export_db.cpp @@ -53,7 +53,7 @@ static void writeStringStreamToFile(ClientContext* context, const std::string& s } static void writeCopyStatement(stringstream& ss, const TableCatalogEntry* entry, - const ReaderConfig* boundFileInfo) { + const FileScanInfo* boundFileInfo) { auto fileTypeStr = boundFileInfo->fileTypeInfo.fileTypeStr; StringUtils::toLower(fileTypeStr); const auto csvConfig = CSVReaderConfig::construct(boundFileInfo->options); @@ -94,7 +94,7 @@ std::string getSchemaCypher(ClientContext* clientContext, Transaction* tx) { } std::string getCopyCypher(const Catalog* catalog, Transaction* tx, - const ReaderConfig* boundFileInfo) { + const FileScanInfo* boundFileInfo) { stringstream ss; for (const auto& nodeTableEntry : catalog->getNodeTableEntries(tx)) { writeCopyStatement(ss, nodeTableEntry, boundFileInfo); diff --git a/tools/python_api/src_cpp/include/pandas/pandas_scan.h b/tools/python_api/src_cpp/include/pandas/pandas_scan.h index 6dbf88391b6..5e3ddb684a6 100644 --- a/tools/python_api/src_cpp/include/pandas/pandas_scan.h +++ b/tools/python_api/src_cpp/include/pandas/pandas_scan.h @@ -31,13 +31,13 @@ struct PandasScanFunction { struct PandasScanFunctionData : public function::TableFuncBindData { py::handle df; std::vector> columnBindData; - common::ReaderConfig config; + common::FileScanInfo fileScanInfo; PandasScanFunctionData(binder::expression_vector columns, py::handle df, uint64_t numRows, std::vector> columnBindData, - common::ReaderConfig config) + common::FileScanInfo fileScanInfo) : TableFuncBindData{std::move(columns), 0 /* numWarningDataColumns */, numRows}, df{df}, - columnBindData{std::move(columnBindData)}, config(std::move(config)) {} + columnBindData{std::move(columnBindData)}, fileScanInfo(std::move(fileScanInfo)) {} ~PandasScanFunctionData() override { py::gil_scoped_acquire acquire; @@ -45,7 +45,7 @@ struct PandasScanFunctionData : public function::TableFuncBindData { } bool getIgnoreErrorsOption() const override { - return config.getOption(common::CopyConstants::IGNORE_ERRORS_OPTION_NAME, + return fileScanInfo.getOption(common::CopyConstants::IGNORE_ERRORS_OPTION_NAME, common::CopyConstants::DEFAULT_IGNORE_ERRORS); } @@ -61,7 +61,7 @@ struct PandasScanFunctionData : public function::TableFuncBindData { for (const auto& i : other.columnBindData) { columnBindData.push_back(i->copy()); } - config = other.config.copy(); + fileScanInfo = other.fileScanInfo.copy(); } }; diff --git a/tools/python_api/src_cpp/pandas/pandas_scan.cpp b/tools/python_api/src_cpp/pandas/pandas_scan.cpp index 2bcafca8e77..59d9577fffd 100644 --- a/tools/python_api/src_cpp/pandas/pandas_scan.cpp +++ b/tools/python_api/src_cpp/pandas/pandas_scan.cpp @@ -32,7 +32,8 @@ std::unique_ptr bindFunc(ClientContext* /*context*/, auto getFunc = df.attr("__getitem__"); auto numRows = py::len(getFunc(columns[0])); auto returnColumns = input->binder->createVariables(names, returnTypes); - auto scanConfig = input->extraInput->constPtrCast()->config.copy(); + auto scanConfig = + input->extraInput->constPtrCast()->fileScanInfo.copy(); return std::make_unique(std::move(returnColumns), df, numRows, std::move(columnBindData), std::move(scanConfig)); } diff --git a/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp b/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp index 3154f075e6e..ec1cbb660af 100644 --- a/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp +++ b/tools/python_api/src_cpp/pyarrow/pyarrow_scan.cpp @@ -73,7 +73,7 @@ static std::unique_ptr bindFunc(ClientContext*, } auto numRows = py::len(table); auto schema = Pyarrow::bind(table, returnTypes, names); - auto config = PyArrowScanConfig(scanInput->config.options); + auto config = PyArrowScanConfig(scanInput->fileScanInfo.options); // The following python operations are zero copy as defined in pyarrow docs. if (config.skipNum != 0) { table = table.attr("slice")(config.skipNum);