Skip to content

Commit

Permalink
Add tests to verify symmetric usage of scan cursor id.
Browse files Browse the repository at this point in the history
See: #2796
  • Loading branch information
christophstrobl committed Dec 13, 2023
1 parent 6e0f30b commit 432d3d9
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ protected ScanIteration<byte[]> doScan(long cursorId, ScanOptions options) {
}

if (type != null) {
result = connection.getJedis().scan(Long.toUnsignedString(cursorId).getBytes(), params, type);
result = connection.getJedis().scan(JedisConverters.toBytes(Long.toUnsignedString(cursorId)), params, type);
} else {
result = connection.getJedis().scan(Long.toUnsignedString(cursorId).getBytes(), params);
result = connection.getJedis().scan(JedisConverters.toBytes(Long.toUnsignedString(cursorId)), params);
}

return new ScanIteration<>(Long.parseUnsignedLong(result.getCursor()), result.getResult());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,22 @@

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import org.mockito.ArgumentCaptor;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.AbstractConnectionUnitTestBase;
import org.springframework.data.redis.connection.RedisServerCommands.ShutdownOption;
import org.springframework.data.redis.connection.zset.Tuple;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.KeyScanOptions;
import org.springframework.data.redis.core.ScanOptions;

/**
Expand Down Expand Up @@ -179,6 +182,23 @@ public void scanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
verify(jedisSpy, times(1)).disconnect();
}

@Test // GH-2796
void scanShouldOperateUponUnsigned64BitCursorId() {

String cursorId = "9286422431637962824";
ArgumentCaptor<byte[]> captor = ArgumentCaptor.forClass(byte[].class);
doReturn(new ScanResult<>(cursorId, List.of("spring".getBytes()))).when(jedisSpy).scan(any(byte[].class),
any(ScanParams.class));

Cursor<byte[]> cursor = connection.scan(KeyScanOptions.NONE);
cursor.next(); // initial value
assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId));

cursor.next(); // fetch next
verify(jedisSpy, times(2)).scan(captor.capture(), any(ScanParams.class));
assertThat(captor.getAllValues()).map(String::new).containsExactly("0", cursorId);
}

@Test // DATAREDIS-531
public void sScanShouldKeepTheConnectionOpen() {

Expand All @@ -202,6 +222,23 @@ public void sScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
verify(jedisSpy, times(1)).disconnect();
}

@Test // GH-2796
void sScanShouldOperateUponUnsigned64BitCursorId() {

String cursorId = "9286422431637962824";
ArgumentCaptor<byte[]> captor = ArgumentCaptor.forClass(byte[].class);
doReturn(new ScanResult<>(cursorId, List.of("spring".getBytes()))).when(jedisSpy).sscan(any(byte[].class),
any(byte[].class), any(ScanParams.class));

Cursor<byte[]> cursor = connection.setCommands().sScan("spring".getBytes(), ScanOptions.NONE);
cursor.next(); // initial value
assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId));

cursor.next(); // fetch next
verify(jedisSpy, times(2)).sscan(any(byte[].class), captor.capture(), any(ScanParams.class));
assertThat(captor.getAllValues()).map(String::new).containsExactly("0", cursorId);
}

@Test // DATAREDIS-531
public void zScanShouldKeepTheConnectionOpen() {

Expand All @@ -225,6 +262,23 @@ public void zScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
verify(jedisSpy, times(1)).disconnect();
}

@Test // GH-2796
void zScanShouldOperateUponUnsigned64BitCursorId() {

String cursorId = "9286422431637962824";
ArgumentCaptor<byte[]> captor = ArgumentCaptor.forClass(byte[].class);
doReturn(new ScanResult<>(cursorId, List.of(new redis.clients.jedis.resps.Tuple("spring", 1D)))).when(jedisSpy).zscan(any(byte[].class),
any(byte[].class), any(ScanParams.class));

Cursor<Tuple> cursor = connection.zSetCommands().zScan("spring".getBytes(), ScanOptions.NONE);
cursor.next(); // initial value
assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId));

cursor.next(); // fetch next
verify(jedisSpy, times(2)).zscan(any(byte[].class), captor.capture(), any(ScanParams.class));
assertThat(captor.getAllValues()).map(String::new).containsExactly("0", cursorId);
}

@Test // DATAREDIS-531
public void hScanShouldKeepTheConnectionOpen() {

Expand All @@ -248,6 +302,23 @@ public void hScanShouldCloseTheConnectionWhenCursorIsClosed() throws IOException
verify(jedisSpy, times(1)).disconnect();
}

@Test // GH-2796
void hScanShouldOperateUponUnsigned64BitCursorId() {

String cursorId = "9286422431637962824";
ArgumentCaptor<byte[]> captor = ArgumentCaptor.forClass(byte[].class);
doReturn(new ScanResult<>(cursorId, List.of(Map.entry("spring".getBytes(), "data".getBytes())))).when(jedisSpy).hscan(any(byte[].class),
any(byte[].class), any(ScanParams.class));

Cursor<Entry<byte[], byte[]>> cursor = connection.hashCommands().hScan("spring".getBytes(), ScanOptions.NONE);
cursor.next(); // initial value
assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId));

cursor.next(); // fetch next
verify(jedisSpy, times(2)).hscan(any(byte[].class), captor.capture(), any(ScanParams.class));
assertThat(captor.getAllValues()).map(String::new).containsExactly("0", cursorId);
}

@Test // DATAREDIS-714
void doesNotSelectDbWhenCurrentDbMatchesDesiredOne() {

Expand Down Expand Up @@ -369,6 +440,29 @@ public void hScanShouldCloseTheConnectionWhenCursorIsClosed() {
.isThrownBy(() -> super.hScanShouldCloseTheConnectionWhenCursorIsClosed());
}

@Test
@Disabled("scan not supported in pipeline")
void scanShouldOperateUponUnsigned64BitCursorId() {

}

@Test
@Disabled("scan not supported in pipeline")
void sScanShouldOperateUponUnsigned64BitCursorId() {

}

@Test
@Disabled("scan not supported in pipeline")
void zScanShouldOperateUponUnsigned64BitCursorId() {

}

@Test
@Disabled("scan not supported in pipeline")
void hScanShouldOperateUponUnsigned64BitCursorId() {

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,26 @@
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.XAddArgs;
import io.lettuce.core.XClaimArgs;
import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.output.ScanOutput;
import io.lettuce.core.output.StatusOutput;
import io.lettuce.core.protocol.AsyncCommand;
import io.lettuce.core.protocol.Command;
import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.CommandType;

import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
Expand All @@ -48,6 +50,9 @@
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.zset.Tuple;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.KeyScanOptions;
import org.springframework.test.util.ReflectionTestUtils;

/**
Expand Down Expand Up @@ -247,6 +252,146 @@ void xaddShouldHonorNoMkStream() {
assertThat(args.getValue()).extracting("nomkstream").isEqualTo(true);
}

@Test // GH-2796
void scanShouldOperateUponUnsigned64BitCursorId() {

String cursorId = "9286422431637962824";
KeyScanCursor<byte[]> sc = new KeyScanCursor<>() {
@Override
public List<byte[]> getKeys() {
return List.of("spring".getBytes());
}
};
sc.setCursor(cursorId);
sc.setFinished(false);

Command<byte[], byte[], KeyScanCursor<byte[]>> command = new Command<>(new LettuceConnection.CustomCommandType("SCAN"),
new ScanOutput<>(ByteArrayCodec.INSTANCE, sc) {
@Override
protected void setOutput(ByteBuffer bytes) {

}
});
AsyncCommand<byte[], byte[], KeyScanCursor<byte[]>> future = new AsyncCommand<>(command);
future.complete();

when(asyncCommandsMock.scan(any(ScanCursor.class),any(ScanArgs.class))).thenReturn(future, future);

Cursor<byte[]> cursor = connection.scan(KeyScanOptions.NONE);
cursor.next(); //initial
assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId));

cursor.next(); // fetch next
ArgumentCaptor<ScanCursor> captor = ArgumentCaptor.forClass(ScanCursor.class);
verify(asyncCommandsMock, times(2)).scan(captor.capture(), any(ScanArgs.class));
assertThat(captor.getAllValues()).map(ScanCursor::getCursor).containsExactly("0", cursorId);
}

@Test // GH-2796
void sScanShouldOperateUponUnsigned64BitCursorId() {

String cursorId = "9286422431637962824";
ValueScanCursor<byte[]> sc = new ValueScanCursor<>() {
@Override
public List<byte[]> getValues() {
return List.of("spring".getBytes());
}
};
sc.setCursor(cursorId);
sc.setFinished(false);

Command<byte[], byte[], ValueScanCursor<byte[]>> command = new Command<>(new LettuceConnection.CustomCommandType("SSCAN"),
new ScanOutput<>(ByteArrayCodec.INSTANCE, sc) {
@Override
protected void setOutput(ByteBuffer bytes) {

}
});
AsyncCommand<byte[], byte[], ValueScanCursor<byte[]>> future = new AsyncCommand<>(command);
future.complete();

when(asyncCommandsMock.sscan(any(byte[].class), any(ScanCursor.class),any(ScanArgs.class))).thenReturn(future, future);

Cursor<byte[]> cursor = connection.setCommands().sScan("key".getBytes(), KeyScanOptions.NONE);
cursor.next(); //initial
assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId));

cursor.next(); // fetch next
ArgumentCaptor<ScanCursor> captor = ArgumentCaptor.forClass(ScanCursor.class);
verify(asyncCommandsMock, times(2)).sscan(any(byte[].class), captor.capture(), any(ScanArgs.class));
assertThat(captor.getAllValues()).map(ScanCursor::getCursor).containsExactly("0", cursorId);
}

@Test // GH-2796
void zScanShouldOperateUponUnsigned64BitCursorId() {

String cursorId = "9286422431637962824";
ScoredValueScanCursor<byte[]> sc = new ScoredValueScanCursor<>() {
@Override
public List<ScoredValue<byte[]>> getValues() {
return List.of(ScoredValue.just(10D, "spring".getBytes()));
}
};
sc.setCursor(cursorId);
sc.setFinished(false);

Command<byte[], byte[], ScoredValueScanCursor<byte[]>> command = new Command<>(new LettuceConnection.CustomCommandType("ZSCAN"),
new ScanOutput<>(ByteArrayCodec.INSTANCE, sc) {
@Override
protected void setOutput(ByteBuffer bytes) {

}
});
AsyncCommand<byte[], byte[], ScoredValueScanCursor<byte[]>> future = new AsyncCommand<>(command);
future.complete();

when(asyncCommandsMock.zscan(any(byte[].class), any(ScanCursor.class),any(ScanArgs.class))).thenReturn(future, future);

Cursor<Tuple> cursor = connection.zSetCommands().zScan("key".getBytes(), KeyScanOptions.NONE);
cursor.next(); //initial
assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId));

cursor.next(); // fetch next
ArgumentCaptor<ScanCursor> captor = ArgumentCaptor.forClass(ScanCursor.class);
verify(asyncCommandsMock, times(2)).zscan(any(byte[].class), captor.capture(), any(ScanArgs.class));
assertThat(captor.getAllValues()).map(ScanCursor::getCursor).containsExactly("0", cursorId);
}

@Test // GH-2796
void hScanShouldOperateUponUnsigned64BitCursorId() {

String cursorId = "9286422431637962824";
MapScanCursor<byte[], byte[]> sc = new MapScanCursor<>() {
@Override
public Map<byte[], byte[]> getMap() {
return Map.of("spring".getBytes(), "data".getBytes());
}
};
sc.setCursor(cursorId);
sc.setFinished(false);

Command<byte[], byte[], MapScanCursor<byte[], byte[]>> command = new Command<>(new LettuceConnection.CustomCommandType("HSCAN"),
new ScanOutput<>(ByteArrayCodec.INSTANCE, sc) {
@Override
protected void setOutput(ByteBuffer bytes) {

}
});
AsyncCommand<byte[], byte[], MapScanCursor<byte[], byte[]>> future = new AsyncCommand<>(command);
future.complete();

when(asyncCommandsMock.hscan(any(byte[].class), any(ScanCursor.class),any(ScanArgs.class))).thenReturn(future, future);

Cursor<Entry<byte[], byte[]>> cursor = connection.hashCommands().hScan("key".getBytes(), KeyScanOptions.NONE);
cursor.next(); //initial
assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId));

cursor.next(); // fetch next
ArgumentCaptor<ScanCursor> captor = ArgumentCaptor.forClass(ScanCursor.class);
verify(asyncCommandsMock, times(2)).hscan(any(byte[].class), captor.capture(), any(ScanArgs.class));
assertThat(captor.getAllValues()).map(ScanCursor::getCursor).containsExactly("0", cursorId);
}

}

public static class LettucePipelineConnectionUnitTests extends BasicUnitTests {
Expand Down Expand Up @@ -304,5 +449,29 @@ public void getClientNameShouldSendRequestCorrectly() {
connection.getClientName();
verify(asyncCommandsMock).clientGetname();
}

@Test
@Disabled("scan not supported in pipeline")
void scanShouldOperateUponUnsigned64BitCursorId() {

}

@Test
@Disabled("scan not supported in pipeline")
void sScanShouldOperateUponUnsigned64BitCursorId() {

}

@Test
@Disabled("scan not supported in pipeline")
void zScanShouldOperateUponUnsigned64BitCursorId() {

}

@Test
@Disabled("scan not supported in pipeline")
void hScanShouldOperateUponUnsigned64BitCursorId() {

}
}
}

0 comments on commit 432d3d9

Please sign in to comment.