Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-5009][CHA-RL1] Room ATTACH with retry #33

Closed
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
6d2d24c
Added checks for current room state before room ATTACH operation
sacOO7 Oct 3, 2024
d018b0d
Implemented RoomStatus interface, added code to handle exceptions when
sacOO7 Oct 8, 2024
99c7363
Marked RoomOptions as optional as per spec
sacOO7 Oct 11, 2024
3067774
Added offAll method interface method to remove all listeners
sacOO7 Oct 11, 2024
8ffabb1
Refactored room interfaces, implemented DefaultRoomStatus class that
sacOO7 Oct 11, 2024
5b321c6
Replaced ChatEventEmitter with RoomStatusEventEmitter, implemented mi…
sacOO7 Oct 14, 2024
7943aba
Added proper logging for DefaultRoom room attach operation failure
sacOO7 Oct 15, 2024
21ac68a
Added RoomLifecycleManager class, defined basic interfaces needed for
sacOO7 Oct 15, 2024
0d7f9a9
Updated coroutine core as a direct dependency to chat-android package
sacOO7 Oct 16, 2024
ba0f81e
Implemented missing interfaces as a part of RoomLifeCycleManager
sacOO7 Oct 16, 2024
5daeaf3
Merge branch 'main' into feature/room-ATTACH
sacOO7 Oct 18, 2024
899363b
Fixed linting issues recommended by detekt
sacOO7 Oct 18, 2024
102513c
Updated ably-java dependency to latest version
sacOO7 Oct 21, 2024
572fd8d
Updated ContributesToRoomLifecycle channel property to CompletableDef…
sacOO7 Oct 21, 2024
da00860
Upgraded kotlinx.coroutines dependency to latest version
sacOO7 Oct 21, 2024
be11428
Renamed DefaultRoomStatusStatus class to DefaultStatus, same as chat-js
sacOO7 Oct 21, 2024
a68a2a7
Implemented basic definition for RoomLifeCycleManager
sacOO7 Oct 21, 2024
a5e837c
Added interface impl. for ContributesToRoomLifecycle and ResolvedCont…
sacOO7 Oct 21, 2024
5dfb474
Fixed linting errors in local files
sacOO7 Oct 21, 2024
e0d5dab
Made AblyRealtimeChannel as a explicit type for all room feature chan…
sacOO7 Oct 22, 2024
0460c5c
Added TODOs before initializing RoomLifeCycleManager, refactored room
sacOO7 Oct 22, 2024
b5a5e54
Removed unused coroutinescope from the RoomLifeCycleManager, added mi…
sacOO7 Oct 22, 2024
164f500
Implemented AtomicCoroutineScope using priorityQueue, added documenta…
sacOO7 Oct 22, 2024
b1a68de
Implemented RoomLifeCycleManager attach operation using AtomicCorouti…
sacOO7 Oct 22, 2024
bf60d1c
Added separate interface for HandlesDiscontinuity
sacOO7 Oct 22, 2024
98f38c1
Added interface implementation for EmitsDiscontinuities, added partial
sacOO7 Oct 22, 2024
0013be5
Implemented DiscontinuityEmitter, extended across all contributors
sacOO7 Oct 22, 2024
2b62982
Implemented runDownChannelsOnFailedAttach, doChannelWindDown to detach
sacOO7 Oct 22, 2024
f091c71
Implemented doRetry mechanism when room attach fails, added comments …
sacOO7 Oct 23, 2024
1eded83
Refactored roomlifecyclemanager, fixed properties backed by fields
sacOO7 Oct 25, 2024
6718da2
Updated AtomicCoroutineScope, made async method thread safe, added fe…
sacOO7 Oct 28, 2024
95a6a2c
Updated atomicCoroutineScope to run operations under supplied scope,
sacOO7 Oct 29, 2024
3fdfd39
Merge branch 'feature/room-lifecycle-using-atomic-coroutinescope' int…
sacOO7 Oct 29, 2024
d9094b5
Added private property roomScope to class DefaultRoom,
sacOO7 Oct 29, 2024
4fe42d6
Removed experimental annotation from finishedProcessing property on
sacOO7 Nov 5, 2024
7405d9c
Added cancel method for AtomicCoroutineScope to cancel pending jobs
sacOO7 Nov 7, 2024
456224b
Merge branch 'feature/room-lifecycle-using-atomic-coroutinescope' int…
sacOO7 Nov 7, 2024
ce9c0ba
Fixed RoomStatusEventEmitter typo,
sacOO7 Nov 8, 2024
d689419
Merge branch 'main' into feature/room-ATTACH
sacOO7 Nov 8, 2024
62a5dae
Merge branch 'feature/room-ATTACH' into feature/room-lifecycle-using-…
sacOO7 Nov 8, 2024
7718ad2
Updated code as per detekt linter suggestions, disabled LabeledReturn…
sacOO7 Nov 8, 2024
23e39fa
Merge branch 'feature/room-lifecycle-using-atomic-coroutinescope' int…
sacOO7 Nov 8, 2024
c874b5b
Fixed linting issues suggested by detekt, updated detekt.yml accordingly
sacOO7 Nov 8, 2024
0aac6ed
Marked DiscontinuityEmitter class as final instead of open
sacOO7 Nov 8, 2024
fc3983c
Fixed flaky test for atomic coroutine scope
sacOO7 Nov 8, 2024
290fb84
Moved DiscontinuityEmitter impl. duplication from contributors to
sacOO7 Nov 8, 2024
d0d2c9c
Refactored AtomicCoroutineScope, updated priorityQueue to accept any …
sacOO7 Nov 8, 2024
9d6b1a9
Appending current coroutineContext to make sure running/pending jobs …
sacOO7 Nov 9, 2024
05f4397
Removed flaky assertion from concurrent atomic coroutinescope test
sacOO7 Nov 9, 2024
e9f3335
Merge branch 'feature/room-lifecycle-using-atomic-coroutinescope' int…
sacOO7 Nov 11, 2024
6e3145f
Fixed flaky assertions inside AtomicCoroutineTest
sacOO7 Nov 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chat-android/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ buildConfig {
dependencies {
api(libs.ably.android)
implementation(libs.gson)
implementation(libs.coroutine.core)

testImplementation(libs.junit)
testImplementation(libs.mockk)
Expand Down
82 changes: 82 additions & 0 deletions chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.ably.chat

import io.ably.annotation.Experimental
import java.util.concurrent.PriorityBlockingQueue
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch

/**
* AtomicCoroutineScope is a thread safe wrapper to run multiple operations mutually exclusive.
* All operations are atomic and run with given priority.
* Accepts scope as a constructor parameter to run operations under the given scope.
* See [Kotlin Dispatchers](https://kt.academy/article/cc-dispatchers) for more information.
*/
class AtomicCoroutineScope(private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default)) {

private val sequentialScope: CoroutineScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1))

private class Job(
private val priority: Int,
val coroutineBlock: suspend CoroutineScope.() -> Any,
val deferredResult: CompletableDeferred<Any>,
val queuedPriority: Int,
) :
Comparable<Job> {
override fun compareTo(other: Job): Int {
if (this.priority == other.priority) {
return this.queuedPriority.compareTo(other.queuedPriority)
}
return this.priority.compareTo(other.priority)
}
}

private val jobs: PriorityBlockingQueue<Job> = PriorityBlockingQueue() // Accessed from both sequentialScope and async method
private var isRunning = false // Only accessed from sequentialScope
private var queueCounter = 0 // Only accessed from synchronized async method

/**
* @param priority Defines priority for the operation execution.
* @param coroutineBlock Suspended function that needs to be executed mutually exclusive under given scope.
*/
@Synchronized
fun <T : Any>async(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred<T> {
val deferredResult = CompletableDeferred<Any>()
jobs.add(Job(priority, coroutineBlock, deferredResult, queueCounter++))
sequentialScope.launch {
if (!isRunning) {
isRunning = true
while (jobs.isNotEmpty()) {
val job = jobs.poll()
job?.let {
safeExecute(it)
}
}
isRunning = false
}
}
sacOO7 marked this conversation as resolved.
Show resolved Hide resolved

@Suppress("UNCHECKED_CAST")
return deferredResult as CompletableDeferred<T>
}

private suspend fun safeExecute(job: Job) {
runCatching {
scope.launch {
try {
val result = job.coroutineBlock(this)
job.deferredResult.complete(result)
} catch (t: Throwable) {
job.deferredResult.completeExceptionally(t)
}
}.join()
}.onFailure {
job.deferredResult.completeExceptionally(it)
}
}

@Experimental
val finishedProcessing: Boolean
get() = jobs.isEmpty() && !isRunning
}
4 changes: 2 additions & 2 deletions chat-android/src/main/java/com/ably/chat/ChatApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ internal class ChatApi(private val realtimeClient: RealtimeClient, private val c
ErrorInfo(
"Metadata contains reserved 'ably-chat' key",
HttpStatusCodes.BadRequest,
ErrorCodes.InvalidRequestBody,
ErrorCodes.InvalidRequestBody.errorCode,
),
)
}
Expand All @@ -98,7 +98,7 @@ internal class ChatApi(private val realtimeClient: RealtimeClient, private val c
ErrorInfo(
"Headers contains reserved key with reserved 'ably-chat' prefix",
HttpStatusCodes.BadRequest,
ErrorCodes.InvalidRequestBody,
ErrorCodes.InvalidRequestBody.errorCode,
),
)
}
Expand Down
55 changes: 55 additions & 0 deletions chat-android/src/main/java/com/ably/chat/Discontinuities.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.ably.chat

import io.ably.lib.types.ErrorInfo
import io.ably.lib.util.EventEmitter
import io.ably.lib.util.Log
import io.ably.lib.realtime.ChannelBase as AblyRealtimeChannel

/**
* Represents an object that has a channel and therefore may care about discontinuities.
*/
interface HandlesDiscontinuity {
/**
* A promise of the channel that this object is associated with. The promise
* is resolved when the feature has finished initializing.
*/
val channel: AblyRealtimeChannel

/**
* Called when a discontinuity is detected on the channel.
* @param reason The error that caused the discontinuity.
*/
fun discontinuityDetected(reason: ErrorInfo?)
}

/**
* An interface to be implemented by objects that can emit discontinuities to listeners.
*/
interface EmitsDiscontinuities {
/**
* Register a listener to be called when a discontinuity is detected.
* @param listener The listener to be called when a discontinuity is detected.
*/
fun onDiscontinuity(listener: Listener): Subscription

/**
* An interface for listening when discontinuity happens
*/
fun interface Listener {
/**
* A function that can be called when discontinuity happens.
* @param reason reason for discontinuity
*/
fun discontinuityEmitted(reason: ErrorInfo?)
}
}

open class DiscontinuityEmitter : EventEmitter<String, EmitsDiscontinuities.Listener>() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be much better if the class were final. I can’t find any place where we inherit from it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed 0aac6ed

override fun apply(listener: EmitsDiscontinuities.Listener?, event: String?, vararg args: Any?) {
try {
listener?.discontinuityEmitted(args[0] as ErrorInfo?)
} catch (t: Throwable) {
Log.e("DiscontinuityEmitter", "Unexpected exception calling Discontinuity Listener", t)
}
}
}
25 changes: 0 additions & 25 deletions chat-android/src/main/java/com/ably/chat/EmitsDiscontinuities.kt

This file was deleted.

45 changes: 23 additions & 22 deletions chat-android/src/main/java/com/ably/chat/ErrorCodes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,105 +3,106 @@ package com.ably.chat
/**
* Error codes for the Chat SDK.
*/
object ErrorCodes {
enum class ErrorCodes(val errorCode: Int) {

/**
* The messages feature failed to attach.
*/
const val MessagesAttachmentFailed = 102_001
MessagesAttachmentFailed(102_001),

/**
* The presence feature failed to attach.
*/
const val PresenceAttachmentFailed = 102_002
PresenceAttachmentFailed(102_002),

/**
* The reactions feature failed to attach.
*/
const val ReactionsAttachmentFailed = 102_003
ReactionsAttachmentFailed(102_003),

/**
* The occupancy feature failed to attach.
*/
const val OccupancyAttachmentFailed = 102_004
OccupancyAttachmentFailed(102_004),

/**
* The typing feature failed to attach.
*/
const val TypingAttachmentFailed = 102_005
// 102006 - 102049 reserved for future use for attachment errors
TypingAttachmentFailed(102_005),
// 102_006 - 102_049 reserved for future use for attachment errors

/**
* The messages feature failed to detach.
*/
const val MessagesDetachmentFailed = 102_050
MessagesDetachmentFailed(102_050),

/**
* The presence feature failed to detach.
*/
const val PresenceDetachmentFailed = 102_051
PresenceDetachmentFailed(102_051),

/**
* The reactions feature failed to detach.
*/
const val ReactionsDetachmentFailed = 102_052
ReactionsDetachmentFailed(102_052),

/**
* The occupancy feature failed to detach.
*/
const val OccupancyDetachmentFailed = 102_053
OccupancyDetachmentFailed(102_053),

/**
* The typing feature failed to detach.
*/
const val TypingDetachmentFailed = 102_054
// 102055 - 102099 reserved for future use for detachment errors
TypingDetachmentFailed(102_054),
// 102_055 - 102_099 reserved for future use for detachment errors

/**
* The room has experienced a discontinuity.
*/
const val RoomDiscontinuity = 102_100
RoomDiscontinuity(102_100),

// Unable to perform operation;

/**
* Cannot perform operation because the room is in a failed state.
*/
const val RoomInFailedState = 102_101
RoomInFailedState(102_101),

/**
* Cannot perform operation because the room is in a releasing state.
*/
const val RoomIsReleasing = 102_102
RoomIsReleasing(102_102),

/**
* Cannot perform operation because the room is in a released state.
*/
const val RoomIsReleased = 102_103
RoomIsReleased(102_103),

/**
* Cannot perform operation because the previous operation failed.
*/
const val PreviousOperationFailed = 102_104
PreviousOperationFailed(102_104),

/**
* An unknown error has happened in the room lifecycle.
*/
const val RoomLifecycleError = 102_105
RoomLifecycleError(102_105),

/**
* The request cannot be understood
*/
const val BadRequest = 40_000
BadRequest(40_000),

/**
* Invalid request body
*/
const val InvalidRequestBody = 40_001
InvalidRequestBody(40_001),

/**
* Internal error
*/
const val InternalError = 50_000
InternalError(50_000),
}

/**
Expand Down
Loading
Loading