-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(clp): Add the write path for single-file archives. #646
base: main
Are you sure you want to change the base?
Changes from 19 commits
a5ef64f
615fafd
b84c2da
d1ec9fe
869f1b3
d84c002
f4136f8
6bbf12e
5c75147
c1f12df
0ab6e0e
d4ed4f6
82b9802
393049b
5428403
7e261f7
0bd9b27
d207630
c2fd37d
cdafb8d
dcadf04
265b4e4
c7361c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -243,7 +243,9 @@ void FileCompressor::parse_and_encode_with_heuristic( | |
|
||
// Parse content from file | ||
while (m_message_parser.parse_next_message(true, reader, m_parsed_message)) { | ||
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts) { | ||
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just thinking, would it be cleaner if we add a new method with name like "should_split" to archive_writer, and embed this if logic into the method. Now the same if statements have been duplicated at multiple places, which is inefficient and error prone since one change requires you to update multiple places There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will wait on making this change until I get feedback from kirk. I may want to remove no split functionality even though it is done in private branch. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good |
||
&& false == archive_writer.get_use_single_file_archive()) | ||
{ | ||
split_file_and_archive( | ||
archive_user_config, | ||
path_for_compression, | ||
|
@@ -337,7 +339,9 @@ bool FileCompressor::try_compressing_as_archive( | |
parent_directories.emplace(file_parent_path); | ||
} | ||
|
||
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts) { | ||
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts | ||
&& false == archive_writer.get_use_single_file_archive()) | ||
{ | ||
split_archive(archive_user_config, archive_writer); | ||
} | ||
|
||
|
@@ -537,7 +541,9 @@ std::error_code FileCompressor::compress_ir_stream_by_encoding( | |
} | ||
|
||
// Split archive/encoded file if necessary before writing the new event | ||
if (archive.get_data_size_of_dictionaries() >= target_data_size_of_dicts) { | ||
if (archive.get_data_size_of_dictionaries() >= target_data_size_of_dicts | ||
&& false == archive.get_use_single_file_archive()) | ||
{ | ||
split_file_and_archive( | ||
archive_user_config, | ||
path, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,6 +107,7 @@ bool compress( | |
archive_user_config.global_metadata_db = global_metadata_db.get(); | ||
archive_user_config.print_archive_stats_progress | ||
= command_line_args.print_archive_stats_progress(); | ||
archive_user_config.use_single_file_archive = command_line_args.single_file_archive(); | ||
|
||
// Open Archive | ||
streaming_archive::writer::Archive archive_writer; | ||
|
@@ -135,7 +136,9 @@ bool compress( | |
); | ||
} | ||
for (auto it = files_to_compress.cbegin(); it != files_to_compress.cend(); ++it) { | ||
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries) { | ||
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TBD There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto for this |
||
&& false == archive_writer.get_use_single_file_archive()) | ||
{ | ||
split_archive(archive_user_config, archive_writer); | ||
} | ||
if (false | ||
|
@@ -163,7 +166,9 @@ bool compress( | |
file_group_id_comparator); | ||
// Compress grouped files | ||
for (auto const& file_to_compress : grouped_files_to_compress) { | ||
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries) { | ||
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries | ||
&& false == archive_writer.get_use_single_file_archive()) | ||
{ | ||
split_archive(archive_user_config, archive_writer); | ||
} | ||
if (false | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,11 +4,15 @@ | |
#include <cstdint> | ||
|
||
#include "../Defs.h" | ||
#include "../ffi/encoding_methods.hpp" | ||
#include "../FileReader.hpp" | ||
#include "../FileWriter.hpp" | ||
#include "Constants.hpp" | ||
|
||
namespace clp::streaming_archive { | ||
|
||
static constexpr std::string_view cCompressionTypeZstd = "ZSTD"; | ||
|
||
/** | ||
* A class to encapsulate metadata directly relating to an archive. | ||
*/ | ||
|
@@ -79,6 +83,18 @@ class ArchiveMetadata { | |
|
||
[[nodiscard]] auto get_end_timestamp() const { return m_end_timestamp; } | ||
|
||
[[nodiscard]] auto get_variable_encoding_methods_version() const -> std::string const& { | ||
return m_variable_encoding_methods_version; | ||
} | ||
|
||
[[nodiscard]] auto get_variables_schema_version() const -> std::string const& { | ||
return m_variables_schema_version; | ||
} | ||
|
||
[[nodiscard]] auto get_compression_type() const -> std::string const& { | ||
return m_compression_type; | ||
} | ||
|
||
/** | ||
* Expands the archive's time range based to encompass the given time range | ||
* @param begin_timestamp | ||
|
@@ -102,6 +118,12 @@ class ArchiveMetadata { | |
// The size of the archive | ||
uint64_t m_compressed_size{0}; | ||
uint64_t m_dynamic_compressed_size{0}; | ||
// TODO: The following fields are used in single-file archive; however, they are not | ||
// currently part of multi-file archive metadata. Modifying multi-file archive metadata | ||
// disk format is potentially a breaking change and not currently required. | ||
std::string m_variable_encoding_methods_version{ffi::cVariableEncodingMethodsVersion}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess a high level question is. how are those constants related to multi-file-archive? Technically, since they are not used by anything else, they can also be directly defined under struct "single_archive_metadata"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally these fields are written to multi-file archive metadata. However, since this is actually changing multi-file archives, I didn't want to make the change until we here from @kirkrodrigues. If there is no plan to add these to the multi-file metadata, perhaps we can move them into single file archive files There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess I didn't fully answer your question, but I asked kirk about these fields earlier and supposedly they are also relevant to multi file archive |
||
std::string m_variables_schema_version{ffi::cVariablesSchemaVersion}; | ||
std::string m_compression_type{cCompressionTypeZstd}; | ||
}; | ||
} // namespace clp::streaming_archive | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
#ifndef CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP | ||
#define CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP | ||
|
||
#include <cstdint> | ||
#include <string> | ||
|
||
#include "../../Defs.h" | ||
#include "../Constants.hpp" | ||
#include "msgpack.hpp" | ||
|
||
namespace clp::streaming_archive::single_file_archive { | ||
|
||
using single_file_archive_format_version_t = uint32_t; | ||
|
||
// Single file archive version. | ||
constexpr uint8_t cArchiveMajorVersion{0}; | ||
constexpr uint8_t cArchiveMinorVersion{1}; | ||
constexpr uint16_t cArchivePatchVersion{1}; | ||
constexpr single_file_archive_format_version_t cArchiveVersion{ | ||
cArchiveMajorVersion << 24 | cArchiveMinorVersion << 16 | cArchivePatchVersion | ||
}; | ||
|
||
static constexpr size_t cNumMagicNumberChars{4}; | ||
static constexpr std::array<uint8_t, cNumMagicNumberChars> | ||
cUnstructuredSfaMagicNumber{'Y', 'C', 'L', 'P'}; | ||
static constexpr std::string_view cUnstructuredSfaExtension{".clp"}; | ||
static constexpr size_t cFileSizeWarningThreshold{100L * 1024 * 1024}; | ||
|
||
static constexpr size_t cNumStaticFiles{5}; | ||
constexpr std::array<char const*, cNumStaticFiles> cStaticArchiveFileNames{ | ||
cMetadataDBFileName, | ||
cLogTypeDictFilename, | ||
cLogTypeSegmentIndexFilename, | ||
cVarDictFilename, | ||
cVarSegmentIndexFilename | ||
}; | ||
|
||
static constexpr size_t cNumUnused{6}; | ||
|
||
struct __attribute__((packed)) SingleFileArchiveHeader { | ||
std::array<uint8_t, cNumMagicNumberChars> magic; | ||
single_file_archive_format_version_t version; | ||
uint64_t metadata_size; | ||
std::array<uint64_t, cNumUnused> unused; | ||
}; | ||
|
||
struct FileInfo { | ||
davemarco marked this conversation as resolved.
Show resolved
Hide resolved
|
||
std::string n; | ||
uint64_t o; | ||
MSGPACK_DEFINE_MAP(n, o); | ||
}; | ||
|
||
struct MultiFileArchiveMetadata { | ||
davemarco marked this conversation as resolved.
Show resolved
Hide resolved
|
||
archive_format_version_t archive_format_version; | ||
std::string variable_encoding_methods_version; | ||
std::string variables_schema_version; | ||
std::string compression_type; | ||
std::string creator_id; | ||
epochtime_t begin_timestamp; | ||
epochtime_t end_timestamp; | ||
uint64_t uncompressed_size; | ||
uint64_t compressed_size; | ||
MSGPACK_DEFINE_MAP( | ||
archive_format_version, | ||
variable_encoding_methods_version, | ||
variables_schema_version, | ||
compression_type, | ||
creator_id, | ||
begin_timestamp, | ||
end_timestamp, | ||
uncompressed_size, | ||
compressed_size | ||
); | ||
}; | ||
|
||
struct SingleFileArchiveMetadata { | ||
std::vector<FileInfo> archive_files; | ||
MultiFileArchiveMetadata archive_metadata; | ||
uint64_t num_segments; | ||
MSGPACK_DEFINE_MAP(archive_files, archive_metadata, num_segments); | ||
}; | ||
} // namespace clp::streaming_archive::single_file_archive | ||
|
||
#endif // CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Missing test coverage for the new single-file archive implementation
The new single-file archive files have been properly added to the CMake configuration, but I don't see any corresponding test files for this functionality. While there are several test files in the codebase:
test-clp_s-end_to_end.cpp
Recommended actions:
writer.cpp
🔗 Analysis chain
LGTM! The new single-file archive source files are properly integrated.
The files are correctly placed in a dedicated directory and follow the project's naming conventions.
Let's verify the test coverage for these new files:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
Length of output: 175
Script:
Length of output: 1270
Script:
Length of output: 1828