diff --git a/components/core/src/clp_s/CMakeLists.txt b/components/core/src/clp_s/CMakeLists.txt index 2b98b69f9..ef2eee40e 100644 --- a/components/core/src/clp_s/CMakeLists.txt +++ b/components/core/src/clp_s/CMakeLists.txt @@ -1,3 +1,4 @@ +add_subdirectory(indexer) add_subdirectory(search/kql) add_subdirectory(search/sql) diff --git a/components/core/src/clp_s/indexer/CMakeLists.txt b/components/core/src/clp_s/indexer/CMakeLists.txt new file mode 100644 index 000000000..8c2738d99 --- /dev/null +++ b/components/core/src/clp_s/indexer/CMakeLists.txt @@ -0,0 +1,102 @@ +set( + INDEXER_SOURCES + ../../clp/aws/AwsAuthenticationSigner.cpp + ../../clp/aws/AwsAuthenticationSigner.hpp + ../../clp/BoundedReader.cpp + ../../clp/BoundedReader.hpp + ../../clp/CurlDownloadHandler.cpp + ../../clp/CurlDownloadHandler.hpp + ../../clp/CurlEasyHandle.hpp + ../../clp/CurlGlobalInstance.cpp + ../../clp/CurlGlobalInstance.hpp + ../../clp/CurlOperationFailed.hpp + ../../clp/CurlStringList.hpp + ../../clp/database_utils.cpp + ../../clp/database_utils.hpp + ../../clp/FileReader.cpp + ../../clp/FileReader.hpp + ../../clp/GlobalMetadataDBConfig.cpp + ../../clp/GlobalMetadataDBConfig.hpp + ../../clp/hash_utils.cpp + ../../clp/hash_utils.hpp + ../../clp/MySQLDB.cpp + ../../clp/MySQLDB.hpp + ../../clp/MySQLParamBindings.cpp + ../../clp/MySQLParamBindings.hpp + ../../clp/MySQLPreparedStatement.cpp + ../../clp/MySQLPreparedStatement.hpp + ../../clp/NetworkReader.cpp + ../../clp/NetworkReader.hpp + ../../clp/ReaderInterface.cpp + ../../clp/ReaderInterface.hpp + ../../clp/Thread.cpp + ../../clp/Thread.hpp + ../archive_constants.hpp + ../ArchiveReader.cpp + ../ArchiveReader.hpp + ../ArchiveReaderAdaptor.cpp + ../ArchiveReaderAdaptor.hpp + ../ColumnReader.cpp + ../ColumnReader.hpp + ../DictionaryReader.hpp + ../DictionaryEntry.cpp + ../DictionaryEntry.hpp + ../FileReader.cpp + ../FileReader.hpp + ../FileWriter.cpp + ../FileWriter.hpp + ../InputConfig.cpp + ../InputConfig.hpp + ../PackedStreamReader.cpp + ../PackedStreamReader.hpp + ../ReaderUtils.cpp + ../ReaderUtils.hpp + ../SchemaReader.cpp + ../SchemaReader.hpp + ../SchemaTree.cpp + ../SchemaTree.hpp + ../TimestampDictionaryReader.cpp + ../TimestampDictionaryReader.hpp + ../TimestampEntry.cpp + ../TimestampEntry.hpp + ../TimestampPattern.cpp + ../TimestampPattern.hpp + ../Utils.cpp + ../Utils.hpp + ../VariableDecoder.cpp + ../VariableDecoder.hpp + ../ZstdCompressor.cpp + ../ZstdCompressor.hpp + ../ZstdDecompressor.cpp + ../ZstdDecompressor.hpp + CommandLineArguments.cpp + CommandLineArguments.hpp + indexer.cpp + IndexManager.cpp + IndexManager.hpp + MySQLIndexStorage.cpp + MySQLIndexStorage.hpp +) + +add_executable(indexer ${INDEXER_SOURCES}) +target_compile_features(indexer PRIVATE cxx_std_20) +target_include_directories(indexer PRIVATE "${PROJECT_SOURCE_DIR}/submodules") +target_link_libraries(indexer + PRIVATE + absl::flat_hash_map + Boost::iostreams Boost::program_options Boost::url + ${CURL_LIBRARIES} + clp::string_utils + MariaDBClient::MariaDBClient + OpenSSL::Crypto + simdjson + spdlog::spdlog + yaml-cpp::yaml-cpp + ZStd::ZStd +) +# Put the built executable at the root of the build directory +set_target_properties( + indexer + PROPERTIES + RUNTIME_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}" +) diff --git a/components/core/src/clp_s/indexer/CommandLineArguments.cpp b/components/core/src/clp_s/indexer/CommandLineArguments.cpp new file mode 100644 index 000000000..488650a54 --- /dev/null +++ b/components/core/src/clp_s/indexer/CommandLineArguments.cpp @@ -0,0 +1,133 @@ +#include "CommandLineArguments.hpp" + +#include + +#include +#include + +namespace po = boost::program_options; + +namespace clp_s::indexer { +CommandLineArguments::ParsingResult +CommandLineArguments::parse_arguments(int argc, char const** argv) { + // Print out basic usage if user doesn't specify any options + if (1 == argc) { + print_basic_usage(); + return ParsingResult::Failure; + } + + // Define general options + po::options_description general_options("General Options"); + general_options.add_options()("help,h", "Print help"); + + // Define output options + po::options_description output_options("Output Options"); + std::string metadata_db_config_file_path; + // clang-format off + output_options.add_options()( + "db-config-file", + po::value(&metadata_db_config_file_path)->value_name("FILE") + ->default_value(metadata_db_config_file_path), + "Path to the YAML DB config file for metadata storage" + )( + "create-table", + po::bool_switch(&m_should_create_table), + "Create the table if it doesn't exist" + ); + // clang-format on + + // Define visible options + po::options_description visible_options; + visible_options.add(general_options); + visible_options.add(output_options); + + std::string archive_path; + po::options_description positional_options; + // clang-format off + positional_options.add_options()( + "table-name", + po::value(&m_table_name), + "Name of the table where fields from the archive will be indexed and stored" + )( + "archive-path", + po::value(&archive_path), + "Path to an archive" + ); + // clang-format on + po::positional_options_description positional_options_description; + positional_options_description.add("table-name", 1); + positional_options_description.add("archive-path", 1); + + // Aggregate all options + po::options_description all_options; + all_options.add(general_options); + all_options.add(output_options); + all_options.add(positional_options); + + // Parse options + try { + // Parse options specified on the command line + po::parsed_options parsed = po::command_line_parser(argc, argv) + .options(all_options) + .positional(positional_options_description) + .run(); + po::variables_map parsed_command_line_options; + store(parsed, parsed_command_line_options); + + notify(parsed_command_line_options); + + // Handle --help + if (parsed_command_line_options.count("help")) { + if (argc > 2) { + SPDLOG_WARN("Ignoring all options besides --help."); + } + + print_basic_usage(); + + std::cerr << visible_options << std::endl; + return ParsingResult::InfoCommand; + } + + // Validate required parameters + if (m_table_name.empty()) { + throw std::invalid_argument("Table name not specified or empty."); + } + if (archive_path.empty()) { + throw std::invalid_argument("Archive path not specified or empty."); + } + m_archive_path = get_path_object_for_raw_path(archive_path); + if (false == metadata_db_config_file_path.empty()) { + clp::GlobalMetadataDBConfig metadata_db_config; + try { + metadata_db_config.parse_config_file(metadata_db_config_file_path); + } catch (std::exception& e) { + SPDLOG_ERROR("Failed to validate metadata database config - {}.", e.what()); + return ParsingResult::Failure; + } + + if (clp::GlobalMetadataDBConfig::MetadataDBType::MySQL + != metadata_db_config.get_metadata_db_type()) + { + SPDLOG_ERROR( + "Invalid metadata database type for {}; only supported type is MySQL.", + m_program_name + ); + return ParsingResult::Failure; + } + + m_metadata_db_config = std::move(metadata_db_config); + } + } catch (std::exception& e) { + SPDLOG_ERROR("{}", e.what()); + print_basic_usage(); + return ParsingResult::Failure; + } + + return ParsingResult::Success; +} + +void CommandLineArguments::print_basic_usage() const { + std::cerr << "Usage: " << get_program_name() << " [OPTIONS] TABLE_NAME ARCHIVE_PATH" + << std::endl; +} +} // namespace clp_s::indexer diff --git a/components/core/src/clp_s/indexer/CommandLineArguments.hpp b/components/core/src/clp_s/indexer/CommandLineArguments.hpp new file mode 100644 index 000000000..890157ebe --- /dev/null +++ b/components/core/src/clp_s/indexer/CommandLineArguments.hpp @@ -0,0 +1,55 @@ +#ifndef CLP_S_INDEXER_COMMANDLINEARGUMENTS_HPP +#define CLP_S_INDEXER_COMMANDLINEARGUMENTS_HPP + +#include +#include + +#include "../../clp/GlobalMetadataDBConfig.hpp" +#include "../InputConfig.hpp" + +namespace clp_s::indexer { +/** + * Class to parse command line arguments + */ +class CommandLineArguments { +public: + // Types + enum class ParsingResult { + Success = 0, + InfoCommand, + Failure + }; + + // Constructors + explicit CommandLineArguments(std::string const& program_name) : m_program_name(program_name) {} + + // Methods + ParsingResult parse_arguments(int argc, char const* argv[]); + + std::string const& get_program_name() const { return m_program_name; } + + std::string const& get_table_name() const { return m_table_name; } + + Path const& get_archive_path() const { return m_archive_path; } + + std::optional const& get_db_config() const { + return m_metadata_db_config; + } + + bool should_create_table() const { return m_should_create_table; } + +private: + // Methods + void print_basic_usage() const; + + // Variables + std::string m_program_name; + std::string m_table_name; + Path m_archive_path; + + std::optional m_metadata_db_config; + bool m_should_create_table{false}; +}; +} // namespace clp_s::indexer + +#endif // CLP_S_INDEXER_COMMANDLINEARGUMENTS_HPP diff --git a/components/core/src/clp_s/indexer/IndexManager.cpp b/components/core/src/clp_s/indexer/IndexManager.cpp new file mode 100644 index 000000000..2e46915ed --- /dev/null +++ b/components/core/src/clp_s/indexer/IndexManager.cpp @@ -0,0 +1,133 @@ +#include "IndexManager.hpp" + +#include +#include + +#include "../archive_constants.hpp" + +namespace clp_s::indexer { +IndexManager::IndexManager( + std::optional const& db_config, + bool should_create_table +) { + if (db_config.has_value()) { + m_mysql_index_storage = std::make_unique( + db_config->get_metadata_db_host(), + db_config->get_metadata_db_port(), + db_config->get_metadata_db_username(), + db_config->get_metadata_db_password(), + db_config->get_metadata_db_name(), + db_config->get_metadata_table_prefix() + ); + m_mysql_index_storage->open(); + m_field_update_callback = [this](std::string& field_name, NodeType field_type) { + m_mysql_index_storage->add_field(field_name, field_type); + }; + m_should_create_table = should_create_table; + m_output_type = OutputType::Database; + } else { + throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__); + } +} + +IndexManager::~IndexManager() { + if (m_output_type == OutputType::Database) { + m_mysql_index_storage->close(); + } +} + +void IndexManager::update_metadata(std::string const& table_name, Path const& archive_path) { + m_mysql_index_storage->init(table_name, m_should_create_table); + + ArchiveReader archive_reader; + archive_reader.open(archive_path, NetworkAuthOption{}); + + traverse_schema_tree_and_update_metadata(archive_reader.get_schema_tree()); +} + +std::string IndexManager::escape_key_name(std::string_view const key_name) { + std::string escaped_key_name; + escaped_key_name.reserve(key_name.size()); + for (auto c : key_name) { + switch (c) { + case '\"': + escaped_key_name += "\\\""; + break; + case '\\': + escaped_key_name += "\\\\"; + break; + case '\n': + escaped_key_name += "\\n"; + break; + case '\t': + escaped_key_name += "\\t"; + break; + case '\r': + escaped_key_name += "\\r"; + break; + case '\b': + escaped_key_name += "\\b"; + break; + case '\f': + escaped_key_name += "\\f"; + break; + case '.': + escaped_key_name += "\\."; + break; + default: + if (std::isprint(c)) { + escaped_key_name += c; + } else { + char buffer[7]; + std::snprintf( + buffer, + sizeof(buffer), + "\\u00%02x", + static_cast(c) + ); + escaped_key_name += buffer; + } + } + } + return escaped_key_name; +} + +void IndexManager::traverse_schema_tree_and_update_metadata( + std::shared_ptr const& schema_tree +) { + if (nullptr == schema_tree) { + return; + } + + std::string path_buffer; + // Stack of pairs of node_id and path_length + std::stack> s; + s.emplace(schema_tree->get_object_subtree_node_id(), 0); + + while (false == s.empty()) { + auto [node_id, path_length] = s.top(); + s.pop(); + + auto const& node = schema_tree->get_node(node_id); + auto node_type = node.get_type(); + // TODO: Add support for structured arrays + if (NodeType::StructuredArray == node_type) { + continue; + } + auto const& children_ids = node.get_children_ids(); + path_buffer.resize(path_length); + if (false == path_buffer.empty()) { + path_buffer += "."; + } + path_buffer += escape_key_name(node.get_key_name()); + if (children_ids.empty() && NodeType::Object != node_type && NodeType::Unknown != node_type) + { + m_field_update_callback(path_buffer, node_type); + } + + for (auto child_id : children_ids) { + s.emplace(child_id, path_buffer.size()); + } + } +} +} // namespace clp_s::indexer diff --git a/components/core/src/clp_s/indexer/IndexManager.hpp b/components/core/src/clp_s/indexer/IndexManager.hpp new file mode 100644 index 000000000..b9f412c0c --- /dev/null +++ b/components/core/src/clp_s/indexer/IndexManager.hpp @@ -0,0 +1,76 @@ +#ifndef CLP_S_INDEXER_INDEXMANAGER_HPP +#define CLP_S_INDEXER_INDEXMANAGER_HPP + +#include +#include + +#include "../../clp/GlobalMetadataDBConfig.hpp" +#include "../ArchiveReader.hpp" +#include "MySQLIndexStorage.hpp" + +namespace clp_s::indexer { +/** + * This class updates field names (e.g., JSON full paths) and data types for a specified archive. + * It traverses the schema tree from root to leaf, concatenating escaped key names using the + * delimiter `.` at each level. + * + * Currently, only leaf nodes with primitive types are indexed. Object nodes and subtrees of + * structured arrays are not indexed. The results are stored in a database to enable efficient + * querying. + * + * Multiple archives related to the same topic can form a table that can be queried using a SQL + * query engine. When indexing, a table name must be specified. This table is then used by the SQL + * engine to resolve column metadata. + */ +class IndexManager { +public: + // Types + class OperationFailed : public TraceableException { + public: + // Constructors + OperationFailed(ErrorCode error_code, char const* const filename, int line_number) + : TraceableException(error_code, filename, line_number) {} + }; + + enum class OutputType : uint8_t { + Database + }; + + // Constructors + IndexManager( + std::optional const& db_config = std::nullopt, + bool should_create_table = false + ); + + // Destructor + ~IndexManager(); + + // Methods + /** + * Updates the metadata for a given archive + * @param table_name + * @param archive_path + */ + void update_metadata(std::string const& table_name, Path const& archive_path); + +private: + /** + * Escapes a key name + * @param key_name + * @return the escaped key name + */ + static std::string escape_key_name(std::string_view const key_name); + + /** + * Traverses the schema tree and updates the metadata + * @param schema_tree + */ + void traverse_schema_tree_and_update_metadata(std::shared_ptr const& schema_tree); + + OutputType m_output_type{OutputType::Database}; + std::shared_ptr m_mysql_index_storage; + bool m_should_create_table{false}; + std::function m_field_update_callback; +}; +} // namespace clp_s::indexer +#endif // CLP_S_INDEXER_INDEXMANAGER_HPP diff --git a/components/core/src/clp_s/indexer/MySQLIndexStorage.cpp b/components/core/src/clp_s/indexer/MySQLIndexStorage.cpp new file mode 100644 index 000000000..144a116e4 --- /dev/null +++ b/components/core/src/clp_s/indexer/MySQLIndexStorage.cpp @@ -0,0 +1,99 @@ +#include "MySQLIndexStorage.hpp" + +#include +#include + +#include "../../clp/database_utils.hpp" +#include "../../clp/type_utils.hpp" + +enum class TableMetadataFieldIndexes : uint16_t { + Name = 0, + Type, + Length, +}; + +namespace clp_s::indexer { +void MySQLIndexStorage::open() { + if (m_is_open) { + throw OperationFailed(ErrorCodeNotReady, __FILENAME__, __LINE__); + } + + m_db.open(m_host, m_port, m_username, m_password, m_database_name); + m_is_open = true; +} + +void MySQLIndexStorage::init(std::string const& table_name, bool should_create_table) { + if (false == m_is_open) { + throw OperationFailed(ErrorCodeNotReady, __FILENAME__, __LINE__); + } + + if (should_create_table) { + m_db.execute_query(fmt::format( + "CREATE TABLE IF NOT EXISTS {}{} (" + "name VARCHAR(512) NOT NULL, " + "type BIGINT NOT NULL," + "PRIMARY KEY (name, type)" + ")", + m_table_prefix, + table_name + )); + } + + m_insert_field_statement.reset(); + + std::vector table_metadata_field_names( + clp::enum_to_underlying_type(TableMetadataFieldIndexes::Length) + ); + table_metadata_field_names[clp::enum_to_underlying_type(TableMetadataFieldIndexes::Name)] + = "name"; + table_metadata_field_names[clp::enum_to_underlying_type(TableMetadataFieldIndexes::Type)] + = "type"; + fmt::memory_buffer statement_buffer; + auto statement_buffer_ix = std::back_inserter(statement_buffer); + + fmt::format_to( + statement_buffer_ix, + "INSERT IGNORE INTO {}{} ({}) VALUES ({})", + m_table_prefix, + table_name, + clp::get_field_names_sql(table_metadata_field_names), + clp::get_placeholders_sql(table_metadata_field_names.size()) + ); + SPDLOG_DEBUG("{:.{}}", statement_buffer.data(), statement_buffer.size()); + m_insert_field_statement = std::make_unique( + m_db.prepare_statement(statement_buffer.data(), statement_buffer.size()) + ); + + m_is_init = true; +} + +void MySQLIndexStorage::add_field(std::string const& field_name, NodeType field_type) { + if (false == m_is_init) { + throw OperationFailed(ErrorCodeNotReady, __FILENAME__, __LINE__); + } + + auto& statement_bindings = m_insert_field_statement->get_statement_bindings(); + statement_bindings.bind_varchar( + clp::enum_to_underlying_type(TableMetadataFieldIndexes::Name), + field_name.c_str(), + field_name.length() + ); + + uint64_t field_type_value = static_cast(field_type); + statement_bindings.bind_uint64( + clp::enum_to_underlying_type(TableMetadataFieldIndexes::Type), + field_type_value + ); + + if (false == m_insert_field_statement->execute()) { + throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__); + } +} + +void MySQLIndexStorage::close() { + m_insert_field_statement.reset(); + m_db.close(); + m_is_open = false; + m_is_init = false; +} +} // namespace clp_s::indexer diff --git a/components/core/src/clp_s/indexer/MySQLIndexStorage.hpp b/components/core/src/clp_s/indexer/MySQLIndexStorage.hpp new file mode 100644 index 000000000..7374c1aa5 --- /dev/null +++ b/components/core/src/clp_s/indexer/MySQLIndexStorage.hpp @@ -0,0 +1,88 @@ +#ifndef CLP_S_INDEXER_MYSQLINDEXSTORAGE_HPP +#define CLP_S_INDEXER_MYSQLINDEXSTORAGE_HPP + +#include "../../clp/MySQLDB.hpp" +#include "../../clp/MySQLPreparedStatement.hpp" +#include "../SchemaTree.hpp" +#include "../TraceableException.hpp" + +using clp::MySQLDB; +using clp::MySQLPreparedStatement; + +namespace clp_s::indexer { +/** + * Class representing a MySQL storage for column metadata (column names and types) + */ +class MySQLIndexStorage { +public: + static constexpr char cColumnMetadataPrefix[] = "column_metadata_"; + + // Types + class OperationFailed : public TraceableException { + public: + // Constructors + OperationFailed(ErrorCode error_code, char const* const filename, int line_number) + : TraceableException(error_code, filename, line_number) {} + }; + + // Constructors + MySQLIndexStorage( + std::string const& host, + int port, + std::string const& username, + std::string const& password, + std::string const& database_name, + std::string const& table_prefix + ) + : m_is_open(false), + m_is_init(false), + m_host(host), + m_port(port), + m_username(username), + m_password(password), + m_database_name(database_name), + m_table_prefix(table_prefix + cColumnMetadataPrefix) {} + + // Methods + /** + * Opens the database connection + */ + void open(); + + /** + * Creates the table if it is required and prepares the insert statement + * @param table_name + * @param should_create_table + */ + void init(std::string const& table_name, bool should_create_table); + + /** + * Closes the database connection + */ + void close(); + + /** + * Adds a field (column) to the table + * @param field_name + * @param field_type + */ + void add_field(std::string const& field_name, NodeType field_type); + +private: + // Variables + bool m_is_open{}; + bool m_is_init{}; + std::string m_host; + int m_port{}; + std::string m_username; + std::string m_password; + std::string m_database_name; + std::string m_table_prefix; + + MySQLDB m_db; + + std::unique_ptr m_insert_field_statement; +}; +} // namespace clp_s::indexer + +#endif // CLP_S_INDEXER_MYSQLINDEXSTORAGE_HPP diff --git a/components/core/src/clp_s/indexer/indexer.cpp b/components/core/src/clp_s/indexer/indexer.cpp new file mode 100644 index 000000000..f2d9867bc --- /dev/null +++ b/components/core/src/clp_s/indexer/indexer.cpp @@ -0,0 +1,45 @@ +#include +#include + +#include +#include + +#include "../ReaderUtils.hpp" +#include "CommandLineArguments.hpp" +#include "IndexManager.hpp" + +using clp_s::indexer::CommandLineArguments; + +int main(int argc, char const* argv[]) { + try { + auto stderr_logger = spdlog::stderr_logger_st("stderr"); + spdlog::set_default_logger(stderr_logger); + spdlog::set_pattern("%Y-%m-%dT%H:%M:%S.%e%z [%l] %v"); + } catch (std::exception& e) { + return 1; + } + + CommandLineArguments command_line_arguments("metadata-uploader"); + auto parsing_result = command_line_arguments.parse_arguments(argc, argv); + switch (parsing_result) { + case CommandLineArguments::ParsingResult::Failure: + return 1; + case CommandLineArguments::ParsingResult::InfoCommand: + return 0; + case CommandLineArguments::ParsingResult::Success: + // Continue processing + break; + } + + try { + clp_s::indexer::IndexManager index_manager(command_line_arguments.get_db_config()); + index_manager.update_metadata( + command_line_arguments.get_table_name(), + command_line_arguments.get_archive_path() + ); + } catch (std::exception& e) { + SPDLOG_ERROR("Failed to update metadata: {}", e.what()); + return 1; + } + return 0; +}