Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster keys #69

Merged
merged 7 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ListeningCountDownLatch(
private val scope: CoroutineScope = CoroutineScope(CoroutineName("listeningCountDownLatch") + Dispatchers.IO)
private val clientId: String = UUID.randomUUID().toString()
private val keySpace = "countdownlatch"
private val channelSpace = "channels"
private val channelSpace = "channel"
private val currentCounter = AtomicInteger(count)
private val minimalMaxDuration = Duration.ofMillis(100)

Expand Down Expand Up @@ -150,7 +150,7 @@ class ListeningCountDownLatch(
) { backend ->
backend.count(
latchKeyName = buildKey(name),
channelName = buildKey(channelSpace, name),
channelName = buildKey(name, channelSpace),
clientId = clientId,
count = currentCounter.get(),
initialCount = count,
Expand Down Expand Up @@ -203,10 +203,13 @@ class ListeningCountDownLatch(
retryDelay = retryDelay,
waitStrategy = WaitStrategy.MAJORITY,
) { backend ->
backend.listen(channelName = buildKey(channelSpace, name))
backend.listen(channelName = buildKey(name, channelSpace))
}
}

@Suppress("NOTHING_TO_INLINE")
private inline fun buildKey(vararg parts: String) = keySpace + ":" + parts.joinToString(":")
private inline fun buildKey(
name: String,
vararg parts: String,
) = "{$keySpace:$name}:" + parts.joinToString(":")
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Semaphore(
resourceName: String,
ttl: Duration,
): String? {
val leasersKey = buildKey(leasersKey, resourceName)
val leasersKey = buildKey(resourceName, leasersKey)
val leaserValidityKey = buildKey(resourceName, clientId)
return backend.setSemaphoreLock(leasersKey, leaserValidityKey, clientId, maxLeases, ttl)
}
Expand All @@ -64,7 +64,7 @@ class Semaphore(
backend: LocksBackend,
resourceName: String,
): String? {
val leasersKey = buildKey(leasersKey, resourceName)
val leasersKey = buildKey(resourceName, leasersKey)
val leaserValidityKey = buildKey(resourceName, clientId)
val removeSemaphoreLock = backend.removeSemaphoreLock(leasersKey, leaserValidityKey, clientId)
// clean up expired other leasers
Expand All @@ -76,11 +76,14 @@ class Semaphore(
backend: LocksBackend,
resourceName: String,
) {
val leasersKey = buildKey(leasersKey, resourceName)
val leasersKey = buildKey(resourceName, leasersKey)
val leaserValidityKeyPrefix = buildKey(resourceName)
backend.cleanUpExpiredSemaphoreLocks(leasersKey, leaserValidityKeyPrefix)
}

@Suppress("NOTHING_TO_INLINE")
private inline fun buildKey(vararg parts: String) = globalKeyPrefix + ":" + parts.joinToString(":")
private inline fun buildKey(
resourceName: String,
vararg parts: String,
) = "{$globalKeyPrefix:$resourceName}:" + parts.joinToString(":")
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
local leasersKey = KEYS[1]
local leasers = redis.call("smembers", leasersKey)
for _, leaser in ipairs(leasers) do
local leaserValidityKey = ARGV[1] .. ":" .. leaser
local leaserValidityKey = ARGV[1] .. leaser
if redis.call("exists", leaserValidityKey) == 0 then
redis.call("srem", leasersKey, leaser)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ class ListeningCountDownLatchTest {

@Test
fun `count down`() {
backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, returnVal = "OK")
backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 3, 4, returnVal = "OK")
backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 2, 4, returnVal = "OK")
backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 4, 4, returnVal = "OK")
backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 3, 4, returnVal = "OK")
backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 2, 4, returnVal = "OK")
val latch =
ListeningCountDownLatch(
"test",
Expand All @@ -57,8 +57,8 @@ class ListeningCountDownLatchTest {

@Test
fun `count down failing`() {
backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 2, 2, returnVal = null)
backend.everyUndoCount("countdownlatch:test", 2, 1)
backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 2, 2, returnVal = null)
backend.everyUndoCount("{countdownlatch:test}:", 2, 1)
val latch =
ListeningCountDownLatch(
"test",
Expand All @@ -76,8 +76,8 @@ class ListeningCountDownLatchTest {

@Test
fun `undo count failing`() {
backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 2, 2, returnVal = null)
backend.everyUndoCount("countdownlatch:test", 2, null)
backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 2, 2, returnVal = null)
backend.everyUndoCount("{countdownlatch:test}:", 2, null)
val latch =
ListeningCountDownLatch(
"test",
Expand All @@ -96,7 +96,7 @@ class ListeningCountDownLatchTest {
@Test
fun await() {
backend.everyListen("open")
backend.everyCheckCount("countdownlatch:test", 2)
backend.everyCheckCount("{countdownlatch:test}:", 2)
val latch =
ListeningCountDownLatch(
"test",
Expand All @@ -117,7 +117,7 @@ class ListeningCountDownLatchTest {

@Test
fun `await finished by global count`() {
backend.everyCheckCount("countdownlatch:test", 4)
backend.everyCheckCount("{countdownlatch:test}:", 4)
val latch =
ListeningCountDownLatch(
"test",
Expand All @@ -138,7 +138,7 @@ class ListeningCountDownLatchTest {

@Test
fun `await - check count failed`() {
backend.everyCheckCount("countdownlatch:test", null)
backend.everyCheckCount("{countdownlatch:test}:", null)
val latch =
ListeningCountDownLatch(
"test",
Expand Down Expand Up @@ -175,13 +175,13 @@ class ListeningCountDownLatchTest {
@Test
fun `await timed out`() {
coEvery {
backend.listen(eq("countdownlatch:channels:test"))
backend.listen(eq("{countdownlatch:test}:channel"))
} answers {
runBlocking { delay(1000) }
null
}

backend.everyCheckCount("countdownlatch:test", 2)
backend.everyCheckCount("{countdownlatch:test}:", 2)
val latch = ListeningCountDownLatch("test", 4, listOf(backend))

assertEquals(CallResult.FAILED, latch.await(Duration.ofMillis(110)))
Expand All @@ -196,7 +196,7 @@ class ListeningCountDownLatchTest {
@Test
fun `await failed`() {
backend.everyListen(null)
backend.everyCheckCount("countdownlatch:test", 3)
backend.everyCheckCount("{countdownlatch:test}:", 3)
val latch =
ListeningCountDownLatch(
"test",
Expand All @@ -216,7 +216,7 @@ class ListeningCountDownLatchTest {
@ParameterizedTest(name = "current count: {0}")
@ValueSource(ints = [-123, -1, 0, 1, 2, 5])
fun `check count`(count: Long) {
backend.everyCheckCount("countdownlatch:test", count)
backend.everyCheckCount("{countdownlatch:test}:", count)
val latch = ListeningCountDownLatch("test", 5, listOf(backend))

assertEquals(5 - count.toInt(), latch.getCount())
Expand All @@ -226,7 +226,7 @@ class ListeningCountDownLatchTest {

@Test
fun `check count failed`() {
backend.everyCheckCount("countdownlatch:test", null)
backend.everyCheckCount("{countdownlatch:test}:", null)
val latch =
ListeningCountDownLatch(
"test",
Expand Down Expand Up @@ -391,8 +391,8 @@ class ListeningCountDownLatchTest {
fun `all instances are in quorum for count down`() {
instances.forEach { backend ->
backend.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
"{countdownlatch:test}:",
"{countdownlatch:test}:channel",
4,
4,
Duration.ofMinutes(10),
Expand All @@ -417,24 +417,24 @@ class ListeningCountDownLatchTest {
@Test
fun `two instances are in quorum for count down`() {
backend1.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
"{countdownlatch:test}:",
"{countdownlatch:test}:channel",
4,
4,
Duration.ofMinutes(10),
"OK",
)
backend2.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
"{countdownlatch:test}:",
"{countdownlatch:test}:channel",
4,
4,
Duration.ofMinutes(10),
null,
)
backend3.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
"{countdownlatch:test}:",
"{countdownlatch:test}:channel",
4,
4,
Duration.ofMinutes(10),
Expand All @@ -458,31 +458,31 @@ class ListeningCountDownLatchTest {
@Test
fun `quorum wasn't reach for count down`() {
backend1.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
"{countdownlatch:test}:",
"{countdownlatch:test}:channel",
4,
4,
Duration.ofMinutes(10),
null,
)
backend2.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
"{countdownlatch:test}:",
"{countdownlatch:test}:channel",
4,
4,
Duration.ofMinutes(10),
null,
)
backend3.everyCount(
"countdownlatch:test",
"countdownlatch:channels:test",
"{countdownlatch:test}:",
"{countdownlatch:test}:channel",
4,
4,
Duration.ofMinutes(10),
"OK",
)
instances.forEach { backend ->
backend.everyUndoCount("countdownlatch:test", 4, 1)
backend.everyUndoCount("{countdownlatch:test}:", 4, 1)
}
val latch =
ListeningCountDownLatch(
Expand All @@ -503,7 +503,7 @@ class ListeningCountDownLatchTest {
fun `all instances are in quorum for await`() {
instances.forEach { backend ->
backend.everyListen("open")
backend.everyCheckCount("countdownlatch:test", 1)
backend.everyCheckCount("{countdownlatch:test}:", 1)
}
val latch =
ListeningCountDownLatch(
Expand All @@ -526,7 +526,7 @@ class ListeningCountDownLatchTest {
@Test
fun `two instances are in quorum for await`() {
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
backend.everyCheckCount("{countdownlatch:test}:", 1)
}
backend1.everyListen("open")
backend2.everyListen(null)
Expand All @@ -552,7 +552,7 @@ class ListeningCountDownLatchTest {
@Test
fun `one instance will continue to wait`() {
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
backend.everyCheckCount("{countdownlatch:test}:", 1)
}
backend1.everyListen {
delay(50)
Expand Down Expand Up @@ -587,7 +587,7 @@ class ListeningCountDownLatchTest {
@Test
fun `failed instance return value first`() {
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
backend.everyCheckCount("{countdownlatch:test}:", 1)
}
backend1.everyListen {
delay(100)
Expand Down Expand Up @@ -622,7 +622,7 @@ class ListeningCountDownLatchTest {
@Test
fun `quorum wasn't reach at majority`() {
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
backend.everyCheckCount("{countdownlatch:test}:", 1)
}
backend1.everyListen(null)
backend2.everyListen("open")
Expand All @@ -647,7 +647,7 @@ class ListeningCountDownLatchTest {
@Test
fun `all instances are down`() {
instances.forEach { backend ->
backend.everyCheckCount("countdownlatch:test", 1)
backend.everyCheckCount("{countdownlatch:test}:", 1)
backend.everyListen(null)
}
val latch =
Expand All @@ -669,9 +669,9 @@ class ListeningCountDownLatchTest {

@Test
fun `check count return max value of majority`() {
backend1.everyCheckCount("countdownlatch:test", 1)
backend2.everyCheckCount("countdownlatch:test", 2)
backend3.everyCheckCount("countdownlatch:test", 1)
backend1.everyCheckCount("{countdownlatch:test}:", 1)
backend2.everyCheckCount("{countdownlatch:test}:", 2)
backend3.everyCheckCount("{countdownlatch:test}:", 1)

val latch =
ListeningCountDownLatch(
Expand All @@ -687,9 +687,9 @@ class ListeningCountDownLatchTest {

@Test
fun `check count return min int`() {
backend1.everyCheckCount("countdownlatch:test", null)
backend2.everyCheckCount("countdownlatch:test", null)
backend3.everyCheckCount("countdownlatch:test", 2)
backend1.everyCheckCount("{countdownlatch:test}:", null)
backend2.everyCheckCount("{countdownlatch:test}:", null)
backend3.everyCheckCount("{countdownlatch:test}:", 2)

val latch =
ListeningCountDownLatch(
Expand Down Expand Up @@ -745,14 +745,14 @@ class ListeningCountDownLatchTest {
private fun CountDownLatchBackend.everyListen(returnVal: String?) {
val backend = this
coEvery {
backend.listen(eq("countdownlatch:channels:test"))
backend.listen(eq("{countdownlatch:test}:channel"))
} returns returnVal
}

private fun CountDownLatchBackend.everyListen(answer: suspend MockKAnswerScope<String?, String?>.(Call) -> String?) {
val backend = this
coEvery {
backend.listen(eq("countdownlatch:channels:test"))
backend.listen(eq("{countdownlatch:test}:channel"))
} coAnswers answer
}

Expand Down
Loading
Loading