From 9995c231a5ce7509517341cae8f0ad8d2804f5ab Mon Sep 17 00:00:00 2001 From: Joseph Brinkman Date: Thu, 16 Jan 2025 18:08:34 -0500 Subject: [PATCH] GO: xpending command (#2957) * GO: add xpending command Signed-off-by: jbrinkman --- go/Makefile | 6 +- go/api/base_client.go | 90 +++++ go/api/options/stream_options.go | 54 +++ go/api/response_handlers.go | 92 +++++ go/api/response_types.go | 47 +++ go/api/stream_commands.go | 4 + go/integTest/shared_commands_test.go | 508 +++++++++++++++++++++++++++ 7 files changed, 798 insertions(+), 3 deletions(-) diff --git a/go/Makefile b/go/Makefile index 62eabbaa8b..014bf962a4 100644 --- a/go/Makefile +++ b/go/Makefile @@ -82,15 +82,15 @@ unit-test: mkdir -p reports set -o pipefail; \ LD_LIBRARY_PATH=$(shell find . -name libglide_rs.so|grep -w release|tail -1|xargs dirname|xargs readlink -f):${LD_LIBRARY_PATH} \ - go test -v -race ./... -skip TestGlideTestSuite $(if $(test-filter), -run $(test-filter)) \ + go test -v -race ./... -skip TestGlideTestSuite $(if $(test-filter), -testify.m $(test-filter)) \ | tee >(go tool test2json -t -p github.com/valkey-io/valkey-glide/go/glide/utils | go-test-report -o reports/unit-tests.html -t unit-test > /dev/null) # integration tests - run subtask with skipping modules tests -integ-test: export TEST_FILTER = -skip TestGlideTestSuite/TestModule $(if $(test-filter), -run $(test-filter)) +integ-test: export TEST_FILTER = -skip TestGlideTestSuite/TestModule $(if $(test-filter), -testify.m $(test-filter)) integ-test: __it # modules tests - run substask with default filter -modules-test: export TEST_FILTER = $(if $(test-filter), -run $(test-filter), -run TestGlideTestSuite/TestModule) +modules-test: export TEST_FILTER = $(if $(test-filter), -run $(test-filter), -testify.m TestGlideTestSuite/TestModule) modules-test: __it __it: diff --git a/go/api/base_client.go b/go/api/base_client.go index 1a67892934..79f8518623 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -1905,3 +1905,93 @@ func (client *baseClient) ZScanWithOptions( } return handleScanResponse(result) } + +// Returns stream message summary information for pending messages matching a stream and group. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name. +// +// Return value: +// An XPendingSummary struct that includes a summary with the following fields: +// +// NumOfMessages: The total number of pending messages for this consumer group. +// StartId: The smallest ID among the pending messages or nil if no pending messages exist. +// EndId: The greatest ID among the pending messages or nil if no pending messages exists. +// GroupConsumers: An array of ConsumerPendingMessages with the following fields: +// ConsumerName: The name of the consumer. +// MessageCount: The number of pending messages for this consumer. +// +// Example +// +// result, err := client.XPending("myStream", "myGroup") +// if err != nil { +// return err +// } +// fmt.Println("Number of pending messages: ", result.NumOfMessages) +// fmt.Println("Start and End ID of messages: ", result.StartId, result.EndId) +// for _, consumer := range result.ConsumerMessages { +// fmt.Printf("Consumer messages: %s: $v\n", consumer.ConsumerName, consumer.MessageCount) +// } +// +// [valkey.io]: https://valkey.io/commands/xpending/ +func (client *baseClient) XPending(key string, group string) (XPendingSummary, error) { + result, err := client.executeCommand(C.XPending, []string{key, group}) + if err != nil { + return XPendingSummary{}, err + } + + return handleXPendingSummaryResponse(result) +} + +// Returns stream message summary information for pending messages matching a given range of IDs. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name. +// opts - The options for the command. See [options.XPendingOptions] for details. +// +// Return value: +// A slice of XPendingDetail structs, where each detail struct includes the following fields: +// +// Id - The ID of the pending message. +// ConsumerName - The name of the consumer that fetched the message and has still to acknowledge it. +// IdleTime - The time in milliseconds since the last time the message was delivered to the consumer. +// DeliveryCount - The number of times this message was delivered. +// +// Example +// +// detailResult, err := client.XPendingWithOptions(key, groupName, options.NewXPendingOptions("-", "+", 10)) +// if err != nil { +// return err +// } +// fmt.Println("=========================") +// for _, detail := range detailResult { +// fmt.Println(detail.Id) +// fmt.Println(detail.ConsumerName) +// fmt.Println(detail.IdleTime) +// fmt.Println(detail.DeliveryCount) +// fmt.Println("=========================") +// } +// +// [valkey.io]: https://valkey.io/commands/xpending/ +func (client *baseClient) XPendingWithOptions( + key string, + group string, + opts *options.XPendingOptions, +) ([]XPendingDetail, error) { + optionArgs, _ := opts.ToArgs() + args := append([]string{key, group}, optionArgs...) + + result, err := client.executeCommand(C.XPending, args) + if err != nil { + return nil, err + } + return handleXPendingDetailResponse(result) +} diff --git a/go/api/options/stream_options.go b/go/api/options/stream_options.go index 2d2f2318a2..19f5e6d5c4 100644 --- a/go/api/options/stream_options.go +++ b/go/api/options/stream_options.go @@ -149,3 +149,57 @@ func (xro *XReadOptions) ToArgs() ([]string, error) { } return args, nil } + +// Optional arguments for `XPending` in [StreamCommands] +type XPendingOptions struct { + minIdleTime int64 + start string + end string + count int64 + consumer string +} + +// Create new empty `XPendingOptions`. The `start`, `end` and `count` arguments are required. +func NewXPendingOptions(start string, end string, count int64) *XPendingOptions { + options := &XPendingOptions{} + options.start = start + options.end = end + options.count = count + return options +} + +// SetMinIdleTime sets the minimum idle time for the XPendingOptions. +// minIdleTime is the amount of time (in milliseconds) that a message must be idle to be considered. +// It returns the updated XPendingOptions. +func (xpo *XPendingOptions) SetMinIdleTime(minIdleTime int64) *XPendingOptions { + xpo.minIdleTime = minIdleTime + return xpo +} + +// SetConsumer sets the consumer for the XPendingOptions. +// consumer is the name of the consumer to filter the pending messages. +// It returns the updated XPendingOptions. +func (xpo *XPendingOptions) SetConsumer(consumer string) *XPendingOptions { + xpo.consumer = consumer + return xpo +} + +func (xpo *XPendingOptions) ToArgs() ([]string, error) { + args := []string{} + + // if minIdleTime is set, we need to add an `IDLE` argument along with the minIdleTime + if xpo.minIdleTime > 0 { + args = append(args, "IDLE") + args = append(args, utils.IntToString(xpo.minIdleTime)) + } + + args = append(args, xpo.start) + args = append(args, xpo.end) + args = append(args, utils.IntToString(xpo.count)) + + if xpo.consumer != "" { + args = append(args, xpo.consumer) + } + + return args, nil +} diff --git a/go/api/response_handlers.go b/go/api/response_handlers.go index adfba889b1..07ab7e1a09 100644 --- a/go/api/response_handlers.go +++ b/go/api/response_handlers.go @@ -9,6 +9,7 @@ import "C" import ( "fmt" "reflect" + "strconv" "unsafe" ) @@ -601,3 +602,94 @@ func handleXReadResponse(response *C.struct_CommandResponse) (map[string]map[str } return nil, &RequestError{fmt.Sprintf("unexpected type received: %T", res)} } + +func handleXPendingSummaryResponse(response *C.struct_CommandResponse) (XPendingSummary, error) { + defer C.free_command_response(response) + + typeErr := checkResponseType(response, C.Array, true) + if typeErr != nil { + return CreateNilXPendingSummary(), typeErr + } + + slice, err := parseArray(response) + if err != nil { + return CreateNilXPendingSummary(), err + } + + arr := slice.([]interface{}) + NumOfMessages := arr[0].(int64) + var StartId, EndId Result[string] + if arr[1] == nil { + StartId = CreateNilStringResult() + } else { + StartId = CreateStringResult(arr[1].(string)) + } + if arr[2] == nil { + EndId = CreateNilStringResult() + } else { + EndId = CreateStringResult(arr[2].(string)) + } + + if pendingMessages, ok := arr[3].([]interface{}); ok { + var ConsumerPendingMessages []ConsumerPendingMessage + for _, msg := range pendingMessages { + consumerMessage := msg.([]interface{}) + count, err := strconv.ParseInt(consumerMessage[1].(string), 10, 64) + if err == nil { + ConsumerPendingMessages = append(ConsumerPendingMessages, ConsumerPendingMessage{ + ConsumerName: consumerMessage[0].(string), + MessageCount: count, + }) + } + } + return XPendingSummary{NumOfMessages, StartId, EndId, ConsumerPendingMessages}, nil + } else { + return XPendingSummary{NumOfMessages, StartId, EndId, make([]ConsumerPendingMessage, 0)}, nil + } +} + +func handleXPendingDetailResponse(response *C.struct_CommandResponse) ([]XPendingDetail, error) { + // response should be [][]interface{} + + defer C.free_command_response(response) + + // TODO: Not sure if this is correct for a nill response + if response == nil || response.response_type == uint32(C.Null) { + return make([]XPendingDetail, 0), nil + } + + typeErr := checkResponseType(response, C.Array, true) + if typeErr != nil { + return make([]XPendingDetail, 0), typeErr + } + + // parse first level of array + slice, err := parseArray(response) + arr := slice.([]interface{}) + + if err != nil { + return make([]XPendingDetail, 0), err + } + + pendingDetails := make([]XPendingDetail, 0, len(arr)) + + for _, message := range arr { + switch detail := message.(type) { + case []interface{}: + pDetail := XPendingDetail{ + Id: detail[0].(string), + ConsumerName: detail[1].(string), + IdleTime: detail[2].(int64), + DeliveryCount: detail[3].(int64), + } + pendingDetails = append(pendingDetails, pDetail) + + case XPendingDetail: + pendingDetails = append(pendingDetails, detail) + default: + fmt.Printf("handleXPendingDetailResponse - unhandled type: %s\n", reflect.TypeOf(detail)) + } + } + + return pendingDetails, nil +} diff --git a/go/api/response_types.go b/go/api/response_types.go index 2c7f3244b8..2e6e527c43 100644 --- a/go/api/response_types.go +++ b/go/api/response_types.go @@ -146,3 +146,50 @@ func CreateEmptyClusterValue() ClusterValue[interface{}] { value: Result[interface{}]{val: empty, isNil: true}, } } + +// XPendingSummary represents a summary of pending messages in a stream group. +// It includes the total number of pending messages, the ID of the first and last pending messages, +// and a list of consumer pending messages. +type XPendingSummary struct { + // NumOfMessages is the total number of pending messages in the stream group. + NumOfMessages int64 + + // StartId is the ID of the first pending message in the stream group. + StartId Result[string] + + // EndId is the ID of the last pending message in the stream group. + EndId Result[string] + + // ConsumerMessages is a list of pending messages for each consumer in the stream group. + ConsumerMessages []ConsumerPendingMessage +} + +// ConsumerPendingMessage represents a pending message for a consumer in a Redis stream group. +// It includes the consumer's name and the count of pending messages for that consumer. +type ConsumerPendingMessage struct { + // ConsumerName is the name of the consumer. + ConsumerName string + + // MessageCount is the number of pending messages for the consumer. + MessageCount int64 +} + +// XPendingDetail represents the details of a pending message in a stream group. +// It includes the message ID, the consumer's name, the idle time, and the delivery count. +type XPendingDetail struct { + // Id is the ID of the pending message. + Id string + + // ConsumerName is the name of the consumer who has the pending message. + ConsumerName string + + // IdleTime is the amount of time (in milliseconds) that the message has been idle. + IdleTime int64 + + // DeliveryCount is the number of times the message has been delivered. + DeliveryCount int64 +} + +func CreateNilXPendingSummary() XPendingSummary { + return XPendingSummary{0, CreateNilStringResult(), CreateNilStringResult(), make([]ConsumerPendingMessage, 0)} +} diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index e5fd216848..0cbb994676 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -107,4 +107,8 @@ type StreamCommands interface { XReadWithOptions(keysAndIds map[string]string, options *options.XReadOptions) (map[string]map[string][][]string, error) XDel(key string, ids []string) (int64, error) + + XPending(key string, group string) (XPendingSummary, error) + + XPendingWithOptions(key string, group string, options *options.XPendingOptions) ([]XPendingDetail, error) } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index ce9f7d5309..be729bdad4 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -5208,3 +5208,511 @@ func (suite *GlideTestSuite) TestZScan() { assert.IsType(suite.T(), &api.RequestError{}, err) }) } + +func (suite *GlideTestSuite) TestXPending() { + suite.runWithDefaultClients(func(client api.BaseClient) { + // TODO: Update tests when XGroupCreate, XGroupCreateConsumer, XReadGroup, XClaim, XClaimJustId and XAck are added to + // the Go client. + // + // This test splits out the cluster and standalone tests into their own functions because we are forced to use + // CustomCommands for many stream commands which are not included in the preview Go client. Using a type switch for + // each use of CustomCommand would make the tests difficult to read and maintain. These tests can be + // collapsed once the native commands are added in a subsequent release. + + execStandalone := func(client api.GlideClient) { + // 1. Arrange the data + key := uuid.New().String() + groupName := "group" + uuid.New().String() + zeroStreamId := "0" + consumer1 := "consumer-1-" + uuid.New().String() + consumer2 := "consumer-2-" + uuid.New().String() + + command := []string{"XGroup", "Create", key, groupName, zeroStreamId, "MKSTREAM"} + + resp, err := client.CustomCommand(command) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "OK", resp.(string)) + + command = []string{"XGroup", "CreateConsumer", key, groupName, consumer1} + resp, err = client.CustomCommand(command) + assert.NoError(suite.T(), err) + assert.True(suite.T(), resp.(bool)) + + command = []string{"XGroup", "CreateConsumer", key, groupName, consumer2} + resp, err = client.CustomCommand(command) + assert.NoError(suite.T(), err) + assert.True(suite.T(), resp.(bool)) + + streamid_1, err := client.XAdd(key, [][]string{{"field1", "value1"}}) + assert.NoError(suite.T(), err) + streamid_2, err := client.XAdd(key, [][]string{{"field2", "value2"}}) + assert.NoError(suite.T(), err) + + command = []string{"XReadGroup", "GROUP", groupName, consumer1, "STREAMS", key, ">"} + _, err = client.CustomCommand(command) + assert.NoError(suite.T(), err) + + _, err = client.XAdd(key, [][]string{{"field3", "value3"}}) + assert.NoError(suite.T(), err) + _, err = client.XAdd(key, [][]string{{"field4", "value4"}}) + assert.NoError(suite.T(), err) + streamid_5, err := client.XAdd(key, [][]string{{"field5", "value5"}}) + assert.NoError(suite.T(), err) + + command = []string{"XReadGroup", "GROUP", groupName, consumer2, "STREAMS", key, ">"} + _, err = client.CustomCommand(command) + assert.NoError(suite.T(), err) + + expectedSummary := api.XPendingSummary{ + NumOfMessages: 5, + StartId: streamid_1, + EndId: streamid_5, + ConsumerMessages: []api.ConsumerPendingMessage{ + {ConsumerName: consumer1, MessageCount: 2}, + {ConsumerName: consumer2, MessageCount: 3}, + }, + } + + // 2. Act + summaryResult, err := client.XPending(key, groupName) + + // 3a. Assert that we get 5 messages in total, 2 for consumer1 and 3 for consumer2 + assert.NoError(suite.T(), err) + assert.True( + suite.T(), + reflect.DeepEqual(expectedSummary, summaryResult), + "Expected and actual results do not match", + ) + + // 3b. Assert that we get 2 details for consumer1 that includes + detailResult, _ := client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("-", "+", 10).SetConsumer(consumer1), + ) + assert.Equal(suite.T(), len(detailResult), 2) + assert.Equal(suite.T(), streamid_1.Value(), detailResult[0].Id) + assert.Equal(suite.T(), streamid_2.Value(), detailResult[1].Id) + } + + execCluster := func(client api.GlideClusterClient) { + // 1. Arrange the data + key := uuid.New().String() + groupName := "group" + uuid.New().String() + zeroStreamId := "0" + consumer1 := "consumer-1-" + uuid.New().String() + consumer2 := "consumer-2-" + uuid.New().String() + + command := []string{"XGroup", "Create", key, groupName, zeroStreamId, "MKSTREAM"} + + resp, err := client.CustomCommand(command) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "OK", resp.Value().(string)) + + command = []string{"XGroup", "CreateConsumer", key, groupName, consumer1} + resp, err = client.CustomCommand(command) + assert.NoError(suite.T(), err) + assert.True(suite.T(), resp.Value().(bool)) + + command = []string{"XGroup", "CreateConsumer", key, groupName, consumer2} + resp, err = client.CustomCommand(command) + assert.NoError(suite.T(), err) + assert.True(suite.T(), resp.Value().(bool)) + + streamid_1, err := client.XAdd(key, [][]string{{"field1", "value1"}}) + assert.NoError(suite.T(), err) + streamid_2, err := client.XAdd(key, [][]string{{"field2", "value2"}}) + assert.NoError(suite.T(), err) + + command = []string{"XReadGroup", "GROUP", groupName, consumer1, "STREAMS", key, ">"} + _, err = client.CustomCommand(command) + assert.NoError(suite.T(), err) + + _, err = client.XAdd(key, [][]string{{"field3", "value3"}}) + assert.NoError(suite.T(), err) + _, err = client.XAdd(key, [][]string{{"field4", "value4"}}) + assert.NoError(suite.T(), err) + streamid_5, err := client.XAdd(key, [][]string{{"field5", "value5"}}) + assert.NoError(suite.T(), err) + + command = []string{"XReadGroup", "GROUP", groupName, consumer2, "STREAMS", key, ">"} + _, err = client.CustomCommand(command) + assert.NoError(suite.T(), err) + + expectedSummary := api.XPendingSummary{ + NumOfMessages: 5, + StartId: streamid_1, + EndId: streamid_5, + ConsumerMessages: []api.ConsumerPendingMessage{ + {ConsumerName: consumer1, MessageCount: 2}, + {ConsumerName: consumer2, MessageCount: 3}, + }, + } + + // 2. Act + summaryResult, err := client.XPending(key, groupName) + + // 3a. Assert that we get 5 messages in total, 2 for consumer1 and 3 for consumer2 + assert.NoError(suite.T(), err) + assert.True( + suite.T(), + reflect.DeepEqual(expectedSummary, summaryResult), + "Expected and actual results do not match", + ) + + // 3b. Assert that we get 2 details for consumer1 that includes + detailResult, _ := client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("-", "+", 10).SetConsumer(consumer1), + ) + assert.Equal(suite.T(), len(detailResult), 2) + assert.Equal(suite.T(), streamid_1.Value(), detailResult[0].Id) + assert.Equal(suite.T(), streamid_2.Value(), detailResult[1].Id) + + // + } + + assert.Equal(suite.T(), "OK", "OK") + + // create group and consumer for the group + // this is only needed in order to be able to use custom commands. + // Once the native commands are added, this logic will be refactored. + switch c := client.(type) { + case api.GlideClient: + execStandalone(c) + case api.GlideClusterClient: + execCluster(c) + } + }) +} + +func (suite *GlideTestSuite) TestXPendingFailures() { + suite.runWithDefaultClients(func(client api.BaseClient) { + // TODO: Update tests when XGroupCreate, XGroupCreateConsumer, XReadGroup, XClaim, XClaimJustId and XAck are added to + // the Go client. + // + // This test splits out the cluster and standalone tests into their own functions because we are forced to use + // CustomCommands for many stream commands which are not included in the preview Go client. Using a type switch for + // each use of CustomCommand would make the tests difficult to read and maintain. These tests can be + // collapsed once the native commands are added in a subsequent release. + + execStandalone := func(client api.GlideClient) { + // 1. Arrange the data + key := uuid.New().String() + missingKey := uuid.New().String() + nonStreamKey := uuid.New().String() + groupName := "group" + uuid.New().String() + zeroStreamId := "0" + consumer1 := "consumer-1-" + uuid.New().String() + invalidConsumer := "invalid-consumer-" + uuid.New().String() + + command := []string{"XGroup", "Create", key, groupName, zeroStreamId, "MKSTREAM"} + + resp, err := client.CustomCommand(command) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "OK", resp.(string)) + + command = []string{"XGroup", "CreateConsumer", key, groupName, consumer1} + resp, err = client.CustomCommand(command) + assert.NoError(suite.T(), err) + assert.True(suite.T(), resp.(bool)) + + _, err = client.XAdd(key, [][]string{{"field1", "value1"}}) + assert.NoError(suite.T(), err) + _, err = client.XAdd(key, [][]string{{"field2", "value2"}}) + assert.NoError(suite.T(), err) + + // no pending messages yet... + summaryResult, err := client.XPending(key, groupName) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(0), summaryResult.NumOfMessages) + + detailResult, err := client.XPendingWithOptions(key, groupName, options.NewXPendingOptions("-", "+", 10)) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 0, len(detailResult)) + + // read the entire stream for the consumer and mark messages as pending + command = []string{"XReadGroup", "GROUP", groupName, consumer1, "STREAMS", key, ">"} + _, err = client.CustomCommand(command) + assert.NoError(suite.T(), err) + + // sanity check - expect some results: + summaryResult, err = client.XPending(key, groupName) + assert.NoError(suite.T(), err) + assert.True(suite.T(), summaryResult.NumOfMessages > 0) + + detailResult, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("-", "+", 1).SetConsumer(consumer1), + ) + assert.NoError(suite.T(), err) + assert.True(suite.T(), len(detailResult) > 0) + + // returns empty if + before - + detailResult, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("+", "-", 10).SetConsumer(consumer1), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 0, len(detailResult)) + + // min idletime of 100 seconds shouldn't produce any results + detailResult, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("-", "+", 10).SetMinIdleTime(100000), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 0, len(detailResult)) + + // invalid consumer - no results + detailResult, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("-", "+", 10).SetConsumer(invalidConsumer), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 0, len(detailResult)) + + // Return an error when range bound is not a valid ID + _, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("invalid-id", "+", 10), + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + + _, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("-", "invalid-id", 10), + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + + // invalid count should return no results + detailResult, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("-", "+", -1), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 0, len(detailResult)) + + // Return an error when an invalid group is provided + _, err = client.XPending( + key, + "invalid-group", + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.True(suite.T(), strings.Contains(err.Error(), "NOGROUP")) + + // non-existent key throws a RequestError (NOGROUP) + _, err = client.XPending( + missingKey, + groupName, + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.True(suite.T(), strings.Contains(err.Error(), "NOGROUP")) + + _, err = client.XPendingWithOptions( + missingKey, + groupName, + options.NewXPendingOptions("-", "+", 10), + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.True(suite.T(), strings.Contains(err.Error(), "NOGROUP")) + + // Key exists, but it is not a stream + _, _ = client.Set(nonStreamKey, "bar") + _, err = client.XPending( + nonStreamKey, + groupName, + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.True(suite.T(), strings.Contains(err.Error(), "WRONGTYPE")) + + _, err = client.XPendingWithOptions( + nonStreamKey, + groupName, + options.NewXPendingOptions("-", "+", 10), + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.True(suite.T(), strings.Contains(err.Error(), "WRONGTYPE")) + } + + execCluster := func(client api.GlideClusterClient) { + // 1. Arrange the data + key := uuid.New().String() + missingKey := uuid.New().String() + nonStreamKey := uuid.New().String() + groupName := "group" + uuid.New().String() + zeroStreamId := "0" + consumer1 := "consumer-1-" + uuid.New().String() + invalidConsumer := "invalid-consumer-" + uuid.New().String() + + command := []string{"XGroup", "Create", key, groupName, zeroStreamId, "MKSTREAM"} + + resp, err := client.CustomCommand(command) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "OK", resp.Value().(string)) + + command = []string{"XGroup", "CreateConsumer", key, groupName, consumer1} + resp, err = client.CustomCommand(command) + assert.NoError(suite.T(), err) + assert.True(suite.T(), resp.Value().(bool)) + + _, err = client.XAdd(key, [][]string{{"field1", "value1"}}) + assert.NoError(suite.T(), err) + _, err = client.XAdd(key, [][]string{{"field2", "value2"}}) + assert.NoError(suite.T(), err) + + // no pending messages yet... + summaryResult, err := client.XPending(key, groupName) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(0), summaryResult.NumOfMessages) + + detailResult, err := client.XPendingWithOptions(key, groupName, options.NewXPendingOptions("-", "+", 10)) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 0, len(detailResult)) + + // read the entire stream for the consumer and mark messages as pending + command = []string{"XReadGroup", "GROUP", groupName, consumer1, "STREAMS", key, ">"} + _, err = client.CustomCommand(command) + assert.NoError(suite.T(), err) + + // sanity check - expect some results: + summaryResult, err = client.XPending(key, groupName) + assert.NoError(suite.T(), err) + assert.True(suite.T(), summaryResult.NumOfMessages > 0) + + detailResult, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("-", "+", 1).SetConsumer(consumer1), + ) + assert.NoError(suite.T(), err) + assert.True(suite.T(), len(detailResult) > 0) + + // returns empty if + before - + detailResult, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("+", "-", 10).SetConsumer(consumer1), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 0, len(detailResult)) + + // min idletime of 100 seconds shouldn't produce any results + detailResult, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("-", "+", 10).SetMinIdleTime(100000), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 0, len(detailResult)) + + // invalid consumer - no results + detailResult, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("-", "+", 10).SetConsumer(invalidConsumer), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 0, len(detailResult)) + + // Return an error when range bound is not a valid ID + _, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("invalid-id", "+", 10), + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + + _, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("-", "invalid-id", 10), + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + + // invalid count should return no results + detailResult, err = client.XPendingWithOptions( + key, + groupName, + options.NewXPendingOptions("-", "+", -1), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 0, len(detailResult)) + + // Return an error when an invalid group is provided + _, err = client.XPending( + key, + "invalid-group", + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.True(suite.T(), strings.Contains(err.Error(), "NOGROUP")) + + // non-existent key throws a RequestError (NOGROUP) + _, err = client.XPending( + missingKey, + groupName, + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.True(suite.T(), strings.Contains(err.Error(), "NOGROUP")) + + _, err = client.XPendingWithOptions( + missingKey, + groupName, + options.NewXPendingOptions("-", "+", 10), + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.True(suite.T(), strings.Contains(err.Error(), "NOGROUP")) + + // Key exists, but it is not a stream + _, _ = client.Set(nonStreamKey, "bar") + _, err = client.XPending( + nonStreamKey, + groupName, + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.True(suite.T(), strings.Contains(err.Error(), "WRONGTYPE")) + + _, err = client.XPendingWithOptions( + nonStreamKey, + groupName, + options.NewXPendingOptions("-", "+", 10), + ) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.True(suite.T(), strings.Contains(err.Error(), "WRONGTYPE")) + } + + assert.Equal(suite.T(), "OK", "OK") + + // create group and consumer for the group + // this is only needed in order to be able to use custom commands. + // Once the native commands are added, this logic will be refactored. + switch c := client.(type) { + case api.GlideClient: + execStandalone(c) + case api.GlideClusterClient: + execCluster(c) + } + }) +}