From 132254cb7cfe80b3efd1b69a56cadfd0681e3c6a Mon Sep 17 00:00:00 2001 From: brido4125 Date: Mon, 28 Oct 2024 14:48:51 +0900 Subject: [PATCH] FIX: Send flush op to master node only. --- src/main/java/net/spy/memcached/ArcusClient.java | 3 ++- .../memcached/ArcusReplKetamaNodeLocator.java | 10 ++++++---- .../java/net/spy/memcached/MemcachedClient.java | 16 +++++++++++++++- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index 395f6401a..f4c7c4abc 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -1902,7 +1902,8 @@ public OperationFuture flush(final String prefix) { @Override public OperationFuture flush(final String prefix, final int delay) { - Collection nodes = getAllNodes(); + Collection nodes = getFlushNodes(); + final BroadcastFuture rv = new BroadcastFuture<>(operationTimeout, Boolean.TRUE, nodes.size()); final Map opsMap = new HashMap<>(); diff --git a/src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java b/src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java index 55b0bfd2d..f8568a42f 100644 --- a/src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java +++ b/src/main/java/net/spy/memcached/ArcusReplKetamaNodeLocator.java @@ -32,6 +32,7 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -41,7 +42,7 @@ public final class ArcusReplKetamaNodeLocator extends SpyObject implements NodeLocator { private final TreeMap> ketamaGroups; - private final HashMap allGroups; + private final ConcurrentHashMap allGroups; private final Collection allNodes; /* ENABLE_MIGRATION if */ @@ -67,7 +68,7 @@ public ArcusReplKetamaNodeLocator(List nodes) { super(); allNodes = nodes; ketamaGroups = new TreeMap<>(); - allGroups = new HashMap<>(); + allGroups = new ConcurrentHashMap<>(); // create all memcached replica group for (MemcachedNode node : nodes) { @@ -103,7 +104,7 @@ public ArcusReplKetamaNodeLocator(List nodes) { } private ArcusReplKetamaNodeLocator(TreeMap> kg, - HashMap ag, + ConcurrentHashMap ag, Collection an) { super(); ketamaGroups = kg; @@ -208,7 +209,8 @@ public NodeLocator getReadonlyCopy() { lock.lock(); try { TreeMap> ketamaCopy = new TreeMap<>(); - HashMap groupsCopy = new HashMap<>(allGroups.size()); + ConcurrentHashMap groupsCopy + = new ConcurrentHashMap<>(allGroups.size()); Collection nodesCopy = new ArrayList<>(allNodes.size()); // Rewrite the values a copy of the map diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index ac076ad6b..e2c09c010 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -39,6 +39,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import net.spy.memcached.auth.AuthDescriptor; import net.spy.memcached.auth.AuthThreadMonitor; @@ -1880,7 +1881,8 @@ public void complete() { * is too full to accept any more requests */ public Future flush(final int delay) { - Collection nodes = getAllNodes(); + Collection nodes = getFlushNodes(); + final BroadcastFuture rv = new BroadcastFuture<>(operationTimeout, Boolean.TRUE, nodes.size()); final Map opsMap = new HashMap<>(); @@ -2148,6 +2150,18 @@ protected Collection getAllNodes() { return conn.getLocator().getAll(); } + protected Collection getFlushNodes() { + /* ENABLE_REPLICATION if */ + if (conn.getArcusReplEnabled()) { + return ((ArcusReplKetamaNodeLocator) getNodeLocator()).getAllGroups().values() + .stream() + .map(MemcachedReplicaGroup::getMasterNode) + .collect(Collectors.toList()); + } + /* ENABLE_REPLICATION end */ + return conn.getLocator().getAll(); + } + /** * Turn the list of keys into groups of keys. * All keys in a group belong to the same memcached server.