Skip to content

Commit

Permalink
Go: XGROUP CREATE. (valkey-io#2966)
Browse files Browse the repository at this point in the history
* Go: `XGROUP CREATE`.

Signed-off-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Edward Liang <[email protected]>
  • Loading branch information
Yury-Fridlyand authored and edlng committed Jan 20, 2025
1 parent 2030000 commit 5096018
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 12 deletions.
67 changes: 67 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5449,6 +5449,73 @@ func (client *baseClient) XPendingWithOptions(
return handleXPendingDetailResponse(result)
}

// Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The newly created consumer group name.
// id - Stream entry ID that specifies the last delivered entry in the stream from the new
// group’s perspective. The special ID `"$"` can be used to specify the last entry in the stream.
//
// Return value:
//
// `"OK"`.
//
// Example:
//
// ok, err := client.XGroupCreate("mystream", "mygroup", "0-0")
// if ok != "OK" || err != nil {
// // handle error
// }
//
// [valkey.io]: https://valkey.io/commands/xgroup-create/
func (client *baseClient) XGroupCreate(key string, group string, id string) (string, error) {
return client.XGroupCreateWithOptions(key, group, id, options.NewXGroupCreateOptions())
}

// Creates a new consumer group uniquely identified by `groupname` for the stream stored at `key`.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The newly created consumer group name.
// id - Stream entry ID that specifies the last delivered entry in the stream from the new
// group's perspective. The special ID `"$"` can be used to specify the last entry in the stream.
// opts - The options for the command. See [options.XGroupCreateOptions] for details.
//
// Return value:
//
// `"OK"`.
//
// Example:
//
// opts := options.NewXGroupCreateOptions().SetMakeStream()
// ok, err := client.XGroupCreateWithOptions("mystream", "mygroup", "0-0", opts)
// if ok != "OK" || err != nil {
// // handle error
// }
//
// [valkey.io]: https://valkey.io/commands/xgroup-create/
func (client *baseClient) XGroupCreateWithOptions(
key string,
group string,
id string,
opts *options.XGroupCreateOptions,
) (string, error) {
optionArgs, _ := opts.ToArgs()
args := append([]string{key, group, id}, optionArgs...)
result, err := client.executeCommand(C.XGroupCreate, args)
if err != nil {
return defaultStringResponse, err
}
return handleStringResponse(result)
}

func (client *baseClient) Restore(key string, ttl int64, value string) (Result[string], error) {
return client.RestoreWithOptions(key, ttl, value, NewRestoreOptionsBuilder())
}
Expand Down
40 changes: 40 additions & 0 deletions go/api/options/stream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,43 @@ func (xpo *XPendingOptions) ToArgs() ([]string, error) {

return args, nil
}

// Optional arguments for `XGroupCreate` in [StreamCommands]
type XGroupCreateOptions struct {
mkStream bool
entriesRead int64
}

// Create new empty `XGroupCreateOptions`
func NewXGroupCreateOptions() *XGroupCreateOptions {
return &XGroupCreateOptions{false, -1}
}

// Once set and if the stream doesn't exist, creates a new stream with a length of `0`.
func (xgco *XGroupCreateOptions) SetMakeStream() *XGroupCreateOptions {
xgco.mkStream = true
return xgco
}

// A value representing the number of stream entries already read by the group.
//
// Since Valkey version 7.0.0.
func (xgco *XGroupCreateOptions) SetEntriesRead(entriesRead int64) *XGroupCreateOptions {
xgco.entriesRead = entriesRead
return xgco
}

func (xgco *XGroupCreateOptions) ToArgs() ([]string, error) {
var args []string

// if minIdleTime is set, we need to add an `IDLE` argument along with the minIdleTime
if xgco.mkStream {
args = append(args, "MKSTREAM")
}

if xgco.entriesRead > -1 {
args = append(args, "ENTRIESREAD", utils.IntToString(xgco.entriesRead))
}

return args, nil
}
4 changes: 4 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,8 @@ type StreamCommands interface {
XPending(key string, group string) (XPendingSummary, error)

XPendingWithOptions(key string, group string, options *options.XPendingOptions) ([]XPendingDetail, error)

XGroupCreate(key string, group string, id string) (string, error)

XGroupCreateWithOptions(key string, group string, id string, opts *options.XGroupCreateOptions) (string, error)
}
63 changes: 51 additions & 12 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5865,15 +5865,13 @@ func (suite *GlideTestSuite) TestXPendingFailures() {
consumer1 := "consumer-1-" + uuid.New().String()
invalidConsumer := "invalid-consumer-" + uuid.New().String()

command := []string{"XGroup", "Create", key, groupName, zeroStreamId, "MKSTREAM"}
suite.verifyOK(
client.XGroupCreateWithOptions(key, groupName, zeroStreamId, options.NewXGroupCreateOptions().SetMakeStream()),
)

command := []string{"XGroup", "CreateConsumer", key, groupName, consumer1}
resp, err := client.CustomCommand(command)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), "OK", resp.(string))

command = []string{"XGroup", "CreateConsumer", key, groupName, consumer1}
resp, err = client.CustomCommand(command)
assert.NoError(suite.T(), err)
assert.True(suite.T(), resp.(bool))

_, err = client.XAdd(key, [][]string{{"field1", "value1"}})
Expand Down Expand Up @@ -6017,15 +6015,13 @@ func (suite *GlideTestSuite) TestXPendingFailures() {
consumer1 := "consumer-1-" + uuid.New().String()
invalidConsumer := "invalid-consumer-" + uuid.New().String()

command := []string{"XGroup", "Create", key, groupName, zeroStreamId, "MKSTREAM"}
suite.verifyOK(
client.XGroupCreateWithOptions(key, groupName, zeroStreamId, options.NewXGroupCreateOptions().SetMakeStream()),
)

command := []string{"XGroup", "CreateConsumer", key, groupName, consumer1}
resp, err := client.CustomCommand(command)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), "OK", resp.Value().(string))

command = []string{"XGroup", "CreateConsumer", key, groupName, consumer1}
resp, err = client.CustomCommand(command)
assert.NoError(suite.T(), err)
assert.True(suite.T(), resp.Value().(bool))

_, err = client.XAdd(key, [][]string{{"field1", "value1"}})
Expand Down Expand Up @@ -6173,6 +6169,49 @@ func (suite *GlideTestSuite) TestXPendingFailures() {
})
}

// TODO add XGroupDestroy tests there
func (suite *GlideTestSuite) TestXGroupCreate_XGroupDestroy() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.NewString()
group1 := uuid.NewString()
group2 := uuid.NewString()
id := "0-1"

// Stream not created results in error
_, err := client.XGroupCreate(key, group1, id)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)

// Stream with option to create creates stream & Group
opts := options.NewXGroupCreateOptions().SetMakeStream()
suite.verifyOK(client.XGroupCreateWithOptions(key, group1, id, opts))

// ...and again results in BUSYGROUP error, because group names must be unique
_, err = client.XGroupCreate(key, group1, id)
assert.ErrorContains(suite.T(), err, "BUSYGROUP")
assert.IsType(suite.T(), &api.RequestError{}, err)

// TODO add XGroupDestroy tests there

// ENTRIESREAD option was added in valkey 7.0.0
opts = options.NewXGroupCreateOptions().SetEntriesRead(100)
if suite.serverVersion >= "7.0.0" {
suite.verifyOK(client.XGroupCreateWithOptions(key, group2, id, opts))
} else {
_, err = client.XGroupCreateWithOptions(key, group2, id, opts)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)
}

// key is not a stream
key = uuid.NewString()
suite.verifyOK(client.Set(key, id))
_, err = client.XGroupCreate(key, group1, id)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)
})
}

func (suite *GlideTestSuite) TestObjectEncoding() {
suite.runWithDefaultClients(func(client api.BaseClient) {
// Test 1: Check object encoding for embstr
Expand Down

0 comments on commit 5096018

Please sign in to comment.