diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt index d812a32..48eba47 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatch.kt @@ -40,7 +40,7 @@ class ListeningCountDownLatch( private val scope: CoroutineScope = CoroutineScope(CoroutineName("listeningCountDownLatch") + Dispatchers.IO) private val clientId: String = UUID.randomUUID().toString() private val keySpace = "countdownlatch" - private val channelSpace = "channels" + private val channelSpace = "channel" private val currentCounter = AtomicInteger(count) private val minimalMaxDuration = Duration.ofMillis(100) @@ -150,7 +150,7 @@ class ListeningCountDownLatch( ) { backend -> backend.count( latchKeyName = buildKey(name), - channelName = buildKey(channelSpace, name), + channelName = buildKey(name, channelSpace), clientId = clientId, count = currentCounter.get(), initialCount = count, @@ -203,10 +203,13 @@ class ListeningCountDownLatch( retryDelay = retryDelay, waitStrategy = WaitStrategy.MAJORITY, ) { backend -> - backend.listen(channelName = buildKey(channelSpace, name)) + backend.listen(channelName = buildKey(name, channelSpace)) } } @Suppress("NOTHING_TO_INLINE") - private inline fun buildKey(vararg parts: String) = keySpace + ":" + parts.joinToString(":") + private inline fun buildKey( + name: String, + vararg parts: String, + ) = "{$keySpace:$name}:" + parts.joinToString(":") } diff --git a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/Semaphore.kt b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/Semaphore.kt index 0909c11..312bf1f 100644 --- a/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/Semaphore.kt +++ b/redpulsar-core/src/main/kotlin/com/himadieiev/redpulsar/core/locks/Semaphore.kt @@ -55,7 +55,7 @@ class Semaphore( resourceName: String, ttl: Duration, ): String? { - val leasersKey = buildKey(leasersKey, resourceName) + val leasersKey = buildKey(resourceName, leasersKey) val leaserValidityKey = buildKey(resourceName, clientId) return backend.setSemaphoreLock(leasersKey, leaserValidityKey, clientId, maxLeases, ttl) } @@ -64,7 +64,7 @@ class Semaphore( backend: LocksBackend, resourceName: String, ): String? { - val leasersKey = buildKey(leasersKey, resourceName) + val leasersKey = buildKey(resourceName, leasersKey) val leaserValidityKey = buildKey(resourceName, clientId) val removeSemaphoreLock = backend.removeSemaphoreLock(leasersKey, leaserValidityKey, clientId) // clean up expired other leasers @@ -76,11 +76,14 @@ class Semaphore( backend: LocksBackend, resourceName: String, ) { - val leasersKey = buildKey(leasersKey, resourceName) + val leasersKey = buildKey(resourceName, leasersKey) val leaserValidityKeyPrefix = buildKey(resourceName) backend.cleanUpExpiredSemaphoreLocks(leasersKey, leaserValidityKeyPrefix) } @Suppress("NOTHING_TO_INLINE") - private inline fun buildKey(vararg parts: String) = globalKeyPrefix + ":" + parts.joinToString(":") + private inline fun buildKey( + resourceName: String, + vararg parts: String, + ) = "{$globalKeyPrefix:$resourceName}:" + parts.joinToString(":") } diff --git a/redpulsar-core/src/main/resources/lua/CleanUpExpiredSemaphoreLocksScript.lua b/redpulsar-core/src/main/resources/lua/CleanUpExpiredSemaphoreLocksScript.lua index 1bc1700..cf63fa7 100644 --- a/redpulsar-core/src/main/resources/lua/CleanUpExpiredSemaphoreLocksScript.lua +++ b/redpulsar-core/src/main/resources/lua/CleanUpExpiredSemaphoreLocksScript.lua @@ -1,7 +1,7 @@ local leasersKey = KEYS[1] local leasers = redis.call("smembers", leasersKey) for _, leaser in ipairs(leasers) do - local leaserValidityKey = ARGV[1] .. ":" .. leaser + local leaserValidityKey = ARGV[1] .. leaser if redis.call("exists", leaserValidityKey) == 0 then redis.call("srem", leasersKey, leaser) end diff --git a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt index ac3b470..bee15ea 100644 --- a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt +++ b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/ListeningCountDownLatchTest.kt @@ -39,9 +39,9 @@ class ListeningCountDownLatchTest { @Test fun `count down`() { - backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 4, 4, returnVal = "OK") - backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 3, 4, returnVal = "OK") - backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 2, 4, returnVal = "OK") + backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 4, 4, returnVal = "OK") + backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 3, 4, returnVal = "OK") + backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 2, 4, returnVal = "OK") val latch = ListeningCountDownLatch( "test", @@ -57,8 +57,8 @@ class ListeningCountDownLatchTest { @Test fun `count down failing`() { - backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 2, 2, returnVal = null) - backend.everyUndoCount("countdownlatch:test", 2, 1) + backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 2, 2, returnVal = null) + backend.everyUndoCount("{countdownlatch:test}:", 2, 1) val latch = ListeningCountDownLatch( "test", @@ -76,8 +76,8 @@ class ListeningCountDownLatchTest { @Test fun `undo count failing`() { - backend.everyCount("countdownlatch:test", "countdownlatch:channels:test", 2, 2, returnVal = null) - backend.everyUndoCount("countdownlatch:test", 2, null) + backend.everyCount("{countdownlatch:test}:", "{countdownlatch:test}:channel", 2, 2, returnVal = null) + backend.everyUndoCount("{countdownlatch:test}:", 2, null) val latch = ListeningCountDownLatch( "test", @@ -96,7 +96,7 @@ class ListeningCountDownLatchTest { @Test fun await() { backend.everyListen("open") - backend.everyCheckCount("countdownlatch:test", 2) + backend.everyCheckCount("{countdownlatch:test}:", 2) val latch = ListeningCountDownLatch( "test", @@ -117,7 +117,7 @@ class ListeningCountDownLatchTest { @Test fun `await finished by global count`() { - backend.everyCheckCount("countdownlatch:test", 4) + backend.everyCheckCount("{countdownlatch:test}:", 4) val latch = ListeningCountDownLatch( "test", @@ -138,7 +138,7 @@ class ListeningCountDownLatchTest { @Test fun `await - check count failed`() { - backend.everyCheckCount("countdownlatch:test", null) + backend.everyCheckCount("{countdownlatch:test}:", null) val latch = ListeningCountDownLatch( "test", @@ -175,13 +175,13 @@ class ListeningCountDownLatchTest { @Test fun `await timed out`() { coEvery { - backend.listen(eq("countdownlatch:channels:test")) + backend.listen(eq("{countdownlatch:test}:channel")) } answers { runBlocking { delay(1000) } null } - backend.everyCheckCount("countdownlatch:test", 2) + backend.everyCheckCount("{countdownlatch:test}:", 2) val latch = ListeningCountDownLatch("test", 4, listOf(backend)) assertEquals(CallResult.FAILED, latch.await(Duration.ofMillis(110))) @@ -196,7 +196,7 @@ class ListeningCountDownLatchTest { @Test fun `await failed`() { backend.everyListen(null) - backend.everyCheckCount("countdownlatch:test", 3) + backend.everyCheckCount("{countdownlatch:test}:", 3) val latch = ListeningCountDownLatch( "test", @@ -216,7 +216,7 @@ class ListeningCountDownLatchTest { @ParameterizedTest(name = "current count: {0}") @ValueSource(ints = [-123, -1, 0, 1, 2, 5]) fun `check count`(count: Long) { - backend.everyCheckCount("countdownlatch:test", count) + backend.everyCheckCount("{countdownlatch:test}:", count) val latch = ListeningCountDownLatch("test", 5, listOf(backend)) assertEquals(5 - count.toInt(), latch.getCount()) @@ -226,7 +226,7 @@ class ListeningCountDownLatchTest { @Test fun `check count failed`() { - backend.everyCheckCount("countdownlatch:test", null) + backend.everyCheckCount("{countdownlatch:test}:", null) val latch = ListeningCountDownLatch( "test", @@ -391,8 +391,8 @@ class ListeningCountDownLatchTest { fun `all instances are in quorum for count down`() { instances.forEach { backend -> backend.everyCount( - "countdownlatch:test", - "countdownlatch:channels:test", + "{countdownlatch:test}:", + "{countdownlatch:test}:channel", 4, 4, Duration.ofMinutes(10), @@ -417,24 +417,24 @@ class ListeningCountDownLatchTest { @Test fun `two instances are in quorum for count down`() { backend1.everyCount( - "countdownlatch:test", - "countdownlatch:channels:test", + "{countdownlatch:test}:", + "{countdownlatch:test}:channel", 4, 4, Duration.ofMinutes(10), "OK", ) backend2.everyCount( - "countdownlatch:test", - "countdownlatch:channels:test", + "{countdownlatch:test}:", + "{countdownlatch:test}:channel", 4, 4, Duration.ofMinutes(10), null, ) backend3.everyCount( - "countdownlatch:test", - "countdownlatch:channels:test", + "{countdownlatch:test}:", + "{countdownlatch:test}:channel", 4, 4, Duration.ofMinutes(10), @@ -458,31 +458,31 @@ class ListeningCountDownLatchTest { @Test fun `quorum wasn't reach for count down`() { backend1.everyCount( - "countdownlatch:test", - "countdownlatch:channels:test", + "{countdownlatch:test}:", + "{countdownlatch:test}:channel", 4, 4, Duration.ofMinutes(10), null, ) backend2.everyCount( - "countdownlatch:test", - "countdownlatch:channels:test", + "{countdownlatch:test}:", + "{countdownlatch:test}:channel", 4, 4, Duration.ofMinutes(10), null, ) backend3.everyCount( - "countdownlatch:test", - "countdownlatch:channels:test", + "{countdownlatch:test}:", + "{countdownlatch:test}:channel", 4, 4, Duration.ofMinutes(10), "OK", ) instances.forEach { backend -> - backend.everyUndoCount("countdownlatch:test", 4, 1) + backend.everyUndoCount("{countdownlatch:test}:", 4, 1) } val latch = ListeningCountDownLatch( @@ -503,7 +503,7 @@ class ListeningCountDownLatchTest { fun `all instances are in quorum for await`() { instances.forEach { backend -> backend.everyListen("open") - backend.everyCheckCount("countdownlatch:test", 1) + backend.everyCheckCount("{countdownlatch:test}:", 1) } val latch = ListeningCountDownLatch( @@ -526,7 +526,7 @@ class ListeningCountDownLatchTest { @Test fun `two instances are in quorum for await`() { instances.forEach { backend -> - backend.everyCheckCount("countdownlatch:test", 1) + backend.everyCheckCount("{countdownlatch:test}:", 1) } backend1.everyListen("open") backend2.everyListen(null) @@ -552,7 +552,7 @@ class ListeningCountDownLatchTest { @Test fun `one instance will continue to wait`() { instances.forEach { backend -> - backend.everyCheckCount("countdownlatch:test", 1) + backend.everyCheckCount("{countdownlatch:test}:", 1) } backend1.everyListen { delay(50) @@ -587,7 +587,7 @@ class ListeningCountDownLatchTest { @Test fun `failed instance return value first`() { instances.forEach { backend -> - backend.everyCheckCount("countdownlatch:test", 1) + backend.everyCheckCount("{countdownlatch:test}:", 1) } backend1.everyListen { delay(100) @@ -622,7 +622,7 @@ class ListeningCountDownLatchTest { @Test fun `quorum wasn't reach at majority`() { instances.forEach { backend -> - backend.everyCheckCount("countdownlatch:test", 1) + backend.everyCheckCount("{countdownlatch:test}:", 1) } backend1.everyListen(null) backend2.everyListen("open") @@ -647,7 +647,7 @@ class ListeningCountDownLatchTest { @Test fun `all instances are down`() { instances.forEach { backend -> - backend.everyCheckCount("countdownlatch:test", 1) + backend.everyCheckCount("{countdownlatch:test}:", 1) backend.everyListen(null) } val latch = @@ -669,9 +669,9 @@ class ListeningCountDownLatchTest { @Test fun `check count return max value of majority`() { - backend1.everyCheckCount("countdownlatch:test", 1) - backend2.everyCheckCount("countdownlatch:test", 2) - backend3.everyCheckCount("countdownlatch:test", 1) + backend1.everyCheckCount("{countdownlatch:test}:", 1) + backend2.everyCheckCount("{countdownlatch:test}:", 2) + backend3.everyCheckCount("{countdownlatch:test}:", 1) val latch = ListeningCountDownLatch( @@ -687,9 +687,9 @@ class ListeningCountDownLatchTest { @Test fun `check count return min int`() { - backend1.everyCheckCount("countdownlatch:test", null) - backend2.everyCheckCount("countdownlatch:test", null) - backend3.everyCheckCount("countdownlatch:test", 2) + backend1.everyCheckCount("{countdownlatch:test}:", null) + backend2.everyCheckCount("{countdownlatch:test}:", null) + backend3.everyCheckCount("{countdownlatch:test}:", 2) val latch = ListeningCountDownLatch( @@ -745,14 +745,14 @@ class ListeningCountDownLatchTest { private fun CountDownLatchBackend.everyListen(returnVal: String?) { val backend = this coEvery { - backend.listen(eq("countdownlatch:channels:test")) + backend.listen(eq("{countdownlatch:test}:channel")) } returns returnVal } private fun CountDownLatchBackend.everyListen(answer: suspend MockKAnswerScope.(Call) -> String?) { val backend = this coEvery { - backend.listen(eq("countdownlatch:channels:test")) + backend.listen(eq("{countdownlatch:test}:channel")) } coAnswers answer } diff --git a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/SemaphoreTest.kt b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/SemaphoreTest.kt index 13b1910..b77cdc4 100644 --- a/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/SemaphoreTest.kt +++ b/redpulsar-core/src/test/kotlin/com/himadieiev/redpulsar/core/locks/SemaphoreTest.kt @@ -32,8 +32,8 @@ class SemaphoreTest { fun `lock acquired`(ttl: Long) { every { backend.setSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), eq(3), eq(Duration.ofSeconds(ttl)), @@ -46,8 +46,8 @@ class SemaphoreTest { assertTrue(permit) verify(exactly = 1) { backend.setSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), any(), any(), @@ -63,8 +63,8 @@ class SemaphoreTest { fun `lock already taken or instance is down`() { every { backend.setSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), eq(3), eq(Duration.ofSeconds(1)), @@ -72,14 +72,14 @@ class SemaphoreTest { } returns null every { backend.removeSemaphoreLock( - eq("semaphore:leasers:test"), match { it.startsWith("semaphore:test:") }, any(), + eq("{semaphore:test}:leasers"), match { it.startsWith("{semaphore:test}:") }, any(), ) } returns "OK" // cleaning up every { backend.cleanUpExpiredSemaphoreLocks( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test") }, + eq("{semaphore:test}:leasers"), + eq("{semaphore:test}:"), ) } returns "OK" @@ -90,22 +90,22 @@ class SemaphoreTest { verify(exactly = 4) { backend.setSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), eq(3), any(), ) // unlocking backend.removeSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), ) // cleaning up backend.cleanUpExpiredSemaphoreLocks( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test") }, + eq("{semaphore:test}:leasers"), + eq("{semaphore:test}:"), ) } } @@ -114,13 +114,13 @@ class SemaphoreTest { fun `unlock resource`() { every { backend.removeSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), ) } returns "OK" every { - backend.cleanUpExpiredSemaphoreLocks(eq("semaphore:leasers:test"), eq("semaphore:test")) + backend.cleanUpExpiredSemaphoreLocks(eq("{semaphore:test}:leasers"), eq("{semaphore:test}:")) } returns "OK" val semaphore = Semaphore(listOf(backend), 3) @@ -129,12 +129,12 @@ class SemaphoreTest { verify(exactly = 1) { // unlocking backend.removeSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), ) // cleaning up - backend.cleanUpExpiredSemaphoreLocks(eq("semaphore:leasers:test"), eq("semaphore:test")) + backend.cleanUpExpiredSemaphoreLocks(eq("{semaphore:test}:leasers"), eq("{semaphore:test}:")) } verify(exactly = 0) { backend.setSemaphoreLock(any(), any(), any(), any(), any()) @@ -194,8 +194,8 @@ class SemaphoreTest { fun `validate ttl`(ttl: Long) { every { backend.setSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), eq(3), eq(Duration.ofMillis(ttl)), @@ -231,8 +231,8 @@ class SemaphoreTest { instances.forEach { backend -> every { backend.setSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), eq(3), eq(Duration.ofSeconds(10)), @@ -247,8 +247,8 @@ class SemaphoreTest { instances.forEach { backend -> verify(exactly = 1) { backend.setSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), eq(3), any(), @@ -265,8 +265,8 @@ class SemaphoreTest { fun `two instances are in quorum`() { every { backend1.setSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), eq(3), eq(Duration.ofSeconds(10)), @@ -274,8 +274,8 @@ class SemaphoreTest { } returns "OK" every { backend2.setSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), eq(3), eq(Duration.ofSeconds(10)), @@ -283,8 +283,8 @@ class SemaphoreTest { } returns null every { backend3.setSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), eq(3), eq(Duration.ofSeconds(10)), @@ -299,8 +299,8 @@ class SemaphoreTest { instances.forEach { backend -> verify(exactly = 1) { backend.setSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), eq(3), any(), @@ -317,8 +317,8 @@ class SemaphoreTest { fun `quorum wasn't reach`() { every { backend1.setSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), eq(3), eq(Duration.ofSeconds(3)), @@ -326,8 +326,8 @@ class SemaphoreTest { } returns null every { backend2.setSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), eq(3), eq(Duration.ofSeconds(3)), @@ -335,8 +335,8 @@ class SemaphoreTest { } returns null every { backend3.setSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), eq(3), eq(Duration.ofSeconds(3)), @@ -345,16 +345,16 @@ class SemaphoreTest { instances.forEach { backend -> every { backend.removeSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), ) } returns "OK" // cleaning up every { backend.cleanUpExpiredSemaphoreLocks( - eq("semaphore:leasers:test"), - eq("semaphore:test"), + eq("{semaphore:test}:leasers"), + eq("{semaphore:test}:"), ) } returns "OK" } @@ -367,20 +367,20 @@ class SemaphoreTest { instances.forEach { backend -> verify(exactly = 3) { backend.setSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), eq(3), any(), ) // unlocking backend.removeSemaphoreLock( - eq("semaphore:leasers:test"), - match { it.startsWith("semaphore:test:") }, + eq("{semaphore:test}:leasers"), + match { it.startsWith("{semaphore:test}:") }, any(), ) // cleaning up - backend.cleanUpExpiredSemaphoreLocks(eq("semaphore:leasers:test"), eq("semaphore:test")) + backend.cleanUpExpiredSemaphoreLocks(eq("{semaphore:test}:leasers"), eq("{semaphore:test}:")) } } } diff --git a/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/integrationtests/SemaphoreIntegrationTest.kt b/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/integrationtests/SemaphoreIntegrationTest.kt index b358dae..7d1b85e 100644 --- a/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/integrationtests/SemaphoreIntegrationTest.kt +++ b/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/integrationtests/SemaphoreIntegrationTest.kt @@ -37,12 +37,13 @@ class SemaphoreIntegrationTest { assertTrue(permit) - val clients = instances.map { it.smembers("semaphore:lasers:test") } + val clients = instances.map { it.smembers("{semaphore:test}:leasers") } + clients.forEach { client -> assertTrue(client.isNotEmpty()) } assertTrue(clients[0] == clients[1] && clients[1] == clients[2]) clients[0].forEach { leaser -> instances.forEach { - assertTrue(it.exists("semaphore:test:$leaser")) + assertTrue(it.exists("{semaphore:test}:$leaser")) } } } @@ -52,19 +53,20 @@ class SemaphoreIntegrationTest { val semaphore = Semaphore(backends, 3) semaphore.lock("test", Duration.ofSeconds(10)) - val clients = instances.map { it.smembers("semaphore:lasers:test") } + val clients = instances.map { it.smembers("{semaphore:test}:leasers") } + clients.forEach { client -> assertTrue(client.isNotEmpty()) } assertTrue(clients[0] == clients[1] && clients[1] == clients[2]) clients[0].forEach { leaser -> instances.forEach { - assertTrue(it.exists("semaphore:test:$leaser")) + assertTrue(it.exists("{semaphore:test}:$leaser")) } } semaphore.unlock("test") - assertTrue(instances.map { it.smembers("semaphore:lasers:test") }.none { it.isNotEmpty() }) + assertTrue(instances.map { it.smembers("{semaphore:test}:leasers") }.none { it.isNotEmpty() }) clients[0].forEach { leaser -> instances.forEach { - assertFalse(it.exists("semaphore:test:$leaser")) + assertFalse(it.exists("{semaphore:test}:$leaser")) } } } @@ -73,12 +75,12 @@ class SemaphoreIntegrationTest { @ValueSource(ints = [1, 2, 3, 5, 7, 10]) fun `another client can re-acquire lock`(maxLeases: Int) { val semaphores = mutableListOf() - (1..maxLeases + 1) + (1..maxLeases) .forEach { semaphores.add( Semaphore( backends = backends, - maxLeases = it, + maxLeases = maxLeases, retryCount = 2, retryDelay = Duration.ofMillis(30), ), @@ -111,12 +113,12 @@ class SemaphoreIntegrationTest { @ValueSource(ints = [1, 2, 3, 5, 7, 10]) fun `another client can re-acquire lock due to expiration`(maxLeases: Int) { val semaphores = mutableListOf() - (1..maxLeases + 1) + (1..maxLeases) .forEach { semaphores.add( Semaphore( backends = backends, - maxLeases = it, + maxLeases = maxLeases, retryCount = 2, retryDelay = Duration.ofMillis(30), ), diff --git a/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackendTest.kt b/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackendTest.kt index a1f1bee..a6d0a59 100644 --- a/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackendTest.kt +++ b/redpulsar-jedis/src/test/kotlin/com/himadieiev/redpulsar/jedis/locks/backends/JedisCountDownLatchBackendTest.kt @@ -44,12 +44,12 @@ class JedisCountDownLatchBackendTest { every { redis.eval( any(), - eq(listOf("latch:test", "latch:channel:test")), + eq(listOf("latch:test", "latch:test:channel")), eq(listOf("${clientId}0", "5000", "4")), ) } returns "OK" val callResult = - countDownLatchBackend.count("latch:test", "latch:channel:test", clientId, 0, 4, Duration.ofSeconds(5)) + countDownLatchBackend.count("latch:test", "latch:test:channel", clientId, 0, 4, Duration.ofSeconds(5)) assertEquals("OK", callResult) verify(exactly = 1) { @@ -63,12 +63,12 @@ class JedisCountDownLatchBackendTest { every { redis.eval( any(), - eq(listOf("latch:test", "latch:channel:test")), + eq(listOf("latch:test", "latch:test:channel")), eq(listOf("${clientId}0", "5000", "4")), ) } throws IOException("test exception") val callResult = - countDownLatchBackend.count("latch:test", "latch:channel:test", clientId, 0, 4, Duration.ofSeconds(5)) + countDownLatchBackend.count("latch:test", "latch:test:channel", clientId, 0, 4, Duration.ofSeconds(5)) assertNull(callResult) verify(exactly = 1) { @@ -132,14 +132,14 @@ class JedisCountDownLatchBackendTest { val channel = slot() every { redis.subscribe(capture(pubSubSlot), capture(channel)) } returns Unit CoroutineScope(CoroutineName("test")).launch { - val result = countDownLatchBackend.listen("latch:channel:test") + val result = countDownLatchBackend.listen("latch:test:channel") assertEquals("open", result) } runBlocking { delay(200) } repeat(messageCount) { - pubSubSlot.captured.onMessage("latch:channel:test", "open") + pubSubSlot.captured.onMessage("latch:test:channel", "open") } - assertEquals("latch:channel:test", channel.captured) + assertEquals("latch:test:channel", channel.captured) verify(exactly = 1) { redis.subscribe(any(), any()) } @@ -147,11 +147,11 @@ class JedisCountDownLatchBackendTest { @Test fun `message not received`() { - every { redis.subscribe(any(), eq("latch:channel:test")) } returns Unit + every { redis.subscribe(any(), eq("latch:test:channel")) } returns Unit runBlocking { assertThrows { - withTimeout(100) { countDownLatchBackend.listen("latch:channel:test") } + withTimeout(100) { countDownLatchBackend.listen("latch:test:channel") } } } verify(exactly = 1) { diff --git a/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/integrationtests/SemaphoreIntegrationTest.kt b/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/integrationtests/SemaphoreIntegrationTest.kt index 122a6ae..472e09c 100644 --- a/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/integrationtests/SemaphoreIntegrationTest.kt +++ b/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/integrationtests/SemaphoreIntegrationTest.kt @@ -37,12 +37,13 @@ class SemaphoreIntegrationTest { assertTrue(permit) - val clients = instances.map { it.sync { redis -> redis.smembers("semaphore:lasers:test") } } + val clients = instances.map { it.sync { redis -> redis.smembers("{semaphore:test}:leasers") } } + clients.forEach { client -> assertTrue(client.isNotEmpty()) } assertTrue(clients[0] == clients[1] && clients[1] == clients[2]) clients[0].forEach { leaser -> instances.forEach { - assertTrue(it.sync { redis -> redis.exists("semaphore:test:$leaser") == 1L }) + assertTrue(it.sync { redis -> redis.exists("{semaphore:test}:$leaser") == 1L }) } } } @@ -52,22 +53,23 @@ class SemaphoreIntegrationTest { val semaphore = Semaphore(backends, 3) semaphore.lock("test", Duration.ofSeconds(10)) - val clients = instances.map { it.sync { redis -> redis.smembers("semaphore:lasers:test") } } + val clients = instances.map { it.sync { redis -> redis.smembers("{semaphore:test}:leasers") } } + clients.forEach { client -> assertTrue(client.isNotEmpty()) } assertTrue(clients[0] == clients[1] && clients[1] == clients[2]) clients[0].forEach { leaser -> instances.forEach { - assertTrue(it.sync { redis -> redis.exists("semaphore:test:$leaser") == 1L }) + assertTrue(it.sync { redis -> redis.exists("{semaphore:test}:$leaser") == 1L }) } } semaphore.unlock("test") assertTrue( - instances.map { it.sync { redis -> redis.smembers("semaphore:lasers:test") } } + instances.map { it.sync { redis -> redis.smembers("{semaphore:test}:leasers") } } .none { it.isNotEmpty() }, ) clients[0].forEach { leaser -> instances.forEach { - assertFalse(it.sync { redis -> redis.exists("semaphore:test:$leaser") == 1L }) + assertFalse(it.sync { redis -> redis.exists("{semaphore:test}:$leaser") == 1L }) } } } @@ -76,12 +78,12 @@ class SemaphoreIntegrationTest { @ValueSource(ints = [1, 2, 3, 5, 7, 10]) fun `another client can re-acquire lock`(maxLeases: Int) { val semaphores = mutableListOf() - (1..maxLeases + 1) + (1..maxLeases) .forEach { semaphores.add( Semaphore( backends = backends, - maxLeases = it, + maxLeases = maxLeases, retryCount = 2, retryDelay = Duration.ofMillis(30), ), @@ -114,12 +116,12 @@ class SemaphoreIntegrationTest { @ValueSource(ints = [1, 2, 3, 5, 7, 10]) fun `another client can re-acquire lock due to expiration`(maxLeases: Int) { val semaphores = mutableListOf() - (1..maxLeases + 1) + (1..maxLeases) .forEach { semaphores.add( Semaphore( backends = backends, - maxLeases = it, + maxLeases = maxLeases, retryCount = 2, retryDelay = Duration.ofMillis(30), ), @@ -136,7 +138,7 @@ class SemaphoreIntegrationTest { semaphores2.add( Semaphore( backends = backends, - maxLeases = it, + maxLeases = maxLeases, retryCount = 2, retryDelay = Duration.ofMillis(30), ), diff --git a/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackendTest.kt b/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackendTest.kt index 2829ae4..162ba02 100644 --- a/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackendTest.kt +++ b/redpulsar-lettuce/src/test/kotlin/com/himadieiev/redpulsar/lettuce/locks/backends/LettuceCountDownLatchBackendTest.kt @@ -55,12 +55,12 @@ class LettuceCountDownLatchBackendTest { sync.eval( any(), eq(ScriptOutputType.STATUS), - eq(arrayOf("latch:test", "latch:channel:test")), + eq(arrayOf("latch:test", "latch:test:channel")), eq("${clientId}0"), eq("5000"), eq("4"), ) } returns "OK" val callResult = - countDownLatchBackend.count("latch:test", "latch:channel:test", clientId, 0, 4, Duration.ofSeconds(5)) + countDownLatchBackend.count("latch:test", "latch:test:channel", clientId, 0, 4, Duration.ofSeconds(5)) Assertions.assertEquals("OK", callResult) verify(exactly = 1) { @@ -75,12 +75,12 @@ class LettuceCountDownLatchBackendTest { sync.eval( any(), eq(ScriptOutputType.STATUS), - eq(arrayOf("latch:test", "latch:channel:test")), + eq(arrayOf("latch:test", "latch:test:channel")), eq("${clientId}0"), eq("5000"), eq("4"), ) } throws IOException("test exception") val callResult = - countDownLatchBackend.count("latch:test", "latch:channel:test", clientId, 0, 4, Duration.ofSeconds(5)) + countDownLatchBackend.count("latch:test", "latch:test:channel", clientId, 0, 4, Duration.ofSeconds(5)) Assertions.assertNull(callResult) verify(exactly = 1) { @@ -142,8 +142,8 @@ class LettuceCountDownLatchBackendTest { @BeforeEach fun setUp() { connection = mockk() - every { sync.subscribe(eq("latch:channel:test")) } returns Unit - every { sync.unsubscribe(eq("latch:channel:test")) } returns Unit + every { sync.subscribe(eq("latch:test:channel")) } returns Unit + every { sync.unsubscribe(eq("latch:test:channel")) } returns Unit every { sync.statefulConnection } returns connection every { connection.addListener(any>()) } returns Unit every { connection.removeListener(any>()) } returns Unit @@ -154,19 +154,19 @@ class LettuceCountDownLatchBackendTest { fun `listen produce value`(messageCount: Int) { val listener = slot>() every { connection.addListener(capture(listener)) } returns Unit - CoroutineScope(CoroutineName("latch:channel:test")).launch { - val result = countDownLatchBackend.listen("latch:channel:test") + CoroutineScope(CoroutineName("latch:test:channel")).launch { + val result = countDownLatchBackend.listen("latch:test:channel") Assertions.assertEquals("open", result) } runBlocking { Thread.sleep(200) } repeat(messageCount) { - listener.captured.message("latch:channel:test", "open") + listener.captured.message("latch:test:channel", "open") } runBlocking { Thread.sleep(100) } verify(exactly = 1) { - sync.subscribe(eq("latch:channel:test")) - sync.unsubscribe(eq("latch:channel:test")) + sync.subscribe(eq("latch:test:channel")) + sync.unsubscribe(eq("latch:test:channel")) } } @@ -174,11 +174,11 @@ class LettuceCountDownLatchBackendTest { fun `message not received`() { runBlocking { assertThrows { - withTimeout(100) { countDownLatchBackend.listen("latch:channel:test") } + withTimeout(100) { countDownLatchBackend.listen("latch:test:channel") } } } verify(exactly = 1) { - sync.subscribe("latch:channel:test") + sync.subscribe("latch:test:channel") } } }