From ede4966308fd0cc4d791d2279102cd541abac73a Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Mon, 27 Jan 2025 10:00:55 +0100 Subject: [PATCH] [proxima-direct-core] check prefix of cached scanWildcard's offset --- .../o2/proxima/core/storage/StreamElement.java | 4 ++-- .../DirectAttributeFamilyProxyDescriptor.java | 3 ++- .../core/view/LocalCachedPartitionedView.java | 6 ++++++ .../core/view/TimeBoundedVersionedCache.java | 2 +- .../view/TimeBoundedVersionedCacheTest.java | 18 +++++++++++++++--- 5 files changed, 26 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/cz/o2/proxima/core/storage/StreamElement.java b/core/src/main/java/cz/o2/proxima/core/storage/StreamElement.java index e8785c4f8..37422dea2 100644 --- a/core/src/main/java/cz/o2/proxima/core/storage/StreamElement.java +++ b/core/src/main/java/cz/o2/proxima/core/storage/StreamElement.java @@ -269,7 +269,7 @@ protected StreamElement( this.value = value; this.deleteWildcard = deleteWildcard; - Preconditions.checkArgument(this.uuid.length() > 0, "UUID must not be empty"); + Preconditions.checkArgument(!this.uuid.isEmpty(), "UUID must not be empty"); } protected StreamElement( @@ -362,7 +362,7 @@ public Optional getParsed() { return (Optional) Optional.ofNullable(parsed); } - protected void setParsed(Object parsed) { + protected final void setParsed(Object parsed) { this.parsed = parsed; } diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/DirectAttributeFamilyProxyDescriptor.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/DirectAttributeFamilyProxyDescriptor.java index bc08c4582..be3d93136 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/DirectAttributeFamilyProxyDescriptor.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/DirectAttributeFamilyProxyDescriptor.java @@ -76,7 +76,6 @@ static class AttrLookup implements Serializable { @SuppressWarnings("unchecked") AttrLookup(AttributeFamilyProxyDescriptor proxy) { - this.familyName = proxy.getName(); this.attrs = proxy.getAttributes().stream() @@ -204,6 +203,8 @@ private static Optional getPartitionedCachedView( Optional maybeWriter = getWriter(lookup, context, desc).map(AttributeWriterBase::online); if (maybeReader.isPresent() && maybeWriter.isPresent()) { + // FIXME: this mapping is wrong! + // we must avoid caching the proxy, but cache the original instead return Optional.of( new LocalCachedPartitionedView( desc.getTargetFamilyRead().getEntity(), maybeReader.get(), maybeWriter.get())); diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java index 16952d5ee..77cc1d48f 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/view/LocalCachedPartitionedView.java @@ -35,6 +35,7 @@ import cz.o2.proxima.direct.core.randomaccess.RawOffset; import cz.o2.proxima.direct.core.view.TimeBoundedVersionedCache.Payload; import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting; +import cz.o2.proxima.internal.com.google.common.base.Preconditions; import java.net.URI; import java.time.Duration; import java.util.Collection; @@ -319,6 +320,11 @@ private void scanWildcardPrefix( int limit, Consumer> consumer) { + Preconditions.checkArgument( + offset.startsWith(prefix), + "Offset must start with prefix, got offset %s prefix %s", + offset, + prefix); AtomicInteger missing = new AtomicInteger(limit); cache.scan( key, diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/view/TimeBoundedVersionedCache.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/view/TimeBoundedVersionedCache.java index d73e13b81..9d53dc769 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/view/TimeBoundedVersionedCache.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/view/TimeBoundedVersionedCache.java @@ -233,7 +233,7 @@ private void logPayloadUpdateIfNecessary( String key, String attribute, long stamp, @Nullable Object value) { if (log.isDebugEnabled()) { - AttributeDescriptor attrDesc = entity.findAttribute(attribute).orElse(null); + AttributeDescriptor attrDesc = entity.findAttribute(attribute, true).orElse(null); if (attrDesc != null) { log.debug( "Caching attribute {} for key {} at {} with payload {}", diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/view/TimeBoundedVersionedCacheTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/view/TimeBoundedVersionedCacheTest.java index 312fcdeae..08d02caf4 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/view/TimeBoundedVersionedCacheTest.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/view/TimeBoundedVersionedCacheTest.java @@ -19,9 +19,11 @@ import cz.o2.proxima.core.repository.EntityDescriptor; import cz.o2.proxima.core.repository.Repository; +import cz.o2.proxima.core.util.Optionals; import cz.o2.proxima.core.util.Pair; import cz.o2.proxima.direct.core.view.TimeBoundedVersionedCache.Payload; import cz.o2.proxima.typesafe.config.ConfigFactory; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import org.junit.Test; @@ -30,9 +32,7 @@ public class TimeBoundedVersionedCacheTest { Repository repo = Repository.ofTest(ConfigFactory.load("test-reference.conf")); - EntityDescriptor entity = - repo.findEntity("gateway") - .orElseThrow(() -> new IllegalArgumentException("Missing entity gateway")); + EntityDescriptor entity = Optionals.get(repo.findEntity("gateway")); long now = System.currentTimeMillis(); @Test @@ -45,6 +45,18 @@ public void testCachePutGet() { assertNull(cache.get("key", "attribute", now - 1)); } + @Test + public void testCacheProxyPutGet() { + entity = Optionals.get(repo.findEntity("proxied")); + TimeBoundedVersionedCache cache = new TimeBoundedVersionedCache(entity, 60_000L); + byte[] payload = "test".getBytes(StandardCharsets.UTF_8); + assertTrue(cache.put("key", "_e.test", now, 1L, false, payload)); + assertEquals(Pair.of(now, new Payload(payload, 1L, true)), cache.get("key", "_e.test", now)); + assertEquals( + Pair.of(now, new Payload(payload, 1L, true)), cache.get("key", "_e.test", now + 1)); + assertNull(cache.get("key", "_e.test", now - 1)); + } + @Test public void testCachePutGetWithCorrectAttribute() { TimeBoundedVersionedCache cache = new TimeBoundedVersionedCache(entity, 60_000L);