diff --git a/go/api/base_client.go b/go/api/base_client.go index 0d7a0b1ed2..fc4b1abc35 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -5449,6 +5449,73 @@ func (client *baseClient) XPendingWithOptions( return handleXPendingDetailResponse(result) } +// Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The newly created consumer group name. +// id - Stream entry ID that specifies the last delivered entry in the stream from the new +// group’s perspective. The special ID `"$"` can be used to specify the last entry in the stream. +// +// Return value: +// +// `"OK"`. +// +// Example: +// +// ok, err := client.XGroupCreate("mystream", "mygroup", "0-0") +// if ok != "OK" || err != nil { +// // handle error +// } +// +// [valkey.io]: https://valkey.io/commands/xgroup-create/ +func (client *baseClient) XGroupCreate(key string, group string, id string) (string, error) { + return client.XGroupCreateWithOptions(key, group, id, options.NewXGroupCreateOptions()) +} + +// Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The newly created consumer group name. +// id - Stream entry ID that specifies the last delivered entry in the stream from the new +// group's perspective. The special ID `"$"` can be used to specify the last entry in the stream. +// opts - The options for the command. See [options.XGroupCreateOptions] for details. +// +// Return value: +// +// `"OK"`. +// +// Example: +// +// opts := options.NewXGroupCreateOptions().SetMakeStream() +// ok, err := client.XGroupCreateWithOptions("mystream", "mygroup", "0-0", opts) +// if ok != "OK" || err != nil { +// // handle error +// } +// +// [valkey.io]: https://valkey.io/commands/xgroup-create/ +func (client *baseClient) XGroupCreateWithOptions( + key string, + group string, + id string, + opts *options.XGroupCreateOptions, +) (string, error) { + optionArgs, _ := opts.ToArgs() + args := append([]string{key, group, id}, optionArgs...) + result, err := client.executeCommand(C.XGroupCreate, args) + if err != nil { + return defaultStringResponse, err + } + return handleStringResponse(result) +} + func (client *baseClient) Restore(key string, ttl int64, value string) (Result[string], error) { return client.RestoreWithOptions(key, ttl, value, NewRestoreOptionsBuilder()) } diff --git a/go/api/options/stream_options.go b/go/api/options/stream_options.go index 4507b0478c..cb27269f3b 100644 --- a/go/api/options/stream_options.go +++ b/go/api/options/stream_options.go @@ -262,3 +262,43 @@ func (xpo *XPendingOptions) ToArgs() ([]string, error) { return args, nil } + +// Optional arguments for `XGroupCreate` in [StreamCommands] +type XGroupCreateOptions struct { + mkStream bool + entriesRead int64 +} + +// Create new empty `XGroupCreateOptions` +func NewXGroupCreateOptions() *XGroupCreateOptions { + return &XGroupCreateOptions{false, -1} +} + +// Once set and if the stream doesn't exist, creates a new stream with a length of `0`. +func (xgco *XGroupCreateOptions) SetMakeStream() *XGroupCreateOptions { + xgco.mkStream = true + return xgco +} + +// A value representing the number of stream entries already read by the group. +// +// Since Valkey version 7.0.0. +func (xgco *XGroupCreateOptions) SetEntriesRead(entriesRead int64) *XGroupCreateOptions { + xgco.entriesRead = entriesRead + return xgco +} + +func (xgco *XGroupCreateOptions) ToArgs() ([]string, error) { + var args []string + + // if minIdleTime is set, we need to add an `IDLE` argument along with the minIdleTime + if xgco.mkStream { + args = append(args, "MKSTREAM") + } + + if xgco.entriesRead > -1 { + args = append(args, "ENTRIESREAD", utils.IntToString(xgco.entriesRead)) + } + + return args, nil +} diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index 564b8b8109..c5e32bf331 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -65,4 +65,8 @@ type StreamCommands interface { XPending(key string, group string) (XPendingSummary, error) XPendingWithOptions(key string, group string, options *options.XPendingOptions) ([]XPendingDetail, error) + + XGroupCreate(key string, group string, id string) (string, error) + + XGroupCreateWithOptions(key string, group string, id string, opts *options.XGroupCreateOptions) (string, error) } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 5a2f34050c..2e941aafb4 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -5865,15 +5865,13 @@ func (suite *GlideTestSuite) TestXPendingFailures() { consumer1 := "consumer-1-" + uuid.New().String() invalidConsumer := "invalid-consumer-" + uuid.New().String() - command := []string{"XGroup", "Create", key, groupName, zeroStreamId, "MKSTREAM"} + suite.verifyOK( + client.XGroupCreateWithOptions(key, groupName, zeroStreamId, options.NewXGroupCreateOptions().SetMakeStream()), + ) + command := []string{"XGroup", "CreateConsumer", key, groupName, consumer1} resp, err := client.CustomCommand(command) assert.NoError(suite.T(), err) - assert.Equal(suite.T(), "OK", resp.(string)) - - command = []string{"XGroup", "CreateConsumer", key, groupName, consumer1} - resp, err = client.CustomCommand(command) - assert.NoError(suite.T(), err) assert.True(suite.T(), resp.(bool)) _, err = client.XAdd(key, [][]string{{"field1", "value1"}}) @@ -6017,15 +6015,13 @@ func (suite *GlideTestSuite) TestXPendingFailures() { consumer1 := "consumer-1-" + uuid.New().String() invalidConsumer := "invalid-consumer-" + uuid.New().String() - command := []string{"XGroup", "Create", key, groupName, zeroStreamId, "MKSTREAM"} + suite.verifyOK( + client.XGroupCreateWithOptions(key, groupName, zeroStreamId, options.NewXGroupCreateOptions().SetMakeStream()), + ) + command := []string{"XGroup", "CreateConsumer", key, groupName, consumer1} resp, err := client.CustomCommand(command) assert.NoError(suite.T(), err) - assert.Equal(suite.T(), "OK", resp.Value().(string)) - - command = []string{"XGroup", "CreateConsumer", key, groupName, consumer1} - resp, err = client.CustomCommand(command) - assert.NoError(suite.T(), err) assert.True(suite.T(), resp.Value().(bool)) _, err = client.XAdd(key, [][]string{{"field1", "value1"}}) @@ -6173,6 +6169,49 @@ func (suite *GlideTestSuite) TestXPendingFailures() { }) } +// TODO add XGroupDestroy tests there +func (suite *GlideTestSuite) TestXGroupCreate_XGroupDestroy() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key := uuid.NewString() + group1 := uuid.NewString() + group2 := uuid.NewString() + id := "0-1" + + // Stream not created results in error + _, err := client.XGroupCreate(key, group1, id) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + + // Stream with option to create creates stream & Group + opts := options.NewXGroupCreateOptions().SetMakeStream() + suite.verifyOK(client.XGroupCreateWithOptions(key, group1, id, opts)) + + // ...and again results in BUSYGROUP error, because group names must be unique + _, err = client.XGroupCreate(key, group1, id) + assert.ErrorContains(suite.T(), err, "BUSYGROUP") + assert.IsType(suite.T(), &api.RequestError{}, err) + + // TODO add XGroupDestroy tests there + + // ENTRIESREAD option was added in valkey 7.0.0 + opts = options.NewXGroupCreateOptions().SetEntriesRead(100) + if suite.serverVersion >= "7.0.0" { + suite.verifyOK(client.XGroupCreateWithOptions(key, group2, id, opts)) + } else { + _, err = client.XGroupCreateWithOptions(key, group2, id, opts) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + } + + // key is not a stream + key = uuid.NewString() + suite.verifyOK(client.Set(key, id)) + _, err = client.XGroupCreate(key, group1, id) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + }) +} + func (suite *GlideTestSuite) TestObjectEncoding() { suite.runWithDefaultClients(func(client api.BaseClient) { // Test 1: Check object encoding for embstr