Skip to content

Commit

Permalink
CLEANUP: Refactored `MemcachedConnection.switchoverMemcachedReplGroup…
Browse files Browse the repository at this point in the history
…()` method.
  • Loading branch information
uhm0311 authored and jhpark816 committed Dec 19, 2024
1 parent ae6878e commit e35d37a
Showing 1 changed file with 34 additions and 27 deletions.
61 changes: 34 additions & 27 deletions src/main/java/net/spy/memcached/MemcachedConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ private void updateReplConnections(List<InetSocketAddress> addrs) throws IOExcep

if (oldGroup.isDelayedSwitchover()) {
delayedSwitchoverGroups.remove(oldGroup);
switchoverMemcachedReplGroup(oldGroup.getMasterNode(), true);
switchoverMemcachedReplGroup(oldGroup, true);
}

MemcachedNode oldMasterNode = oldGroup.getMasterNode();
Expand Down Expand Up @@ -587,26 +587,29 @@ private Set<ArcusReplNodeAddress> getSlaveAddrsFromGroupAddrs(
/* ENABLE_REPLICATION end */

/* ENABLE_REPLICATION if */
private void switchoverMemcachedReplGroup(MemcachedNode node, boolean cancelNonIdempotent) {
MemcachedReplicaGroup group = node.getReplicaGroup();
private void switchoverMemcachedReplGroup(MemcachedReplicaGroup group,
boolean cancelNonIdempotent) {

if (group.getMasterCandidate() == null) {
getLogger().warn("Delay switchover because invalid group state : " + group);
return;
}

MemcachedNode oldMaster = group.getMasterNode();

/* must keep the following execution order when switchover
* - first moveOperations
* - second, queueReconnect
*
* because moves all operations
*/
if (group.getMasterNode() != null && group.getMasterCandidate() != null) {
if (((ArcusReplNodeAddress) node.getSocketAddress()).isMaster()) {
((ArcusReplKetamaNodeLocator) locator).switchoverReplGroup(group);
}
node.moveOperations(group.getMasterNode(), cancelNonIdempotent);
addedQueue.offer(group.getMasterNode());
queueReconnect(node, ReconnDelay.IMMEDIATE,
"Discarded all pending reading state operation to move operations.");
} else {
getLogger().warn("Delay switchover because invalid group state : " + group);
}
((ArcusReplKetamaNodeLocator) locator).switchoverReplGroup(group);
MemcachedNode newMaster = group.getMasterNode();

oldMaster.moveOperations(newMaster, cancelNonIdempotent);
addedQueue.offer(newMaster);
queueReconnect(oldMaster, ReconnDelay.IMMEDIATE,
"Discarded all pending reading state operation to move operations.");
}
/* ENABLE_REPLICATION end */

Expand Down Expand Up @@ -1015,8 +1018,9 @@ private void handleReads(MemcachedNode qa)
/* ENABLE_REPLICATION if */
if (currentOp != null && currentOp.getState() == OperationState.MOVING) {
((Buffer) rbuf).clear();
delayedSwitchoverGroups.remove(qa.getReplicaGroup());
switchoverMemcachedReplGroup(qa, false);
MemcachedReplicaGroup group = qa.getReplicaGroup();
delayedSwitchoverGroups.remove(group);
switchoverMemcachedReplGroup(group, false);
break;
}
/* ENABLE_REPLICATION end */
Expand All @@ -1031,10 +1035,10 @@ private void handleReads(MemcachedNode qa)
/* ENABLE_REPLICATION if */
if (arcusReplEnabled) {
if (currentOp == null) { // readQ is empty
if (qa.getReplicaGroup().isDelayedSwitchover() &&
qa.getReplicaGroup().masterNode == qa) {
delayedSwitchoverGroups.remove(qa.getReplicaGroup());
switchoverMemcachedReplGroup(qa, false);
MemcachedReplicaGroup group = qa.getReplicaGroup();
if (group.isDelayedSwitchover() && group.getMasterNode() == qa) {
delayedSwitchoverGroups.remove(group);
switchoverMemcachedReplGroup(group, false);
}
}
}
Expand Down Expand Up @@ -1179,10 +1183,10 @@ private void queueReconnect(MemcachedNode qa, ReconnDelay type, String cause) {

/* ENABLE_REPLICATION if */
if (arcusReplEnabled) {
if (qa.getReplicaGroup().isDelayedSwitchover() &&
qa.getReplicaGroup().getMasterNode() == qa) {
delayedSwitchoverGroups.remove(qa.getReplicaGroup());
switchoverMemcachedReplGroup(qa, true);
MemcachedReplicaGroup group = qa.getReplicaGroup();
if (group.isDelayedSwitchover() && group.getMasterNode() == qa) {
delayedSwitchoverGroups.remove(group);
switchoverMemcachedReplGroup(group, true);
return;
}
}
Expand Down Expand Up @@ -1800,12 +1804,15 @@ public void switchover() {
Iterator<Entry<Long, MemcachedReplicaGroup>> iterator = groups.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Long, MemcachedReplicaGroup> entry = iterator.next();
if (now < entry.getKey()) {
long switchoverTime = entry.getKey();
MemcachedReplicaGroup group = entry.getValue();

if (now < switchoverTime) {
return;
} else {
iterator.remove();
entry.getValue().setDelayedSwitchover(false);
switchoverMemcachedReplGroup(entry.getValue().getMasterNode(), true);
group.setDelayedSwitchover(false);
switchoverMemcachedReplGroup(group, true);
}
}
}
Expand Down

0 comments on commit e35d37a

Please sign in to comment.