Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-5137] Ensure room is ATTACHED before performing operation #66

Merged
merged 2 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions chat-android/src/main/java/com/ably/chat/ErrorCodes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ enum class ErrorCode(val code: Int) {
*/
RoomReleasedBeforeOperationCompleted(102_106),

/**
* Room is not in valid state to perform any realtime operation.
*/
RoomInInvalidState(102_107),

/**
* Cannot perform operation because the previous operation failed.
*/
Expand Down
16 changes: 8 additions & 8 deletions chat-android/src/main/java/com/ably/chat/Occupancy.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package com.ably.chat

import com.google.gson.JsonObject
import com.google.gson.JsonPrimitive
import io.ably.lib.realtime.AblyRealtime
import io.ably.lib.realtime.Channel
import java.util.concurrent.CopyOnWriteArrayList
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -73,20 +72,21 @@ data class OccupancyEvent(
)

internal class DefaultOccupancy(
realtimeChannels: AblyRealtime.Channels,
private val chatApi: ChatApi,
private val roomId: String,
private val logger: Logger,
) : Occupancy, ContributesToRoomLifecycleImpl(logger) {
private val room: DefaultRoom,
) : Occupancy, ContributesToRoomLifecycleImpl(room.roomLogger) {

override val featureName: String = "occupancy"

override val attachmentErrorCode: ErrorCode = ErrorCode.OccupancyAttachmentFailed

override val detachmentErrorCode: ErrorCode = ErrorCode.OccupancyDetachmentFailed

private val realtimeChannels = room.realtimeClient.channels

private val logger = room.roomLogger.withContext(tag = "Occupancy")

// (CHA-O1)
private val messagesChannelName = "$roomId::\$chat::\$chatMessages"
private val messagesChannelName = "${room.roomId}::\$chat::\$chatMessages"

override val channel: Channel = realtimeChannels.get(
messagesChannelName,
Expand Down Expand Up @@ -142,7 +142,7 @@ internal class DefaultOccupancy(
// (CHA-O3)
override suspend fun get(): OccupancyEvent {
logger.trace("Occupancy.get()")
return chatApi.getOccupancy(roomId)
return room.chatApi.getOccupancy(room.roomId)
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
}

override fun release() {
Expand Down
24 changes: 15 additions & 9 deletions chat-android/src/main/java/com/ably/chat/Presence.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import com.google.gson.JsonElement
import com.google.gson.JsonObject
import io.ably.lib.realtime.Channel
import io.ably.lib.types.PresenceMessage
import io.ably.lib.realtime.Presence as PubSubPresence
import io.ably.lib.realtime.Presence.PresenceListener as PubSubPresenceListener

typealias PresenceData = JsonElement
Expand Down Expand Up @@ -134,19 +133,23 @@ data class PresenceEvent(
)

internal class DefaultPresence(
private val clientId: String,
override val channel: Channel,
private val presence: PubSubPresence,
private val logger: Logger,
) : Presence, ContributesToRoomLifecycleImpl(logger) {
private val room: DefaultRoom,
) : Presence, ContributesToRoomLifecycleImpl(room.roomLogger) {

override val featureName = "presence"

override val attachmentErrorCode: ErrorCode = ErrorCode.PresenceAttachmentFailed

override val detachmentErrorCode: ErrorCode = ErrorCode.PresenceDetachmentFailed

override val channel: Channel = room.messages.channel

private val logger = room.roomLogger.withContext(tag = "Presence")

private val presence = channel.presence

override suspend fun get(waitForSync: Boolean, clientId: String?, connectionId: String?): List<PresenceMember> {
room.ensureAttached() // CHA-PR6c, CHA-PR6h
return presence.getCoroutine(waitForSync, clientId, connectionId).map { user ->
PresenceMember(
clientId = user.clientId,
Expand All @@ -160,15 +163,18 @@ internal class DefaultPresence(
override suspend fun isUserPresent(clientId: String): Boolean = presence.getCoroutine(clientId = clientId).isNotEmpty()

override suspend fun enter(data: PresenceData?) {
presence.enterClientCoroutine(clientId, wrapInUserCustomData(data))
room.ensureAttached() // CHA-PR3d, CHA-PR3h
presence.enterClientCoroutine(room.clientId, wrapInUserCustomData(data))
}

override suspend fun update(data: PresenceData?) {
presence.updateClientCoroutine(clientId, wrapInUserCustomData(data))
room.ensureAttached() // CHA-PR10d, CHA-PR10h
presence.updateClientCoroutine(room.clientId, wrapInUserCustomData(data))
}

override suspend fun leave(data: PresenceData?) {
presence.leaveClientCoroutine(clientId, wrapInUserCustomData(data))
room.ensureAttached()
presence.leaveClientCoroutine(room.clientId, wrapInUserCustomData(data))
}

override fun subscribe(listener: Presence.Listener): Subscription {
Expand Down
75 changes: 46 additions & 29 deletions chat-android/src/main/java/com/ably/chat/Room.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
package com.ably.chat

import io.ably.lib.types.ErrorInfo
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch

/**
* Represents a chat room.
Expand Down Expand Up @@ -110,12 +112,12 @@ interface Room {
internal class DefaultRoom(
override val roomId: String,
override val options: RoomOptions,
private val realtimeClient: RealtimeClient,
chatApi: ChatApi,
clientId: String,
internal val realtimeClient: RealtimeClient,
internal val chatApi: ChatApi,
internal val clientId: String,
logger: Logger,
) : Room {
private val roomLogger = logger.withContext("Room", mapOf("roomId" to roomId))
internal val roomLogger = logger.withContext("Room", mapOf("roomId" to roomId))

/**
* RoomScope is a crucial part of the Room lifecycle. It manages sequential and atomic operations.
Expand Down Expand Up @@ -184,46 +186,25 @@ internal class DefaultRoom(
val roomFeatures = mutableListOf<ContributesToRoomLifecycle>(messages)

options.presence?.let {
val presenceContributor = DefaultPresence(
clientId = clientId,
channel = messages.channel,
presence = messages.channel.presence,
logger = roomLogger.withContext(tag = "Presence"),
)
val presenceContributor = DefaultPresence(room = this)
roomFeatures.add(presenceContributor)
_presence = presenceContributor
}

options.typing?.let {
val typingContributor = DefaultTyping(
roomId = roomId,
realtimeClient = realtimeClient,
clientId = clientId,
options = options.typing,
logger = roomLogger.withContext(tag = "Typing"),
)
val typingContributor = DefaultTyping(room = this)
roomFeatures.add(typingContributor)
_typing = typingContributor
}

options.reactions?.let {
val reactionsContributor = DefaultRoomReactions(
roomId = roomId,
clientId = clientId,
realtimeChannels = realtimeClient.channels,
logger = roomLogger.withContext(tag = "Reactions"),
)
val reactionsContributor = DefaultRoomReactions(room = this)
roomFeatures.add(reactionsContributor)
_reactions = reactionsContributor
}

options.occupancy?.let {
val occupancyContributor = DefaultOccupancy(
roomId = roomId,
realtimeChannels = realtimeClient.channels,
chatApi = chatApi,
logger = roomLogger.withContext(tag = "Occupancy"),
)
val occupancyContributor = DefaultOccupancy(room = this)
roomFeatures.add(occupancyContributor)
_occupancy = occupancyContributor
}
Expand Down Expand Up @@ -253,4 +234,40 @@ internal class DefaultRoom(
internal suspend fun release() {
lifecycleManager.release()
}

/**
* Ensures that the room is attached before performing any realtime room operation.
* @throws roomInvalidStateException if room is not in ATTACHING/ATTACHED state.
* Spec: CHA-RL9
*/
internal suspend fun ensureAttached() {
// CHA-PR3d, CHA-PR10d, CHA-PR6c, CHA-PR6c
if (statusLifecycle.status == RoomStatus.Attached) {
return
}
if (statusLifecycle.status == RoomStatus.Attaching) { // CHA-RL9
val attachDeferred = CompletableDeferred<Unit>()
roomScope.launch {
when (statusLifecycle.status) {
RoomStatus.Attached -> attachDeferred.complete(Unit)
RoomStatus.Attaching -> statusLifecycle.onChangeOnce {
if (it.current == RoomStatus.Attached) {
attachDeferred.complete(Unit)
} else {
val exception = roomInvalidStateException(roomId, statusLifecycle.status, HttpStatusCode.InternalServerError)
attachDeferred.completeExceptionally(exception)
}
}
else -> {
val exception = roomInvalidStateException(roomId, statusLifecycle.status, HttpStatusCode.InternalServerError)
attachDeferred.completeExceptionally(exception)
}
}
}
attachDeferred.await()
return
}
// CHA-PR3h, CHA-PR10h, CHA-PR6h, CHA-T2g
throw roomInvalidStateException(roomId, statusLifecycle.status, HttpStatusCode.BadRequest)
}
}
19 changes: 9 additions & 10 deletions chat-android/src/main/java/com/ably/chat/RoomReactions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package com.ably.chat

import com.google.gson.JsonObject
import io.ably.lib.realtime.AblyRealtime
import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo
import io.ably.lib.types.MessageExtras
Expand Down Expand Up @@ -104,22 +103,21 @@ data class SendReactionParams(
)

internal class DefaultRoomReactions(
roomId: String,
private val clientId: String,
private val realtimeChannels: AblyRealtime.Channels,
private val logger: Logger,
) : RoomReactions, ContributesToRoomLifecycleImpl(logger) {
private val room: DefaultRoom,
) : RoomReactions, ContributesToRoomLifecycleImpl(room.roomLogger) {

override val featureName = "reactions"

private val roomReactionsChannelName = "$roomId::\$chat::\$reactions"
private val roomReactionsChannelName = "${room.roomId}::\$chat::\$reactions"

override val channel: AblyRealtimeChannel = realtimeChannels.get(roomReactionsChannelName, ChatChannelOptions())
override val channel: AblyRealtimeChannel = room.realtimeClient.channels.get(roomReactionsChannelName, ChatChannelOptions())

override val attachmentErrorCode: ErrorCode = ErrorCode.ReactionsAttachmentFailed

override val detachmentErrorCode: ErrorCode = ErrorCode.ReactionsDetachmentFailed

private val logger = room.roomLogger.withContext(tag = "Reactions")

// (CHA-ER3) Ephemeral room reactions are sent to Ably via the Realtime connection via a send method.
// (CHA-ER3a) Reactions are sent on the channel using a message in a particular format - see spec for format.
override suspend fun send(params: SendReactionParams) {
Expand All @@ -137,6 +135,7 @@ internal class DefaultRoomReactions(
)
}
}
room.ensureAttached() // TODO - This check might be removed in the future due to core spec change
channel.publishCoroutine(pubSubMessage)
}

Expand All @@ -154,7 +153,7 @@ internal class DefaultRoomReactions(
clientId = pubSubMessage.clientId,
metadata = data.get("metadata")?.toMap() ?: mapOf(),
headers = pubSubMessage.extras?.asJsonObject()?.get("headers")?.toMap() ?: mapOf(),
isSelf = pubSubMessage.clientId == clientId,
isSelf = pubSubMessage.clientId == room.clientId,
)
listener.onReaction(reaction)
}
Expand All @@ -163,6 +162,6 @@ internal class DefaultRoomReactions(
}

override fun release() {
realtimeChannels.release(channel.name)
room.realtimeClient.channels.release(channel.name)
}
}
2 changes: 2 additions & 0 deletions chat-android/src/main/java/com/ably/chat/RoomStatus.kt
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,12 @@ internal class RoomStatusEventEmitter(logger: Logger) : EventEmitter<RoomStatus,

internal class DefaultRoomLifecycle(logger: Logger) : InternalRoomLifecycle {

@Volatile
private var _status = RoomStatus.Initialized // CHA-RS3
override val status: RoomStatus
get() = _status

@Volatile
private var _error: ErrorInfo? = null
override val error: ErrorInfo?
get() = _error
Expand Down
25 changes: 13 additions & 12 deletions chat-android/src/main/java/com/ably/chat/Typing.kt
Original file line number Diff line number Diff line change
Expand Up @@ -91,28 +91,26 @@ interface Typing : EmitsDiscontinuities {
data class TypingEvent(val currentlyTyping: Set<String>)

internal class DefaultTyping(
roomId: String,
private val realtimeClient: RealtimeClient,
private val clientId: String,
private val options: TypingOptions?,
private val logger: Logger,
) : Typing, ContributesToRoomLifecycleImpl(logger) {
private val typingIndicatorsChannelName = "$roomId::\$chat::\$typingIndicators"
private val room: DefaultRoom,
) : Typing, ContributesToRoomLifecycleImpl(room.roomLogger) {
private val typingIndicatorsChannelName = "${room.roomId}::\$chat::\$typingIndicators"

override val featureName = "typing"

override val attachmentErrorCode: ErrorCode = ErrorCode.TypingAttachmentFailed

override val detachmentErrorCode: ErrorCode = ErrorCode.TypingDetachmentFailed

private val logger = room.roomLogger.withContext(tag = "Typing")

private val typingScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1) + SupervisorJob())

private val eventBus = MutableSharedFlow<Unit>(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)

override val channel: Channel = realtimeClient.channels.get(typingIndicatorsChannelName, ChatChannelOptions())
override val channel: Channel = room.realtimeClient.channels.get(typingIndicatorsChannelName, ChatChannelOptions())

private var typingJob: Job? = null

Expand Down Expand Up @@ -155,6 +153,7 @@ internal class DefaultTyping(

override suspend fun get(): Set<String> {
logger.trace("DefaultTyping.get()")
room.ensureAttached() // CHA-T2c, CHA-T2g
return channel.presence.getCoroutine().map { it.clientId }.toSet()
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
}

Expand All @@ -169,7 +168,8 @@ internal class DefaultTyping(
startTypingTimer()
} else {
startTypingTimer()
channel.presence.enterClientCoroutine(clientId)
room.ensureAttached()
channel.presence.enterClientCoroutine(room.clientId)
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
}
}.join()
}
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -178,18 +178,19 @@ internal class DefaultTyping(
logger.trace("DefaultTyping.stop()")
typingScope.launch {
typingJob?.cancel()
channel.presence.leaveClientCoroutine(clientId)
room.ensureAttached()
channel.presence.leaveClientCoroutine(room.clientId)
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
}.join()
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved
}

override fun release() {
presenceSubscription.unsubscribe()
typingScope.cancel()
realtimeClient.channels.release(channel.name)
room.realtimeClient.channels.release(channel.name)
}

private fun startTypingTimer() {
val timeout = options?.timeoutMs ?: throw AblyException.fromErrorInfo(
val timeout = room.options.typing?.timeoutMs ?: throw AblyException.fromErrorInfo(
ErrorInfo(
"Typing options hasn't been initialized",
ErrorCode.BadRequest.code,
Expand Down
Loading