Skip to content

Commit

Permalink
Don't crash on malformed messages received with crypto enabled
Browse files Browse the repository at this point in the history
Deliver original message content with error flag.
  • Loading branch information
wkal-pubnub committed Nov 15, 2023
1 parent 0ea6eac commit 892eba9
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.pubnub.api.PNConfiguration;
import com.pubnub.api.PubNub;
import com.pubnub.api.PubNubError;
import com.pubnub.api.PubNubException;
import com.pubnub.api.builder.PubNubErrorBuilder;
import com.pubnub.api.callbacks.SubscribeCallback;
import com.pubnub.api.crypto.CryptoModule;
import com.pubnub.api.enums.PNOperationType;
import com.pubnub.api.integration.util.BaseIntegrationTest;
import com.pubnub.api.models.consumer.PNStatus;
Expand All @@ -20,6 +24,7 @@
import com.pubnub.api.models.consumer.pubsub.message_actions.PNMessageActionResult;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsEqual;
import org.jetbrains.annotations.NotNull;
import org.json.JSONArray;
Expand All @@ -35,6 +40,8 @@
import static com.pubnub.api.integration.util.Utils.randomChannel;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;

public class PublishIntegrationTests extends BaseIntegrationTest {

Expand Down Expand Up @@ -147,6 +154,7 @@ public void testReceiveMessage() {
public void file(@NotNull PubNub pubnub, @NotNull PNFileEventResult pnFileEventResult) {

}

@Override
public void status(@NotNull PubNub pubnub, @NotNull PNStatus status) {
if (status.getOperation() == PNOperationType.PNSubscribeOperation) {
Expand Down Expand Up @@ -204,6 +212,99 @@ public void messageAction(@NotNull PubNub pubnub, @NotNull PNMessageActionResult
Awaitility.await().atMost(Durations.TEN_SECONDS).untilTrue(success);
}

@Test
public void testReceiveUnencryptedMessageWithCryptoDoesntCrash() {
final AtomicInteger success = new AtomicInteger(0);
final String expectedChannel = randomChannel();
final JsonObject messagePayload = generateMessage(pubNub);

final PubNub sender = getPubNub();
PNConfiguration config = getBasicPnConfiguration();
config.setCryptoModule(CryptoModule.createAesCbcCryptoModule("test", false));
final PubNub observer = getPubNub(config);

observer.addListener(new SubscribeCallback() {
@Override
public void file(@NotNull PubNub pubnub, @NotNull PNFileEventResult pnFileEventResult) {

}

@Override
public void status(@NotNull PubNub pubnub, @NotNull PNStatus status) {
if (status.getOperation() == PNOperationType.PNSubscribeOperation) {
assert status.getAffectedChannels() != null;
if (status.getAffectedChannels().contains(expectedChannel)) {
// send an unencrypted message first to try to crash the SubscribeMessageProcessor
sender.publish()
.message(messagePayload)
.channel(expectedChannel)
.async((result, status1) -> {
assertFalse(status1.isError());

// then verify if the subscribe loop is still working by sending an encrypted message
observer.publish()
.message(messagePayload)
.channel(expectedChannel)
.async((result2, status2) -> assertFalse(status2.isError()));
});

}
}
}

@Override
public void message(@NotNull PubNub pubnub, @NotNull PNMessageResult message) {
if (success.get() == 0) {
assertEquals(expectedChannel, message.getChannel());
assertEquals(sender.getConfiguration().getUserId().getValue(), message.getPublisher());
assertEquals(PubNubErrorBuilder.PNERROBJ_PNERR_CRYPTO_IS_CONFIGURED_BUT_MESSAGE_IS_NOT_ENCRYPTED, message.getError());
assertEquals(messagePayload, message.getMessage());
success.incrementAndGet();
} else if (success.get() == 1) {
assertEquals(expectedChannel, message.getChannel());
assertEquals(observer.getConfiguration().getUserId().getValue(), message.getPublisher());
assertEquals(messagePayload, message.getMessage());
assertNull(message.getError());
success.incrementAndGet();
}
}

@Override
public void presence(@NotNull PubNub pubnub, @NotNull PNPresenceEventResult presence) {

}

@Override
public void signal(@NotNull PubNub pubNub, @NotNull PNSignalResult pnSignalResult) {

}

@Override
public void uuid(@NotNull final PubNub pubnub, @NotNull final PNUUIDMetadataResult pnUUIDMetadataResult) {

}

@Override
public void channel(@NotNull final PubNub pubnub, @NotNull final PNChannelMetadataResult pnChannelMetadataResult) {

}

@Override
public void membership(@NotNull PubNub pubNub, @NotNull PNMembershipResult pnMembershipResult) {

}

@Override
public void messageAction(@NotNull PubNub pubnub, @NotNull PNMessageActionResult pnActionResult) {

}
});

subscribeToChannel(observer, expectedChannel);

Awaitility.await().atMost(Durations.TEN_SECONDS).untilAtomic(success, Matchers.greaterThanOrEqualTo(2));
}

@Test
public void testOrgJsonObject_Get_History() throws PubNubException, JSONException {
final String channel = random();
Expand Down Expand Up @@ -272,6 +373,7 @@ public void testOrgJsonObject_Get_Receive() throws PubNubException {
public void file(@NotNull PubNub pubnub, @NotNull PNFileEventResult pnFileEventResult) {

}

@Override
public void status(@NotNull PubNub pubnub, @NotNull PNStatus pnStatus) {

Expand Down Expand Up @@ -346,6 +448,7 @@ public void testOrgJsonObject_Post_Receive() throws PubNubException {
public void file(@NotNull PubNub pubnub, @NotNull PNFileEventResult pnFileEventResult) {

}

@Override
public void status(@NotNull PubNub pubnub, @NotNull PNStatus pnStatus) {

Expand Down Expand Up @@ -484,6 +587,7 @@ public void testOrgJsonArray_Get_Receive() throws PubNubException {
public void file(@NotNull PubNub pubnub, @NotNull PNFileEventResult pnFileEventResult) {

}

@Override
public void status(@NotNull PubNub pubnub, @NotNull PNStatus pnStatus) {

Expand Down Expand Up @@ -563,6 +667,7 @@ public void testOrgJsonArray_Post_Receive() throws PubNubException {
public void file(@NotNull PubNub pubnub, @NotNull PNFileEventResult pnFileEventResult) {

}

@Override
public void status(@NotNull PubNub pubnub, @NotNull PNStatus pnStatus) {

Expand Down Expand Up @@ -651,6 +756,7 @@ public void testOrgJson_Combo() throws PubNubException, JSONException {
public void file(@NotNull PubNub pubnub, @NotNull PNFileEventResult pnFileEventResult) {

}

@Override
public void status(@NotNull PubNub pubnub, @NotNull PNStatus pnStatus) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,25 @@

import com.google.gson.JsonElement;

import com.pubnub.api.PubNubError;
import lombok.Getter;
import lombok.ToString;
import org.jetbrains.annotations.Nullable;

@Getter
@ToString(callSuper = true)
public class PNMessageResult extends MessageResult {

@Nullable
private final PubNubError error;

public PNMessageResult(BasePubSubResult basePubSubResult, JsonElement message) {
this(basePubSubResult, message, null);
}

public PNMessageResult(BasePubSubResult basePubSubResult, JsonElement message, @Nullable PubNubError error) {
super(basePubSubResult, message);
this.error = error;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import com.google.gson.JsonObject;
import com.pubnub.api.PNConfiguration;
import com.pubnub.api.PubNub;
import com.pubnub.api.PubNubError;
import com.pubnub.api.PubNubException;
import com.pubnub.api.PubNubUtil;
import com.pubnub.api.builder.PubNubErrorBuilder;
import com.pubnub.api.crypto.CryptoModule;
import com.pubnub.api.crypto.CryptoModuleKt;
import com.pubnub.api.managers.DuplicationManager;
Expand Down Expand Up @@ -110,7 +112,18 @@ PNEvent processIncomingPayload(SubscribeMessage message) throws PubNubException

return pnPresenceEventResult;
} else {
JsonElement extractedMessage = processMessage(message);
JsonElement extractedMessage;
PubNubError error = null;
try {
extractedMessage = processMessage(message);
} catch (PubNubException e) {
if (e.getPubnubError() == PubNubErrorBuilder.PNERROBJ_PNERR_CRYPTO_IS_CONFIGURED_BUT_MESSAGE_IS_NOT_ENCRYPTED) {
extractedMessage = message.getPayload();
error = e.getPubnubError();
} else {
throw e;
}
}

if (extractedMessage == null) {
log.debug("unable to parse payload on #processIncomingMessages");
Expand All @@ -129,9 +142,9 @@ PNEvent processIncomingPayload(SubscribeMessage message) throws PubNubException
.build();

if (message.getType() == null) {
return new PNMessageResult(result, extractedMessage);
return new PNMessageResult(result, extractedMessage, error);
} else if (message.getType() == TYPE_MESSAGE) {
return new PNMessageResult(result, extractedMessage);
return new PNMessageResult(result, extractedMessage, error);
} else if (message.getType() == typeSignal) {
return new PNSignalResult(result, extractedMessage);
} else if (message.getType() == typeObject) {
Expand Down Expand Up @@ -215,14 +228,22 @@ private JsonElement processMessage(SubscribeMessage subscribeMessage) throws Pub
String outputText;
JsonElement outputObject;

if (mapper.isJsonObject(input) && mapper.hasField(input, PN_OTHER)) {
inputText = mapper.elementToString(input, PN_OTHER);
if (mapper.isJsonObject(input)) {
if (mapper.hasField(input, PN_OTHER)) {
inputText = mapper.elementToString(input, PN_OTHER);
} else {
throw logAndGetDecryptionException();
}
} else {
inputText = mapper.elementToString(input);
}

outputText = CryptoModuleKt.decryptString(cryptoModule, inputText);
outputObject = mapper.fromJson(outputText, JsonElement.class);
try {
outputText = CryptoModuleKt.decryptString(cryptoModule, inputText);
outputObject = mapper.fromJson(outputText, JsonElement.class);
} catch (Exception e) {
throw logAndGetDecryptionException();
}

// inject the decoded response into the payload
if (mapper.isJsonObject(input) && mapper.hasField(input, PN_OTHER)) {
Expand All @@ -234,6 +255,12 @@ private JsonElement processMessage(SubscribeMessage subscribeMessage) throws Pub
return outputObject;
}

private PubNubException logAndGetDecryptionException() {
PubNubError error = PubNubErrorBuilder.PNERROBJ_PNERR_CRYPTO_IS_CONFIGURED_BUT_MESSAGE_IS_NOT_ENCRYPTED;
log.warn(error.getMessage());
return new PubNubException(error.getMessage(), error, null, null, 0, null, null);
}

@SuppressWarnings("RegExpRedundantEscape")
private final String formatFriendlyGetFileUrl = "%s" + FilesService.GET_FILE_URL.replaceAll("\\{.*?\\}", "%s");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ private void takeMessage() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.trace("take message interrupted", e);
} catch (Exception e) { // don't crash the thread on malformed messages
log.warn("Unexpected message processing error", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void beforeEach() throws IOException, PubNubException {

@After
public void afterEach() {
pubnub.destroy();
pubnub.forceDestroy();
pubnub = null;
wireMockRule.stop();
}
Expand Down
Loading

0 comments on commit 892eba9

Please sign in to comment.