Skip to content

Commit

Permalink
Merge pull request #957: [proxima-direct-core] check prefix of cached…
Browse files Browse the repository at this point in the history
… scanWildcard's offset
  • Loading branch information
je-ik authored Jan 27, 2025
2 parents f379cac + ede4966 commit c5f1306
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ protected StreamElement(
this.value = value;
this.deleteWildcard = deleteWildcard;

Preconditions.checkArgument(this.uuid.length() > 0, "UUID must not be empty");
Preconditions.checkArgument(!this.uuid.isEmpty(), "UUID must not be empty");
}

protected StreamElement(
Expand Down Expand Up @@ -362,7 +362,7 @@ public <T> Optional<T> getParsed() {
return (Optional<T>) Optional.ofNullable(parsed);
}

protected void setParsed(Object parsed) {
protected final void setParsed(Object parsed) {
this.parsed = parsed;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ static class AttrLookup implements Serializable {

@SuppressWarnings("unchecked")
AttrLookup(AttributeFamilyProxyDescriptor proxy) {

this.familyName = proxy.getName();
this.attrs =
proxy.getAttributes().stream()
Expand Down Expand Up @@ -204,6 +203,8 @@ private static Optional<CachedView> getPartitionedCachedView(
Optional<OnlineAttributeWriter> maybeWriter =
getWriter(lookup, context, desc).map(AttributeWriterBase::online);
if (maybeReader.isPresent() && maybeWriter.isPresent()) {
// FIXME: this mapping is wrong!
// we must avoid caching the proxy, but cache the original instead
return Optional.of(
new LocalCachedPartitionedView(
desc.getTargetFamilyRead().getEntity(), maybeReader.get(), maybeWriter.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import cz.o2.proxima.direct.core.randomaccess.RawOffset;
import cz.o2.proxima.direct.core.view.TimeBoundedVersionedCache.Payload;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
Expand Down Expand Up @@ -319,6 +320,11 @@ private void scanWildcardPrefix(
int limit,
Consumer<KeyValue<?>> consumer) {

Preconditions.checkArgument(
offset.startsWith(prefix),
"Offset must start with prefix, got offset %s prefix %s",
offset,
prefix);
AtomicInteger missing = new AtomicInteger(limit);
cache.scan(
key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private void logPayloadUpdateIfNecessary(
String key, String attribute, long stamp, @Nullable Object value) {

if (log.isDebugEnabled()) {
AttributeDescriptor<Object> attrDesc = entity.findAttribute(attribute).orElse(null);
AttributeDescriptor<Object> attrDesc = entity.findAttribute(attribute, true).orElse(null);
if (attrDesc != null) {
log.debug(
"Caching attribute {} for key {} at {} with payload {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import cz.o2.proxima.core.repository.EntityDescriptor;
import cz.o2.proxima.core.repository.Repository;
import cz.o2.proxima.core.util.Optionals;
import cz.o2.proxima.core.util.Pair;
import cz.o2.proxima.direct.core.view.TimeBoundedVersionedCache.Payload;
import cz.o2.proxima.typesafe.config.ConfigFactory;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
Expand All @@ -30,9 +32,7 @@
public class TimeBoundedVersionedCacheTest {

Repository repo = Repository.ofTest(ConfigFactory.load("test-reference.conf"));
EntityDescriptor entity =
repo.findEntity("gateway")
.orElseThrow(() -> new IllegalArgumentException("Missing entity gateway"));
EntityDescriptor entity = Optionals.get(repo.findEntity("gateway"));
long now = System.currentTimeMillis();

@Test
Expand All @@ -45,6 +45,18 @@ public void testCachePutGet() {
assertNull(cache.get("key", "attribute", now - 1));
}

@Test
public void testCacheProxyPutGet() {
entity = Optionals.get(repo.findEntity("proxied"));
TimeBoundedVersionedCache cache = new TimeBoundedVersionedCache(entity, 60_000L);
byte[] payload = "test".getBytes(StandardCharsets.UTF_8);
assertTrue(cache.put("key", "_e.test", now, 1L, false, payload));
assertEquals(Pair.of(now, new Payload(payload, 1L, true)), cache.get("key", "_e.test", now));
assertEquals(
Pair.of(now, new Payload(payload, 1L, true)), cache.get("key", "_e.test", now + 1));
assertNull(cache.get("key", "_e.test", now - 1));
}

@Test
public void testCachePutGetWithCorrectAttribute() {
TimeBoundedVersionedCache cache = new TimeBoundedVersionedCache(entity, 60_000L);
Expand Down

0 comments on commit c5f1306

Please sign in to comment.