From 542c8c4fb7ff29d0acae935a87f3fce73e2a3a35 Mon Sep 17 00:00:00 2001 From: brido4125 Date: Wed, 8 Jan 2025 16:22:35 +0900 Subject: [PATCH] FIX: Handling abnormal swithover from zk. --- .../spy/memcached/MemcachedConnection.java | 23 ++++++++++++++++++- .../spy/memcached/MemcachedReplicaGroup.java | 13 +++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index b8cd743d9..bd734ebe6 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -485,13 +485,34 @@ private void updateReplConnections(List addrs) throws IOExcep // move operation slave -> master. taskList.add(new MoveOperationTask( oldSlaveNode, oldMasterNode, false)); + // clear the masterCandidate if the removed slave is the masterCandidate. + if (oldGroup.getMasterCandidate() == oldSlaveNode) { + oldGroup.clearMasterCandidate(); + } } } } else if (oldSlaveAddrs.contains(newMasterAddr)) { if (newSlaveAddrs.contains(oldMasterAddr)) { // Switchover - if (oldGroup.getMasterCandidate() != null) { + MemcachedNode oldMasterCandidate = oldGroup.getMasterCandidate(); + if (oldMasterCandidate != null) { + ArcusReplNodeAddress masterFromZk = (ArcusReplNodeAddress) oldGroup + .getSlaveNodeBy(newMasterAddr.getIPPort()).getSocketAddress(); changeRoleGroups.add(oldGroup); + if (!masterFromZk.isSameAddress( + ((ArcusReplNodeAddress) oldMasterCandidate.getSocketAddress()))) { + /** + * Moves ops from oldMasterCandidate set by cache server to newMasterCandidate. + * Handling the below case. + * old group : [oldMaster, oldSlave1, oldSlave2] + * old group after switchover response : + * [oldMaster, oldSlave1-masterCandidate, oldSlave2] + * new group from zk cache list: [slave1, X, newMaster] + */ + oldGroup.setMasterCandidateByAddr(newMasterAddr.getIPPort()); + taskList.add(new MoveOperationTask( + oldMasterCandidate, oldGroup.getMasterCandidate(), false)); + } } else { // ZK event occurs before cache server response. oldGroup.setMasterCandidateByAddr(newMasterAddr.getIPPort()); diff --git a/src/main/java/net/spy/memcached/MemcachedReplicaGroup.java b/src/main/java/net/spy/memcached/MemcachedReplicaGroup.java index a6dedc4d4..c4428bbcb 100644 --- a/src/main/java/net/spy/memcached/MemcachedReplicaGroup.java +++ b/src/main/java/net/spy/memcached/MemcachedReplicaGroup.java @@ -86,6 +86,10 @@ public void setMasterCandidate() { } } + public void clearMasterCandidate() { + this.masterCandidate = null; + } + public void setMasterCandidateByAddr(String address) { for (MemcachedNode node : this.getSlaveNodes()) { if (address.equals(((ArcusReplNodeAddress) node.getSocketAddress()).getIPPort())) { @@ -181,5 +185,14 @@ private MemcachedNode getNextActiveSlaveNodeNoRotate() { public static String getGroupNameFromNode(final MemcachedNode node) { return ((ArcusReplNodeAddress) node.getSocketAddress()).getGroupName(); } + + public MemcachedNode getSlaveNodeBy(String address) { + for (MemcachedNode node : this.getSlaveNodes()) { + if (address.equals(((ArcusReplNodeAddress) node.getSocketAddress()).getIPPort())) { + return node; + } + } + return null; + } } /* ENABLE_REPLICATION end */