From 7843cdaf3aba75503b825a48a97b57ed94f77c08 Mon Sep 17 00:00:00 2001 From: jbrinkman Date: Fri, 17 Jan 2025 17:32:01 -0500 Subject: [PATCH] add command comments Signed-off-by: jbrinkman --- go/api/base_client.go | 47 +++++++++++++++++++++++++++- go/integTest/shared_commands_test.go | 46 ++++++++++++++++++++++++--- 2 files changed, 87 insertions(+), 6 deletions(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index e4dbcbe172..f11e248c0d 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -2678,6 +2678,52 @@ func (client *baseClient) XGroupCreateConsumer( } +// XGroupCreateConsumer creates a consumer named `consumer` in the consumer group `group` for the +// stream stored at `key`. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key: The key of the stream. +// group: The consumer group name. +// consumer: The newly created consumer. +// +// Return value: +// +// Returns `true` if the consumer is created. Otherwise, returns `false`. +// +// Example: +// +// //Creates the consumer "myconsumer" in consumer group "mygroup" +// success, err := client.xgroupCreateConsumer("mystream", "mygroup", "myconsumer").get(); +// if err == nill && success { +// fmt.Println("Consumer created") +// } +// +// [valkey.io]: https://valkey.io/commands/xgroup-createconsumer/ +// XGroupDelConsumer deletes a consumer named `consumer` in the consumer group `group`. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name. +// consumer - The consumer to delete. +// +// Returns the number of pending messages the `consumer` had before it was deleted. +// +// Example: +// +// // Deletes the consumer "myconsumer" in consumer group "mygroup" +// pendingMsgCount, err := client.XGroupDelConsumer("mystream", "mygroup", "myconsumer") +// if err != nil { +// // handle error +// } +// fmt.Printf("Consumer 'myconsumer' had %d pending messages unclaimed.\n", pendingMsgCount) +// +// [valkey.io]: https://valkey.io/commands/xgroup-delconsumer/ func (client *baseClient) XGroupDelConsumer( key string, group string, @@ -2688,5 +2734,4 @@ func (client *baseClient) XGroupDelConsumer( return defaultIntResponse, err } return handleIntResponse(result) - } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 8de72aec08..26efa7143d 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -6073,7 +6073,6 @@ func (suite *GlideTestSuite) TestEcho() { }) } -func (suite *GlideTestSuite) TestZRemRangeByRank() { suite.runWithDefaultClients(func(client api.BaseClient) { key1 := uuid.New().String() stringKey := uuid.New().String() @@ -6231,9 +6230,10 @@ func (suite *GlideTestSuite) TestZRemRangeByScore() { assert.IsType(suite.T(), &api.RequestError{}, err) }) } -func (suite *GlideTestSuite) TestXGroupCreateConsumer() { +func (suite *GlideTestSuite) TestXGroupStreamCommands() { suite.runWithDefaultClients(func(client api.BaseClient) { key := uuid.New().String() + stringKey := uuid.New().String() groupName := "group" + uuid.New().String() zeroStreamId := "0" consumerName := "consumer-" + uuid.New().String() @@ -6271,7 +6271,9 @@ func (suite *GlideTestSuite) TestXGroupCreateConsumer() { assert.NoError(suite.T(), err) // read the stream for the consumer and mark messages as pending - expectedGroup := map[string]map[string][][]string{key: {streamId1.Value(): {{"field1", "value1"}}, streamId2.Value(): {{"field12", "value2"}}}} + expectedGroup := map[string]map[string][][]string{ + key: {streamId1.Value(): {{"field1", "value1"}}, streamId2.Value(): {{"field2", "value2"}}}, + } actualGroup, err := client.XReadGroup(groupName, consumerName, map[string]string{key: ">"}) assert.NoError(suite.T(), err) assert.True(suite.T(), reflect.DeepEqual(expectedGroup, actualGroup), @@ -6286,9 +6288,12 @@ func (suite *GlideTestSuite) TestXGroupCreateConsumer() { // xreadgroup should return one empty stream and one non-empty stream resp, err := client.XReadGroup(groupName, consumerName, map[string]string{key: zeroStreamId}) assert.NoError(suite.T(), err) - assert.Equal(suite.T(), 2, len(resp)) + assert.Equal(suite.T(), 2, len(resp[key])) assert.Nil(suite.T(), resp[key][streamId1.Value()]) - assert.Equal(suite.T(), [][]string{{"field2", "value2"}}, resp[key][streamId2.Value()]) + assert.True(suite.T(), reflect.DeepEqual([][]string{{"field2", "value2"}}, resp[key][streamId2.Value()])) + + fmt.Printf("resp: %v\n", resp) + fmt.Printf("resp: %v\n", resp[key][streamId2.Value()]) // add a new stream entry streamId3, err := client.XAdd(key, [][]string{{"field3", "value3"}}) @@ -6299,9 +6304,40 @@ func (suite *GlideTestSuite) TestXGroupCreateConsumer() { command := []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()} sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command") + // Delete the consumer group and expect 0 pending messages + respInt64, err = client.XGroupDelConsumer(key, groupName, consumerName) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(0), respInt64) + + // TODO: Use XAck when it is added to the Go client + // xack streamid_1, and streamid_2 already received returns 0L + command = []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()} + sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command") + + // Consume the last message with the previously deleted consumer (creates the consumer anew) + resp, err = client.XReadGroup(groupName, consumerName, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 1, len(resp[key])) + + // TODO: Use XAck when it is added to the Go client + // Use non existent group, so xack streamid_3 returns 0 + command = []string{"XAck", key, "non-existent-group", streamId3.Value()} + sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command") + // Delete the consumer group and expect 1 pending message respInt64, err = client.XGroupDelConsumer(key, groupName, consumerName) assert.NoError(suite.T(), err) assert.Equal(suite.T(), int64(1), respInt64) + + // Set a string key, and expect an error when you try to create or delete a consumer group + _, err = client.Set(stringKey, "test") + assert.NoError(suite.T(), err) + respBool, err = client.XGroupCreateConsumer(stringKey, groupName, consumerName) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + + respInt64, err = client.XGroupDelConsumer(stringKey, groupName, consumerName) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) }) }