Skip to content

Commit

Permalink
Merge pull request #100 from SolaceDev/EBP-482
Browse files Browse the repository at this point in the history
EBP-482: test to validate only flow thru is not rejected while there is outstanding flow thru
  • Loading branch information
TrentDaniel authored Feb 20, 2025
2 parents b74f9ea + 7bde3f2 commit e9a3cab
Showing 1 changed file with 121 additions and 0 deletions.
121 changes: 121 additions & 0 deletions test/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,127 @@ var _ = Describe("Cache Strategy", func() {
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsFailed)).To(BeNumerically("==", 0))
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSucceeded)).To(BeNumerically("==", 2))
})
It("requests subsequent to non-wildcard AsAvailable request are rejected, except for AsAvailable and CachedOnly", func() {
firstCacheRequestID := message.CacheRequestID(1)
numExpectedCachedMessages := 3
cacheName := fmt.Sprintf("MaxMsgs%d/delay=10000", numExpectedCachedMessages)
topic := fmt.Sprintf("MaxMsgs%d/%s/data1", numExpectedCachedMessages, testcontext.Cache().Vpn)
cacheRequestConfig := resource.NewCachedMessageSubscriptionRequest(resource.AsAvailable, cacheName, resource.TopicSubscriptionOf(topic), int32(12000), int32(0), int32(0))
/* NOTE: We expect the first AsAvailable, second AsAvailable, and CachedOnly requests to succeed, so our
* application buffer may contain as many as 3 times the number of cached messages expected from a single
* cache request to MaxMsgs3 before we are able to clear it.
*/
receivedMsgChan := make(chan message.InboundMessage, numExpectedCachedMessages*3)
receiver.ReceiveAsync(func(msg message.InboundMessage) {
receivedMsgChan <- msg
})
firstChannel, err := receiver.RequestCachedAsync(cacheRequestConfig, firstCacheRequestID)
Expect(err).To(BeNil())
Expect(firstChannel).ToNot(BeNil())
cacheName = fmt.Sprintf("MaxMsgs%d", numExpectedCachedMessages)

Eventually(func() uint64 { return messagingService.Metrics().GetValue(metrics.CacheRequestsSent) }).Should(BeNumerically("==", 1))
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSucceeded)).To(BeNumerically("==", 0))
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsFailed)).To(BeNumerically("==", 0))

/* NOTE: Subsequent LiveCancelsCached fails. */
cacheRequestConfig = helpers.GetValidLiveCancelsCachedRequestConfig(cacheName, topic)
secondCacheRequestID := message.CacheRequestID(2)
secondChannel, err := receiver.RequestCachedAsync(cacheRequestConfig, secondCacheRequestID)
Expect(err).To(BeAssignableToTypeOf(&solace.NativeError{}))
helpers.ValidateNativeError(err, subcode.CacheAlreadyInProgress)
Expect(secondChannel).To(BeNil())
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSent)).To(BeNumerically("==", 1))
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSucceeded)).To(BeNumerically("==", 0))
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsFailed)).To(BeNumerically("==", 0))

/* NOTE: Subsequent CachedFirst fails. */
cacheRequestConfig = helpers.GetValidCachedFirstCacheRequestConfig(cacheName, topic)
thirdCacheRequestID := message.CacheRequestID(3)
thirdChannel, err := receiver.RequestCachedAsync(cacheRequestConfig, thirdCacheRequestID)
Expect(err).To(BeAssignableToTypeOf(&solace.NativeError{}))
helpers.ValidateNativeError(err, subcode.CacheAlreadyInProgress)
Expect(thirdChannel).To(BeNil())
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSent)).To(BeNumerically("==", 1))
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSucceeded)).To(BeNumerically("==", 0))
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsFailed)).To(BeNumerically("==", 0))

var cacheResponse1 solace.CacheResponse
cacheName = fmt.Sprintf("MaxMsgs%d", numExpectedCachedMessages)

/* NOTE: Subsequent CachedOnly succeeds. */
cacheRequestConfig = helpers.GetValidCachedOnlyCacheRequestConfig(cacheName, topic)
fourthCacheRequestID := message.CacheRequestID(4)
fourthChannel, err := receiver.RequestCachedAsync(cacheRequestConfig, fourthCacheRequestID)
Expect(err).To(BeNil())
Expect(fourthChannel).ToNot(BeNil())
Eventually(fourthChannel, "10s").Should(Receive(&cacheResponse1))
Expect(cacheResponse1).ToNot(BeNil())
/* EBP-25: Assert cache request ID matches cache response ID. */
/* EBP-26: Assert CacheRequestOutcome Ok. */
/* EBP-28: Assert err is nil. */
for i := 0; i < numExpectedCachedMessages; i++ {
var msg message.InboundMessage
Eventually(receivedMsgChan).Should(Receive(&msg))
Expect(msg).ToNot(BeNil())
Expect(msg.GetDestinationName()).To(Equal(topic))
id, ok := msg.GetCacheRequestID()
Expect(ok).To(BeTrue())
Expect(id).To(BeNumerically("==", fourthCacheRequestID))
/* EBP-21: Assert that this message is cached. */
}
Eventually(func() uint64 { return messagingService.Metrics().GetValue(metrics.CacheRequestsSent) }).Should(BeNumerically("==", 2))
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSucceeded)).To(BeNumerically("==", 1))
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsFailed)).To(BeNumerically("==", 0))

/* NOTE: Subsequent AsAvailable succeeds. */
var cacheResponse2 solace.CacheResponse
cacheRequestConfig = helpers.GetValidAsAvailableCacheRequestConfig(cacheName, topic)
fifthCacheRequestID := message.CacheRequestID(5)
fifthChannel, err := receiver.RequestCachedAsync(cacheRequestConfig, fifthCacheRequestID)
Expect(err).To(BeNil())
Expect(fifthChannel).ToNot(BeNil())
Eventually(fifthChannel, "10s").Should(Receive(&cacheResponse2))
Expect(cacheResponse2).ToNot(BeNil())
/* EBP-25: Assert cache request ID matches cache response ID. */
/* EBP-26: Assert CacheRequestOutcome Ok. */
/* EBP-28: Assert err is nil. */
for i := 0; i < numExpectedCachedMessages; i++ {
var msg message.InboundMessage
Eventually(receivedMsgChan).Should(Receive(&msg))
Expect(msg).ToNot(BeNil())
Expect(msg.GetDestinationName()).To(Equal(topic))
id, ok := msg.GetCacheRequestID()
Expect(ok).To(BeTrue())
Expect(id).To(BeNumerically("==", fifthCacheRequestID))
/* EBP-21: Assert that this message is cached. */
}
Eventually(func() uint64 { return messagingService.Metrics().GetValue(metrics.CacheRequestsSent) }).Should(BeNumerically("==", 3))
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSucceeded)).To(BeNumerically("==", 2))
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsFailed)).To(BeNumerically("==", 0))

/* NOTE: First AsAvailable cache request should succeed. */
var cacheResponse3 solace.CacheResponse
Eventually(firstChannel, "10s").Should(Receive(&cacheResponse3))
Expect(cacheResponse3).ToNot(BeNil())
/* EBP-25: Assert cache request ID matches cache response ID. */
/* EBP-26: Assert CacheRequestOutcome Ok. */
/* EBP-28: Assert err is nil. */

for i := 0; i < numExpectedCachedMessages; i++ {
var msg message.InboundMessage
Eventually(receivedMsgChan).Should(Receive(&msg))
Expect(msg).ToNot(BeNil())
Expect(msg.GetDestinationName()).To(Equal(topic))
id, ok := msg.GetCacheRequestID()
Expect(ok).To(BeTrue())
Expect(id).To(BeNumerically("==", firstCacheRequestID))
/* EBP-21: Assert that this message is cached. */
}
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSent)).To(BeNumerically("==", 3))
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSucceeded)).To(BeNumerically("==", 3))
Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsFailed)).To(BeNumerically("==", 0))
})
It("a direct receiver that tries to submit more than the maximum number of cache requests should get an IllegalStateError", func() {
err := receiver.ReceiveAsync(func(message.InboundMessage) {})
Expect(err).To(BeNil())
Expand Down

0 comments on commit e9a3cab

Please sign in to comment.