Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp): Add the write path for single-file archives. #646

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions components/core/src/clp/clp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ set(
../streaming_archive/reader/Segment.hpp
../streaming_archive/reader/SegmentManager.cpp
../streaming_archive/reader/SegmentManager.hpp
../streaming_archive/single_file_archive/Defs.hpp
../streaming_archive/single_file_archive/writer.cpp
../streaming_archive/single_file_archive/writer.hpp
../streaming_archive/writer/Archive.cpp
../streaming_archive/writer/Archive.hpp
../streaming_archive/writer/File.cpp
Expand Down
4 changes: 4 additions & 0 deletions components/core/src/clp/clp/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
->default_value(m_schema_file_path),
"Path to a schema file. If not specified, heuristics are used to determine "
"dictionary variables. See README-Schema.md for details."
)(
"single-file-archive",
po::bool_switch(&m_single_file_archive),
"Output archive as a single-file"
davemarco marked this conversation as resolved.
Show resolved Hide resolved
);

po::options_description all_compression_options;
Expand Down
4 changes: 4 additions & 0 deletions components/core/src/clp/clp/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class CommandLineArguments : public CommandLineArgumentsBase {
explicit CommandLineArguments(std::string const& program_name)
: CommandLineArgumentsBase(program_name),
m_show_progress(false),
m_single_file_archive(false),
m_sort_input_files(true),
m_print_archive_stats_progress(false),
m_target_segment_uncompressed_size(1L * 1024 * 1024 * 1024),
Expand All @@ -45,6 +46,8 @@ class CommandLineArguments : public CommandLineArgumentsBase {

bool show_progress() const { return m_show_progress; }

bool get_use_single_file_archive() const { return m_single_file_archive; }
davemarco marked this conversation as resolved.
Show resolved Hide resolved

bool sort_input_files() const { return m_sort_input_files; }

bool print_archive_stats_progress() const { return m_print_archive_stats_progress; }
Expand Down Expand Up @@ -92,6 +95,7 @@ class CommandLineArguments : public CommandLineArgumentsBase {
std::string m_output_dir;
std::string m_schema_file_path;
bool m_show_progress;
bool m_single_file_archive;
bool m_print_archive_stats_progress;
size_t m_target_encoded_file_size;
size_t m_target_segment_uncompressed_size;
Expand Down
9 changes: 6 additions & 3 deletions components/core/src/clp/clp/FileCompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ void FileCompressor::parse_and_encode_with_heuristic(

// Parse content from file
while (m_message_parser.parse_next_message(true, reader, m_parsed_message)) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking, would it be cleaner if we add a new method with name like "should_split" to archive_writer, and embed this if logic into the method.

Now the same if statements have been duplicated at multiple places, which is inefficient and error prone since one change requires you to update multiple places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will wait on making this change until I get feedback from kirk. I may want to remove no split functionality even though it is done in private branch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

&& false == archive_writer.get_use_single_file_archive()) {
split_file_and_archive(
archive_user_config,
path_for_compression,
Expand Down Expand Up @@ -337,7 +338,8 @@ bool FileCompressor::try_compressing_as_archive(
parent_directories.emplace(file_parent_path);
}

if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts
&& false == archive_writer.get_use_single_file_archive()) {
split_archive(archive_user_config, archive_writer);
}

Expand Down Expand Up @@ -537,7 +539,8 @@ std::error_code FileCompressor::compress_ir_stream_by_encoding(
}

// Split archive/encoded file if necessary before writing the new event
if (archive.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
if (archive.get_data_size_of_dictionaries() >= target_data_size_of_dicts
&& false == archive.get_use_single_file_archive()) {
split_file_and_archive(
archive_user_config,
path,
Expand Down
7 changes: 5 additions & 2 deletions components/core/src/clp/clp/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ bool compress(
archive_user_config.global_metadata_db = global_metadata_db.get();
archive_user_config.print_archive_stats_progress
= command_line_args.print_archive_stats_progress();
archive_user_config.use_single_file_archive = command_line_args.get_use_single_file_archive();

// Open Archive
streaming_archive::writer::Archive archive_writer;
Expand Down Expand Up @@ -135,7 +136,8 @@ bool compress(
);
}
for (auto it = files_to_compress.cbegin(); it != files_to_compress.cend(); ++it) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBD

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for this

&& false == archive_writer.get_use_single_file_archive()) {
split_archive(archive_user_config, archive_writer);
}
if (false
Expand Down Expand Up @@ -163,7 +165,8 @@ bool compress(
file_group_id_comparator);
// Compress grouped files
for (auto const& file_to_compress : grouped_files_to_compress) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries
&& false == archive_writer.get_use_single_file_archive()) {
split_archive(archive_user_config, archive_writer);
}
if (false
Expand Down
16 changes: 16 additions & 0 deletions components/core/src/clp/streaming_archive/ArchiveMetadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
#include "../FileReader.hpp"
#include "../FileWriter.hpp"
#include "Constants.hpp"
#include "../ffi/encoding_methods.hpp"

namespace clp::streaming_archive {

static constexpr std::string_view cCompressionTypeZstd = "ZSTD";

/**
* A class to encapsulate metadata directly relating to an archive.
*/
Expand Down Expand Up @@ -79,6 +83,12 @@ class ArchiveMetadata {

[[nodiscard]] auto get_end_timestamp() const { return m_end_timestamp; }

[[nodiscard]] auto get_variable_encoding_methods_version() const -> std::string const& { return m_variable_encoding_methods_version; }

[[nodiscard]] auto get_variables_schema_version() const -> std::string const& { return m_variables_schema_version; }

[[nodiscard]] auto get_compression_type() const -> std::string const& { return m_compression_type; }

/**
* Expands the archive's time range based to encompass the given time range
* @param begin_timestamp
Expand All @@ -102,6 +112,12 @@ class ArchiveMetadata {
// The size of the archive
uint64_t m_compressed_size{0};
uint64_t m_dynamic_compressed_size{0};
// TODO: The following fields are used in single-file archive; however, they are not
// currently part of multi-file archive metadata. Modifying multi-file archive metadata
// disk format is potentially a breaking change and not currently required.
std::string m_variable_encoding_methods_version{ffi::cVariableEncodingMethodsVersion};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess a high level question is. how are those constants related to multi-file-archive?

Technically, since they are not used by anything else, they can also be directly defined under struct "single_archive_metadata"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally these fields are written to multi-file archive metadata. However, since this is actually changing multi-file archives, I didn't want to make the change until we here from @kirkrodrigues. If there is no plan to add these to the multi-file metadata, perhaps we can move them into single file archive files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I didn't fully answer your question, but I asked kirk about these fields earlier and supposedly they are also relevant to multi file archive

std::string m_variables_schema_version{ffi::cVariablesSchemaVersion};
std::string m_compression_type{cCompressionTypeZstd};
};
} // namespace clp::streaming_archive

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#ifndef CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP
#define CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP

#include <cstdint>
#include <string>

#include "../clp/Defs.h"
#include "../Constants.hpp"
#include "msgpack.hpp"

namespace clp::streaming_archive::single_file_archive {

using single_file_archive_format_version_t = uint32_t;

// Single file archive version.
constexpr uint8_t cArchiveMajorVersion{0};
constexpr uint8_t cArchiveMinorVersion{1};
constexpr uint16_t cArchivePatchVersion{1};
constexpr single_file_archive_format_version_t cArchiveVersion{
cArchiveMajorVersion << 24 | cArchiveMinorVersion << 16 | cArchivePatchVersion
};

static constexpr size_t cNumMagicNumberChars{4};
static constexpr std::array<uint8_t, cNumMagicNumberChars> cUnstructuredSfaMagicNumber{'Y', 'C', 'L', 'P'};
static constexpr std::string_view cUnstructuredSfaExtension{".clp"};
static constexpr size_t cFileSizeWarningThreshold{100L * 1024 * 1024};

static constexpr size_t cNumStaticFiles{5};
constexpr std::array<const char*, cNumStaticFiles> cStaticArchiveFileNames{
cMetadataDBFileName,
cLogTypeDictFilename,
cLogTypeSegmentIndexFilename,
cVarDictFilename,
cVarSegmentIndexFilename
};

static constexpr size_t cNumUnused{6};
struct __attribute__((packed)) SingleFileArchiveHeader {
std::array<uint8_t, cNumMagicNumberChars> magic;
single_file_archive_format_version_t version;
uint64_t metadata_size;
std::array<uint64_t, cNumUnused> unused;
};

struct FileInfo {
davemarco marked this conversation as resolved.
Show resolved Hide resolved
std::string n;
uint64_t o;
MSGPACK_DEFINE_MAP(n, o);
};

struct MultiFileArchiveMetadata {
davemarco marked this conversation as resolved.
Show resolved Hide resolved
archive_format_version_t archive_format_version;
std::string variable_encoding_methods_version;
std::string variables_schema_version;
std::string compression_type;
std::string creator_id;
epochtime_t begin_timestamp;
epochtime_t end_timestamp;
uint64_t uncompressed_size;
uint64_t compressed_size;
MSGPACK_DEFINE_MAP(
archive_format_version,
variable_encoding_methods_version,
variables_schema_version,
compression_type,
creator_id,
begin_timestamp,
end_timestamp,
uncompressed_size,
compressed_size
);
};

struct SingleFileArchiveMetadata {
std::vector<FileInfo> archive_files;
MultiFileArchiveMetadata archive_metadata;
uint64_t num_segments;
MSGPACK_DEFINE_MAP(archive_files, archive_metadata, num_segments);
};
} // namespace clp::streaming_archive::single_file_archive

#endif // CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP
Loading
Loading