Skip to content

Commit

Permalink
Updated: gelf-message-id construction v3
Browse files Browse the repository at this point in the history
Signed-off-by: Bharat Jain <[email protected]>
  • Loading branch information
BharatKJain committed Oct 5, 2024
1 parent b45a96a commit 4553311
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 15 deletions.
5 changes: 2 additions & 3 deletions tremor-interceptor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ anyhow = { version = "1.0", default-features = true }
thiserror = { version = "1.0", default-features = false }

#gelf
rand = { version = "0.8", optional = true, default-features = false, features = ["std_rng"] }
sha2 = { version="0.10.8", optional = true, default-feature = false, features = ["std"]}
rand = { version = "0.8", optional = true, default-features = false, features = [] }

# Compression
brotli = { version = "6", optional = true, default-features = false, features = [
Expand Down Expand Up @@ -62,7 +61,7 @@ proptest = "1.5"
[features]
default = ["base64", "compression", "gelf", "length-prefix"]
length-prefix = ["dep:bytes"]
gelf = ["dep:rand", "dep:sha2"]
gelf = ["dep:rand"]
base64 = []
compression = [
"dep:brotli",
Expand Down
24 changes: 12 additions & 12 deletions tremor-interceptor/src/postprocessor/gelf_chunking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@
use super::Postprocessor;
use std::time::{SystemTime, UNIX_EPOCH};
use sha2::{Sha256, Digest};
use std::thread;
use std::hash::{Hash, Hasher, DefaultHasher};

#[derive(Clone)]
pub struct Gelf {
auto_increment_id: u64,
chunk_size: usize,
}

impl Default for Gelf {
fn default() -> Self {
Self {
auto_increment_id: 0,

Check warning on line 31 in tremor-interceptor/src/postprocessor/gelf_chunking.rs

View check run for this annotation

Codecov / codecov/patch

tremor-interceptor/src/postprocessor/gelf_chunking.rs#L31

Added line #L31 was not covered by tests
chunk_size: 8192,
}
}
Expand All @@ -42,7 +43,7 @@ enum Error {
ChunkCount(usize),
}

fn generate_message_id(epoch_timestamp: u64, data: &[u8]) -> u64{
fn generate_message_id(epoch_timestamp: u64, auto_increment_id: u64) -> u64{

Check warning on line 46 in tremor-interceptor/src/postprocessor/gelf_chunking.rs

View check run for this annotation

Codecov / codecov/patch

tremor-interceptor/src/postprocessor/gelf_chunking.rs#L46

Added line #L46 was not covered by tests

/*
* REFERENCE(/Explaination) TAKEN FROM: https://github.com/osiegmar/logback-gelf/blob/master/src/main/java/de/siegmar/logbackgelf/MessageIdSupplier.java#L61
Expand Down Expand Up @@ -73,13 +74,8 @@ fn generate_message_id(epoch_timestamp: u64, data: &[u8]) -> u64{
* Then we can spend the rest on a random number.
*/

const BITS_13: u64 = 0b1_1111_1111_1111;

let mut sha_hasher = Sha256::new();
sha_hasher.update(data);
let data_sha_hash = sha_hasher.finalize();
let data_sha_hash_u64 = u64::from_be_bytes(data_sha_hash[0..8].try_into().expect("slice with incorrect length"));

const BITS_13: u64 = 0b1_1111_1111_1111;

let current_thread = thread::current();
let thread_id = current_thread.id();
Expand All @@ -88,7 +84,7 @@ fn generate_message_id(epoch_timestamp: u64, data: &[u8]) -> u64{
thread_id.hash(&mut hasher);
let thread_id_u64 = hasher.finish();

return (epoch_timestamp & BITS_13) | (data_sha_hash_u64 & !BITS_13) | (thread_id_u64 & BITS_13)
(epoch_timestamp & BITS_13) | (auto_increment_id & !BITS_13) | (thread_id_u64 & BITS_13)
}

Check warning on line 88 in tremor-interceptor/src/postprocessor/gelf_chunking.rs

View check run for this annotation

Codecov / codecov/patch

tremor-interceptor/src/postprocessor/gelf_chunking.rs#L80-L88

Added lines #L80 - L88 were not covered by tests

impl Gelf {
Expand All @@ -103,7 +99,9 @@ impl Gelf {
return Err(Error::ChunkCount(n));
};

let id = generate_message_id(epoch_timestamp,data);
let gelf_message_id = generate_message_id(epoch_timestamp,self.auto_increment_id);

self.auto_increment_id+=1;

Check warning on line 105 in tremor-interceptor/src/postprocessor/gelf_chunking.rs

View check run for this annotation

Codecov / codecov/patch

tremor-interceptor/src/postprocessor/gelf_chunking.rs#L101-L105

Added lines #L101 - L105 were not covered by tests
Ok(chunks
.enumerate()
Expand All @@ -114,7 +112,7 @@ impl Gelf {
// magic number
buf.append(&mut vec![0x1e, 0x0f]);
// gelf package id
buf.append(&mut id.to_be_bytes().to_vec());
buf.append(&mut gelf_message_id.to_be_bytes().to_vec());

Check warning on line 115 in tremor-interceptor/src/postprocessor/gelf_chunking.rs

View check run for this annotation

Codecov / codecov/patch

tremor-interceptor/src/postprocessor/gelf_chunking.rs#L115

Added line #L115 was not covered by tests
// sequence number
buf.push(i as u8);
// sequence count
Expand Down Expand Up @@ -182,6 +180,7 @@ mod test {
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
];
let mut encoder = super::Gelf {
auto_increment_id: 0,
chunk_size: 20,
};

Expand All @@ -208,11 +207,12 @@ mod test {
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
];
let mut encoder = super::Gelf {
auto_increment_id: 0,
chunk_size: 20
};

let encoded_gelf = encoder.encode_gelf(&input_data, 0)?;
let expected_message_id = generate_message_id(0, &input_data);
let expected_message_id = generate_message_id(0, 0);
assert_eq!(u64::from_be_bytes(encoded_gelf[1][2..10].try_into().expect("slice with incorrect length")), expected_message_id);

// print!("\nLength of encoding message: {}",encoded_gelf[1].len());
Expand Down

0 comments on commit 4553311

Please sign in to comment.