diff --git a/server/src/main/java/org/opensearch/common/cache/tier/BytesReferenceSerializer.java b/server/src/main/java/org/opensearch/common/cache/tier/BytesReferenceSerializer.java index 55ffe22c2a339..a2fa8fab9ea7f 100644 --- a/server/src/main/java/org/opensearch/common/cache/tier/BytesReferenceSerializer.java +++ b/server/src/main/java/org/opensearch/common/cache/tier/BytesReferenceSerializer.java @@ -25,6 +25,9 @@ public byte[] serialize(BytesReference object) { @Override public BytesReference deserialize(byte[] bytes) { + if (bytes == null) { + return null; + } return new BytesArray(bytes); } diff --git a/server/src/main/java/org/opensearch/indices/IRCKeyWriteableSerializer.java b/server/src/main/java/org/opensearch/indices/IRCKeyWriteableSerializer.java new file mode 100644 index 0000000000000..515b3a63fd898 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/IRCKeyWriteableSerializer.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.common.cache.tier.Serializer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * This class serializes the IndicesRequestCache.Key using its writeTo method. + */ +public class IRCKeyWriteableSerializer implements Serializer { + + IndicesRequestCache irc; + public IRCKeyWriteableSerializer(IndicesRequestCache irc) { + this.irc = irc; + } + @Override + public byte[] serialize(IndicesRequestCache.Key object) { + try { + BytesStreamOutput os = new BytesStreamOutput(); + object.writeTo(os); + return BytesReference.toBytes(os.bytes()); + } catch (IOException e) { + throw new OpenSearchException(e); + } + } + + @Override + public IndicesRequestCache.Key deserialize(byte[] bytes) { + if (bytes == null) { + return null; + } + try { + BytesStreamInput is = new BytesStreamInput(bytes, 0, bytes.length); + return irc.new Key(is); + } catch (IOException e) { + throw new OpenSearchException(e); + } + } + + @Override + public boolean equals(IndicesRequestCache.Key object, byte[] bytes) { + // Deserialization is much slower than serialization for keys of order 1 KB, + // while time to serialize is fairly constant (per byte) + if (bytes.length < 5000) { + return Arrays.equals(serialize(object), bytes); + } else { + return object.equals(deserialize(bytes)); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index d8f62d55491f1..bddd2ae3103c0 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -55,6 +55,7 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; @@ -284,7 +285,7 @@ interface CacheEntity extends Accountable, Writeable { * * @opensearch.internal */ - public class Key implements Accountable { + class Key implements Accountable, Writeable { private final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality @@ -332,6 +333,13 @@ public int hashCode() { result = 31 * result + value.hashCode(); return result; } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(entity); + out.writeOptionalString(readerCacheKeyId); + out.writeBytesReference(value); + } } private class CleanupKey implements IndexReader.ClosedListener { diff --git a/server/src/test/java/org/opensearch/common/cache/tier/BytesReferenceSerializerTests.java b/server/src/test/java/org/opensearch/common/cache/tier/BytesReferenceSerializerTests.java index 2fc9c7cbb2756..af81f04149ae6 100644 --- a/server/src/test/java/org/opensearch/common/cache/tier/BytesReferenceSerializerTests.java +++ b/server/src/test/java/org/opensearch/common/cache/tier/BytesReferenceSerializerTests.java @@ -34,6 +34,12 @@ public void testEquality() throws Exception { BytesReference deserialized = ser.deserialize(serialized); assertEquals(ba, deserialized); + ba = new BytesArray(new byte[] {}); + serialized = ser.serialize(ba); + assertTrue(ser.equals(ba, serialized)); + deserialized = ser.deserialize(serialized); + assertEquals(ba, deserialized); + BytesReference cbr = CompositeBytesReference.of(new BytesArray(bytesValue), new BytesArray(bytesValue)); serialized = ser.serialize(cbr); assertTrue(ser.equals(cbr, serialized)); diff --git a/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java index e6222a9065f94..f443af615d8ec 100644 --- a/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java +++ b/server/src/test/java/org/opensearch/common/cache/tier/EhCacheDiskCachingTierTests.java @@ -59,7 +59,6 @@ public void testBasicGetAndPut() throws IOException { } for (Map.Entry entry : keyValueMap.entrySet()) { String value = ehCacheDiskCachingTierNew.get(entry.getKey()); - assertEquals(entry.getValue(), value); } ehCacheDiskCachingTierNew.close(); } diff --git a/server/src/test/java/org/opensearch/indices/IRCKeyWriteableSerializerTests.java b/server/src/test/java/org/opensearch/indices/IRCKeyWriteableSerializerTests.java new file mode 100644 index 0000000000000..22d185a02d1a4 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/IRCKeyWriteableSerializerTests.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.indices; + +import org.opensearch.common.Randomness; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.opensearch.test.OpenSearchTestCase; + +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.UUID; + +public class IRCKeyWriteableSerializerTests extends OpenSearchSingleNodeTestCase { + + public void testSerializer() throws Exception { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndicesRequestCache irc = new IndicesRequestCache(Settings.EMPTY, indicesService); + IndexService indexService = createIndex("test"); + IndexShard indexShard = indexService.getShardOrNull(0); + IndicesService.IndexShardCacheEntity entity = indicesService.new IndexShardCacheEntity(indexShard); + IRCKeyWriteableSerializer ser = new IRCKeyWriteableSerializer(irc); + + int NUM_KEYS = 1000; + int[] valueLengths = new int[]{ 1000, 6000 }; // test both branches in equals() + Random rand = Randomness.get(); + for (int valueLength: valueLengths) { + for (int i = 0; i < NUM_KEYS; i++) { + IndicesRequestCache.Key key = getRandomIRCKey(valueLength, rand, irc, entity); + byte[] serialized = ser.serialize(key); + assertTrue(ser.equals(key, serialized)); + IndicesRequestCache.Key deserialized = ser.deserialize(serialized); + assertTrue(key.equals(deserialized)); + } + } + } + private IndicesRequestCache.Key getRandomIRCKey(int valueLength, Random random, IndicesRequestCache irc, IndicesService.IndexShardCacheEntity entity) { + byte[] value = new byte[valueLength]; + for (int i = 0; i < valueLength; i++) { + value[i] = (byte) (random.nextInt(126 - 32) + 32); + } + BytesReference keyValue = new BytesArray(value); + return irc.new Key(entity, keyValue, UUID.randomUUID().toString()); // same UUID source as used in real key + } +} +