Skip to content

Commit

Permalink
- Handled retry flow for guarnteed delivery of logs for MQTT
Browse files Browse the repository at this point in the history
  • Loading branch information
umair13adil committed Jul 15, 2020
1 parent 36b2014 commit fa5426c
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 55 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ Add following block for initializing MQTT logging.
brokerUrl = "YOUR_URL", //Without Scheme
certificateRes = R.raw.m2mqtt_ca,
clientId = "5aa39cef4d544d658ecaf23db701099c",
writeLogsToLocalStorage = true
writeLogsToLocalStorage = true,
initialDelaySecondsForPublishing = 30,
debug = true
)
```

Expand Down
4 changes: 2 additions & 2 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ android {
applicationId "com.blackbox.library.plog"
minSdkVersion 16
targetSdkVersion 29
versionCode 23
versionName "1.0.6"
versionCode 24
versionName "1.0.7"
testInstrumentationRunner 'androidx.test.runner.AndroidJUnitRunner'
}
buildTypes {
Expand Down
27 changes: 11 additions & 16 deletions app/src/main/java/com/blackbox/library/plog/MainActivity.kt
Original file line number Diff line number Diff line change
Expand Up @@ -92,32 +92,27 @@ class MainActivity : AppCompatActivity() {
))

//MQTT Setup
/*PLogMQTTProvider.initMQTTClient(applicationContext,
//Uncomment following block to enable MQTT feature
PLogMQTTProvider.initMQTTClient(this,
topic = "",
brokerUrl = "", //Without Scheme
certificateRes = R.raw.m2mqtt_ca,
clientId = "5aa39cef4d544d658ecaf23db701099c"
)*/
port = "8883",
writeLogsToLocalStorage = true,
initialDelaySecondsForPublishing = 30,
debug = true
)

//Initialize FakeIt
Fakeit.initWithLocale(Locale.ENGLISH)

//If permission granted
setupLoggerControls()

/*object : CountDownTimer(5000, 1000) {
override fun onFinish() {
}
override fun onTick(millisUntilFinished: Long) {
//Write Fake Data to Logs
for (i in 0..10) {
PLog.logThis(TAG, Fakeit.gameOfThrones().house(), Fakeit.gameOfThrones().quote(), LogLevel.INFO)
}
}
}.start();*/
//Write Fake Data to Logs
for (i in 0..250) {
PLog.logThis(TAG, Fakeit.gameOfThrones().house(), Fakeit.gameOfThrones().quote(), LogLevel.INFO)
}

run_test.setOnClickListener {
startActivity(Intent(this, HourlyLogsTest::class.java))
Expand Down
4 changes: 2 additions & 2 deletions plog/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ android {
defaultConfig {
minSdkVersion 16
targetSdkVersion 29
versionCode 34
versionName "1.0.6"
versionCode 35
versionName "1.0.7"
testInstrumentationRunner 'androidx.test.runner.AndroidJUnitRunner'

}
Expand Down
43 changes: 37 additions & 6 deletions plog/src/main/java/com/blackbox/plog/mqtt/MQTTSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ import com.blackbox.plog.mqtt.client.PahoMqqtClient
import com.blackbox.plog.pLogs.impl.PLogImpl
import com.blackbox.plog.pLogs.workers.LogsPublishWorker
import com.blackbox.plog.utils.PLogUtils
import io.reactivex.Observable
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.schedulers.Schedulers
import java.util.concurrent.TimeUnit


object MQTTSender {

Expand Down Expand Up @@ -34,22 +40,39 @@ object MQTTSender {
return
}
}
}

sendMessage(message)
sendMessage(message, context)
?.subscribeOn(Schedulers.io())
?.observeOn(AndroidSchedulers.mainThread())
?.subscribeBy(
onNext = {
if (it) {
if (PLogMQTTProvider.debug) {
doOnMessageDelivered()
printMQTTMessagesSummary("deliveryComplete")
}
}
},
onError = {

},
onComplete = { }
)
}
}
}

fun sendMessage(message: String) {
fun sendMessage(message: String, context: Context): Observable<Boolean>? {
try {
PLogMQTTProvider.androidClient?.let { androidClient ->
PahoMqqtClient.instance?.publishMessage(androidClient, message, PLogMQTTProvider.qos, PLogMQTTProvider.topic)
return PahoMqqtClient.instance?.publishMessage(androidClient, message, PLogMQTTProvider.qos, PLogMQTTProvider.topic, context)
}
} catch (e: Exception) {
if (PLogMQTTProvider.debug) {
Log.e(TAG, PLogUtils.getStackTrace(e))
}
}
return null
}

private fun createInputData(message: String): Data {
Expand All @@ -64,12 +87,19 @@ object MQTTSender {
.setRequiredNetworkType(NetworkType.CONNECTED)
.build()


val request: WorkRequest = OneTimeWorkRequestBuilder<LogsPublishWorker>()
.setConstraints(constraints)
.setInitialDelay(PLogMQTTProvider.initialDelaySecondsForPublishing, TimeUnit.SECONDS)
.setInputData(createInputData(message))
.build()


/*val request = PeriodicWorkRequest.Builder(LogsPublishWorker::class.java,
10, TimeUnit.SECONDS, 10, TimeUnit.SECONDS)
.setInitialDelay(20, TimeUnit.SECONDS)
.setConstraints(constraints)
.build()*/

WorkManager.getInstance(context)
.enqueue(request)

Expand All @@ -85,7 +115,8 @@ object MQTTSender {
if (totalQueued > 0) {
Log.i(TAG, "Event: [$eventName] Total Messages: $totalAdded, Total Delivered: $totalSent, Total Queued: $totalQueued")
} else {
Log.i(TAG, "Event: [$eventName] Total Messages: $totalAdded, Total Delivered: $totalSent")
if (totalSent <= totalAdded)
Log.i(TAG, "Event: [$eventName] Total Messages: $totalAdded, Total Delivered: $totalSent")
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions plog/src/main/java/com/blackbox/plog/mqtt/PLogMQTTProvider.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ object PLogMQTTProvider {
private var clientId: String = MqttClient.generateClientId() //Provide if needed
internal var keepAliveIntervalSeconds = 180 //Default
internal var connectionTimeout = 60 //Default
internal var initialDelaySecondsForPublishing = 30L //Default
internal var isCleanSession = true //Default
internal var isAutomaticReconnect = true //Default
internal var debug = true //Default
Expand All @@ -39,6 +40,7 @@ object PLogMQTTProvider {
clientId: String = this.clientId,
keepAliveIntervalSeconds: Int = this.keepAliveIntervalSeconds,
connectionTimeout: Int = this.connectionTimeout,
initialDelaySecondsForPublishing: Long = this.initialDelaySecondsForPublishing,
isCleanSession: Boolean = this.isCleanSession,
isAutomaticReconnect: Boolean = this.isAutomaticReconnect,
@RawRes certificateRes: Int? = null,
Expand All @@ -60,6 +62,7 @@ object PLogMQTTProvider {
this.clientId = clientId
this.keepAliveIntervalSeconds = keepAliveIntervalSeconds
this.connectionTimeout = connectionTimeout
this.initialDelaySecondsForPublishing = initialDelaySecondsForPublishing
this.isCleanSession = isCleanSession
this.isAutomaticReconnect = isAutomaticReconnect
this.debug = debug
Expand Down
42 changes: 27 additions & 15 deletions plog/src/main/java/com/blackbox/plog/mqtt/client/PahoMqqtClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package com.blackbox.plog.mqtt.client
import android.content.Context
import android.util.Log
import androidx.annotation.RawRes
import com.blackbox.plog.mqtt.MQTTSender
import com.blackbox.plog.mqtt.PLogMQTTProvider
import com.blackbox.plog.utils.PLogUtils
import io.reactivex.Observable
import org.eclipse.paho.android.service.MqttAndroidClient
import org.eclipse.paho.client.mqttv3.*
import java.io.IOException
Expand Down Expand Up @@ -55,10 +54,7 @@ class PahoMqqtClient {
}

override fun deliveryComplete(token: IMqttDeliveryToken?) {
if (PLogMQTTProvider.debug) {
MQTTSender.doOnMessageDelivered()
MQTTSender.printMQTTMessagesSummary("deliveryComplete")
}

}
})
}
Expand Down Expand Up @@ -189,15 +185,31 @@ class PahoMqqtClient {
}

@Throws(MqttException::class, UnsupportedEncodingException::class)
fun publishMessage(client: MqttAndroidClient, msg: String, qos: Int, topic: String) {
if (isConnected) {
var encodedPayload = ByteArray(0)
encodedPayload = msg.toByteArray(charset("UTF-8"))
val message = MqttMessage(encodedPayload)
message.id = msg.hashCode()
message.isRetained = PLogMQTTProvider.retained
message.qos = qos
client.publish(topic, message)
fun publishMessage(client: MqttAndroidClient?, msg: String, qos: Int, topic: String, context: Context): Observable<Boolean> {
return Observable.create { emitter ->
if (isConnected) {
var encodedPayload = ByteArray(0)
encodedPayload = msg.toByteArray(charset("UTF-8"))
val message = MqttMessage(encodedPayload)
message.id = msg.hashCode()
message.isRetained = PLogMQTTProvider.retained
message.qos = qos
client?.publish(topic, message, context, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken?) {
if (!emitter.isDisposed) {
emitter.onNext(true)
emitter.onComplete()
}
}

override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
if (!emitter.isDisposed) {
emitter.onNext(false)
emitter.onComplete()
}
}
})
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,65 @@ package com.blackbox.plog.pLogs.workers

import android.content.Context
import android.util.Log
import androidx.work.Worker
import androidx.work.RxWorker
import androidx.work.WorkerParameters
import com.blackbox.plog.mqtt.MQTTSender
import com.blackbox.plog.mqtt.PLogMQTTProvider
import com.blackbox.plog.mqtt.client.PahoMqqtClient
import com.blackbox.plog.pLogs.impl.PLogImpl
import com.blackbox.plog.utils.PLogUtils
import io.reactivex.Single
import io.reactivex.SingleEmitter
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.schedulers.Schedulers
import java.util.concurrent.TimeUnit

class LogsPublishWorker(appContext: Context, workerParams: WorkerParameters) :
Worker(appContext, workerParams) {
RxWorker(appContext, workerParams) {

companion object {
private val TAG = "LogsPublishWorker"
val KEY_LOG_MESSAGE = "log_message"
}

override fun doWork(): Result {
override fun createWork(): Single<Result> {

//Send Pending reports
return Single.create {
doWork(it)
}
}

private fun doWork(emitter: SingleEmitter<Result>) {
PahoMqqtClient.instance?.setConnected()

return try {
try {
val message = inputData.getString(KEY_LOG_MESSAGE)

message?.let {
if (PLogMQTTProvider.mqttEnabled && PLogMQTTProvider.topic.isNotEmpty() && message.isNotEmpty()) {
MQTTSender.printMQTTMessagesSummary("retrySending")
MQTTSender.sendMessage(message)
MQTTSender.sendMessage(message, context = applicationContext)
?.subscribeOn(Schedulers.io())
?.observeOn(AndroidSchedulers.mainThread())
?.delay(1, TimeUnit.SECONDS)
?.retryWhen(RetryWithDelay(2, 5000))
?.subscribeBy(
onNext = {
if (it) {
MQTTSender.doOnMessageDelivered()
MQTTSender.printMQTTMessagesSummary("sentOnRetry")
emitter.onSuccess(Result.success())
}
},
onError = {
it.printStackTrace()
},
onComplete = { }
)
}
}

Result.success()
} catch (throwable: Throwable) {
if (PLogMQTTProvider.debug) {
Log.e(TAG, PLogUtils.getStackTrace(throwable))
}
Result.failure()
throwable.printStackTrace()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.blackbox.plog.pLogs.workers;

import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.functions.Function;

/**
* Created by Umair Adil on 13/03/2017.
*/
public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {

private final int maxRetries;
private final int retryDelayMillis;
private String TAG = "RetryWithDelay";
private int retryCount;

public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}

@Override
public Observable<?> apply(final Observable<? extends Throwable> attempts) {
return attempts
.flatMap((Function<Throwable, Observable<?>>) throwable -> {

if (++retryCount < maxRetries) {
return Observable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}

// Max retries hit. Just pass the error along.
return Observable.error(throwable);
});
}
}

0 comments on commit fa5426c

Please sign in to comment.