diff --git a/go/api/base_client.go b/go/api/base_client.go index f7313e05d6..3c52edb79a 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -2870,3 +2870,73 @@ func (client *baseClient) SortStoreWithOptions( } return handleIntOrNilResponse(result) } + +// XGroupCreateConsumer creates a consumer named `consumer` in the consumer group `group` for the +// stream stored at `key`. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name. +// consumer - The newly created consumer. +// +// Return value: +// +// Returns `true` if the consumer is created. Otherwise, returns `false`. +// +// Example: +// +// //Creates the consumer "myconsumer" in consumer group "mygroup" +// success, err := client.xgroupCreateConsumer("mystream", "mygroup", "myconsumer") +// if err == nil && success { +// fmt.Println("Consumer created") +// } +// +// [valkey.io]: https://valkey.io/commands/xgroup-createconsumer/ +func (client *baseClient) XGroupCreateConsumer( + key string, + group string, + consumer string, +) (bool, error) { + result, err := client.executeCommand(C.XGroupCreateConsumer, []string{key, group, consumer}) + if err != nil { + return false, err + } + return handleBoolResponse(result) +} + +// XGroupDelConsumer deletes a consumer named `consumer` in the consumer group `group`. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name. +// consumer - The consumer to delete. +// +// Returns the number of pending messages the `consumer` had before it was deleted. +// +// Example: +// +// // Deletes the consumer "myconsumer" in consumer group "mygroup" +// pendingMsgCount, err := client.XGroupDelConsumer("mystream", "mygroup", "myconsumer") +// if err != nil { +// // handle error +// } +// fmt.Printf("Consumer 'myconsumer' had %d pending messages unclaimed.\n", pendingMsgCount) +// +// [valkey.io]: https://valkey.io/commands/xgroup-delconsumer/ +func (client *baseClient) XGroupDelConsumer( + key string, + group string, + consumer string, +) (int64, error) { + result, err := client.executeCommand(C.XGroupDelConsumer, []string{key, group, consumer}) + if err != nil { + return defaultIntResponse, err + } + return handleIntResponse(result) +} diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index c212879d54..b5febb245d 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -152,4 +152,8 @@ type StreamCommands interface { XGroupCreate(key string, group string, id string) (string, error) XGroupCreateWithOptions(key string, group string, id string, opts *options.XGroupCreateOptions) (string, error) + + XGroupCreateConsumer(key string, group string, consumer string) (bool, error) + + XGroupDelConsumer(key string, group string, consumer string) (int64, error) } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 2e941aafb4..28421c428e 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -6708,3 +6708,115 @@ func (suite *GlideTestSuite) TestSortStoreWithOptions_ByPattern() { assert.Equal(suite.T(), resultList, sortedValues) }) } + +func (suite *GlideTestSuite) TestXGroupStreamCommands() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key := uuid.New().String() + stringKey := uuid.New().String() + groupName := "group" + uuid.New().String() + zeroStreamId := "0" + consumerName := "consumer-" + uuid.New().String() + + sendWithCustomCommand( + suite, + client, + []string{"xgroup", "create", key, groupName, zeroStreamId, "MKSTREAM"}, + "Can't send XGROUP CREATE as a custom command", + ) + respBool, err := client.XGroupCreateConsumer(key, groupName, consumerName) + assert.NoError(suite.T(), err) + assert.True(suite.T(), respBool) + + // create a consumer for a group that doesn't exist should result in a NOGROUP error + _, err = client.XGroupCreateConsumer(key, "non-existent-group", consumerName) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.True(suite.T(), strings.Contains(err.Error(), "NOGROUP")) + + // create consumer that already exists should return false + respBool, err = client.XGroupCreateConsumer(key, groupName, consumerName) + assert.NoError(suite.T(), err) + assert.False(suite.T(), respBool) + + // Delete a consumer that hasn't been created should return 0 + respInt64, err := client.XGroupDelConsumer(key, groupName, "non-existent-consumer") + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(0), respInt64) + + // Add two stream entries + streamId1, err := client.XAdd(key, [][]string{{"field1", "value1"}}) + assert.NoError(suite.T(), err) + streamId2, err := client.XAdd(key, [][]string{{"field2", "value2"}}) + assert.NoError(suite.T(), err) + + // read the stream for the consumer and mark messages as pending + expectedGroup := map[string]map[string][][]string{ + key: {streamId1.Value(): {{"field1", "value1"}}, streamId2.Value(): {{"field2", "value2"}}}, + } + actualGroup, err := client.XReadGroup(groupName, consumerName, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + assert.True(suite.T(), reflect.DeepEqual(expectedGroup, actualGroup), + "Expected and actual results do not match", + ) + + // delete one of the streams using XDel + respInt64, err = client.XDel(key, []string{streamId1.Value()}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(1), respInt64) + + // xreadgroup should return one empty stream and one non-empty stream + resp, err := client.XReadGroup(groupName, consumerName, map[string]string{key: zeroStreamId}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), map[string]map[string][][]string{ + key: { + streamId1.Value(): nil, + streamId2.Value(): {{"field2", "value2"}}, + }, + }, resp) + + // add a new stream entry + streamId3, err := client.XAdd(key, [][]string{{"field3", "value3"}}) + assert.NoError(suite.T(), err) + assert.NotNil(suite.T(), streamId3) + + // xack that streamid1 and streamid2 have been processed + command := []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()} + sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command") + + // Delete the consumer group and expect 0 pending messages + respInt64, err = client.XGroupDelConsumer(key, groupName, consumerName) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(0), respInt64) + + // TODO: Use XAck when it is added to the Go client + // xack streamid_1, and streamid_2 already received returns 0L + command = []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()} + sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command") + + // Consume the last message with the previously deleted consumer (creates the consumer anew) + resp, err = client.XReadGroup(groupName, consumerName, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 1, len(resp[key])) + + // TODO: Use XAck when it is added to the Go client + // Use non existent group, so xack streamid_3 returns 0 + command = []string{"XAck", key, "non-existent-group", streamId3.Value()} + sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command") + + // Delete the consumer group and expect 1 pending message + respInt64, err = client.XGroupDelConsumer(key, groupName, consumerName) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(1), respInt64) + + // Set a string key, and expect an error when you try to create or delete a consumer group + _, err = client.Set(stringKey, "test") + assert.NoError(suite.T(), err) + _, err = client.XGroupCreateConsumer(stringKey, groupName, consumerName) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + + _, err = client.XGroupDelConsumer(stringKey, groupName, consumerName) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + }) +}