diff --git a/go/api/base_client.go b/go/api/base_client.go index f7313e05d6..337e88a96f 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -1561,7 +1561,7 @@ func (client *baseClient) XReadWithOptions( // // Return value: // A `map[string]map[string][][]string` of stream keys to a map of stream entry IDs mapped to an array entries or `nil` if -// a key does not exist or does not contain requiested entries. +// a key does not exist or does not contain requested entries. // // For example: // @@ -2638,6 +2638,71 @@ func (client *baseClient) Echo(message string) (Result[string], error) { return handleStringOrNilResponse(result) } +// Sets the last delivered ID for a consumer group. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The newly created consumer group name. +// id - The stream entry ID that should be set as the last delivered ID for the consumer group. +// +// Return value: +// +// `"OK"`. +// +// Example: +// +// ok, err := client.XGroupSetId("mystream", "mygroup", "0-0") +// if ok != "OK" || err != nil { +// // handle error +// } +// +// [valkey.io]: https://valkey.io/commands/xgroup-create/ +func (client *baseClient) XGroupSetId(key string, group string, id string) (string, error) { + return client.XGroupSetIdWithOptions(key, group, id, options.NewXGroupSetIdOptionsOptions()) +} + +// Sets the last delivered ID for a consumer group. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The newly created consumer group name. +// id - The stream entry ID that should be set as the last delivered ID for the consumer group. +// opts - The options for the command. See [options.XGroupSetIdOptions] for details. +// +// Return value: +// +// `"OK"`. +// +// Example: +// +// opts := options.NewXGroupSetIdOptionsOptions().SetEntriesRead(42) +// ok, err := client.XGroupSetIdWithOptions("mystream", "mygroup", "0-0", opts) +// if ok != "OK" || err != nil { +// // handle error +// } +// +// [valkey.io]: https://valkey.io/commands/xgroup-create/ +func (client *baseClient) XGroupSetIdWithOptions( + key string, + group string, + id string, + opts *options.XGroupSetIdOptions, +) (string, error) { + optionArgs, _ := opts.ToArgs() + args := append([]string{key, group, id}, optionArgs...) + result, err := client.executeCommand(C.XGroupSetId, args) + if err != nil { + return defaultStringResponse, err + } + return handleStringResponse(result) +} + // Removes all elements in the sorted set stored at `key` with a lexicographical order // between `rangeQuery.Start` and `rangeQuery.End`. // diff --git a/go/api/options/stream_options.go b/go/api/options/stream_options.go index cb27269f3b..71a76dc284 100644 --- a/go/api/options/stream_options.go +++ b/go/api/options/stream_options.go @@ -246,7 +246,6 @@ func (xpo *XPendingOptions) SetConsumer(consumer string) *XPendingOptions { func (xpo *XPendingOptions) ToArgs() ([]string, error) { args := []string{} - // if minIdleTime is set, we need to add an `IDLE` argument along with the minIdleTime if xpo.minIdleTime > 0 { args = append(args, "IDLE") args = append(args, utils.IntToString(xpo.minIdleTime)) @@ -280,9 +279,6 @@ func (xgco *XGroupCreateOptions) SetMakeStream() *XGroupCreateOptions { return xgco } -// A value representing the number of stream entries already read by the group. -// -// Since Valkey version 7.0.0. func (xgco *XGroupCreateOptions) SetEntriesRead(entriesRead int64) *XGroupCreateOptions { xgco.entriesRead = entriesRead return xgco @@ -302,3 +298,31 @@ func (xgco *XGroupCreateOptions) ToArgs() ([]string, error) { return args, nil } + +// Optional arguments for `XGroupSetId` in [StreamCommands] +type XGroupSetIdOptions struct { + entriesRead int64 +} + +// Create new empty `XGroupSetIdOptions` +func NewXGroupSetIdOptionsOptions() *XGroupSetIdOptions { + return &XGroupSetIdOptions{-1} +} + +// A value representing the number of stream entries already read by the group. +// +// Since Valkey version 7.0.0. +func (xgsio *XGroupSetIdOptions) SetEntriesRead(entriesRead int64) *XGroupSetIdOptions { + xgsio.entriesRead = entriesRead + return xgsio +} + +func (xgsio *XGroupSetIdOptions) ToArgs() ([]string, error) { + var args []string + + if xgsio.entriesRead > -1 { + args = append(args, "ENTRIESREAD", utils.IntToString(xgsio.entriesRead)) + } + + return args, nil +} diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index c212879d54..d504176d15 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -149,6 +149,10 @@ type StreamCommands interface { XPendingWithOptions(key string, group string, options *options.XPendingOptions) ([]XPendingDetail, error) + XGroupSetId(key string, group string, id string) (string, error) + + XGroupSetIdWithOptions(key string, group string, id string, opts *options.XGroupSetIdOptions) (string, error) + XGroupCreate(key string, group string, id string) (string, error) XGroupCreateWithOptions(key string, group string, id string, opts *options.XGroupCreateOptions) (string, error) diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 2e941aafb4..9ee878b0ae 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -4650,6 +4650,94 @@ func (suite *GlideTestSuite) TestXRead() { }) } +func (suite *GlideTestSuite) TestXGroupSetId() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key := uuid.NewString() + group := uuid.NewString() + consumer := uuid.NewString() + + // Setup: Create stream with 3 entries, create consumer group, read entries to add them to the Pending Entries List + xadd, err := client.XAddWithOptions( + key, + [][]string{{"f0", "v0"}}, + options.NewXAddOptions().SetId("1-0"), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "1-0", xadd.Value()) + xadd, err = client.XAddWithOptions( + key, + [][]string{{"f1", "v1"}}, + options.NewXAddOptions().SetId("1-1"), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "1-1", xadd.Value()) + xadd, err = client.XAddWithOptions( + key, + [][]string{{"f2", "v2"}}, + options.NewXAddOptions().SetId("1-2"), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "1-2", xadd.Value()) + + sendWithCustomCommand( + suite, + client, + []string{"xgroup", "create", key, group, "0"}, + "Can't send XGROUP CREATE as a custom command", + ) + + xreadgroup, err := client.XReadGroup(group, consumer, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), map[string]map[string][][]string{ + key: { + "1-0": {{"f0", "v0"}}, + "1-1": {{"f1", "v1"}}, + "1-2": {{"f2", "v2"}}, + }, + }, xreadgroup) + + // Sanity check: xreadgroup should not return more entries since they're all already in the + // Pending Entries List. + xreadgroup, err = client.XReadGroup(group, consumer, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + assert.Nil(suite.T(), xreadgroup) + + // Reset the last delivered ID for the consumer group to "1-1" + if suite.serverVersion < "7.0.0" { + suite.verifyOK(client.XGroupSetId(key, group, "1-1")) + } else { + opts := options.NewXGroupSetIdOptionsOptions().SetEntriesRead(42) + suite.verifyOK(client.XGroupSetIdWithOptions(key, group, "1-1", opts)) + } + + // xreadgroup should only return entry 1-2 since we reset the last delivered ID to 1-1 + xreadgroup, err = client.XReadGroup(group, consumer, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), map[string]map[string][][]string{ + key: { + "1-2": {{"f2", "v2"}}, + }, + }, xreadgroup) + + // An error is raised if XGROUP SETID is called with a non-existing key + _, err = client.XGroupSetId(uuid.NewString(), group, "1-1") + assert.IsType(suite.T(), &api.RequestError{}, err) + + // An error is raised if XGROUP SETID is called with a non-existing group + _, err = client.XGroupSetId(key, uuid.NewString(), "1-1") + assert.IsType(suite.T(), &api.RequestError{}, err) + + // Setting the ID to a non-existing ID is allowed + suite.verifyOK(client.XGroupSetId(key, group, "99-99")) + + // key exists, but is not a stream + key = uuid.NewString() + suite.verifyOK(client.Set(key, "xgroup setid")) + _, err = client.XGroupSetId(key, group, "1-1") + assert.IsType(suite.T(), &api.RequestError{}, err) + }) +} + func (suite *GlideTestSuite) TestZAddAndZAddIncr() { suite.runWithDefaultClients(func(client api.BaseClient) { key := uuid.New().String()