Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go: Add stream commands XGroupCreateConsumer/XGroupDelConsumer #2975

Merged
merged 14 commits into from
Jan 20, 2025
70 changes: 70 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2870,3 +2870,73 @@ func (client *baseClient) SortStoreWithOptions(
}
return handleIntOrNilResponse(result)
}

// XGroupCreateConsumer creates a consumer named `consumer` in the consumer group `group` for the
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
// stream stored at `key`.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// consumer - The newly created consumer.
//
// Return value:
//
// Returns `true` if the consumer is created. Otherwise, returns `false`.
//
// Example:
//
// //Creates the consumer "myconsumer" in consumer group "mygroup"
// success, err := client.xgroupCreateConsumer("mystream", "mygroup", "myconsumer")
// if err == nil && success {
// fmt.Println("Consumer created")
// }
//
// [valkey.io]: https://valkey.io/commands/xgroup-createconsumer/
func (client *baseClient) XGroupCreateConsumer(
key string,
group string,
consumer string,
) (bool, error) {
result, err := client.executeCommand(C.XGroupCreateConsumer, []string{key, group, consumer})
if err != nil {
return false, err
}
return handleBoolResponse(result)
}

// XGroupDelConsumer deletes a consumer named `consumer` in the consumer group `group`.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// consumer - The consumer to delete.
//
// Returns the number of pending messages the `consumer` had before it was deleted.
//
// Example:
//
// // Deletes the consumer "myconsumer" in consumer group "mygroup"
// pendingMsgCount, err := client.XGroupDelConsumer("mystream", "mygroup", "myconsumer")
// if err != nil {
// // handle error
// }
// fmt.Printf("Consumer 'myconsumer' had %d pending messages unclaimed.\n", pendingMsgCount)
//
// [valkey.io]: https://valkey.io/commands/xgroup-delconsumer/
func (client *baseClient) XGroupDelConsumer(
key string,
group string,
consumer string,
) (int64, error) {
result, err := client.executeCommand(C.XGroupDelConsumer, []string{key, group, consumer})
if err != nil {
return defaultIntResponse, err
}
return handleIntResponse(result)
}
4 changes: 4 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,8 @@ type StreamCommands interface {
XGroupCreate(key string, group string, id string) (string, error)

XGroupCreateWithOptions(key string, group string, id string, opts *options.XGroupCreateOptions) (string, error)

XGroupCreateConsumer(key string, group string, consumer string) (bool, error)

XGroupDelConsumer(key string, group string, consumer string) (int64, error)
}
112 changes: 112 additions & 0 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6708,3 +6708,115 @@ func (suite *GlideTestSuite) TestSortStoreWithOptions_ByPattern() {
assert.Equal(suite.T(), resultList, sortedValues)
})
}

func (suite *GlideTestSuite) TestXGroupStreamCommands() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.New().String()
stringKey := uuid.New().String()
groupName := "group" + uuid.New().String()
zeroStreamId := "0"
consumerName := "consumer-" + uuid.New().String()

sendWithCustomCommand(
suite,
client,
[]string{"xgroup", "create", key, groupName, zeroStreamId, "MKSTREAM"},
"Can't send XGROUP CREATE as a custom command",
)
respBool, err := client.XGroupCreateConsumer(key, groupName, consumerName)
assert.NoError(suite.T(), err)
assert.True(suite.T(), respBool)

// create a consumer for a group that doesn't exist should result in a NOGROUP error
_, err = client.XGroupCreateConsumer(key, "non-existent-group", consumerName)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)
assert.True(suite.T(), strings.Contains(err.Error(), "NOGROUP"))

// create consumer that already exists should return false
respBool, err = client.XGroupCreateConsumer(key, groupName, consumerName)
assert.NoError(suite.T(), err)
assert.False(suite.T(), respBool)

// Delete a consumer that hasn't been created should return 0
respInt64, err := client.XGroupDelConsumer(key, groupName, "non-existent-consumer")
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(0), respInt64)

// Add two stream entries
streamId1, err := client.XAdd(key, [][]string{{"field1", "value1"}})
assert.NoError(suite.T(), err)
streamId2, err := client.XAdd(key, [][]string{{"field2", "value2"}})
assert.NoError(suite.T(), err)

// read the stream for the consumer and mark messages as pending
expectedGroup := map[string]map[string][][]string{
key: {streamId1.Value(): {{"field1", "value1"}}, streamId2.Value(): {{"field2", "value2"}}},
}
actualGroup, err := client.XReadGroup(groupName, consumerName, map[string]string{key: ">"})
assert.NoError(suite.T(), err)
assert.True(suite.T(), reflect.DeepEqual(expectedGroup, actualGroup),
"Expected and actual results do not match",
)

// delete one of the streams using XDel
respInt64, err = client.XDel(key, []string{streamId1.Value()})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(1), respInt64)

// xreadgroup should return one empty stream and one non-empty stream
resp, err := client.XReadGroup(groupName, consumerName, map[string]string{key: zeroStreamId})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), map[string]map[string][][]string{
key: {
streamId1.Value(): nil,
streamId2.Value(): {{"field2", "value2"}},
},
}, resp)

// add a new stream entry
streamId3, err := client.XAdd(key, [][]string{{"field3", "value3"}})
assert.NoError(suite.T(), err)
assert.NotNil(suite.T(), streamId3)

// xack that streamid1 and streamid2 have been processed
command := []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()}
sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command")

// Delete the consumer group and expect 0 pending messages
respInt64, err = client.XGroupDelConsumer(key, groupName, consumerName)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(0), respInt64)

// TODO: Use XAck when it is added to the Go client
// xack streamid_1, and streamid_2 already received returns 0L
command = []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()}
sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command")

// Consume the last message with the previously deleted consumer (creates the consumer anew)
resp, err = client.XReadGroup(groupName, consumerName, map[string]string{key: ">"})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), 1, len(resp[key]))

// TODO: Use XAck when it is added to the Go client
// Use non existent group, so xack streamid_3 returns 0
command = []string{"XAck", key, "non-existent-group", streamId3.Value()}
sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command")

// Delete the consumer group and expect 1 pending message
respInt64, err = client.XGroupDelConsumer(key, groupName, consumerName)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(1), respInt64)

// Set a string key, and expect an error when you try to create or delete a consumer group
_, err = client.Set(stringKey, "test")
assert.NoError(suite.T(), err)
_, err = client.XGroupCreateConsumer(stringKey, groupName, consumerName)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)

_, err = client.XGroupDelConsumer(stringKey, groupName, consumerName)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)
})
}
Loading