Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

onDisconnected Method Takes Significant Time to Detect Disconnection Despite Low KeepAlive Setting #76

Open
qwerty470 opened this issue Jan 11, 2025 · 0 comments

Comments

@qwerty470
Copy link

qwerty470 commented Jan 11, 2025

I have implemented auto-reconnect and used onDisconnected to detect disconnections and trigger reconnection. However, onDisconnected takes 2–3 minutes to be called, causing a significant delay in detecting disconnections, even with keepAlive set to 5 seconds.

Here’s the relevant code snippet:

internal val MQTTClient.isConnected: Boolean
    get() = isRunning() && isConnackReceived()

class MQTTManager(
    private val host: String,
    private val port: Int,
    private val clientId: String,
    private val mqttProtocol: MqttProtocolVersion = MqttProtocolVersion.MQTT_5,
    private val tlsSettings: MqttTLSSettings? = null,
    private val cleanStart: Boolean = false,
    private val keepAliveSeconds: Long = 5L,
    private val sessionExpiry: Long = 300L,
    private val publishIntervalMs: Long = 1000L,
    private val debugLog: Boolean = false
) {

    /** Reference to the underlying KMQTT client. */
    private var client: MQTTClient? = null

    /** Protect shared queue with ReentrantLock. */
    private val queueLock = ReentrantLock()
    private val packetQueue = mutableListOf<MqttPacket>()

    /** Whether we’re connected (i.e., we received CONNACK). */
    private val _isConnected = MutableStateFlow(false)
    val isConnected = _isConnected.asStateFlow()

    /** Parent Job for coroutines. Cancel this to terminate everything. */
    private val supervisorJob = SupervisorJob()

    private val scope = CoroutineScope(Dispatchers.Default + supervisorJob)

    /** Jobs for reconnect loop, step loop, and publishing loop. */
    private var reconnectJob: Job? = null
    private var stepLoopJob: Job? = null
    private var publishLoopJob: Job? = null

    /** For exponential backoff. */
    private var reconnectDelayMs = 2_000L

    /** Whether this is the first-ever connection attempt. Used for “Clean Start” logic. */
    private var isFirstConnection = true

    /** QoS Mapping utility. */
    private val qosMap = mapOf(
        0 to Qos.AT_MOST_ONCE,
        1 to Qos.AT_LEAST_ONCE,
        2 to Qos.EXACTLY_ONCE
    )

    // ---------------
    // PUBLIC METHODS
    // ---------------

    /**
     * Start the MQTT manager:
     * - Launch the reconnect logic in one coroutine.
     * - It will create a new client, connect, and if successful, start the step loop & publish loop.
     */
    fun start() {
        if (reconnectJob?.isActive == true) {
            if (debugLog) println("Reconnect job already active. Skipping.")
            return
        }
        reconnectJob = scope.launch {
            while (isActive) {
                if (client == null || (client?.isRunning() == false)) {
                    try {
                        createAndConnectClient()
                        // Start step loop to process the handshake and keep connection alive
                        startStepLoop()
                        // Start publish loop
                        startPublishLoop()
                    } catch (e: Exception) {
                        if (debugLog) println("Connection attempt failed: ${e.message}")
                        _isConnected.value = false
                        delay(reconnectDelayMs)
                        reconnectDelayMs = (reconnectDelayMs * 2).coerceAtMost(30_000L)
                    }
                }
                delay(1000)
            }
        }
    }

    /**
     * Stop the MQTT manager:
     * - Cancel reconnect, step, and publish loops.
     * - Disconnect the client gracefully.
     */
    fun stop() {
        reconnectJob?.cancel()
        reconnectJob = null

        stepLoopJob?.cancel()
        stepLoopJob = null

        publishLoopJob?.cancel()
        publishLoopJob = null

        _isConnected.value = false
        isFirstConnection = true
        // Graceful disconnect from broker, if client is present
        client?.disconnect(ReasonCode.SUCCESS)
        client = null

        // Cancel the entire scope
        supervisorJob.cancel()
    }

    /**
     * Enqueue an MQTT packet (e.g., for publishing).
     * If we are disconnected, it remains in queue until we reconnect.
     */
    fun enqueuePacket(packet: MqttPacket) {
        queueLock.withLock {
            packetQueue.add(packet)
        }
    }

    // ---------------
    // INTERNAL LOGIC
    // ---------------

    /**
     * Create a new MQTTClient instance and connect.
     *
     * On success, sets _isConnected.value = true.
     */
    private fun createAndConnectClient() {
        // If we already have a client, close it
        client?.disconnect(ReasonCode.SUCCESS)
        client = null

        // Construct the new MQTT client, which automatically sends CONNECT
        val newClient = MQTTClient(
            mqttVersion = toKMQTTVersion(mqttProtocol),
            address = host,
            port = port,
            tls = toKMQTTTLSSettings(tlsSettings),
            properties = MQTT5Properties(sessionExpiryInterval = sessionExpiry.toUInt()),
            keepAlive = keepAliveSeconds.toInt(),
            cleanStart = isFirstConnection.takeIf { it } ?: cleanStart,
            clientId = clientId.ifBlank { null },
            debugLog = debugLog,
            onConnected = { connack ->
                if (debugLog) println("MQTTClient onConnected: CONNACK arrived!")
                _isConnected.value = true
            },
            onDisconnected = { disconnect ->
                if (debugLog) println("MQTTClient onDisconnected: $disconnect")
                _isConnected.value = false
            },
            publishReceived = { incoming ->
                if (debugLog) {
                    println("Received PUBLISH from broker: topic=${incoming.topicName}")
                }
            }
        )

        if (debugLog) println("createAndConnectClient: constructor done, waiting for CONNACK...")

        isFirstConnection = false
        client = newClient
    }

    /**
     * Start a coroutine that repeatedly calls client.step() every ~50ms as long as the client is running.
     */
    private fun startStepLoop() {
        // Avoid duplicating the step loop
        if (stepLoopJob?.isActive == true) return
        val c = client ?: return

        stepLoopJob = scope.launch {
            try {
                while (isActive && c.isRunning()) {
                    c.step()
                    delay(50)
                }
            } catch (e: Exception) {
                if (debugLog) println("stepLoopJob caught exception: ${e.message}")
            } finally {
                _isConnected.value = false
            }
        }
    }

    /**
     * Start a coroutine that periodically flushes the outbound queue (publish messages).
     */
    private fun startPublishLoop() {
        if (publishLoopJob?.isActive == true) return
        publishLoopJob = scope.launch {
            while (isActive) {
                flushQueue()
                delay(publishIntervalMs.milliseconds)
            }
        }
    }

    /**
     * Publish all queued messages.
     * If a publish fails, re-queue it for later.
     */
    private fun flushQueue() {
        val c = client ?: return
        if (!_isConnected.value) return

        // Snapshot the queue
        val localList = queueLock.withLock {
            if (packetQueue.isEmpty()) return
            val snapshot = packetQueue.toList()
            packetQueue.clear()
            snapshot
        }

        localList.forEach { packet ->
            try {
                val qos = qosMap.getOrElse(packet.qos) { Qos.AT_MOST_ONCE }
                c.publish(
                    retain = packet.retain,
                    qos = qos,
                    topic = packet.topic,
                    payload = packet.payload.toUByteArray()
                )

                // Debug/log
                if (debugLog) {
                    try {
                        val jsonPayload = packet.payload.decodeToString()
                        val ecgPacket = Json.decodeFromString<ECGPacket>(jsonPayload)
                        println("Publishing packet number: ${ecgPacket.packetNo}")
                    } catch (e: Exception) {
                        println("Failed to parse packet payload: ${e.message}")
                    }
                }

            } catch (e: Exception) {
                // If publish fails, put it back in the queue for a retry
                if (debugLog) {
                    println("Failed to publish message: ${e.message}. Re-enqueueing.")
                }
                enqueuePacket(packet)
            }
        }
    }

Could you suggest any way to reduce this delay or improve disconnection detection? Additionally, auto-reconnect and Offline Buffering is an important feature for an MQTT client to ensure lossless delivery in case of network disruptions. It would be great if this could be implemented in the library itself.

@qwerty470 qwerty470 changed the title Help with Auto-Reconnect and Publishing Queued Messages onDisconnected Method Takes Significant Time to Detect Disconnection Despite Low KeepAlive Setting Jan 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant