Skip to content

Commit

Permalink
Go: XGROUP DESTROY. (#2974)
Browse files Browse the repository at this point in the history
* Go: `XGROUP CREATE`.

Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand authored Jan 21, 2025
1 parent cf88d66 commit 2990264
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 15 deletions.
38 changes: 33 additions & 5 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2504,7 +2504,7 @@ func (client *baseClient) XPendingWithOptions(
return handleXPendingDetailResponse(result)
}

// Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`.
// Creates a new consumer group uniquely identified by `group` for the stream stored at `key`.
//
// See [valkey.io] for details.
//
Expand All @@ -2531,7 +2531,7 @@ func (client *baseClient) XGroupCreate(key string, group string, id string) (str
return client.XGroupCreateWithOptions(key, group, id, options.NewXGroupCreateOptions())
}

// Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`.
// Creates a new consumer group uniquely identified by `group` for the stream stored at `key`.
//
// See [valkey.io] for details.
//
Expand Down Expand Up @@ -2636,14 +2636,43 @@ func (client *baseClient) Echo(message string) (Result[string], error) {
return handleStringOrNilResponse(result)
}

// Destroys the consumer group `group` for the stream stored at `key`.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name to delete.
//
// Return value:
//
// `true` if the consumer group is destroyed. Otherwise, `false`.
//
// Example:
//
// ok, err := client.XGroupDestroy("mystream", "mygroup")
// if !ok || err != nil {
// // handle errors
// }
//
// [valkey.io]: https://valkey.io/commands/xgroup-destroy/
func (client *baseClient) XGroupDestroy(key string, group string) (bool, error) {
result, err := client.executeCommand(C.XGroupDestroy, []string{key, group})
if err != nil {
return defaultBoolResponse, err
}
return handleBoolResponse(result)
}

// Sets the last delivered ID for a consumer group.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The newly created consumer group name.
// group - The consumer group name.
// id - The stream entry ID that should be set as the last delivered ID for the consumer group.
//
// Return value:
Expand All @@ -2669,7 +2698,7 @@ func (client *baseClient) XGroupSetId(key string, group string, id string) (stri
// Parameters:
//
// key - The key of the stream.
// group - The newly created consumer group name.
// group - The consumer group name.
// id - The stream entry ID that should be set as the last delivered ID for the consumer group.
// opts - The options for the command. See [options.XGroupSetIdOptions] for details.
//
Expand Down Expand Up @@ -2710,7 +2739,6 @@ func (client *baseClient) XGroupSetIdWithOptions(
//
// key - The key of the sorted set.
// rangeQuery - The range query object representing the minimum and maximum bound of the lexicographical range.
// can be an implementation of [options.LexBoundary].
//
// Return value:
//
Expand Down
2 changes: 2 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ type StreamCommands interface {

XGroupCreateWithOptions(key string, group string, id string, opts *options.XGroupCreateOptions) (string, error)

XGroupDestroy(key string, group string) (bool, error)

XGroupCreateConsumer(key string, group string, consumer string) (bool, error)

XGroupDelConsumer(key string, group string, consumer string) (int64, error)
Expand Down
26 changes: 16 additions & 10 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5988,44 +5988,50 @@ func (suite *GlideTestSuite) TestXPendingFailures() {
})
}

// TODO add XGroupDestroy tests there
func (suite *GlideTestSuite) TestXGroupCreate_XGroupDestroy() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.NewString()
group1 := uuid.NewString()
group2 := uuid.NewString()
group := uuid.NewString()
id := "0-1"

// Stream not created results in error
_, err := client.XGroupCreate(key, group1, id)
_, err := client.XGroupCreate(key, group, id)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)

// Stream with option to create creates stream & Group
opts := options.NewXGroupCreateOptions().SetMakeStream()
suite.verifyOK(client.XGroupCreateWithOptions(key, group1, id, opts))
suite.verifyOK(client.XGroupCreateWithOptions(key, group, id, opts))

// ...and again results in BUSYGROUP error, because group names must be unique
_, err = client.XGroupCreate(key, group1, id)
_, err = client.XGroupCreate(key, group, id)
assert.ErrorContains(suite.T(), err, "BUSYGROUP")
assert.IsType(suite.T(), &api.RequestError{}, err)

// TODO add XGroupDestroy tests there
// Stream Group can be destroyed returns: true
destroyed, err := client.XGroupDestroy(key, group)
assert.NoError(suite.T(), err)
assert.True(suite.T(), destroyed)

// ...and again results in: false
destroyed, err = client.XGroupDestroy(key, group)
assert.NoError(suite.T(), err)
assert.False(suite.T(), destroyed)

// ENTRIESREAD option was added in valkey 7.0.0
opts = options.NewXGroupCreateOptions().SetEntriesRead(100)
if suite.serverVersion >= "7.0.0" {
suite.verifyOK(client.XGroupCreateWithOptions(key, group2, id, opts))
suite.verifyOK(client.XGroupCreateWithOptions(key, group, id, opts))
} else {
_, err = client.XGroupCreateWithOptions(key, group2, id, opts)
_, err = client.XGroupCreateWithOptions(key, group, id, opts)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)
}

// key is not a stream
key = uuid.NewString()
suite.verifyOK(client.Set(key, id))
_, err = client.XGroupCreate(key, group1, id)
_, err = client.XGroupCreate(key, group, id)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)
})
Expand Down

0 comments on commit 2990264

Please sign in to comment.