Skip to content

Commit

Permalink
Merge pull request #66 from ably/feature/ensure-room-attached-before-…
Browse files Browse the repository at this point in the history
…operation

[ECO-5137] Ensure room is ATTACHED before performing operation
  • Loading branch information
sacOO7 authored Nov 28, 2024
2 parents d178c6e + 8010bf4 commit 16ead0c
Show file tree
Hide file tree
Showing 15 changed files with 386 additions and 138 deletions.
5 changes: 5 additions & 0 deletions chat-android/src/main/java/com/ably/chat/ErrorCodes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ enum class ErrorCode(val code: Int) {
*/
RoomReleasedBeforeOperationCompleted(102_106),

/**
* Room is not in valid state to perform any realtime operation.
*/
RoomInInvalidState(102_107),

/**
* Cannot perform operation because the previous operation failed.
*/
Expand Down
16 changes: 8 additions & 8 deletions chat-android/src/main/java/com/ably/chat/Occupancy.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package com.ably.chat

import com.google.gson.JsonObject
import com.google.gson.JsonPrimitive
import io.ably.lib.realtime.AblyRealtime
import io.ably.lib.realtime.Channel
import java.util.concurrent.CopyOnWriteArrayList
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -73,20 +72,21 @@ data class OccupancyEvent(
)

internal class DefaultOccupancy(
realtimeChannels: AblyRealtime.Channels,
private val chatApi: ChatApi,
private val roomId: String,
private val logger: Logger,
) : Occupancy, ContributesToRoomLifecycleImpl(logger) {
private val room: DefaultRoom,
) : Occupancy, ContributesToRoomLifecycleImpl(room.roomLogger) {

override val featureName: String = "occupancy"

override val attachmentErrorCode: ErrorCode = ErrorCode.OccupancyAttachmentFailed

override val detachmentErrorCode: ErrorCode = ErrorCode.OccupancyDetachmentFailed

private val realtimeChannels = room.realtimeClient.channels

private val logger = room.roomLogger.withContext(tag = "Occupancy")

// (CHA-O1)
private val messagesChannelName = "$roomId::\$chat::\$chatMessages"
private val messagesChannelName = "${room.roomId}::\$chat::\$chatMessages"

override val channel: Channel = realtimeChannels.get(
messagesChannelName,
Expand Down Expand Up @@ -142,7 +142,7 @@ internal class DefaultOccupancy(
// (CHA-O3)
override suspend fun get(): OccupancyEvent {
logger.trace("Occupancy.get()")
return chatApi.getOccupancy(roomId)
return room.chatApi.getOccupancy(room.roomId)
}

override fun release() {
Expand Down
24 changes: 15 additions & 9 deletions chat-android/src/main/java/com/ably/chat/Presence.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import com.google.gson.JsonElement
import com.google.gson.JsonObject
import io.ably.lib.realtime.Channel
import io.ably.lib.types.PresenceMessage
import io.ably.lib.realtime.Presence as PubSubPresence
import io.ably.lib.realtime.Presence.PresenceListener as PubSubPresenceListener

typealias PresenceData = JsonElement
Expand Down Expand Up @@ -134,19 +133,23 @@ data class PresenceEvent(
)

internal class DefaultPresence(
private val clientId: String,
override val channel: Channel,
private val presence: PubSubPresence,
private val logger: Logger,
) : Presence, ContributesToRoomLifecycleImpl(logger) {
private val room: DefaultRoom,
) : Presence, ContributesToRoomLifecycleImpl(room.roomLogger) {

override val featureName = "presence"

override val attachmentErrorCode: ErrorCode = ErrorCode.PresenceAttachmentFailed

override val detachmentErrorCode: ErrorCode = ErrorCode.PresenceDetachmentFailed

override val channel: Channel = room.messages.channel

private val logger = room.roomLogger.withContext(tag = "Presence")

private val presence = channel.presence

override suspend fun get(waitForSync: Boolean, clientId: String?, connectionId: String?): List<PresenceMember> {
room.ensureAttached() // CHA-PR6c, CHA-PR6h
return presence.getCoroutine(waitForSync, clientId, connectionId).map { user ->
PresenceMember(
clientId = user.clientId,
Expand All @@ -160,15 +163,18 @@ internal class DefaultPresence(
override suspend fun isUserPresent(clientId: String): Boolean = presence.getCoroutine(clientId = clientId).isNotEmpty()

override suspend fun enter(data: PresenceData?) {
presence.enterClientCoroutine(clientId, wrapInUserCustomData(data))
room.ensureAttached() // CHA-PR3d, CHA-PR3h
presence.enterClientCoroutine(room.clientId, wrapInUserCustomData(data))
}

override suspend fun update(data: PresenceData?) {
presence.updateClientCoroutine(clientId, wrapInUserCustomData(data))
room.ensureAttached() // CHA-PR10d, CHA-PR10h
presence.updateClientCoroutine(room.clientId, wrapInUserCustomData(data))
}

override suspend fun leave(data: PresenceData?) {
presence.leaveClientCoroutine(clientId, wrapInUserCustomData(data))
room.ensureAttached()
presence.leaveClientCoroutine(room.clientId, wrapInUserCustomData(data))
}

override fun subscribe(listener: Presence.Listener): Subscription {
Expand Down
75 changes: 46 additions & 29 deletions chat-android/src/main/java/com/ably/chat/Room.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
package com.ably.chat

import io.ably.lib.types.ErrorInfo
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch

/**
* Represents a chat room.
Expand Down Expand Up @@ -110,12 +112,12 @@ interface Room {
internal class DefaultRoom(
override val roomId: String,
override val options: RoomOptions,
private val realtimeClient: RealtimeClient,
chatApi: ChatApi,
clientId: String,
internal val realtimeClient: RealtimeClient,
internal val chatApi: ChatApi,
internal val clientId: String,
logger: Logger,
) : Room {
private val roomLogger = logger.withContext("Room", mapOf("roomId" to roomId))
internal val roomLogger = logger.withContext("Room", mapOf("roomId" to roomId))

/**
* RoomScope is a crucial part of the Room lifecycle. It manages sequential and atomic operations.
Expand Down Expand Up @@ -184,46 +186,25 @@ internal class DefaultRoom(
val roomFeatures = mutableListOf<ContributesToRoomLifecycle>(messages)

options.presence?.let {
val presenceContributor = DefaultPresence(
clientId = clientId,
channel = messages.channel,
presence = messages.channel.presence,
logger = roomLogger.withContext(tag = "Presence"),
)
val presenceContributor = DefaultPresence(room = this)
roomFeatures.add(presenceContributor)
_presence = presenceContributor
}

options.typing?.let {
val typingContributor = DefaultTyping(
roomId = roomId,
realtimeClient = realtimeClient,
clientId = clientId,
options = options.typing,
logger = roomLogger.withContext(tag = "Typing"),
)
val typingContributor = DefaultTyping(room = this)
roomFeatures.add(typingContributor)
_typing = typingContributor
}

options.reactions?.let {
val reactionsContributor = DefaultRoomReactions(
roomId = roomId,
clientId = clientId,
realtimeChannels = realtimeClient.channels,
logger = roomLogger.withContext(tag = "Reactions"),
)
val reactionsContributor = DefaultRoomReactions(room = this)
roomFeatures.add(reactionsContributor)
_reactions = reactionsContributor
}

options.occupancy?.let {
val occupancyContributor = DefaultOccupancy(
roomId = roomId,
realtimeChannels = realtimeClient.channels,
chatApi = chatApi,
logger = roomLogger.withContext(tag = "Occupancy"),
)
val occupancyContributor = DefaultOccupancy(room = this)
roomFeatures.add(occupancyContributor)
_occupancy = occupancyContributor
}
Expand Down Expand Up @@ -253,4 +234,40 @@ internal class DefaultRoom(
internal suspend fun release() {
lifecycleManager.release()
}

/**
* Ensures that the room is attached before performing any realtime room operation.
* @throws roomInvalidStateException if room is not in ATTACHING/ATTACHED state.
* Spec: CHA-RL9
*/
internal suspend fun ensureAttached() {
// CHA-PR3d, CHA-PR10d, CHA-PR6c, CHA-PR6c
if (statusLifecycle.status == RoomStatus.Attached) {
return
}
if (statusLifecycle.status == RoomStatus.Attaching) { // CHA-RL9
val attachDeferred = CompletableDeferred<Unit>()
roomScope.launch {
when (statusLifecycle.status) {
RoomStatus.Attached -> attachDeferred.complete(Unit)
RoomStatus.Attaching -> statusLifecycle.onChangeOnce {
if (it.current == RoomStatus.Attached) {
attachDeferred.complete(Unit)
} else {
val exception = roomInvalidStateException(roomId, statusLifecycle.status, HttpStatusCode.InternalServerError)
attachDeferred.completeExceptionally(exception)
}
}
else -> {
val exception = roomInvalidStateException(roomId, statusLifecycle.status, HttpStatusCode.InternalServerError)
attachDeferred.completeExceptionally(exception)
}
}
}
attachDeferred.await()
return
}
// CHA-PR3h, CHA-PR10h, CHA-PR6h, CHA-T2g
throw roomInvalidStateException(roomId, statusLifecycle.status, HttpStatusCode.BadRequest)
}
}
19 changes: 9 additions & 10 deletions chat-android/src/main/java/com/ably/chat/RoomReactions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package com.ably.chat

import com.google.gson.JsonObject
import io.ably.lib.realtime.AblyRealtime
import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo
import io.ably.lib.types.MessageExtras
Expand Down Expand Up @@ -104,22 +103,21 @@ data class SendReactionParams(
)

internal class DefaultRoomReactions(
roomId: String,
private val clientId: String,
private val realtimeChannels: AblyRealtime.Channels,
private val logger: Logger,
) : RoomReactions, ContributesToRoomLifecycleImpl(logger) {
private val room: DefaultRoom,
) : RoomReactions, ContributesToRoomLifecycleImpl(room.roomLogger) {

override val featureName = "reactions"

private val roomReactionsChannelName = "$roomId::\$chat::\$reactions"
private val roomReactionsChannelName = "${room.roomId}::\$chat::\$reactions"

override val channel: AblyRealtimeChannel = realtimeChannels.get(roomReactionsChannelName, ChatChannelOptions())
override val channel: AblyRealtimeChannel = room.realtimeClient.channels.get(roomReactionsChannelName, ChatChannelOptions())

override val attachmentErrorCode: ErrorCode = ErrorCode.ReactionsAttachmentFailed

override val detachmentErrorCode: ErrorCode = ErrorCode.ReactionsDetachmentFailed

private val logger = room.roomLogger.withContext(tag = "Reactions")

// (CHA-ER3) Ephemeral room reactions are sent to Ably via the Realtime connection via a send method.
// (CHA-ER3a) Reactions are sent on the channel using a message in a particular format - see spec for format.
override suspend fun send(params: SendReactionParams) {
Expand All @@ -137,6 +135,7 @@ internal class DefaultRoomReactions(
)
}
}
room.ensureAttached() // TODO - This check might be removed in the future due to core spec change
channel.publishCoroutine(pubSubMessage)
}

Expand All @@ -154,7 +153,7 @@ internal class DefaultRoomReactions(
clientId = pubSubMessage.clientId,
metadata = data.get("metadata")?.toMap() ?: mapOf(),
headers = pubSubMessage.extras?.asJsonObject()?.get("headers")?.toMap() ?: mapOf(),
isSelf = pubSubMessage.clientId == clientId,
isSelf = pubSubMessage.clientId == room.clientId,
)
listener.onReaction(reaction)
}
Expand All @@ -163,6 +162,6 @@ internal class DefaultRoomReactions(
}

override fun release() {
realtimeChannels.release(channel.name)
room.realtimeClient.channels.release(channel.name)
}
}
2 changes: 2 additions & 0 deletions chat-android/src/main/java/com/ably/chat/RoomStatus.kt
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,12 @@ internal class RoomStatusEventEmitter(logger: Logger) : EventEmitter<RoomStatus,

internal class DefaultRoomLifecycle(logger: Logger) : InternalRoomLifecycle {

@Volatile
private var _status = RoomStatus.Initialized // CHA-RS3
override val status: RoomStatus
get() = _status

@Volatile
private var _error: ErrorInfo? = null
override val error: ErrorInfo?
get() = _error
Expand Down
25 changes: 13 additions & 12 deletions chat-android/src/main/java/com/ably/chat/Typing.kt
Original file line number Diff line number Diff line change
Expand Up @@ -91,28 +91,26 @@ interface Typing : EmitsDiscontinuities {
data class TypingEvent(val currentlyTyping: Set<String>)

internal class DefaultTyping(
roomId: String,
private val realtimeClient: RealtimeClient,
private val clientId: String,
private val options: TypingOptions?,
private val logger: Logger,
) : Typing, ContributesToRoomLifecycleImpl(logger) {
private val typingIndicatorsChannelName = "$roomId::\$chat::\$typingIndicators"
private val room: DefaultRoom,
) : Typing, ContributesToRoomLifecycleImpl(room.roomLogger) {
private val typingIndicatorsChannelName = "${room.roomId}::\$chat::\$typingIndicators"

override val featureName = "typing"

override val attachmentErrorCode: ErrorCode = ErrorCode.TypingAttachmentFailed

override val detachmentErrorCode: ErrorCode = ErrorCode.TypingDetachmentFailed

private val logger = room.roomLogger.withContext(tag = "Typing")

private val typingScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1) + SupervisorJob())

private val eventBus = MutableSharedFlow<Unit>(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)

override val channel: Channel = realtimeClient.channels.get(typingIndicatorsChannelName, ChatChannelOptions())
override val channel: Channel = room.realtimeClient.channels.get(typingIndicatorsChannelName, ChatChannelOptions())

private var typingJob: Job? = null

Expand Down Expand Up @@ -155,6 +153,7 @@ internal class DefaultTyping(

override suspend fun get(): Set<String> {
logger.trace("DefaultTyping.get()")
room.ensureAttached() // CHA-T2c, CHA-T2g
return channel.presence.getCoroutine().map { it.clientId }.toSet()
}

Expand All @@ -169,7 +168,8 @@ internal class DefaultTyping(
startTypingTimer()
} else {
startTypingTimer()
channel.presence.enterClientCoroutine(clientId)
room.ensureAttached()
channel.presence.enterClientCoroutine(room.clientId)
}
}.join()
}
Expand All @@ -178,18 +178,19 @@ internal class DefaultTyping(
logger.trace("DefaultTyping.stop()")
typingScope.launch {
typingJob?.cancel()
channel.presence.leaveClientCoroutine(clientId)
room.ensureAttached()
channel.presence.leaveClientCoroutine(room.clientId)
}.join()
}

override fun release() {
presenceSubscription.unsubscribe()
typingScope.cancel()
realtimeClient.channels.release(channel.name)
room.realtimeClient.channels.release(channel.name)
}

private fun startTypingTimer() {
val timeout = options?.timeoutMs ?: throw AblyException.fromErrorInfo(
val timeout = room.options.typing?.timeoutMs ?: throw AblyException.fromErrorInfo(
ErrorInfo(
"Typing options hasn't been initialized",
ErrorCode.BadRequest.code,
Expand Down
Loading

0 comments on commit 16ead0c

Please sign in to comment.