Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHA-RL3][ECO-5011] Room RELEASE with retry #53

14 changes: 7 additions & 7 deletions chat-android/src/main/java/com/ably/chat/Messages.kt
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,12 @@ internal class DefaultMessagesSubscription(

internal class DefaultMessages(
private val roomId: String,
realtimeChannels: AblyRealtime.Channels,
private val realtimeChannels: AblyRealtime.Channels,
private val chatApi: ChatApi,
) : Messages, ContributesToRoomLifecycleImpl(), ResolvedContributor {

override val featureName: String = "messages"

private var listeners: Map<Messages.Listener, DeferredValue<String>> = emptyMap()

private var channelStateListener: ChannelStateListener
Expand All @@ -239,8 +241,6 @@ internal class DefaultMessages(
*/
private val messagesChannelName = "$roomId::\$chat::\$chatMessages"

override val featureName: String = "messages"

override val channel = realtimeChannels.get(messagesChannelName, ChatChannelOptions())

override val contributor: ContributesToRoomLifecycle = this
Expand Down Expand Up @@ -303,10 +303,6 @@ internal class DefaultMessages(

override suspend fun send(params: SendMessageParams): Message = chatApi.sendMessage(roomId, params)

fun release() {
channel.off(channelStateListener)
}

/**
* Associate deferred channel serial value with the current channel's serial
*
Expand Down Expand Up @@ -371,6 +367,10 @@ internal class DefaultMessages(
}
}
}

override fun release() {
realtimeChannels.release(channel.name)
}
Comment on lines +371 to +373
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider enhancing release() implementation robustness

While the implementation correctly releases the channel, consider these improvements for better resource cleanup:

  1. Add error handling for the release operation
  2. Clean up existing listeners and state
  3. Consider making the operation idempotent

Here's a suggested implementation:

 override fun release() {
+    synchronized(lock) {
+        try {
+            realtimeChannels.release(channel.name)
+            listeners.clear()
+            channel.off(channelStateListener)
+        } catch (e: AblyException) {
+            // Log error but don't rethrow as we want to ensure cleanup continues
+            // logger.error("Error during channel release", e)
+        }
+    }
 }

Committable suggestion skipped: line range outside the PR's diff.

}

/**
Expand Down
4 changes: 4 additions & 0 deletions chat-android/src/main/java/com/ably/chat/Occupancy.kt
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,8 @@ internal class DefaultOccupancy(
override suspend fun get(): OccupancyEvent {
TODO("Not yet implemented")
}

override fun release() {
// No need to do anything, since it uses same channel as messages
}
}
4 changes: 4 additions & 0 deletions chat-android/src/main/java/com/ably/chat/Presence.kt
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,8 @@ internal class DefaultPresence(
override fun subscribe(listener: Presence.Listener): Subscription {
TODO("Not yet implemented")
}

override fun release() {
// No need to do anything, since it uses same channel as messages
}
}
9 changes: 7 additions & 2 deletions chat-android/src/main/java/com/ably/chat/Room.kt
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ interface Room {
* Detaches from the room to stop receiving events in realtime.
*/
suspend fun detach()

/**
* Releases the room, underlying channels are removed from the core SDK to prevent leakage.
*/
suspend fun release()
}

internal class DefaultRoom(
Expand Down Expand Up @@ -199,7 +204,7 @@ internal class DefaultRoom(
reactions.channel.detachCoroutine()
}

fun release() {
messages.release()
override suspend fun release() {
_lifecycleManager?.release()
Comment on lines +207 to +208
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add null safety and state validation.

The current implementation has potential issues with null safety and lacks state validation.

Consider implementing proper null checks and state validation:

 override suspend fun release() {
+    val lifecycleManager = _lifecycleManager ?: throw ErrorInfo(
+        "Room not properly initialized",
+        40000
+    )
+    
+    if (status == RoomStatus.Released) {
+        throw ErrorInfo("Room already released", 40001)
+    }
+    
-    _lifecycleManager?.release()
+    lifecycleManager.release()
 }

Committable suggestion skipped: line range outside the PR's diff.

}
}
101 changes: 100 additions & 1 deletion chat-android/src/main/java/com/ably/chat/RoomLifecycleManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ interface ContributesToRoomLifecycle : EmitsDiscontinuities, HandlesDiscontinuit
* @returns The error that should be used when the feature fails to detach.
*/
val detachmentErrorCode: ErrorCodes

/**
* Underlying Realtime feature channel is removed from the core SDK to prevent leakage.
*/
fun release()
Comment on lines +43 to +47
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Clarify documentation and provide default implementation for release()

Adding the release() method to the ContributesToRoomLifecycle interface may impact existing implementations since all subclasses must now implement this method. To prevent breaking changes, consider providing a default implementation in the interface. Additionally, the documentation for release() could be clearer. Suggest rephrasing it to better describe the method's purpose.

Apply this diff to improve the documentation and provide a default implementation:

 /**
- * Underlying Realtime feature channel is removed from the core SDK to prevent leakage.
+ * Releases resources held by the underlying Realtime feature channel to prevent resource leakage.
  */
-fun release()
+fun release() {
+   // Default implementation (if applicable)
+}

Committable suggestion skipped: line range outside the PR's diff.

}

abstract class ContributesToRoomLifecycleImpl : ContributesToRoomLifecycle {
Expand Down Expand Up @@ -175,6 +180,11 @@ class RoomLifecycleManager(
*/
private val _firstAttachesCompleted = mutableMapOf<ResolvedContributor, Boolean>()

/**
* Are we in the process of releasing the room?
*/
private var _releaseInProgress = false

Comment on lines +183 to +187
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure thread safety of _releaseInProgress

The _releaseInProgress flag may be accessed concurrently by multiple coroutines, leading to potential race conditions. To prevent this, consider using an AtomicBoolean or other thread-safe mechanisms to manage this state.

Apply this diff to use an AtomicBoolean:

+import java.util.concurrent.atomic.AtomicBoolean

 /**
  * Indicates whether a release operation is currently in progress.
  */
-private var _releaseInProgress = false
+private val _releaseInProgress = AtomicBoolean(false)

And update usages accordingly:

- if (_releaseInProgress) {
+ if (_releaseInProgress.get()) {

- _releaseInProgress = true
+ _releaseInProgress.set(true)

- _releaseInProgress = false
+ _releaseInProgress.set(false)

Committable suggestion skipped: line range outside the PR's diff.

/**
* Retry duration in milliseconds, used by internal doRetry and runDownChannelsOnFailedAttach methods
*/
Expand Down Expand Up @@ -589,6 +599,95 @@ class RoomLifecycleManager(
* state (e.g. attached), release will throw exception.
*/
internal suspend fun release() {
// TODO("Need to impl. room release")
val deferredRelease = atomicCoroutineScope.async(LifecycleOperationPrecedence.Release.priority) { // CHA-RL2i
// If we're already released, this is a no-op
if (_statusLifecycle.status === RoomStatus.Released) {
return@async
}

// If we're already detached, then we can transition to released immediately
if (_statusLifecycle.status === RoomStatus.Detached ||
_statusLifecycle.status === RoomStatus.Initialized
) {
_statusLifecycle.setStatus(RoomStatus.Released)
return@async
}

// If we're in the process of releasing, we should wait for it to complete
if (_releaseInProgress) {
return@async listenToRoomRelease()
}

// We force the room status to be releasing
clearAllTransientDetachTimeouts()
_operationInProgress = true
_releaseInProgress = true
_statusLifecycle.setStatus(RoomStatus.Releasing)

// Do the release until it completes
return@async releaseChannels()
}
deferredRelease.await()
}
Comment on lines +602 to +631
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Reset _releaseInProgress in case of exceptions

In the release() method, if releaseChannels() throws an exception, _releaseInProgress remains true, potentially causing future release attempts to hang or behave incorrectly. To ensure proper resetting of the flag, update the code to reset _releaseInProgress in a finally block.

Apply this diff to adjust the code:

 internal suspend fun release() {
     val deferredRelease = atomicCoroutineScope.async(LifecycleOperationPrecedence.Release.priority) { // CHA-RL2i
         // Existing checks...
         // We force the room status to be releasing
         clearAllTransientDetachTimeouts()
         _operationInProgress = true
         _releaseInProgress = true
         _statusLifecycle.setStatus(RoomStatus.Releasing)
+        try {
             // Do the release until it completes
             return@async releaseChannels()
+        } finally {
+            _releaseInProgress = false
+        }
     }
     deferredRelease.await()
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val deferredRelease = atomicCoroutineScope.async(LifecycleOperationPrecedence.Release.priority) { // CHA-RL2i
// If we're already released, this is a no-op
if (_statusLifecycle.status === RoomStatus.Released) {
return@async
}
// If we're already detached, then we can transition to released immediately
if (_statusLifecycle.status === RoomStatus.Detached ||
_statusLifecycle.status === RoomStatus.Initialized
) {
_statusLifecycle.setStatus(RoomStatus.Released)
return@async
}
// If we're in the process of releasing, we should wait for it to complete
if (_releaseInProgress) {
return@async listenToRoomRelease()
}
// We force the room status to be releasing
clearAllTransientDetachTimeouts()
_operationInProgress = true
_releaseInProgress = true
_statusLifecycle.setStatus(RoomStatus.Releasing)
// Do the release until it completes
return@async releaseChannels()
}
deferredRelease.await()
}
val deferredRelease = atomicCoroutineScope.async(LifecycleOperationPrecedence.Release.priority) { // CHA-RL2i
// If we're already released, this is a no-op
if (_statusLifecycle.status === RoomStatus.Released) {
return@async
}
// If we're already detached, then we can transition to released immediately
if (_statusLifecycle.status === RoomStatus.Detached ||
_statusLifecycle.status === RoomStatus.Initialized
) {
_statusLifecycle.setStatus(RoomStatus.Released)
return@async
}
// If we're in the process of releasing, we should wait for it to complete
if (_releaseInProgress) {
return@async listenToRoomRelease()
}
// We force the room status to be releasing
clearAllTransientDetachTimeouts()
_operationInProgress = true
_releaseInProgress = true
_statusLifecycle.setStatus(RoomStatus.Releasing)
try {
// Do the release until it completes
return@async releaseChannels()
} finally {
_releaseInProgress = false
}
}
deferredRelease.await()


private suspend fun listenToRoomRelease() = suspendCancellableCoroutine { continuation ->
_statusLifecycle.onChangeOnce {
if (it.current == RoomStatus.Released) {
continuation.resume(Unit)
} else {
val err = AblyException.fromErrorInfo(
ErrorInfo(
"failed to release room; existing attempt failed${it.errorMessage}",
HttpStatusCodes.InternalServerError,
ErrorCodes.PreviousOperationFailed.errorCode,
),
)
continuation.resumeWithException(err)
}
}
}

/**
* Releases the room by detaching all channels. If the release operation fails, we wait
* a short period and then try again.
*/
private suspend fun releaseChannels() {
var contributorsReleased = kotlin.runCatching { doRelease() }
while (contributorsReleased.isFailure) {
// Wait a short period and then try again
delay(_retryDurationInMs)
contributorsReleased = kotlin.runCatching { doRelease() }
}
}

/**
* Performs the release operation. This will detach all channels in the room that aren't
* already detached or in the failed state.
*/
@Suppress("RethrowCaughtException")
private suspend fun doRelease() = coroutineScope {
_contributors.map { contributor: ResolvedContributor ->
async {
// Failed channels, we can ignore
if (contributor.channel.state == ChannelState.failed) {
return@async
}
// Detached channels, we can ignore
if (contributor.channel.state == ChannelState.detached) {
return@async
}
try {
contributor.channel.detachCoroutine()
} catch (ex: Throwable) {
// TODO - log error here before rethrowing
throw ex
}
}
}.awaitAll()
_contributors.forEach {
it.contributor.release()
}
_releaseInProgress = false
_statusLifecycle.setStatus(RoomStatus.Released)
}
}
9 changes: 7 additions & 2 deletions chat-android/src/main/java/com/ably/chat/RoomReactions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,14 @@ data class SendReactionParams(
internal class DefaultRoomReactions(
roomId: String,
private val clientId: String,
realtimeChannels: AblyRealtime.Channels,
private val realtimeChannels: AblyRealtime.Channels,
) : RoomReactions, ContributesToRoomLifecycleImpl(), ResolvedContributor {

override val featureName = "reactions"

private val roomReactionsChannelName = "$roomId::\$chat::\$reactions"

override val channel: AblyRealtimeChannel = realtimeChannels.get(roomReactionsChannelName, ChatChannelOptions())
override val featureName = "reactions"

override val contributor: ContributesToRoomLifecycle = this

Expand Down Expand Up @@ -161,4 +162,8 @@ internal class DefaultRoomReactions(
channel.subscribe(RoomReactionEventType.Reaction.eventName, messageListener)
return Subscription { channel.unsubscribe(RoomReactionEventType.Reaction.eventName, messageListener) }
}

override fun release() {
realtimeChannels.release(channel.name)
}
}
6 changes: 2 additions & 4 deletions chat-android/src/main/java/com/ably/chat/Rooms.kt
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ internal class DefaultRooms(
}

override suspend fun release(roomId: String) {
synchronized(this) {
val room = roomIdToRoom.remove(roomId)
room?.release()
}
val room = roomIdToRoom.remove(roomId)
room?.release()
Comment on lines +75 to +76
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling in release method

The current implementation silently handles errors from room.release(). Consider propagating errors for better debugging and monitoring in production.

Apply this diff to improve error handling:

    override suspend fun release(roomId: String) {
        synchronized(this) {
            val room = roomIdToRoom.remove(roomId)
-           room?.release()
+           try {
+               room?.release()
+           } catch (e: Exception) {
+               clientOptions.logHandler.error("Failed to release room $roomId", e)
+               throw AblyException.fromErrorInfo(
+                   ErrorInfo("Failed to release room: ${e.message}", 
+                           HttpStatusCodes.InternalServerError,
+                           ErrorCodes.InternalError.errorCode,
+                           e
+                   )
+               )
+           }
        }
    }

Committable suggestion skipped: line range outside the PR's diff.

}
}
4 changes: 4 additions & 0 deletions chat-android/src/main/java/com/ably/chat/Typing.kt
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,8 @@ internal class DefaultTyping(
override suspend fun stop() {
TODO("Not yet implemented")
}

override fun release() {
realtimeClient.channels.release(channel.name)
}
Comment on lines +111 to +113
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider enhancing the release implementation

While the basic implementation is correct, consider these improvements:

  1. Add the release() method to the Typing interface to make it part of the contract
  2. Add error handling for potential channel release failures
  3. Add KDoc documentation explaining the purpose and behavior

Here's the suggested implementation:

interface Typing : EmitsDiscontinuities {
    // ... existing methods ...
+    /**
+     * Releases resources associated with typing indicators.
+     * This includes releasing the underlying Ably channel.
+     */
+    suspend fun release()
}

internal class DefaultTyping(
    // ... existing code ...
-    override fun release() {
-        realtimeClient.channels.release(channel.name)
-    }
+    override suspend fun release() {
+        try {
+            realtimeClient.channels.release(channel.name)
+        } catch (e: Exception) {
+            throw ErrorInfo(
+                "Failed to release typing indicators channel",
+                ErrorCodes.TypingReleaseFailed,
+                cause = e
+            )
+        }
+    }
}

Committable suggestion skipped: line range outside the PR's diff.

}
7 changes: 7 additions & 0 deletions chat-android/src/main/java/com/ably/chat/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ val Channel.errorMessage: String
", ${reason.message}"
}

val RoomStatusChange.errorMessage: String
get() = if (error == null) {
""
} else {
", ${error.message}"
}

@Suppress("FunctionName")
fun ChatChannelOptions(init: (ChannelOptions.() -> Unit)? = null): ChannelOptions {
val options = ChannelOptions()
Expand Down