diff --git a/go/api/base_client.go b/go/api/base_client.go index d424035e3c..301eff1aa6 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -2504,7 +2504,7 @@ func (client *baseClient) XPendingWithOptions( return handleXPendingDetailResponse(result) } -// Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`. +// Creates a new consumer group uniquely identified by `group` for the stream stored at `key`. // // See [valkey.io] for details. // @@ -2531,7 +2531,7 @@ func (client *baseClient) XGroupCreate(key string, group string, id string) (str return client.XGroupCreateWithOptions(key, group, id, options.NewXGroupCreateOptions()) } -// Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`. +// Creates a new consumer group uniquely identified by `group` for the stream stored at `key`. // // See [valkey.io] for details. // @@ -2636,6 +2636,35 @@ func (client *baseClient) Echo(message string) (Result[string], error) { return handleStringOrNilResponse(result) } +// Destroys the consumer group `group` for the stream stored at `key`. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name to delete. +// +// Return value: +// +// `true` if the consumer group is destroyed. Otherwise, `false`. +// +// Example: +// +// ok, err := client.XGroupDestroy("mystream", "mygroup") +// if !ok || err != nil { +// // handle errors +// } +// +// [valkey.io]: https://valkey.io/commands/xgroup-destroy/ +func (client *baseClient) XGroupDestroy(key string, group string) (bool, error) { + result, err := client.executeCommand(C.XGroupDestroy, []string{key, group}) + if err != nil { + return defaultBoolResponse, err + } + return handleBoolResponse(result) +} + // Sets the last delivered ID for a consumer group. // // See [valkey.io] for details. @@ -2643,7 +2672,7 @@ func (client *baseClient) Echo(message string) (Result[string], error) { // Parameters: // // key - The key of the stream. -// group - The newly created consumer group name. +// group - The consumer group name. // id - The stream entry ID that should be set as the last delivered ID for the consumer group. // // Return value: @@ -2669,7 +2698,7 @@ func (client *baseClient) XGroupSetId(key string, group string, id string) (stri // Parameters: // // key - The key of the stream. -// group - The newly created consumer group name. +// group - The consumer group name. // id - The stream entry ID that should be set as the last delivered ID for the consumer group. // opts - The options for the command. See [options.XGroupSetIdOptions] for details. // @@ -2710,7 +2739,6 @@ func (client *baseClient) XGroupSetIdWithOptions( // // key - The key of the sorted set. // rangeQuery - The range query object representing the minimum and maximum bound of the lexicographical range. -// can be an implementation of [options.LexBoundary]. // // Return value: // diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index 0e5f399f2b..0dc369482a 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -157,6 +157,8 @@ type StreamCommands interface { XGroupCreateWithOptions(key string, group string, id string, opts *options.XGroupCreateOptions) (string, error) + XGroupDestroy(key string, group string) (bool, error) + XGroupCreateConsumer(key string, group string, consumer string) (bool, error) XGroupDelConsumer(key string, group string, consumer string) (int64, error) diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index fb942fc599..bc3e687c6c 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -5988,36 +5988,42 @@ func (suite *GlideTestSuite) TestXPendingFailures() { }) } -// TODO add XGroupDestroy tests there func (suite *GlideTestSuite) TestXGroupCreate_XGroupDestroy() { suite.runWithDefaultClients(func(client api.BaseClient) { key := uuid.NewString() - group1 := uuid.NewString() - group2 := uuid.NewString() + group := uuid.NewString() id := "0-1" // Stream not created results in error - _, err := client.XGroupCreate(key, group1, id) + _, err := client.XGroupCreate(key, group, id) assert.Error(suite.T(), err) assert.IsType(suite.T(), &api.RequestError{}, err) // Stream with option to create creates stream & Group opts := options.NewXGroupCreateOptions().SetMakeStream() - suite.verifyOK(client.XGroupCreateWithOptions(key, group1, id, opts)) + suite.verifyOK(client.XGroupCreateWithOptions(key, group, id, opts)) // ...and again results in BUSYGROUP error, because group names must be unique - _, err = client.XGroupCreate(key, group1, id) + _, err = client.XGroupCreate(key, group, id) assert.ErrorContains(suite.T(), err, "BUSYGROUP") assert.IsType(suite.T(), &api.RequestError{}, err) - // TODO add XGroupDestroy tests there + // Stream Group can be destroyed returns: true + destroyed, err := client.XGroupDestroy(key, group) + assert.NoError(suite.T(), err) + assert.True(suite.T(), destroyed) + + // ...and again results in: false + destroyed, err = client.XGroupDestroy(key, group) + assert.NoError(suite.T(), err) + assert.False(suite.T(), destroyed) // ENTRIESREAD option was added in valkey 7.0.0 opts = options.NewXGroupCreateOptions().SetEntriesRead(100) if suite.serverVersion >= "7.0.0" { - suite.verifyOK(client.XGroupCreateWithOptions(key, group2, id, opts)) + suite.verifyOK(client.XGroupCreateWithOptions(key, group, id, opts)) } else { - _, err = client.XGroupCreateWithOptions(key, group2, id, opts) + _, err = client.XGroupCreateWithOptions(key, group, id, opts) assert.Error(suite.T(), err) assert.IsType(suite.T(), &api.RequestError{}, err) } @@ -6025,7 +6031,7 @@ func (suite *GlideTestSuite) TestXGroupCreate_XGroupDestroy() { // key is not a stream key = uuid.NewString() suite.verifyOK(client.Set(key, id)) - _, err = client.XGroupCreate(key, group1, id) + _, err = client.XGroupCreate(key, group, id) assert.Error(suite.T(), err) assert.IsType(suite.T(), &api.RequestError{}, err) })