From b8c7113753227a3d81c2307f56217b86b132ce95 Mon Sep 17 00:00:00 2001 From: ikolomi Date: Tue, 2 Jul 2024 00:02:47 +0300 Subject: [PATCH] Fix cand --- glide-core/src/rotating_buffer.rs | 9 +++++++++ glide-core/src/socket_listener.rs | 13 ++++++++++--- python/python/tests/test_async_client.py | 2 +- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/glide-core/src/rotating_buffer.rs b/glide-core/src/rotating_buffer.rs index cbd32313ed..2f40b95129 100644 --- a/glide-core/src/rotating_buffer.rs +++ b/glide-core/src/rotating_buffer.rs @@ -26,10 +26,14 @@ impl RotatingBuffer { let mut results: Vec = vec![]; let mut prev_position = 0; let buffer_len = buffer.len(); + //println!("------------- get_requests() buffer_len == {:?}", buffer_len); while prev_position < buffer_len { if let Some((request_len, bytes_read)) = u32::decode_var(&buffer[prev_position..]) { + //println!("------------- get_requests() request_len == {:?}", request_len); let start_pos = prev_position + bytes_read; if (start_pos + request_len as usize) > buffer_len { + // how its even possible? + //println!("------------- get_requests() start_pos + request_len == {:?}", start_pos + request_len as usize); break; } else { match T::parse_from_tokio_bytes( @@ -51,6 +55,11 @@ impl RotatingBuffer { } if prev_position != buffer.len() { + // save back the unconsumed bytes + println!( + "------------- get_requests() saving from {:?}", + prev_position + ); self.backing_buffer .extend_from_slice(&buffer[prev_position..]); } diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index a2a333f103..d7a7ff3eb9 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -110,9 +110,16 @@ impl UnixStreamListener { return ReadSocketClosed.into(); } Ok(_) => { - return match self.rotating_buffer.get_requests() { - Ok(requests) => ReceivedValues(requests), - Err(err) => UnhandledError(err.into()).into(), + match self.rotating_buffer.get_requests() { + Ok(requests) => { + //println!("------------- requests.size() == {:?}", requests.len()); + if !requests.is_empty() { + // continue to read from socket + return ReceivedValues(requests); + } + continue; + } + Err(err) => return UnhandledError(err.into()).into(), }; } Err(ref e) diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index eb7fd7a912..8c94d50a8e 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -4122,7 +4122,7 @@ async def test_zrevrank(self, redis_client: TGlideClient): await redis_client.zrevrank(non_existing_key, "non_existing_member") is None ) - if not check_if_server_version_lt(redis_client, "7.2.0"): + if not await check_if_server_version_lt(redis_client, "7.2.0"): assert await redis_client.zrevrank_withscore(key, "one") == [2, 1.0] assert ( await redis_client.zrevrank_withscore(key, "non_existing_member")