Skip to content

Commit

Permalink
Fix consume regression (#741)
Browse files Browse the repository at this point in the history
* fix regression in consume fetch

* cargo fmt

* update VERSION
  • Loading branch information
sehz authored Feb 3, 2021
1 parent 2b05012 commit 5791b88
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 50 deletions.
72 changes: 46 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.0-alpha.2
0.7.0-alpha.3
3 changes: 3 additions & 0 deletions src/auth/src/x509/authenticator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ impl X509Authenticator {
.await
.map_err(|err| match err {
fluvio_socket::FlvSocketError::IoError { source } => source,
fluvio_socket::FlvSocketError::SocketClosed => {
IoError::new(IoErrorKind::BrokenPipe, "connection closed")
}
fluvio_socket::FlvSocketError::SendFileError { .. } => {
panic!("shoud not be doing zero copy here")
}
Expand Down
13 changes: 3 additions & 10 deletions src/controlplane-metadata/src/topic/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,20 +722,13 @@ pub mod test {
let result = topic_spec.encode(&mut dest, 0);
assert!(result.is_ok());
let expected_dest = [
0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x13, 0x89, 0x00, 0x00, 0x13,
0x8a,
];

/*
let expected_dest = [
0, 2, 0, 0, 0, 0, 4, 0, 0,
0x00,
0x02, // partition cnt
0x00, // type
0x00, 0x00, 0x00, 0x01, // partition cnt
0x00, 0x00, 0x00, 0x00, // partition id
0x00, 0x00, 0x00, 0x02, // replica cnt
0x00, 0x00, 0x13, 0x89, // spu id: 5001
0x00, 0x00, 0x13, 0x8a, // spu id: 5002
];*/
];
assert_eq!(dest, expected_dest);

// test encode
Expand Down
2 changes: 1 addition & 1 deletion src/dataplane-protocol/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ mod test {

let decoded_record = batch.records.get(0).unwrap();
println!("record crc: {}", batch.header.crc);
assert_eq!(batch.header.crc, 1910360147);
assert_eq!(batch.header.crc, 2908671645);
let b = decoded_record.value.as_ref();
assert_eq!(b, b"test");

Expand Down
8 changes: 5 additions & 3 deletions src/dataplane-protocol/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,15 +433,17 @@ mod test {

#[test]
fn test_decode_encode_record() -> Result<(), IoError> {
/*
* Below is how you generate the vec<u8> for the `data` varible below.
/* Below is how you generate the vec<u8> for the `data` varible below.
let mut record = DefaultRecord::from(String::from("dog"));
record.preamble.set_offset_delta(1);
let mut out = vec![];
record.encode(&mut out, 0)?;
println!("ENCODED: {:#x?}", out);
*/
let data: Vec<u8> = vec![0x12, 0x0, 0x0, 0x2, 0x0, 0x6, 0x64, 0x6f, 0x67, 0x0];

let data = [
0x1e, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x64, 0x6f, 0x67, 0x0,
];

let record = DefaultRecord::decode_from(&mut Cursor::new(&data), 0)?;
assert_eq!(record.as_bytes(0)?.len(), data.len());
Expand Down
18 changes: 9 additions & 9 deletions src/storage/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ mod tests {
(active_segment.find_offset_position(20).await?).expect("offset exists");
assert_eq!(offset_position.get_base_offset(), 20);
assert_eq!(offset_position.get_pos(), 0); //
assert_eq!(offset_position.len(), 55);
assert_eq!(offset_position.len(), 64);
assert!((active_segment.find_offset_position(30).await?).is_none());
Ok(())
}
Expand Down Expand Up @@ -492,7 +492,7 @@ mod tests {
(active_segment.find_offset_position(20).await?).expect("offset exists");
assert_eq!(offset_position.get_base_offset(), 20);
assert_eq!(offset_position.get_pos(), 0); //
assert_eq!(offset_position.len(), 82);
assert_eq!(offset_position.len(), 109);
assert!((active_segment.find_offset_position(30).await?).is_none());

Ok(())
Expand All @@ -516,10 +516,10 @@ mod tests {

assert_eq!(seg_sink.get_end_offset(), 46);

assert_eq!(seg_sink.get_log_pos(), 228); // each takes 91 bytes
assert_eq!(seg_sink.get_log_pos(), 273); // each takes 91 bytes

let index = seg_sink.get_index();
assert_eq!(index[0].to_be(), (2, 76));
assert_eq!(index[0].to_be(), (2, 91));

let bytes = read_bytes_from_file(&test_dir.join(TEST2_FILE_NAME))?;
debug!("read {} bytes", bytes.len());
Expand All @@ -536,16 +536,16 @@ mod tests {
let offset_pos1 = seg_sink.find_offset_position(40).await?.expect("pos");
assert_eq!(offset_pos1.get_base_offset(), 40);
assert_eq!(offset_pos1.get_pos(), 0);
assert_eq!(offset_pos1.len(), 64);
assert_eq!(offset_pos1.len(), 79);
let offset_pos2 = seg_sink.find_offset_position(42).await?.expect("pos");
assert_eq!(offset_pos2.get_base_offset(), 42);
assert_eq!(offset_pos2.get_pos(), 76);
assert_eq!(offset_pos2.len(), 64);
assert_eq!(offset_pos2.get_pos(), 91);
assert_eq!(offset_pos2.len(), 79);

let offset_pos3 = seg_sink.find_offset_position(44).await?.expect("pos");
assert_eq!(offset_pos3.get_base_offset(), 44);
assert_eq!(offset_pos3.get_pos(), 152);
assert_eq!(offset_pos3.len(), 64);
assert_eq!(offset_pos3.get_pos(), 182);
assert_eq!(offset_pos3.len(), 79);

// test whether you can send batch with non zero base offset
let mut next_batch = create_batch();
Expand Down

0 comments on commit 5791b88

Please sign in to comment.