From cc147b23b75a1a43e27904e111d3c402500bc943 Mon Sep 17 00:00:00 2001 From: Trent Daniel Date: Thu, 30 Jan 2025 17:07:27 -0500 Subject: [PATCH] EBP-471: Added test to verify cache response that takes a long time with 100k+ messages in response. --- test/cache_test.go | 71 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/test/cache_test.go b/test/cache_test.go index cc66824..56a41e4 100644 --- a/test/cache_test.go +++ b/test/cache_test.go @@ -230,6 +230,77 @@ var _ = Describe("Cache Strategy", func() { <-cacheResponseSignalChan } }) + DescribeTable("long running cache requests with live data queue and live data to fill", func(cacheResponseProcessStrategy helpers.CacheResponseProcessStrategy) { + numExpectedCachedMessages := 3 + numExpectedLiveMessages := 100000 + delay := 10000 + numExpectedReceivedMessages := numExpectedCachedMessages + numExpectedLiveMessages + receivedMsgChan := make(chan message.InboundMessage, numExpectedReceivedMessages) + err := receiver.ReceiveAsync(func(msg message.InboundMessage) { + receivedMsgChan <- msg + }) + Expect(err).To(BeNil()) + cacheName := fmt.Sprintf("MaxMsgs%d/delay=%d,msgs=%d", numExpectedCachedMessages, delay, numExpectedLiveMessages) + topic := fmt.Sprintf("MaxMsgs%d/%s/data1", numExpectedCachedMessages, testcontext.Cache().Vpn) + cacheRequestID := message.CacheRequestID(1) + cacheRequestConfig := resource.NewCachedMessageSubscriptionRequest(resource.CachedFirst, cacheName, resource.TopicSubscriptionOf(topic), 45000,0, 50000) + var cacheResponse solace.CacheResponse + /* NOTE: We need to wait for longer than usual for the cache response (10s) since the cache response is + * given to the application only after all messages related to the cache request have been received by + * the API. Since 100000 live messages are being received as a part of the cache response, the cache + * response ends up taking a lot longer. + */ + switch cacheResponseProcessStrategy { + case helpers.ProcessCacheResponseThroughCallback: + channel := make(chan solace.CacheResponse, 1) + callback := func(cacheResponse solace.CacheResponse) { + channel <- cacheResponse + } + err = receiver.RequestCachedAsyncWithCallback(cacheRequestConfig, cacheRequestID, callback) + Expect(err).To(BeNil()) + Eventually(func () uint64 {return messagingService.Metrics().GetValue(metrics.CacheRequestsSent)}).Should(BeNumerically("==", 1)) + Consistently(channel, "9.5s").ShouldNot(Receive()) + Eventually(channel, "10s").Should(Receive(&cacheResponse)) + case helpers.ProcessCacheResponseThroughChannel: + channel, err := receiver.RequestCachedAsync(cacheRequestConfig, cacheRequestID) + Expect(err).To(BeNil()) + Expect(channel).ToNot(BeNil()) + Consistently(channel, "9.5s").ShouldNot(Receive(&cacheResponse)) + Eventually(channel, "10s").Should(Receive(&cacheResponse)) + default: + Fail("Got unexpected cache response process strategy") + } + Expect(cacheResponse).ToNot(BeNil()) + /* EBP-25: Assert cache request ID from response is the same as the request */ + /* EBP-26: Assert cache request Outcome is Ok. */ + /* EBP-28: Assert error from cache response is nil */ + + /* NOTE: Check the cached messages first. */ + for i := 0; i < numExpectedCachedMessages; i++ { + var msg message.InboundMessage + Eventually(receivedMsgChan).Should(Receive(&msg)) + Expect(msg).ToNot(BeNil()) + Expect(msg.GetDestinationName()).To(Equal(topic)) + id, ok := msg.GetCacheRequestID() + Expect(ok).To(BeTrue()) + Expect(id).To(BeNumerically("==", cacheRequestID)) + /* EBP-21: Assert that this message is a cached message. */ + } + /* NOTE: Check the live messages second. */ + for i := 0; i < numExpectedLiveMessages; i++ { + var msg message.InboundMessage + Eventually(receivedMsgChan).Should(Receive(&msg)) + Expect(msg).ToNot(BeNil()) + Expect(msg.GetDestinationName()).To(Equal(topic)) + id, ok := msg.GetCacheRequestID() + Expect(ok).To(BeFalse()) + Expect(id).To(BeNumerically("==", 0)) + /* EBP-21: Assert that this is a live message */ + } + }, + Entry("with channel", helpers.ProcessCacheResponseThroughChannel), + Entry("with callback", helpers.ProcessCacheResponseThroughCallback), + ) DescribeTable("wildcard request are rejected with error of not live data flow on live data queue", func(cacheRequestStrategy resource.CachedMessageSubscriptionStrategy, cacheResponseProcessStrategy helpers.CacheResponseProcessStrategy) { numExpectedCachedMessages := 3