Skip to content

Commit

Permalink
Use MRM shared data views
Browse files Browse the repository at this point in the history
  • Loading branch information
eager-signal committed Nov 19, 2024
1 parent 085f013 commit ea75c39
Show file tree
Hide file tree
Showing 10 changed files with 24 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
DisconnectionRequestManager disconnectionRequestManager = new DisconnectionRequestManager(pubsubClient, disconnectionRequestListenerExecutor);
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
MessagesCache messagesCache = new MessagesCache(messagesCluster, messageDeliveryScheduler,
messageDeletionAsyncExecutor, clock, dynamicConfigurationManager);
messageDeletionAsyncExecutor, clock);
ClientReleaseManager clientReleaseManager = new ClientReleaseManager(clientReleases,
recurringJobExecutor,
config.getClientReleaseConfiguration().refreshInterval(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ public class DynamicConfiguration {
@Valid
DynamicMetricsConfiguration metricsConfiguration = new DynamicMetricsConfiguration(false);

@JsonProperty
@Valid
DynamicMessagesConfiguration messagesConfiguration = new DynamicMessagesConfiguration();

@JsonProperty
@Valid
List<String> svrStatusCodesToIgnoreForAccountDeletion = Collections.emptyList();
Expand Down Expand Up @@ -130,10 +126,6 @@ public DynamicMetricsConfiguration getMetricsConfiguration() {
return metricsConfiguration;
}

public DynamicMessagesConfiguration getMessagesConfiguration() {
return messagesConfiguration;
}

public List<String> getSvrStatusCodesToIgnoreForAccountDeletion() {
return svrStatusCodesToIgnoreForAccountDeletion;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -952,9 +952,6 @@ private void sendCommonPayloadMessage(Account destinationAccount,
.setDestinationServiceId(serviceIdentifier.toServiceIdentifierString())
.setSharedMrmKey(ByteString.copyFrom(sharedMrmKey));

// mrm views phase 3: always set content
messageBuilder.setContent(ByteString.copyFrom(payload));

messageSender.sendMessage(destinationAccount, destinationDevice, messageBuilder.build(), online);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@
import org.signal.libsignal.protocol.ServiceId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration;
import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagesConfiguration;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.experiment.Experiment;
import org.whispersystems.textsecuregcm.identity.ServiceIdentifier;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient;
Expand Down Expand Up @@ -116,8 +113,6 @@ public class MessagesCache {
// messageDeletionExecutorService wrapped into a reactor Scheduler
private final Scheduler messageDeletionScheduler;

private final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager;

private final MessagesCacheInsertScript insertScript;
private final MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript insertMrmScript;
private final MessagesCacheRemoveByGuidScript removeByGuidScript;
Expand All @@ -137,10 +132,8 @@ public class MessagesCache {
private final Counter staleEphemeralMessagesCounter = Metrics.counter(
name(MessagesCache.class, "staleEphemeralMessages"));
private final Counter mrmContentRetrievedCounter = Metrics.counter(name(MessagesCache.class, "mrmViewRetrieved"));
private final String MRM_RETRIEVAL_ERROR_COUNTER_NAME = "mrmRetrievalError";
private final String MRM_RETRIEVAL_ERROR_COUNTER_NAME = name(MessagesCache.class, "mrmRetrievalError");
private final String EPHEMERAL_TAG_NAME = "ephemeral";
private final Counter mrmPhaseTwoMissingContentCounter = Metrics.counter(
name(MessagesCache.class, "mrmPhaseTwoMissingContent"));
private final Counter skippedStaleEphemeralMrmCounter = Metrics.counter(
name(MessagesCache.class, "skippedStaleEphemeralMrm"));
private final Counter sharedMrmDataKeyRemovedCounter = Metrics.counter(
Expand All @@ -149,8 +142,6 @@ public class MessagesCache {
static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot";
private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8);

private static final String MRM_VIEWS_EXPERIMENT_NAME = "mrmViews";

@VisibleForTesting
static final Duration MAX_EPHEMERAL_MESSAGE_DELAY = Duration.ofSeconds(10);

Expand All @@ -164,16 +155,14 @@ public class MessagesCache {
public MessagesCache(final FaultTolerantRedisClusterClient redisCluster,
final Scheduler messageDeliveryScheduler,
final ExecutorService messageDeletionExecutorService,
final Clock clock,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager)
final Clock clock)
throws IOException {

this(
redisCluster,
messageDeliveryScheduler,
messageDeletionExecutorService,
clock,
dynamicConfigurationManager,
new MessagesCacheInsertScript(redisCluster),
new MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript(redisCluster),
new MessagesCacheGetItemsScript(redisCluster),
Expand All @@ -189,7 +178,6 @@ public MessagesCache(final FaultTolerantRedisClusterClient redisCluster,
MessagesCache(final FaultTolerantRedisClusterClient redisCluster,
final Scheduler messageDeliveryScheduler,
final ExecutorService messageDeletionExecutorService, final Clock clock,
final DynamicConfigurationManager<DynamicConfiguration> dynamicConfigurationManager,
final MessagesCacheInsertScript insertScript,
final MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript insertMrmScript,
final MessagesCacheGetItemsScript getItemsScript, final MessagesCacheRemoveByGuidScript removeByGuidScript,
Expand All @@ -205,8 +193,6 @@ public MessagesCache(final FaultTolerantRedisClusterClient redisCluster,
this.messageDeletionExecutorService = messageDeletionExecutorService;
this.messageDeletionScheduler = Schedulers.fromExecutorService(messageDeletionExecutorService, "messageDeletion");

this.dynamicConfigurationManager = dynamicConfigurationManager;

this.insertScript = insertScript;
this.insertMrmScript = insertMrmScript;
this.removeByGuidScript = removeByGuidScript;
Expand Down Expand Up @@ -371,8 +357,6 @@ Flux<MessageProtos.Envelope> getAllMessages(final UUID destinationUuid, final by
messageMono = Mono.just(message.toBuilder().clearSharedMrmKey().build());
skippedStaleEphemeralMrmCounter.increment();
} else {
// mrm views phase 3: fetch shared MRM data -- internally depends on dynamic config that
// enables using it (the stored messages still always have `content` set upstream)
messageMono = getMessageWithSharedMrmData(message, destinationDevice);
}

Expand All @@ -393,27 +377,18 @@ Flux<MessageProtos.Envelope> getAllMessages(final UUID destinationUuid, final by

/**
* Returns the given message with its shared MRM data.
*
* @see DynamicMessagesConfiguration#useSharedMrmData()
*/
private Mono<MessageProtos.Envelope> getMessageWithSharedMrmData(final MessageProtos.Envelope mrmMessage,
final byte destinationDevice) {

assert mrmMessage.hasSharedMrmKey();

// mrm views phase 2: messages have content
if (!mrmMessage.hasContent()) {
mrmPhaseTwoMissingContentCounter.increment();
}

final Experiment experiment = new Experiment(MRM_VIEWS_EXPERIMENT_NAME);

final byte[] key = mrmMessage.getSharedMrmKey().toByteArray();
final byte[] sharedMrmViewKey = MessagesCache.getSharedMrmViewKey(
// the message might be addressed to the account's PNI, so use the service ID from the envelope
ServiceIdentifier.valueOf(mrmMessage.getDestinationServiceId()), destinationDevice);

final Mono<MessageProtos.Envelope> messageFromRedisMono = Mono.from(redisCluster.withBinaryClusterReactive(
return Mono.from(redisCluster.withBinaryClusterReactive(
conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey)
.collectList()
.publishOn(messageDeliveryScheduler)))
Expand Down Expand Up @@ -444,18 +419,6 @@ private Mono<MessageProtos.Envelope> getMessageWithSharedMrmData(final MessagePr
return Mono.empty();
})
.share();

if (mrmMessage.hasContent()) {
experiment.compareMonoResult(mrmMessage.toBuilder().clearSharedMrmKey().build(), messageFromRedisMono);
}

if (dynamicConfigurationManager.getConfiguration().getMessagesConfiguration().useSharedMrmData()
|| !mrmMessage.hasContent()) {
return messageFromRedisMono;
}

// if fetching or using shared data is disabled, fallback to just() with the existing message
return Mono.just(mrmMessage.toBuilder().clearSharedMrmKey().build());
}

/**
Expand Down Expand Up @@ -529,8 +492,6 @@ List<MessageProtos.Envelope> getMessagesToPersist(final UUID accountUuid, final
.concatMap(message -> {
final Mono<MessageProtos.Envelope> messageMono;
if (message.hasSharedMrmKey()) {
// mrm views phase 2: fetch shared MRM data -- internally depends on dynamic config that
// enables fetching and using it (the stored messages still always have `content` set upstream)
messageMono = getMessageWithSharedMrmData(message, destinationDevice);
} else {
messageMono = Mono.just(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ static CommandDependencies build(
storageServiceExecutor, storageServiceRetryExecutor, configuration.getSecureStorageServiceConfiguration());
DisconnectionRequestManager disconnectionRequestManager = new DisconnectionRequestManager(pubsubClient, disconnectionRequestListenerExecutor);
MessagesCache messagesCache = new MessagesCache(messagesCluster,
messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC(), dynamicConfigurationManager);
messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC());
ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster);
ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbClient,
configuration.getDynamoDbTables().getReportMessage().getTableName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void setUp() throws Exception {
final AccountsManager accountsManager = mock(AccountsManager.class);

messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
messageDeliveryScheduler, messageDeletionExecutorService, Clock.systemUTC(), dynamicConfigurationManager);
messageDeliveryScheduler, messageDeletionExecutorService, Clock.systemUTC());
messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class),
messageDeletionExecutorService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void setUp() throws Exception {
resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor();
messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery");
messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(),
messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC(), dynamicConfigurationManager);
messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC());
messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager,
dynamicConfigurationManager, PERSIST_DELAY, 1);

Expand Down
Loading

0 comments on commit ea75c39

Please sign in to comment.