From 95beeb0c5d71932b98072b5d9b26b09a204ba1fa Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Fri, 17 Jan 2025 15:30:38 -0800 Subject: [PATCH 1/2] Go: `XGROUP SETID`. Signed-off-by: Yury-Fridlyand --- go/api/base_client.go | 61 ++++++++++++++++++- go/api/options/stream_options.go | 29 ++++++++- go/api/stream_commands.go | 4 ++ go/integTest/shared_commands_test.go | 88 ++++++++++++++++++++++++++++ 4 files changed, 180 insertions(+), 2 deletions(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index 5b27cf2d11..3712d923af 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -1425,7 +1425,7 @@ func (client *baseClient) XReadWithOptions( // // Return value: // A `map[string]map[string][][]string` of stream keys to a map of stream entry IDs mapped to an array entries or `nil` if -// a key does not exist or does not contain requiested entries. +// a key does not exist or does not contain requested entries. // // For example: // @@ -2183,3 +2183,62 @@ func (client *baseClient) Echo(message string) (Result[string], error) { } return handleStringOrNilResponse(result) } + +// Sets the last delivered ID for a consumer group. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The newly created consumer group name. +// id - The stream entry ID that should be set as the last delivered ID for the consumer group. +// +// Return value: +// +// `"OK"`. +// +// Example: +// +// client.XGroupSetId("mystream", "mygroup", "0-0") +// +// [valkey.io]: https://valkey.io/commands/xgroup-create/ +func (client *baseClient) XGroupSetId(key string, group string, id string) (string, error) { + return client.XGroupSetIdWithOptions(key, group, id, options.NewXGroupSetIdOptionsOptions()) +} + +// Sets the last delivered ID for a consumer group. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The newly created consumer group name. +// id - The stream entry ID that should be set as the last delivered ID for the consumer group. +// opts - The options for the command. See [options.XGroupSetIdOptions] for details. +// +// Return value: +// +// `"OK"`. +// +// Example: +// +// opts := options.NewXGroupSetIdOptionsOptions().SetEntriesRead(42) +// client.XGroupSetIdWithOptions("mystream", "mygroup", "0-0", opts) +// +// [valkey.io]: https://valkey.io/commands/xgroup-create/ +func (client *baseClient) XGroupSetIdWithOptions( + key string, + group string, + id string, + opts *options.XGroupSetIdOptions, +) (string, error) { + optionArgs, _ := opts.ToArgs() + args := append([]string{key, group, id}, optionArgs...) + result, err := client.executeCommand(C.XGroupSetId, args) + if err != nil { + return defaultStringResponse, err + } + return handleStringResponse(result) +} diff --git a/go/api/options/stream_options.go b/go/api/options/stream_options.go index ff40c224ac..d4a9a18951 100644 --- a/go/api/options/stream_options.go +++ b/go/api/options/stream_options.go @@ -232,7 +232,6 @@ func (xpo *XPendingOptions) SetConsumer(consumer string) *XPendingOptions { func (xpo *XPendingOptions) ToArgs() ([]string, error) { args := []string{} - // if minIdleTime is set, we need to add an `IDLE` argument along with the minIdleTime if xpo.minIdleTime > 0 { args = append(args, "IDLE") args = append(args, utils.IntToString(xpo.minIdleTime)) @@ -248,3 +247,31 @@ func (xpo *XPendingOptions) ToArgs() ([]string, error) { return args, nil } + +// Optional arguments for `XGroupSetId` in [StreamCommands] +type XGroupSetIdOptions struct { + entriesRead int64 +} + +// Create new empty `XGroupSetIdOptions` +func NewXGroupSetIdOptionsOptions() *XGroupSetIdOptions { + return &XGroupSetIdOptions{-1} +} + +// A value representing the number of stream entries already read by the group. +// +// Since Valkey version 7.0.0. +func (xgsio *XGroupSetIdOptions) SetEntriesRead(entriesRead int64) *XGroupSetIdOptions { + xgsio.entriesRead = entriesRead + return xgsio +} + +func (xgsio *XGroupSetIdOptions) ToArgs() ([]string, error) { + var args []string + + if xgsio.entriesRead > -1 { + args = append(args, "ENTRIESREAD", utils.IntToString(xgsio.entriesRead)) + } + + return args, nil +} diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index 211d27cdaa..df5c878953 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -120,4 +120,8 @@ type StreamCommands interface { XPending(key string, group string) (XPendingSummary, error) XPendingWithOptions(key string, group string, options *options.XPendingOptions) ([]XPendingDetail, error) + + XGroupSetId(key string, group string, id string) (string, error) + + XGroupSetIdWithOptions(key string, group string, id string, opts *options.XGroupSetIdOptions) (string, error) } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 6f3d019872..7fac431d2e 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -4322,6 +4322,94 @@ func (suite *GlideTestSuite) TestXRead() { }) } +func (suite *GlideTestSuite) TestXGroupSetId() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key := uuid.NewString() + group := uuid.NewString() + consumer := uuid.NewString() + + // Setup: Create stream with 3 entries, create consumer group, read entries to add them to the Pending Entries List + xadd, err := client.XAddWithOptions( + key, + [][]string{{"f0", "v0"}}, + options.NewXAddOptions().SetId("1-0"), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "1-0", xadd.Value()) + xadd, err = client.XAddWithOptions( + key, + [][]string{{"f1", "v1"}}, + options.NewXAddOptions().SetId("1-1"), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "1-1", xadd.Value()) + xadd, err = client.XAddWithOptions( + key, + [][]string{{"f2", "v2"}}, + options.NewXAddOptions().SetId("1-2"), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "1-2", xadd.Value()) + + sendWithCustomCommand( + suite, + client, + []string{"xgroup", "create", key, group, "0"}, + "Can't send XGROUP CREATE as a custom command", + ) + + xreadgroup, err := client.XReadGroup(group, consumer, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), map[string]map[string][][]string{ + key: { + "1-0": {{"f0", "v0"}}, + "1-1": {{"f1", "v1"}}, + "1-2": {{"f2", "v2"}}, + }, + }, xreadgroup) + + // Sanity check: xreadgroup should not return more entries since they're all already in the + // Pending Entries List. + xreadgroup, err = client.XReadGroup(group, consumer, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + assert.Nil(suite.T(), xreadgroup) + + // Reset the last delivered ID for the consumer group to "1-1" + if suite.serverVersion < "7.0.0" { + suite.verifyOK(client.XGroupSetId(key, group, "1-1")) + } else { + opts := options.NewXGroupSetIdOptionsOptions().SetEntriesRead(42) + suite.verifyOK(client.XGroupSetIdWithOptions(key, group, "1-1", opts)) + } + + // xreadgroup should only return entry 1-2 since we reset the last delivered ID to 1-1 + xreadgroup, err = client.XReadGroup(group, consumer, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), map[string]map[string][][]string{ + key: { + "1-2": {{"f2", "v2"}}, + }, + }, xreadgroup) + + // An error is raised if XGROUP SETID is called with a non-existing key + _, err = client.XGroupSetId(uuid.NewString(), group, "1-1") + assert.IsType(suite.T(), &api.RequestError{}, err) + + // An error is raised if XGROUP SETID is called with a non-existing group + _, err = client.XGroupSetId(key, uuid.NewString(), "1-1") + assert.IsType(suite.T(), &api.RequestError{}, err) + + // Setting the ID to a non-existing ID is allowed + suite.verifyOK(client.XGroupSetId(key, group, "99-99")) + + // key exists, but is not a stream + key = uuid.NewString() + suite.verifyOK(client.Set(key, "xgroup setid")) + _, err = client.XGroupSetId(key, group, "1-1") + assert.IsType(suite.T(), &api.RequestError{}, err) + }) +} + func (suite *GlideTestSuite) TestZAddAndZAddIncr() { suite.runWithDefaultClients(func(client api.BaseClient) { key := uuid.New().String() From a660acb38fe96e785b9809adc690826cd0b1da00 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Mon, 20 Jan 2025 10:02:28 -0800 Subject: [PATCH 2/2] doc Signed-off-by: Yury-Fridlyand --- go/api/base_client.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index 839339543c..b16a2e159c 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -2451,7 +2451,10 @@ func (client *baseClient) Echo(message string) (Result[string], error) { // // Example: // -// client.XGroupSetId("mystream", "mygroup", "0-0") +// ok, err := client.XGroupSetId("mystream", "mygroup", "0-0") +// if ok != "OK" || err != nil { +// // handle error +// } // // [valkey.io]: https://valkey.io/commands/xgroup-create/ func (client *baseClient) XGroupSetId(key string, group string, id string) (string, error) { @@ -2476,7 +2479,10 @@ func (client *baseClient) XGroupSetId(key string, group string, id string) (stri // Example: // // opts := options.NewXGroupSetIdOptionsOptions().SetEntriesRead(42) -// client.XGroupSetIdWithOptions("mystream", "mygroup", "0-0", opts) +// ok, err := client.XGroupSetIdWithOptions("mystream", "mygroup", "0-0", opts) +// if ok != "OK" || err != nil { +// // handle error +// } // // [valkey.io]: https://valkey.io/commands/xgroup-create/ func (client *baseClient) XGroupSetIdWithOptions(