From 495d493cf8ee6bb405535cf129f60dcfe80c41a1 Mon Sep 17 00:00:00 2001 From: "Venkata Sairam, Madduri" Date: Thu, 2 Jul 2020 17:45:12 +0530 Subject: [PATCH] Sync clustered cache state on passives with newly promoted active --- .../internal/messages/EhcacheMessageType.java | 7 +- .../common/internal/messages/BaseCodec.java | 2 + ...ClusterTierManagerServerEntityService.java | 7 +- .../server/EhcacheExecutionStrategy.java | 3 + .../internal/messages/EhcacheServerCodec.java | 15 ++++- .../ReconnectPassiveReplicationMessage.java | 51 +++++++++++++++ ...connectPassiveReplicationMessageCodec.java | 64 +++++++++++++++++++ .../server/store/ClusterTierActiveEntity.java | 53 +++++++++++---- .../store/ClusterTierPassiveEntity.java | 40 +++++++++++- .../store/ClusterTierServerEntityService.java | 3 +- .../messages/EhcacheServerCodecTest.java | 5 +- 11 files changed, 230 insertions(+), 20 deletions(-) create mode 100644 clustered/server/entity/src/main/java/org/ehcache/clustered/server/internal/messages/ReconnectPassiveReplicationMessage.java create mode 100644 clustered/server/entity/src/main/java/org/ehcache/clustered/server/internal/messages/ReconnectPassiveReplicationMessageCodec.java diff --git a/clustered/common-api/src/main/java/org/ehcache/clustered/common/internal/messages/EhcacheMessageType.java b/clustered/common-api/src/main/java/org/ehcache/clustered/common/internal/messages/EhcacheMessageType.java index ab1b47ef26..b69d1ce0da 100644 --- a/clustered/common-api/src/main/java/org/ehcache/clustered/common/internal/messages/EhcacheMessageType.java +++ b/clustered/common-api/src/main/java/org/ehcache/clustered/common/internal/messages/EhcacheMessageType.java @@ -54,7 +54,8 @@ public enum EhcacheMessageType { // Passive replication messages CHAIN_REPLICATION_OP, CLEAR_INVALIDATION_COMPLETE, - INVALIDATION_COMPLETE; + INVALIDATION_COMPLETE, + RECONNECT_PASSIVE_REPLICATION_MESSAGE; public static final EnumSet LIFECYCLE_MESSAGES = of(VALIDATE, VALIDATE_SERVER_STORE, PREPARE_FOR_DESTROY); public static boolean isLifecycleMessage(EhcacheMessageType value) { @@ -92,4 +93,8 @@ public static boolean isTrackedOperationMessage(EhcacheMessageType value) { public static boolean isPassiveReplicationMessage(EhcacheMessageType value) { return PASSIVE_REPLICATION_MESSAGES.contains(value); } + + public static boolean isReconnectPassiveReplicationMessage(EhcacheMessageType type) { + return RECONNECT_PASSIVE_REPLICATION_MESSAGE.equals(type); + } } diff --git a/clustered/common/src/main/java/org/ehcache/clustered/common/internal/messages/BaseCodec.java b/clustered/common/src/main/java/org/ehcache/clustered/common/internal/messages/BaseCodec.java index d7f8affffe..4f40b9e6dd 100644 --- a/clustered/common/src/main/java/org/ehcache/clustered/common/internal/messages/BaseCodec.java +++ b/clustered/common/src/main/java/org/ehcache/clustered/common/internal/messages/BaseCodec.java @@ -35,6 +35,7 @@ import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.ITERATOR_OPEN; import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.LOCK; import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.PUT_IF_ABSENT; +import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.RECONNECT_PASSIVE_REPLICATION_MESSAGE; import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.REPLACE; import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.UNLOCK; import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.VALIDATE; @@ -85,6 +86,7 @@ public class BaseCodec { .mapping(CHAIN_REPLICATION_OP, 61) .mapping(CLEAR_INVALIDATION_COMPLETE, 63) .mapping(INVALIDATION_COMPLETE, 64) + .mapping(RECONNECT_PASSIVE_REPLICATION_MESSAGE, 70) .build(); public static final String RESPONSE_TYPE_FIELD_NAME = "opCode"; diff --git a/clustered/server/entity/src/main/java/org/ehcache/clustered/server/ClusterTierManagerServerEntityService.java b/clustered/server/entity/src/main/java/org/ehcache/clustered/server/ClusterTierManagerServerEntityService.java index 92717bdfa3..285bc9150f 100644 --- a/clustered/server/entity/src/main/java/org/ehcache/clustered/server/ClusterTierManagerServerEntityService.java +++ b/clustered/server/entity/src/main/java/org/ehcache/clustered/server/ClusterTierManagerServerEntityService.java @@ -29,6 +29,7 @@ import org.ehcache.clustered.common.internal.messages.StateRepositoryOpCodec; import org.ehcache.clustered.server.internal.messages.EhcacheServerCodec; import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessageCodec; +import org.ehcache.clustered.server.internal.messages.ReconnectPassiveReplicationMessageCodec; import org.ehcache.clustered.server.management.Management; import org.ehcache.clustered.server.state.EhcacheStateService; import org.ehcache.clustered.server.state.config.EhcacheStateServiceConfig; @@ -38,7 +39,6 @@ import org.terracotta.entity.EntityServerService; import org.terracotta.entity.ExecutionStrategy; import org.terracotta.entity.MessageCodec; -import org.terracotta.entity.MessageCodecException; import org.terracotta.entity.ServiceException; import org.terracotta.entity.ServiceRegistry; import org.terracotta.entity.SyncMessageCodec; @@ -99,7 +99,10 @@ public ConcurrencyStrategy getConcurrencyStrategy(byte[] c public MessageCodec getMessageCodec() { EhcacheCodec ehcacheCodec = new EhcacheCodec(new ServerStoreOpCodec(), new LifeCycleMessageCodec(CONFIG_CODEC), new StateRepositoryOpCodec(), new ResponseCodec()); - return new EhcacheServerCodec(ehcacheCodec, new PassiveReplicationMessageCodec()); + ReconnectPassiveReplicationMessageCodec reconnectPassiveReplicationMessageCodec = + new ReconnectPassiveReplicationMessageCodec(); + return new EhcacheServerCodec(ehcacheCodec, new PassiveReplicationMessageCodec(), + reconnectPassiveReplicationMessageCodec); } @Override diff --git a/clustered/server/entity/src/main/java/org/ehcache/clustered/server/EhcacheExecutionStrategy.java b/clustered/server/entity/src/main/java/org/ehcache/clustered/server/EhcacheExecutionStrategy.java index c5cba5f45c..a7667c568c 100644 --- a/clustered/server/entity/src/main/java/org/ehcache/clustered/server/EhcacheExecutionStrategy.java +++ b/clustered/server/entity/src/main/java/org/ehcache/clustered/server/EhcacheExecutionStrategy.java @@ -22,6 +22,7 @@ import org.ehcache.clustered.common.internal.messages.ServerStoreOpMessage; import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage; import org.ehcache.clustered.server.internal.messages.EhcacheSyncMessage; +import org.ehcache.clustered.server.internal.messages.ReconnectPassiveReplicationMessage; import org.terracotta.entity.ExecutionStrategy; /** @@ -52,6 +53,8 @@ public Location getExecutionLocation(EhcacheEntityMessage message) { return Location.PASSIVE; } else if (message instanceof EhcacheSyncMessage) { throw new AssertionError("Unexpected use of ExecutionStrategy for sync messages"); + } else if (message instanceof ReconnectPassiveReplicationMessage) { + return getExecutionLocation(((ReconnectPassiveReplicationMessage)message).getRequest()); } throw new AssertionError("Unknown message type: " + message.getClass()); } diff --git a/clustered/server/entity/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheServerCodec.java b/clustered/server/entity/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheServerCodec.java index 46ce38330b..4019a9b681 100644 --- a/clustered/server/entity/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheServerCodec.java +++ b/clustered/server/entity/src/main/java/org/ehcache/clustered/server/internal/messages/EhcacheServerCodec.java @@ -30,6 +30,7 @@ import static java.nio.ByteBuffer.wrap; import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isPassiveReplicationMessage; +import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isReconnectPassiveReplicationMessage; /** * EhcacheServerCodec @@ -40,10 +41,12 @@ public class EhcacheServerCodec implements MessageCodec encoder = RECONNECT_PASSIVE_REPLICATION_MESSAGE_STRUCT.encoder(); + encoder.int64(SEQUENCE_ID, replicationMessage.getSequenceId()); + encoder.int64(SOURCE_ID, replicationMessage.getClientSourceId()); + encoder.int64(TRANSACTION_ID, replicationMessage.getTransactionId()); + encoder.byteBuffer(REQUEST, ByteBuffer.wrap(ehcacheServerCodec.encodeMessage(replicationMessage.getRequest()))); + try { + encoder.byteBuffer(RESPONSE, ByteBuffer.wrap(ehcacheServerCodec.encodeResponse(replicationMessage.getResponse()))); + } catch (MessageCodecException e) { + throw new RuntimeException(e); + } + + return encoder.encode().array(); + } + + public ReconnectPassiveReplicationMessage decode(ByteBuffer byteBuffer, EhcacheServerCodec ehcacheServerCodec) { + final StructDecoder decoder = RECONNECT_PASSIVE_REPLICATION_MESSAGE_STRUCT.decoder(byteBuffer); + if (decoder != null) { + Long sequenceId = decoder.int64(SEQUENCE_ID); + Long clientSourceId = decoder.int64(SOURCE_ID); + Long transactionId = decoder.int64(TRANSACTION_ID); + EhcacheEntityMessage request = ehcacheServerCodec.decodeMessage(decoder.byteBuffer(REQUEST).array()); + EhcacheEntityResponse response = null; + try { + response = ehcacheServerCodec.decodeResponse(decoder.byteBuffer(RESPONSE).array()); + } catch (MessageCodecException e) { + throw new RuntimeException(e); + } + + return new ReconnectPassiveReplicationMessage(sequenceId, clientSourceId, transactionId, request, response); + } + + return null; + } +} diff --git a/clustered/server/entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierActiveEntity.java b/clustered/server/entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierActiveEntity.java index 183f17d7d5..54b1acc398 100644 --- a/clustered/server/entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierActiveEntity.java +++ b/clustered/server/entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierActiveEntity.java @@ -57,6 +57,7 @@ import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage; import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage.ClearInvalidationCompleteMessage; import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage.InvalidationCompleteMessage; +import org.ehcache.clustered.server.internal.messages.ReconnectPassiveReplicationMessage; import org.ehcache.clustered.server.management.ClusterTierManagement; import org.ehcache.clustered.server.offheap.InternalChain; import org.ehcache.clustered.server.state.EhcacheStateContext; @@ -67,6 +68,7 @@ import org.slf4j.LoggerFactory; import org.terracotta.client.message.tracker.OOOMessageHandler; import org.terracotta.client.message.tracker.OOOMessageHandlerConfiguration; +import org.terracotta.client.message.tracker.RecordedMessage; import org.terracotta.entity.ActiveInvokeContext; import org.terracotta.entity.ActiveServerEntity; import org.terracotta.entity.BasicServiceConfiguration; @@ -79,6 +81,7 @@ import org.terracotta.entity.InvokeContext; import org.terracotta.entity.MessageCodecException; import org.terracotta.entity.PassiveSynchronizationChannel; +import org.terracotta.entity.ReconnectRejectedException; import org.terracotta.entity.ServiceException; import org.terracotta.entity.ServiceRegistry; import org.terracotta.entity.StateDumpCollector; @@ -105,6 +108,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.stream.Collectors.toMap; @@ -123,6 +127,7 @@ import static org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.serverInvalidateHash; import static org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.success; import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isLifecycleMessage; +import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isReconnectPassiveReplicationMessage; import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isStateRepoOperationMessage; import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isStoreOperationMessage; import static org.ehcache.clustered.server.ConcurrencyStrategies.DEFAULT_KEY; @@ -331,6 +336,8 @@ private EhcacheEntityResponse invokeActiveInternal(InvokeContext context, Ehcach return invokeLifeCycleOperation(context, (LifecycleMessage) message); } else if (isStateRepoOperationMessage(messageType)) { return invokeStateRepositoryOperation((StateRepositoryOpMessage) message); + } else if (isReconnectPassiveReplicationMessage(messageType)) { + return EhcacheEntityResponse.success(); } } } @@ -761,22 +768,42 @@ public void notifyDestroyed(ClientSourceId sourceId) { @Override public ReconnectHandler startReconnect() { - return (clientDescriptor, bytes) -> { - if (inflightInvalidations == null) { - throw new AssertionError("Load existing was not invoked before handleReconnect"); - } + return new ReconnectHandler() { + @Override + public void handleReconnect(ClientDescriptor clientDescriptor, byte[] extendedReconnectData) throws ReconnectRejectedException { + if (inflightInvalidations == null) { + throw new AssertionError("Load existing was not invoked before handleReconnect"); + } - ClusterTierReconnectMessage reconnectMessage = reconnectMessageCodec.decode(bytes); - ServerSideServerStore serverStore = stateService.getStore(storeIdentifier); - addInflightInvalidationsForStrongCache(clientDescriptor, reconnectMessage, serverStore); - lockManager.createLockStateAfterFailover(clientDescriptor, reconnectMessage.getLocksHeld()); - if (reconnectMessage.isEventsEnabled()) { - addEventListener(clientDescriptor, serverStore); - } + ClusterTierReconnectMessage reconnectMessage = reconnectMessageCodec.decode(extendedReconnectData); + ServerSideServerStore serverStore = stateService.getStore(storeIdentifier); + addInflightInvalidationsForStrongCache(clientDescriptor, reconnectMessage, serverStore); + lockManager.createLockStateAfterFailover(clientDescriptor, reconnectMessage.getLocksHeld()); + if (reconnectMessage.isEventsEnabled()) { + addEventListener(clientDescriptor, serverStore); + } - LOGGER.info("Client '{}' successfully reconnected to newly promoted ACTIVE after failover.", clientDescriptor); + LOGGER.info("Client '{}' successfully reconnected to newly promoted ACTIVE after failover.", clientDescriptor); - connectedClients.put(clientDescriptor, Boolean.TRUE); + connectedClients.put(clientDescriptor, Boolean.TRUE); + } + + @Override + public void close() { + List> recordedMessages = messageHandler.getRecordedMessages() + .collect(Collectors.toList()); + recordedMessages.forEach(r -> { + try { + entityMessenger.messageSelf(new ReconnectPassiveReplicationMessage( + r.getSequenceId(), + r.getClientSourceId().toLong(), + r.getTransactionId(), + r.getRequest(), + r.getResponse())); + } catch (MessageCodecException e) { + throw new RuntimeException(e); } + }); + } }; } diff --git a/clustered/server/entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierPassiveEntity.java b/clustered/server/entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierPassiveEntity.java index 2c38cdbdfe..0ee8a13572 100644 --- a/clustered/server/entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierPassiveEntity.java +++ b/clustered/server/entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierPassiveEntity.java @@ -36,6 +36,7 @@ import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage; import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage.InvalidationCompleteMessage; import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage.ChainReplicationMessage; +import org.ehcache.clustered.server.internal.messages.ReconnectPassiveReplicationMessage; import org.ehcache.clustered.server.management.ClusterTierManagement; import org.ehcache.clustered.server.state.EhcacheStateContext; import org.ehcache.clustered.server.state.EhcacheStateService; @@ -44,6 +45,7 @@ import org.slf4j.LoggerFactory; import org.terracotta.client.message.tracker.OOOMessageHandler; import org.terracotta.client.message.tracker.OOOMessageHandlerConfiguration; +import org.terracotta.client.message.tracker.RecordedMessage; import org.terracotta.entity.ClientSourceId; import org.terracotta.entity.ConfigurationException; import org.terracotta.entity.EntityUserException; @@ -54,11 +56,13 @@ import org.terracotta.entity.StateDumpCollector; import org.terracotta.offheapstore.exceptions.OversizeMappingException; +import java.util.List; import java.util.concurrent.TimeoutException; import static org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.getResponse; import static org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.success; import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isPassiveReplicationMessage; +import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isReconnectPassiveReplicationMessage; import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isStateRepoOperationMessage; import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isStoreOperationMessage; import static org.ehcache.clustered.server.ConcurrencyStrategies.clusterTierConcurrency; @@ -153,6 +157,38 @@ public ClientSourceId makeClientSourceId(long l) { return context.makeClientSourceId(l); } + @Override + public int getConcurrencyKey() { + return context.getConcurrencyKey(); + } + }; + } else if (message instanceof ReconnectPassiveReplicationMessage) { + realContext = new InvokeContext() { + @Override + public ClientSourceId getClientSource() { + return context.makeClientSourceId(((ReconnectPassiveReplicationMessage) message).getClientSourceId()); + } + + @Override + public long getCurrentTransactionId() { + return ((ReconnectPassiveReplicationMessage) message).getTransactionId(); + } + + @Override + public long getOldestTransactionId() { + return ((ReconnectPassiveReplicationMessage)message).getTransactionId(); + } + + @Override + public boolean isValidClientInformation() { + return true; + } + + @Override + public ClientSourceId makeClientSourceId(long l) { + return context.makeClientSourceId(l); + } + @Override public int getConcurrencyKey() { return context.getConcurrencyKey(); @@ -173,7 +209,9 @@ private EhcacheEntityResponse invokePassiveInternal(InvokeContext context, Ehcac } else if (isStateRepoOperationMessage(messageType)) { return stateService.getStateRepositoryManager().invoke((StateRepositoryOpMessage) message); } else if (isPassiveReplicationMessage(messageType)) { - return invokeRetirementMessages((PassiveReplicationMessage) message); + return invokeRetirementMessages((PassiveReplicationMessage)message); + } else if (isReconnectPassiveReplicationMessage(messageType)) { + return invokePassiveInternal(context, ((ReconnectPassiveReplicationMessage)message).getRequest()); } else { throw new AssertionError("Unsupported EhcacheOperationMessage: " + operationMessage.getMessageType()); } diff --git a/clustered/server/entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierServerEntityService.java b/clustered/server/entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierServerEntityService.java index 0e6fff599c..8ecfced5cc 100644 --- a/clustered/server/entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierServerEntityService.java +++ b/clustered/server/entity/src/main/java/org/ehcache/clustered/server/store/ClusterTierServerEntityService.java @@ -32,6 +32,7 @@ import org.ehcache.clustered.server.internal.messages.EhcacheServerCodec; import org.ehcache.clustered.server.internal.messages.EhcacheSyncMessageCodec; import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessageCodec; +import org.ehcache.clustered.server.internal.messages.ReconnectPassiveReplicationMessageCodec; import org.terracotta.entity.CommonServerEntity; import org.terracotta.entity.ConcurrencyStrategy; import org.terracotta.entity.ConfigurationException; @@ -86,7 +87,7 @@ public ConcurrencyStrategy getConcurrencyStrategy(byte[] c public MessageCodec getMessageCodec() { EhcacheCodec ehcacheCodec = new EhcacheCodec(new ServerStoreOpCodec(), new LifeCycleMessageCodec(CONFIG_CODEC), new StateRepositoryOpCodec(), new ResponseCodec()); - return new EhcacheServerCodec(ehcacheCodec, new PassiveReplicationMessageCodec()); + return new EhcacheServerCodec(ehcacheCodec, new PassiveReplicationMessageCodec(), new ReconnectPassiveReplicationMessageCodec()); } @Override diff --git a/clustered/server/entity/src/test/java/org/ehcache/clustered/server/internal/messages/EhcacheServerCodecTest.java b/clustered/server/entity/src/test/java/org/ehcache/clustered/server/internal/messages/EhcacheServerCodecTest.java index 9f0bb360f5..32c30f3ec6 100644 --- a/clustered/server/entity/src/test/java/org/ehcache/clustered/server/internal/messages/EhcacheServerCodecTest.java +++ b/clustered/server/entity/src/test/java/org/ehcache/clustered/server/internal/messages/EhcacheServerCodecTest.java @@ -44,12 +44,15 @@ public class EhcacheServerCodecTest { @Mock private PassiveReplicationMessageCodec replicationCodec; + @Mock + private ReconnectPassiveReplicationMessageCodec reconnectPassiveReplicationMessageCodec; + private EhcacheServerCodec serverCodec; @Before public void setUp() { initMocks(this); - serverCodec = new EhcacheServerCodec(clientCodec, replicationCodec); + serverCodec = new EhcacheServerCodec(clientCodec, replicationCodec, reconnectPassiveReplicationMessageCodec); } @Test