Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[No merge] Sync clustered cache state on passives with newly promoted active #2794

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public enum EhcacheMessageType {
// Passive replication messages
CHAIN_REPLICATION_OP,
CLEAR_INVALIDATION_COMPLETE,
INVALIDATION_COMPLETE;
INVALIDATION_COMPLETE,
RECONNECT_PASSIVE_REPLICATION_MESSAGE;

public static final EnumSet<EhcacheMessageType> LIFECYCLE_MESSAGES = of(VALIDATE, VALIDATE_SERVER_STORE, PREPARE_FOR_DESTROY);
public static boolean isLifecycleMessage(EhcacheMessageType value) {
Expand Down Expand Up @@ -92,4 +93,8 @@ public static boolean isTrackedOperationMessage(EhcacheMessageType value) {
public static boolean isPassiveReplicationMessage(EhcacheMessageType value) {
return PASSIVE_REPLICATION_MESSAGES.contains(value);
}

public static boolean isReconnectPassiveReplicationMessage(EhcacheMessageType type) {
return RECONNECT_PASSIVE_REPLICATION_MESSAGE.equals(type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.ITERATOR_OPEN;
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.LOCK;
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.PUT_IF_ABSENT;
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.RECONNECT_PASSIVE_REPLICATION_MESSAGE;
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.REPLACE;
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.UNLOCK;
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.VALIDATE;
Expand Down Expand Up @@ -85,6 +86,7 @@ public class BaseCodec {
.mapping(CHAIN_REPLICATION_OP, 61)
.mapping(CLEAR_INVALIDATION_COMPLETE, 63)
.mapping(INVALIDATION_COMPLETE, 64)
.mapping(RECONNECT_PASSIVE_REPLICATION_MESSAGE, 70)
.build();

public static final String RESPONSE_TYPE_FIELD_NAME = "opCode";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.ehcache.clustered.common.internal.messages.StateRepositoryOpCodec;
import org.ehcache.clustered.server.internal.messages.EhcacheServerCodec;
import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessageCodec;
import org.ehcache.clustered.server.internal.messages.ReconnectPassiveReplicationMessageCodec;
import org.ehcache.clustered.server.management.Management;
import org.ehcache.clustered.server.state.EhcacheStateService;
import org.ehcache.clustered.server.state.config.EhcacheStateServiceConfig;
Expand All @@ -38,7 +39,6 @@
import org.terracotta.entity.EntityServerService;
import org.terracotta.entity.ExecutionStrategy;
import org.terracotta.entity.MessageCodec;
import org.terracotta.entity.MessageCodecException;
import org.terracotta.entity.ServiceException;
import org.terracotta.entity.ServiceRegistry;
import org.terracotta.entity.SyncMessageCodec;
Expand Down Expand Up @@ -99,7 +99,10 @@ public ConcurrencyStrategy<EhcacheEntityMessage> getConcurrencyStrategy(byte[] c
public MessageCodec<EhcacheEntityMessage, EhcacheEntityResponse> getMessageCodec() {
EhcacheCodec ehcacheCodec = new EhcacheCodec(new ServerStoreOpCodec(),
new LifeCycleMessageCodec(CONFIG_CODEC), new StateRepositoryOpCodec(), new ResponseCodec());
return new EhcacheServerCodec(ehcacheCodec, new PassiveReplicationMessageCodec());
ReconnectPassiveReplicationMessageCodec reconnectPassiveReplicationMessageCodec =
new ReconnectPassiveReplicationMessageCodec();
return new EhcacheServerCodec(ehcacheCodec, new PassiveReplicationMessageCodec(),
reconnectPassiveReplicationMessageCodec);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.ehcache.clustered.common.internal.messages.ServerStoreOpMessage;
import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage;
import org.ehcache.clustered.server.internal.messages.EhcacheSyncMessage;
import org.ehcache.clustered.server.internal.messages.ReconnectPassiveReplicationMessage;
import org.terracotta.entity.ExecutionStrategy;

/**
Expand Down Expand Up @@ -52,6 +53,8 @@ public Location getExecutionLocation(EhcacheEntityMessage message) {
return Location.PASSIVE;
} else if (message instanceof EhcacheSyncMessage) {
throw new AssertionError("Unexpected use of ExecutionStrategy for sync messages");
} else if (message instanceof ReconnectPassiveReplicationMessage) {
return getExecutionLocation(((ReconnectPassiveReplicationMessage)message).getRequest());
}
throw new AssertionError("Unknown message type: " + message.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import static java.nio.ByteBuffer.wrap;
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isPassiveReplicationMessage;
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isReconnectPassiveReplicationMessage;

/**
* EhcacheServerCodec
Expand All @@ -40,17 +41,24 @@ public class EhcacheServerCodec implements MessageCodec<EhcacheEntityMessage, Eh

private final EhcacheCodec clientCodec;
private final PassiveReplicationMessageCodec replicationCodec;
private final ReconnectPassiveReplicationMessageCodec reconnectPassiveReplicationMessageCodec;

public EhcacheServerCodec(EhcacheCodec clientCodec, PassiveReplicationMessageCodec replicationCodec) {
public EhcacheServerCodec(EhcacheCodec clientCodec, PassiveReplicationMessageCodec replicationCodec, ReconnectPassiveReplicationMessageCodec reconnectPassiveReplicationMessageCodec) {
this.clientCodec = clientCodec;
this.replicationCodec = replicationCodec;
this.reconnectPassiveReplicationMessageCodec = reconnectPassiveReplicationMessageCodec;
}

@Override
public byte[] encodeMessage(EhcacheEntityMessage message) {
if (message instanceof PassiveReplicationMessage) {
return replicationCodec.encode((PassiveReplicationMessage) message);
}

if (message instanceof ReconnectPassiveReplicationMessage) {
return this.reconnectPassiveReplicationMessageCodec.encode((ReconnectPassiveReplicationMessage)message, this);
}

return clientCodec.encodeMessage(message);
}

Expand All @@ -72,6 +80,11 @@ public EhcacheEntityMessage decodeMessage(byte[] payload) {
if (isPassiveReplicationMessage(messageType)) {
return replicationCodec.decode(messageType, byteBuffer);
}

if (isReconnectPassiveReplicationMessage(messageType)) {
return reconnectPassiveReplicationMessageCodec.decode(byteBuffer, this);
}

return clientCodec.decodeMessage(byteBuffer, messageType);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.ehcache.clustered.server.internal.messages;

import org.ehcache.clustered.common.internal.messages.EhcacheEntityMessage;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
import org.ehcache.clustered.common.internal.messages.EhcacheMessageType;
import org.ehcache.clustered.common.internal.messages.EhcacheOperationMessage;

public class ReconnectPassiveReplicationMessage extends EhcacheOperationMessage {
private final long sequenceId;
private final long clientSourceId;
private final long transactionId;
private final EhcacheEntityMessage request;
private final EhcacheEntityResponse response;

public ReconnectPassiveReplicationMessage(long sequenceId,
long clientSourceId,
long transactionId,
EhcacheEntityMessage request,
EhcacheEntityResponse response) {
this.sequenceId = sequenceId;
this.clientSourceId = clientSourceId;
this.transactionId = transactionId;
this.request = request;
this.response = response;
}

public long getSequenceId() {
return sequenceId;
}

public long getClientSourceId() {
return clientSourceId;
}

public long getTransactionId() {
return transactionId;
}

public EhcacheEntityMessage getRequest() {
return request;
}

public EhcacheEntityResponse getResponse() {
return response;
}

@Override
public EhcacheMessageType getMessageType() {
return EhcacheMessageType.RECONNECT_PASSIVE_REPLICATION_MESSAGE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.ehcache.clustered.server.internal.messages;

import org.ehcache.clustered.common.internal.messages.EhcacheCodec;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityMessage;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
import org.terracotta.entity.MessageCodecException;
import org.terracotta.runnel.Struct;
import org.terracotta.runnel.decoding.StructDecoder;
import org.terracotta.runnel.encoding.StructEncoder;

import java.nio.ByteBuffer;

import static org.terracotta.runnel.StructBuilder.newStructBuilder;

public class ReconnectPassiveReplicationMessageCodec {
private static final String SEQUENCE_ID = "sequence_id";
private static final String SOURCE_ID = "client_source_id";
private static final String TRANSACTION_ID = "transaction_id";
private static final String REQUEST = "request";
private static final String RESPONSE = "response";

private static final Struct RECONNECT_PASSIVE_REPLICATION_MESSAGE_STRUCT = newStructBuilder()
.int64(SEQUENCE_ID, 10)
.int64(SOURCE_ID, 20)
.int64(TRANSACTION_ID, 30)
.byteBuffer(REQUEST, 40)
.byteBuffer(RESPONSE, 50)
.build();

public byte[] encode(ReconnectPassiveReplicationMessage replicationMessage, EhcacheServerCodec ehcacheServerCodec) {
final StructEncoder<Void> encoder = RECONNECT_PASSIVE_REPLICATION_MESSAGE_STRUCT.encoder();
encoder.int64(SEQUENCE_ID, replicationMessage.getSequenceId());
encoder.int64(SOURCE_ID, replicationMessage.getClientSourceId());
encoder.int64(TRANSACTION_ID, replicationMessage.getTransactionId());
encoder.byteBuffer(REQUEST, ByteBuffer.wrap(ehcacheServerCodec.encodeMessage(replicationMessage.getRequest())));
try {
encoder.byteBuffer(RESPONSE, ByteBuffer.wrap(ehcacheServerCodec.encodeResponse(replicationMessage.getResponse())));
} catch (MessageCodecException e) {
throw new RuntimeException(e);
}

return encoder.encode().array();
}

public ReconnectPassiveReplicationMessage decode(ByteBuffer byteBuffer, EhcacheServerCodec ehcacheServerCodec) {
final StructDecoder<Void> decoder = RECONNECT_PASSIVE_REPLICATION_MESSAGE_STRUCT.decoder(byteBuffer);
if (decoder != null) {
Long sequenceId = decoder.int64(SEQUENCE_ID);
Long clientSourceId = decoder.int64(SOURCE_ID);
Long transactionId = decoder.int64(TRANSACTION_ID);
EhcacheEntityMessage request = ehcacheServerCodec.decodeMessage(decoder.byteBuffer(REQUEST).array());
EhcacheEntityResponse response = null;
try {
response = ehcacheServerCodec.decodeResponse(decoder.byteBuffer(RESPONSE).array());
} catch (MessageCodecException e) {
throw new RuntimeException(e);
}

return new ReconnectPassiveReplicationMessage(sequenceId, clientSourceId, transactionId, request, response);
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage;
import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage.ClearInvalidationCompleteMessage;
import org.ehcache.clustered.server.internal.messages.PassiveReplicationMessage.InvalidationCompleteMessage;
import org.ehcache.clustered.server.internal.messages.ReconnectPassiveReplicationMessage;
import org.ehcache.clustered.server.management.ClusterTierManagement;
import org.ehcache.clustered.server.offheap.InternalChain;
import org.ehcache.clustered.server.state.EhcacheStateContext;
Expand All @@ -67,6 +68,7 @@
import org.slf4j.LoggerFactory;
import org.terracotta.client.message.tracker.OOOMessageHandler;
import org.terracotta.client.message.tracker.OOOMessageHandlerConfiguration;
import org.terracotta.client.message.tracker.RecordedMessage;
import org.terracotta.entity.ActiveInvokeContext;
import org.terracotta.entity.ActiveServerEntity;
import org.terracotta.entity.BasicServiceConfiguration;
Expand All @@ -79,6 +81,7 @@
import org.terracotta.entity.InvokeContext;
import org.terracotta.entity.MessageCodecException;
import org.terracotta.entity.PassiveSynchronizationChannel;
import org.terracotta.entity.ReconnectRejectedException;
import org.terracotta.entity.ServiceException;
import org.terracotta.entity.ServiceRegistry;
import org.terracotta.entity.StateDumpCollector;
Expand All @@ -105,6 +108,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toMap;
Expand All @@ -123,6 +127,7 @@
import static org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.serverInvalidateHash;
import static org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.success;
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isLifecycleMessage;
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isReconnectPassiveReplicationMessage;
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isStateRepoOperationMessage;
import static org.ehcache.clustered.common.internal.messages.EhcacheMessageType.isStoreOperationMessage;
import static org.ehcache.clustered.server.ConcurrencyStrategies.DEFAULT_KEY;
Expand Down Expand Up @@ -331,6 +336,8 @@ private EhcacheEntityResponse invokeActiveInternal(InvokeContext context, Ehcach
return invokeLifeCycleOperation(context, (LifecycleMessage) message);
} else if (isStateRepoOperationMessage(messageType)) {
return invokeStateRepositoryOperation((StateRepositoryOpMessage) message);
} else if (isReconnectPassiveReplicationMessage(messageType)) {
return EhcacheEntityResponse.success();
}
}
}
Expand Down Expand Up @@ -761,22 +768,42 @@ public void notifyDestroyed(ClientSourceId sourceId) {

@Override
public ReconnectHandler startReconnect() {
return (clientDescriptor, bytes) -> {
if (inflightInvalidations == null) {
throw new AssertionError("Load existing was not invoked before handleReconnect");
}
return new ReconnectHandler() {
@Override
public void handleReconnect(ClientDescriptor clientDescriptor, byte[] extendedReconnectData) throws ReconnectRejectedException {
if (inflightInvalidations == null) {
throw new AssertionError("Load existing was not invoked before handleReconnect");
}

ClusterTierReconnectMessage reconnectMessage = reconnectMessageCodec.decode(bytes);
ServerSideServerStore serverStore = stateService.getStore(storeIdentifier);
addInflightInvalidationsForStrongCache(clientDescriptor, reconnectMessage, serverStore);
lockManager.createLockStateAfterFailover(clientDescriptor, reconnectMessage.getLocksHeld());
if (reconnectMessage.isEventsEnabled()) {
addEventListener(clientDescriptor, serverStore);
}
ClusterTierReconnectMessage reconnectMessage = reconnectMessageCodec.decode(extendedReconnectData);
ServerSideServerStore serverStore = stateService.getStore(storeIdentifier);
addInflightInvalidationsForStrongCache(clientDescriptor, reconnectMessage, serverStore);
lockManager.createLockStateAfterFailover(clientDescriptor, reconnectMessage.getLocksHeld());
if (reconnectMessage.isEventsEnabled()) {
addEventListener(clientDescriptor, serverStore);
}

LOGGER.info("Client '{}' successfully reconnected to newly promoted ACTIVE after failover.", clientDescriptor);
LOGGER.info("Client '{}' successfully reconnected to newly promoted ACTIVE after failover.", clientDescriptor);

connectedClients.put(clientDescriptor, Boolean.TRUE);
connectedClients.put(clientDescriptor, Boolean.TRUE);
}

@Override
public void close() {
List<RecordedMessage<EhcacheEntityMessage, EhcacheEntityResponse>> recordedMessages = messageHandler.getRecordedMessages()
.collect(Collectors.toList());
recordedMessages.forEach(r -> {
try {
entityMessenger.messageSelf(new ReconnectPassiveReplicationMessage(
r.getSequenceId(),
r.getClientSourceId().toLong(),
r.getTransactionId(),
r.getRequest(),
r.getResponse()));
} catch (MessageCodecException e) {
throw new RuntimeException(e); }
});
}
};
}

Expand Down
Loading