From ea75c39b5893cbc7c3192f37e4cd23d181f6df2e Mon Sep 17 00:00:00 2001 From: Chris Eager Date: Tue, 12 Nov 2024 12:23:54 -0600 Subject: [PATCH] Use MRM shared data views --- .../textsecuregcm/WhisperServerService.java | 2 +- .../dynamic/DynamicConfiguration.java | 8 -- .../dynamic/DynamicMessagesConfiguration.java | 13 -- .../controllers/MessageController.java | 3 - .../textsecuregcm/storage/MessagesCache.java | 45 +------ .../workers/CommandDependencies.java | 2 +- .../MessagePersisterIntegrationTest.java | 2 +- .../storage/MessagePersisterTest.java | 2 +- .../storage/MessagesCacheTest.java | 111 +++--------------- .../WebSocketConnectionIntegrationTest.java | 2 +- 10 files changed, 24 insertions(+), 166 deletions(-) delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicMessagesConfiguration.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 45bb6ef4f..c414810ac 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -603,7 +603,7 @@ public void run(WhisperServerConfiguration config, Environment environment) thro DisconnectionRequestManager disconnectionRequestManager = new DisconnectionRequestManager(pubsubClient, disconnectionRequestListenerExecutor); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); MessagesCache messagesCache = new MessagesCache(messagesCluster, messageDeliveryScheduler, - messageDeletionAsyncExecutor, clock, dynamicConfigurationManager); + messageDeletionAsyncExecutor, clock); ClientReleaseManager clientReleaseManager = new ClientReleaseManager(clientReleases, recurringJobExecutor, config.getClientReleaseConfiguration().refreshInterval(), diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java index 4a7df9ec1..a9f0f4e25 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicConfiguration.java @@ -68,10 +68,6 @@ public class DynamicConfiguration { @Valid DynamicMetricsConfiguration metricsConfiguration = new DynamicMetricsConfiguration(false); - @JsonProperty - @Valid - DynamicMessagesConfiguration messagesConfiguration = new DynamicMessagesConfiguration(); - @JsonProperty @Valid List svrStatusCodesToIgnoreForAccountDeletion = Collections.emptyList(); @@ -130,10 +126,6 @@ public DynamicMetricsConfiguration getMetricsConfiguration() { return metricsConfiguration; } - public DynamicMessagesConfiguration getMessagesConfiguration() { - return messagesConfiguration; - } - public List getSvrStatusCodesToIgnoreForAccountDeletion() { return svrStatusCodesToIgnoreForAccountDeletion; } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicMessagesConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicMessagesConfiguration.java deleted file mode 100644 index ece407844..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/configuration/dynamic/DynamicMessagesConfiguration.java +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright 2024 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.configuration.dynamic; - -public record DynamicMessagesConfiguration(boolean useSharedMrmData) { - - public DynamicMessagesConfiguration() { - this(false); - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java index 859c7354e..148678d35 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/MessageController.java @@ -952,9 +952,6 @@ private void sendCommonPayloadMessage(Account destinationAccount, .setDestinationServiceId(serviceIdentifier.toServiceIdentifierString()) .setSharedMrmKey(ByteString.copyFrom(sharedMrmKey)); - // mrm views phase 3: always set content - messageBuilder.setContent(ByteString.copyFrom(payload)); - messageSender.sendMessage(destinationAccount, destinationDevice, messageBuilder.build(), online); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index eee9450ed..a612e3c21 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -36,10 +36,7 @@ import org.signal.libsignal.protocol.ServiceId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagesConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; -import org.whispersystems.textsecuregcm.experiment.Experiment; import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; import org.whispersystems.textsecuregcm.metrics.MetricsUtil; import org.whispersystems.textsecuregcm.redis.FaultTolerantRedisClusterClient; @@ -116,8 +113,6 @@ public class MessagesCache { // messageDeletionExecutorService wrapped into a reactor Scheduler private final Scheduler messageDeletionScheduler; - private final DynamicConfigurationManager dynamicConfigurationManager; - private final MessagesCacheInsertScript insertScript; private final MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript insertMrmScript; private final MessagesCacheRemoveByGuidScript removeByGuidScript; @@ -137,10 +132,8 @@ public class MessagesCache { private final Counter staleEphemeralMessagesCounter = Metrics.counter( name(MessagesCache.class, "staleEphemeralMessages")); private final Counter mrmContentRetrievedCounter = Metrics.counter(name(MessagesCache.class, "mrmViewRetrieved")); - private final String MRM_RETRIEVAL_ERROR_COUNTER_NAME = "mrmRetrievalError"; + private final String MRM_RETRIEVAL_ERROR_COUNTER_NAME = name(MessagesCache.class, "mrmRetrievalError"); private final String EPHEMERAL_TAG_NAME = "ephemeral"; - private final Counter mrmPhaseTwoMissingContentCounter = Metrics.counter( - name(MessagesCache.class, "mrmPhaseTwoMissingContent")); private final Counter skippedStaleEphemeralMrmCounter = Metrics.counter( name(MessagesCache.class, "skippedStaleEphemeralMrm")); private final Counter sharedMrmDataKeyRemovedCounter = Metrics.counter( @@ -149,8 +142,6 @@ public class MessagesCache { static final String NEXT_SLOT_TO_PERSIST_KEY = "user_queue_persist_slot"; private static final byte[] LOCK_VALUE = "1".getBytes(StandardCharsets.UTF_8); - private static final String MRM_VIEWS_EXPERIMENT_NAME = "mrmViews"; - @VisibleForTesting static final Duration MAX_EPHEMERAL_MESSAGE_DELAY = Duration.ofSeconds(10); @@ -164,8 +155,7 @@ public class MessagesCache { public MessagesCache(final FaultTolerantRedisClusterClient redisCluster, final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, - final Clock clock, - final DynamicConfigurationManager dynamicConfigurationManager) + final Clock clock) throws IOException { this( @@ -173,7 +163,6 @@ public MessagesCache(final FaultTolerantRedisClusterClient redisCluster, messageDeliveryScheduler, messageDeletionExecutorService, clock, - dynamicConfigurationManager, new MessagesCacheInsertScript(redisCluster), new MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript(redisCluster), new MessagesCacheGetItemsScript(redisCluster), @@ -189,7 +178,6 @@ public MessagesCache(final FaultTolerantRedisClusterClient redisCluster, MessagesCache(final FaultTolerantRedisClusterClient redisCluster, final Scheduler messageDeliveryScheduler, final ExecutorService messageDeletionExecutorService, final Clock clock, - final DynamicConfigurationManager dynamicConfigurationManager, final MessagesCacheInsertScript insertScript, final MessagesCacheInsertSharedMultiRecipientPayloadAndViewsScript insertMrmScript, final MessagesCacheGetItemsScript getItemsScript, final MessagesCacheRemoveByGuidScript removeByGuidScript, @@ -205,8 +193,6 @@ public MessagesCache(final FaultTolerantRedisClusterClient redisCluster, this.messageDeletionExecutorService = messageDeletionExecutorService; this.messageDeletionScheduler = Schedulers.fromExecutorService(messageDeletionExecutorService, "messageDeletion"); - this.dynamicConfigurationManager = dynamicConfigurationManager; - this.insertScript = insertScript; this.insertMrmScript = insertMrmScript; this.removeByGuidScript = removeByGuidScript; @@ -371,8 +357,6 @@ Flux getAllMessages(final UUID destinationUuid, final by messageMono = Mono.just(message.toBuilder().clearSharedMrmKey().build()); skippedStaleEphemeralMrmCounter.increment(); } else { - // mrm views phase 3: fetch shared MRM data -- internally depends on dynamic config that - // enables using it (the stored messages still always have `content` set upstream) messageMono = getMessageWithSharedMrmData(message, destinationDevice); } @@ -393,27 +377,18 @@ Flux getAllMessages(final UUID destinationUuid, final by /** * Returns the given message with its shared MRM data. - * - * @see DynamicMessagesConfiguration#useSharedMrmData() */ private Mono getMessageWithSharedMrmData(final MessageProtos.Envelope mrmMessage, final byte destinationDevice) { assert mrmMessage.hasSharedMrmKey(); - // mrm views phase 2: messages have content - if (!mrmMessage.hasContent()) { - mrmPhaseTwoMissingContentCounter.increment(); - } - - final Experiment experiment = new Experiment(MRM_VIEWS_EXPERIMENT_NAME); - final byte[] key = mrmMessage.getSharedMrmKey().toByteArray(); final byte[] sharedMrmViewKey = MessagesCache.getSharedMrmViewKey( // the message might be addressed to the account's PNI, so use the service ID from the envelope ServiceIdentifier.valueOf(mrmMessage.getDestinationServiceId()), destinationDevice); - final Mono messageFromRedisMono = Mono.from(redisCluster.withBinaryClusterReactive( + return Mono.from(redisCluster.withBinaryClusterReactive( conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey) .collectList() .publishOn(messageDeliveryScheduler))) @@ -444,18 +419,6 @@ private Mono getMessageWithSharedMrmData(final MessagePr return Mono.empty(); }) .share(); - - if (mrmMessage.hasContent()) { - experiment.compareMonoResult(mrmMessage.toBuilder().clearSharedMrmKey().build(), messageFromRedisMono); - } - - if (dynamicConfigurationManager.getConfiguration().getMessagesConfiguration().useSharedMrmData() - || !mrmMessage.hasContent()) { - return messageFromRedisMono; - } - - // if fetching or using shared data is disabled, fallback to just() with the existing message - return Mono.just(mrmMessage.toBuilder().clearSharedMrmKey().build()); } /** @@ -529,8 +492,6 @@ List getMessagesToPersist(final UUID accountUuid, final .concatMap(message -> { final Mono messageMono; if (message.hasSharedMrmKey()) { - // mrm views phase 2: fetch shared MRM data -- internally depends on dynamic config that - // enables fetching and using it (the stored messages still always have `content` set upstream) messageMono = getMessageWithSharedMrmData(message, destinationDevice); } else { messageMono = Mono.just(message); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index 9d207dd67..df2c3e155 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -212,7 +212,7 @@ static CommandDependencies build( storageServiceExecutor, storageServiceRetryExecutor, configuration.getSecureStorageServiceConfiguration()); DisconnectionRequestManager disconnectionRequestManager = new DisconnectionRequestManager(pubsubClient, disconnectionRequestListenerExecutor); MessagesCache messagesCache = new MessagesCache(messagesCluster, - messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC(), dynamicConfigurationManager); + messageDeliveryScheduler, messageDeletionExecutor, Clock.systemUTC()); ProfilesManager profilesManager = new ProfilesManager(profiles, cacheCluster); ReportMessageDynamoDb reportMessageDynamoDb = new ReportMessageDynamoDb(dynamoDbClient, configuration.getDynamoDbTables().getReportMessage().getTableName(), diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index bd2af221b..79c8cfc95 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -81,7 +81,7 @@ void setUp() throws Exception { final AccountsManager accountsManager = mock(AccountsManager.class); messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), - messageDeliveryScheduler, messageDeletionExecutorService, Clock.systemUTC(), dynamicConfigurationManager); + messageDeliveryScheduler, messageDeletionExecutorService, Clock.systemUTC()); messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class), messageDeletionExecutorService); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java index 56c32ffd7..95816a773 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java @@ -99,7 +99,7 @@ void setUp() throws Exception { resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor(); messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), - messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC(), dynamicConfigurationManager); + messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC()); messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, dynamicConfigurationManager, PERSIST_DELAY, 1); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java index 6c0f2a8d9..54d9b4a73 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagesCacheTest.java @@ -64,12 +64,8 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.junitpioneer.jupiter.cartesian.CartesianTest; import org.reactivestreams.Publisher; import org.signal.libsignal.protocol.SealedSenderMultiRecipientMessage; -import org.signal.libsignal.protocol.ServiceId; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; -import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicMessagesConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; import org.whispersystems.textsecuregcm.identity.AciServiceIdentifier; import org.whispersystems.textsecuregcm.identity.ServiceIdentifier; @@ -98,26 +94,17 @@ class WithRealCluster { private Scheduler messageDeliveryScheduler; private MessagesCache messagesCache; - private DynamicConfigurationManager dynamicConfigurationManager; - private DynamicConfiguration dynamicConfiguration; - private static final UUID DESTINATION_UUID = UUID.randomUUID(); private static final byte DESTINATION_DEVICE_ID = 7; @BeforeEach void setUp() throws Exception { - dynamicConfiguration = mock(DynamicConfiguration.class); - when(dynamicConfiguration.getMessagesConfiguration()).thenReturn( - new DynamicMessagesConfiguration(true)); - dynamicConfigurationManager = mock(DynamicConfigurationManager.class); - when(dynamicConfigurationManager.getConfiguration()).thenReturn(dynamicConfiguration); - sharedExecutorService = Executors.newSingleThreadExecutor(); resubscribeRetryExecutorService = Executors.newSingleThreadScheduledExecutor(); messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), - messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC(), dynamicConfigurationManager); + messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC()); } @AfterEach @@ -321,7 +308,7 @@ void testGetMessagesPublisher(final boolean expectStale) throws Exception { } final MessagesCache messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), - messageDeliveryScheduler, sharedExecutorService, cacheClock, dynamicConfigurationManager); + messageDeliveryScheduler, sharedExecutorService, cacheClock); final List actualMessages = Flux.from( messagesCache.get(DESTINATION_UUID, DESTINATION_DEVICE_ID)) @@ -429,12 +416,9 @@ public void testGetQueuesToPersist(final boolean sealedSender) { assertEquals(DESTINATION_DEVICE_ID, MessagesCache.getDeviceIdFromQueueName(queues.getFirst())); } - @CartesianTest - void testMultiRecipientMessage(@CartesianTest.Values(booleans = {true, false}) final boolean sharedMrmKeyPresent, - @CartesianTest.Values(booleans = {true, false}) final boolean useSharedMrmData) { - - when(dynamicConfiguration.getMessagesConfiguration()) - .thenReturn(new DynamicMessagesConfiguration(useSharedMrmData)); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testMultiRecipientMessage(final boolean sharedMrmKeyPresent) { final ServiceIdentifier destinationServiceId = new AciServiceIdentifier(UUID.randomUUID()); final byte deviceId = 1; @@ -453,10 +437,8 @@ void testMultiRecipientMessage(@CartesianTest.Values(booleans = {true, false}) f .toBuilder() // clear some things added by the helper .clearServerGuid() - // mrm views phase 2: messages have content - .setContent( - ByteString.copyFrom(mrm.messageForRecipient(mrm.getRecipients().get(destinationServiceId.toLibsignal())))) .setSharedMrmKey(ByteString.copyFrom(sharedMrmDataKey)) + .clearContent() .build(); messagesCache.insert(guid, destinationServiceId.uuid(), deviceId, message); @@ -464,7 +446,7 @@ void testMultiRecipientMessage(@CartesianTest.Values(booleans = {true, false}) f .withBinaryCluster(conn -> conn.sync().exists(sharedMrmDataKey))); final List messages = get(destinationServiceId.uuid(), deviceId, 1); - if (useSharedMrmData && !sharedMrmKeyPresent) { + if (!sharedMrmKeyPresent) { assertTrue(messages.isEmpty()); } else { @@ -495,65 +477,7 @@ void testMultiRecipientMessage(@CartesianTest.Values(booleans = {true, false}) f @ParameterizedTest @ValueSource(booleans = {true, false}) - void testMultiRecipientMessagePhase2MissingContentSafeguard(final boolean useSharedMrmData) { - - when(dynamicConfiguration.getMessagesConfiguration()) - .thenReturn(new DynamicMessagesConfiguration(useSharedMrmData)); - - final ServiceIdentifier destinationServiceId = new AciServiceIdentifier(UUID.randomUUID()); - final byte deviceId = 1; - - final SealedSenderMultiRecipientMessage mrm = generateRandomMrmMessage(destinationServiceId, deviceId); - - final byte[] sharedMrmDataKey = messagesCache.insertSharedMultiRecipientMessagePayload(mrm); - - final UUID guid = UUID.randomUUID(); - final MessageProtos.Envelope message = generateRandomMessage(guid, destinationServiceId, true) - .toBuilder() - // clear some things added by the helper - .clearServerGuid() - // mrm views phase 2: there is a safeguard against missing content, even if the dynamic configuration - // is to not fetch or use shared MRM data - .clearContent() - .setSharedMrmKey(ByteString.copyFrom(sharedMrmDataKey)) - .build(); - messagesCache.insert(guid, destinationServiceId.uuid(), deviceId, message); - - assertEquals(1, (long) REDIS_CLUSTER_EXTENSION.getRedisCluster() - .withBinaryCluster(conn -> conn.sync().exists(sharedMrmDataKey))); - - final List messages = get(destinationServiceId.uuid(), deviceId, 1); - - assertEquals(1, messages.size()); - assertEquals(guid, UUID.fromString(messages.getFirst().getServerGuid())); - assertFalse(messages.getFirst().hasSharedMrmKey()); - final SealedSenderMultiRecipientMessage.Recipient recipient = mrm.getRecipients() - .get(destinationServiceId.toLibsignal()); - assertArrayEquals(mrm.messageForRecipient(recipient), messages.getFirst().getContent().toByteArray()); - - final Optional removedMessage = messagesCache.remove(destinationServiceId.uuid(), deviceId, guid) - .join(); - - assertTrue(removedMessage.isPresent()); - assertEquals(guid, UUID.fromString(removedMessage.get().serverGuid().toString())); - assertTrue(get(destinationServiceId.uuid(), deviceId, 1).isEmpty()); - - // updating the shared MRM data is purely async, so we just wait for it - assertTimeoutPreemptively(Duration.ofSeconds(1), () -> { - boolean exists; - do { - exists = 1 == REDIS_CLUSTER_EXTENSION.getRedisCluster() - .withBinaryCluster(conn -> conn.sync().exists(sharedMrmDataKey)); - } while (exists); - }, "Shared MRM data should be deleted asynchronously"); - } - - @CartesianTest - void testGetMessagesToPersist(@CartesianTest.Values(booleans = {true, false}) final boolean sharedMrmKeyPresent, - @CartesianTest.Values(booleans = {true, false}) final boolean useSharedMrmData) { - - when(dynamicConfiguration.getMessagesConfiguration()) - .thenReturn(new DynamicMessagesConfiguration(useSharedMrmData)); + void testGetMessagesToPersist(final boolean sharedMrmKeyPresent) { final UUID destinationUuid = UUID.randomUUID(); final ServiceIdentifier destinationServiceId = new AciServiceIdentifier(destinationUuid); @@ -578,24 +502,22 @@ void testGetMessagesToPersist(@CartesianTest.Values(booleans = {true, false}) fi final MessageProtos.Envelope mrmMessage = generateRandomMessage(mrmMessageGuid, destinationServiceId, true) .toBuilder() // clear some things added by the helper - .clearServerGuid() - // mrm views phase 2: messages have content - .setContent( - ByteString.copyFrom(mrm.messageForRecipient(mrm.getRecipients().get(new ServiceId.Aci(destinationUuid))))) + .clearContent() .setSharedMrmKey(ByteString.copyFrom(sharedMrmDataKey)) .build(); messagesCache.insert(mrmMessageGuid, destinationUuid, deviceId, mrmMessage); final List messages = messagesCache.getMessagesToPersist(destinationUuid, deviceId, 100); - if (useSharedMrmData && !sharedMrmKeyPresent) { + if (!sharedMrmKeyPresent) { assertEquals(1, messages.size()); } else { assertEquals(2, messages.size()); - assertEquals(mrmMessage.toBuilder(). - clearSharedMrmKey(). - setServerGuid(mrmMessageGuid.toString()) + assertEquals(mrmMessage.toBuilder() + .clearSharedMrmKey() + .setContent(ByteString.copyFrom( + mrm.messageForRecipient(mrm.getRecipients().get(destinationServiceId.toLibsignal())))) .build(), messages.getLast()); } @@ -636,7 +558,7 @@ void setup() throws Exception { messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); messagesCache = new MessagesCache(mockCluster, messageDeliveryScheduler, - Executors.newSingleThreadExecutor(), Clock.systemUTC(), mock(DynamicConfigurationManager.class)); + Executors.newSingleThreadExecutor(), Clock.systemUTC()); } @AfterEach @@ -822,8 +744,7 @@ private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, } private MessageProtos.Envelope generateRandomMessage(final UUID messageGuid, - final ServiceIdentifier destinationServiceId, final boolean sealedSender, - final long timestamp) { + final ServiceIdentifier destinationServiceId, final boolean sealedSender, final long timestamp) { final MessageProtos.Envelope.Builder envelopeBuilder = MessageProtos.Envelope.newBuilder() .setClientTimestamp(timestamp) .setServerTimestamp(timestamp) diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java index 576a4cbed..98b89746d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/websocket/WebSocketConnectionIntegrationTest.java @@ -99,7 +99,7 @@ void setUp() throws Exception { dynamicConfigurationManager = mock(DynamicConfigurationManager.class); when(dynamicConfigurationManager.getConfiguration()).thenReturn(new DynamicConfiguration()); messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), - messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC(), dynamicConfigurationManager); + messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC()); messagesDynamoDb = new MessagesDynamoDb(DYNAMO_DB_EXTENSION.getDynamoDbClient(), DYNAMO_DB_EXTENSION.getDynamoDbAsyncClient(), Tables.MESSAGES.tableName(), Duration.ofDays(7), sharedExecutorService);