diff --git a/src/protocol/record.rs b/src/protocol/record.rs index 09832103..9f4a95f3 100644 --- a/src/protocol/record.rs +++ b/src/protocol/record.rs @@ -630,7 +630,8 @@ where // There are "normal" compression libs, and there is Java // See https://github.com/edenhill/librdkafka/blob/2b76b65212e5efda213961d5f84e565038036270/src/rdkafka_msgset_reader.c#L307-L318 let output = if input.starts_with(JAVA_MAGIC) { - let mut cursor = Cursor::new(&input[JAVA_MAGIC.len()..]); + let cursor_content = &input[JAVA_MAGIC.len()..]; + let mut cursor = Cursor::new(cursor_content); let mut buf_version = [0u8; 4]; cursor.read_exact(&mut buf_version)?; @@ -653,6 +654,11 @@ where let mut buf_chunk_length = [0u8; 4]; cursor.read_exact(&mut buf_chunk_length)?; let chunk_length = u32::from_be_bytes(buf_chunk_length) as usize; + let bytes_left = cursor_content.len() - (cursor.position() as usize); + if chunk_length > bytes_left { + // do NOT try to allocate massive buffer for `chunk_data` but instead fail early + return Err(ReadError::Malformed(format!("Java-specific Snappy-compressed data has illegal chunk length, got {chunk_length} bytes but only {bytes_left} bytes are left.").into())); + } let mut chunk_data = vec![0u8; chunk_length]; cursor.read_exact(&mut chunk_data)?; @@ -1312,6 +1318,26 @@ mod tests { assert_eq!(actual2, expected); } + #[test] + fn test_decode_java_specific_oom() { + // Found by the fuzzer, this should return an error instead of OOM. + let data = [ + 0x0a, 0x0a, 0x83, 0x00, 0xd4, 0x00, 0x00, 0x22, 0x00, 0x4b, 0x08, 0xd2, 0x22, 0xfb, + 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x9b, 0x00, 0x9b, 0x0a, 0x40, + 0x00, 0x00, 0x4b, 0x00, 0x00, 0x00, 0x40, 0x30, 0x00, 0x57, 0x00, 0xd3, 0x82, 0x53, + 0x4e, 0x41, 0x50, 0x50, 0x59, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x03, 0x01, 0x00, 0x00, 0xfc, 0x00, 0x09, 0x09, 0x09, 0x09, 0x09, + 0x09, 0x50, 0x50, 0x59, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x30, 0x00, 0x57, 0x00, 0x00, 0x80, + 0x00, 0x00, 0x00, 0x00, 0xb0, 0x9b, 0x00, + ] + .to_vec(); + + let err = RecordBatchBody::read(&mut Cursor::new(data)).unwrap_err(); + assert_matches!(err, ReadError::Malformed(_)); + assert_eq!(err.to_string(), "Malformed data: Java-specific Snappy-compressed data has illegal chunk length, got 4227860745 bytes but only 38 bytes are left."); + } + #[test] fn test_carefully_decompress_snappy_empty_input() { let err = carefully_decompress_snappy(&[], 1).unwrap_err();