Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to java duration #32

Merged
merged 3 commits into from
Jan 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ import me.himadieiev.redpulsar.core.locks.api.CountDownLatch
import me.himadieiev.redpulsar.core.locks.excecutors.executeWithRetry
import me.himadieiev.redpulsar.core.locks.excecutors.waitAnyJobs
import me.himadieiev.redpulsar.core.utils.failsafe
import java.time.Duration
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.cancellation.CancellationException
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes

/**
* A distributed locking mechanics, intended for synchronization of multiple workloads distributed across cloud.
Expand All @@ -30,23 +28,23 @@ class ListeningCountDownLatch(
private val name: String,
private val count: Int,
private val backends: List<CountDownLatchBackend>,
private val maxDuration: Duration = 5.minutes,
private val maxDuration: Duration = Duration.ofMinutes(5),
private val retryCount: Int = 3,
private val retryDelay: Duration = 100.milliseconds,
private val retryDelay: Duration = Duration.ofMillis(100),
) : CountDownLatch {
private val scope: CoroutineScope = CoroutineScope(CoroutineName("listeningCountDownLatch") + Dispatchers.IO)
private val clientId: String = UUID.randomUUID().toString()
private val keySpace = "countdownlatch"
private val channelSpace = "channels"
private val currentCounter = AtomicInteger(count)
private val minimalMaxDuration = 100.milliseconds
private val minimalMaxDuration = Duration.ofMillis(100)

init {
require(backends.isNotEmpty()) { "Backend instances must not be empty" }
require(count > 0) { "Count must be positive" }
require(name.isNotBlank()) { "Name must not be blank" }
require(maxDuration > minimalMaxDuration) { "Max duration must be greater that 0.1 second" }
require(retryDelay > 0.milliseconds) { "Retry delay must be positive" }
require(retryDelay.toMillis() > 0) { "Retry delay must be positive" }
require(retryCount > 0) { "Retry count must be positive" }
}

Expand Down Expand Up @@ -85,7 +83,7 @@ class ListeningCountDownLatch(
require(timeout > minimalMaxDuration) { "Timeout must be greater that [minimalMaxDuration]" }
val job =
scope.async {
withTimeout(timeout.inWholeMilliseconds) {
withTimeout(timeout.toMillis()) {
val globalCount = getCount(this)
if (globalCount == Int.MIN_VALUE) return@withTimeout CallResult.FAILED
// Open latch if internal counter or global one is already 0 or less
Expand Down Expand Up @@ -149,7 +147,7 @@ class ListeningCountDownLatch(
clientId = clientId,
count = currentCounter.get(),
initialCount = count,
ttl = maxDuration * 2,
ttl = maxDuration.multipliedBy(2),
)
}
}
Expand All @@ -172,7 +170,7 @@ class ListeningCountDownLatch(
private fun checkCount(scope: CoroutineScope): List<Long?> {
return backends.executeWithRetry(
scope = scope,
timeout = maxDuration * 2,
timeout = maxDuration.multipliedBy(2),
retryCount = retryCount,
retryDelay = retryDelay,
) { backend ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import me.himadieiev.redpulsar.core.locks.abstracts.AbstractMultyInstanceLock
import me.himadieiev.redpulsar.core.locks.abstracts.backends.LocksBackend
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import java.time.Duration

/**
* A distributed lock for single or multiple Redis instances / clusters.
Expand All @@ -15,11 +14,11 @@ import kotlin.time.Duration.Companion.milliseconds
class RedLock(
backends: List<LocksBackend>,
private val retryCount: Int = 3,
private val retryDelay: Duration = 100.milliseconds,
private val retryDelay: Duration = Duration.ofMillis(100),
scope: CoroutineScope = CoroutineScope(Dispatchers.IO),
) : AbstractMultyInstanceLock(backends, scope) {
init {
require(retryDelay > 0.milliseconds) { "Retry delay must be positive" }
require(retryDelay.toMillis() > 0) { "Retry delay must be positive" }
require(retryCount > 0) { "Retry count must be positive" }
}

Expand All @@ -31,7 +30,7 @@ class RedLock(
resourceName: String,
ttl: Duration,
): Boolean {
require(ttl > 2.milliseconds) { "Timeout must be greater that min clock drift." }
return multyLock(resourceName, ttl, 2.milliseconds, retryCount, retryDelay)
require(ttl.toMillis() > 2) { "Timeout must be greater that min clock drift." }
return multyLock(resourceName, ttl, Duration.ofMillis(2), retryCount, retryDelay)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import me.himadieiev.redpulsar.core.locks.abstracts.AbstractMultyInstanceLock
import me.himadieiev.redpulsar.core.locks.abstracts.backends.LocksBackend
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import java.time.Duration

/**
* An implementation for Semaphore lock in distributed systems.
Expand All @@ -15,15 +14,15 @@ class Semaphore(
backends: List<LocksBackend>,
private val maxLeases: Int,
private val retryCount: Int = 3,
private val retryDelay: Duration = 100.milliseconds,
private val retryDelay: Duration = Duration.ofMillis(100),
scope: CoroutineScope = CoroutineScope(Dispatchers.IO),
) : AbstractMultyInstanceLock(backends, scope) {
private val globalKeyPrefix = "semaphore"
private val leasersKey = "leasers"

init {
require(maxLeases > 0) { "Max leases should be positive number" }
require(retryDelay > 0.milliseconds) { "Retry delay must be positive" }
require(retryDelay.toMillis() > 0) { "Retry delay must be positive" }
require(retryCount > 0) { "Retry count must be positive" }
}

Expand All @@ -36,10 +35,9 @@ class Semaphore(
resourceName: String,
ttl: Duration,
): Boolean {
val minTtl = 10.milliseconds
// ttl is longer that other locks because process of acquiring lock is longer.
require(ttl > minTtl) { "Timeout is too small." }
return multyLock(resourceName, ttl, minTtl, retryCount, retryDelay)
require(ttl.toMillis() > 10) { "Timeout is too small." }
return multyLock(resourceName, ttl, Duration.ofMillis(10), retryCount, retryDelay)
}

override fun lockInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import me.himadieiev.redpulsar.core.locks.abstracts.AbstractLock
import me.himadieiev.redpulsar.core.locks.abstracts.backends.LocksBackend
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import java.time.Duration

/**
* A distributed lock implementation that using only single Redis Cluster or Redis instance.
*/
class SimpleLock(
private val backend: LocksBackend,
private val retryDelay: Duration = 100.milliseconds,
private val retryDelay: Duration = Duration.ofMillis(100),
private val retryCount: Int = 3,
) : AbstractLock() {
init {
require(retryDelay > 0.milliseconds) { "Retry delay must be positive" }
require(retryDelay.toMillis() > 0) { "Retry delay must be positive" }
require(retryCount > 0) { "Retry count must be positive" }
}

Expand All @@ -28,14 +27,14 @@ class SimpleLock(
resourceName: String,
ttl: Duration,
): Boolean {
require(ttl > 2.milliseconds) { "Timeout is too small." }
require(ttl.toMillis() > 2) { "Timeout is too small." }
var retries = retryCount
do {
if (lockInstance(backend, resourceName, ttl)) {
return true
}
runBlocking {
delay(retryDelay.inWholeMilliseconds)
delay(retryDelay.toMillis())
}
} while (--retries > 0)
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package me.himadieiev.redpulsar.core.locks.abstracts
import me.himadieiev.redpulsar.core.locks.abstracts.backends.LocksBackend
import me.himadieiev.redpulsar.core.locks.api.Lock
import mu.KotlinLogging
import java.time.Duration
import java.util.UUID
import kotlin.time.Duration

/**
* Common functions for broad range of different locks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import me.himadieiev.redpulsar.core.locks.abstracts.backends.LocksBackend
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.cancellation.CancellationException
import kotlin.system.measureTimeMillis
import kotlin.time.Duration

/**
* A distributed lock implementation based on the Redlock algorithm.
Expand Down Expand Up @@ -41,7 +41,7 @@ abstract class AbstractMultyInstanceLock(
retryCount: Int,
retryDelay: Duration,
): Boolean {
val clockDrift = (ttl.inWholeMilliseconds * 0.01 + defaultDrift.inWholeMilliseconds).toInt()
val clockDrift = (ttl.toMillis() * 0.01 + defaultDrift.toMillis()).toInt()
var retries = retryCount
do {
val acceptedLocks = AtomicInteger(0)
Expand All @@ -51,7 +51,7 @@ abstract class AbstractMultyInstanceLock(
if (lockInstance(backend, resourceName, ttl)) acceptedLocks.incrementAndGet()
}
}
val validity = ttl.inWholeMilliseconds - timeDiff - clockDrift
val validity = ttl.toMillis() - timeDiff - clockDrift
if (acceptedLocks.get() >= quorum && validity > 0) {
return true
} else {
Expand All @@ -60,7 +60,7 @@ abstract class AbstractMultyInstanceLock(
}
}
runBlocking {
delay(retryDelay.inWholeMilliseconds)
delay(retryDelay.toMillis())
}
} while (--retries > 0)
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package me.himadieiev.redpulsar.core.locks.abstracts.backends

import kotlinx.coroutines.flow.Flow
import me.himadieiev.redpulsar.core.locks.abstracts.Backend
import kotlin.time.Duration
import java.time.Duration

/**
* An abstraction for underlying storage for distributed count down latch.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package me.himadieiev.redpulsar.core.locks.abstracts.backends

import me.himadieiev.redpulsar.core.locks.abstracts.Backend
import kotlin.time.Duration
import java.time.Duration

/**
* An abstraction for underlying storage for distributed locks.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package me.himadieiev.redpulsar.core.locks.api

import kotlin.time.Duration
import java.time.Duration

/**
* A distributed locking mechanics, intended for synchronization of multiple workloads distributed across cloud.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package me.himadieiev.redpulsar.core.locks.api

import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import java.time.Duration

/**
* A distributed lock api. It is a simple interface that allows to lock and unlock a resource.
Expand All @@ -16,7 +15,7 @@ interface Lock {
*/
fun lock(
resourceName: String,
ttl: Duration = 10.seconds,
ttl: Duration = Duration.ofSeconds(10),
): Boolean

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import kotlinx.coroutines.runBlocking
import me.himadieiev.redpulsar.core.locks.abstracts.Backend
import me.himadieiev.redpulsar.core.utils.withRetry
import me.himadieiev.redpulsar.core.utils.withTimeoutInThread
import java.time.Duration
import java.util.Collections
import kotlin.system.measureTimeMillis
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

/**
* An algorithm for running closure on multiple remote instances proxied by [backends].
Expand All @@ -27,14 +26,14 @@ inline fun <T : Backend, R> multyInstanceExecute(
backends: List<T>,
scope: CoroutineScope,
timeout: Duration,
defaultDrift: Duration = 3.milliseconds,
defaultDrift: Duration = Duration.ofMillis(3),
crossinline waiter: suspend (jobs: List<Job>, results: MutableList<R>) -> Unit = ::waitAllJobs,
crossinline callee: suspend (backend: T) -> R,
): List<R> {
val jobs = mutableListOf<Job>()
val quorum: Int = backends.size / 2 + 1
val results = Collections.synchronizedList(mutableListOf<R>())
val clockDrift = (timeout.inWholeMilliseconds * 0.01).toLong() + defaultDrift.inWholeMilliseconds
val clockDrift = (timeout.toMillis() * 0.01).toLong() + defaultDrift.toMillis()
val timeDiff =
measureTimeMillis {
backends.forEach { backend ->
Expand All @@ -49,7 +48,7 @@ inline fun <T : Backend, R> multyInstanceExecute(
}
runBlocking(scope.coroutineContext) { waiter(jobs, results) }
}
val validity = timeout.inWholeMilliseconds - timeDiff - clockDrift
val validity = timeout.toMillis() - timeDiff - clockDrift
if (results.size < quorum || validity < 0) {
return emptyList()
}
Expand All @@ -60,9 +59,9 @@ inline fun <T : Backend, R> multyInstanceExecuteWithRetry(
backends: List<T>,
scope: CoroutineScope,
timeout: Duration,
defaultDrift: Duration = 3.milliseconds,
defaultDrift: Duration = Duration.ofMillis(3),
retryCount: Int = 3,
retryDelay: Duration = 100.milliseconds,
retryDelay: Duration = Duration.ofMillis(100),
crossinline waiter: suspend (jobs: List<Job>, results: MutableList<R>) -> Unit = ::waitAllJobs,
crossinline callee: suspend (backend: T) -> R,
): List<R> {
Expand All @@ -81,9 +80,9 @@ inline fun <T : Backend, R> multyInstanceExecuteWithRetry(
fun <T : Backend, R> List<T>.executeWithRetry(
scope: CoroutineScope,
timeout: Duration,
defaultDrift: Duration = 3.milliseconds,
defaultDrift: Duration = Duration.ofMillis(3),
retryCount: Int = 3,
retryDelay: Duration = 100.milliseconds,
retryDelay: Duration = Duration.ofMillis(100),
waiter: suspend (jobs: List<Job>, results: MutableList<R>) -> Unit = ::waitAllJobs,
callee: suspend (backend: T) -> R,
): List<R> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ package me.himadieiev.redpulsar.core.utils

import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import java.time.Duration

/**
* If provided closure returns empty list, it will be retried [retryCount] times
* with exponentially growing [retryDelay] between each attempt.
*/
inline fun <R> withRetry(
retryCount: Int = 3,
retryDelay: Duration = 100.milliseconds,
retryDelay: Duration = Duration.ofMillis(100),
block: () -> List<R>,
): List<R> {
var retries = retryCount
var exponentialDelay = retryDelay.inWholeMilliseconds
var exponentialDelay = retryDelay.toMillis()
do {
val result = block()
if (result.isNotEmpty()) {
Expand Down
Loading