From fa14fd1bec86c4510d7b9bbcf77169a9e5cd3a90 Mon Sep 17 00:00:00 2001 From: jbrinkman Date: Fri, 17 Jan 2025 09:47:48 -0500 Subject: [PATCH 01/14] go xGroupCreateConsumer and XGroupDelConsumer Signed-off-by: jbrinkman --- go/api/base_client.go | 26 ++++++++++++++++++++++++++ go/api/stream_commands.go | 4 ++++ go/integTest/shared_commands_test.go | 11 +++++++++++ go/integTest/test_utils.go | 2 ++ 4 files changed, 43 insertions(+) diff --git a/go/api/base_client.go b/go/api/base_client.go index f7313e05d6..8c549a9f49 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -2870,3 +2870,29 @@ func (client *baseClient) SortStoreWithOptions( } return handleIntOrNilResponse(result) } + +func (client *baseClient) XGroupCreateConsumer( + key string, + group string, + consumer string, +) (bool, error) { + result, err := client.executeCommand(C.XGroupCreateConsumer, []string{key, group, consumer}) + if err != nil { + return false, err + } + return handleBoolResponse(result) + +} + +func (client *baseClient) XGroupDelConsumer( + key string, + group string, + consumer string, +) (int64, error) { + result, err := client.executeCommand(C.XGroupDelConsumer, []string{key, group, consumer}) + if err != nil { + return defaultIntResponse, err + } + return handleIntResponse(result) + +} diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index c212879d54..b5febb245d 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -152,4 +152,8 @@ type StreamCommands interface { XGroupCreate(key string, group string, id string) (string, error) XGroupCreateWithOptions(key string, group string, id string, opts *options.XGroupCreateOptions) (string, error) + + XGroupCreateConsumer(key string, group string, consumer string) (bool, error) + + XGroupDelConsumer(key string, group string, consumer string) (int64, error) } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 2e941aafb4..a9f574c651 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -6708,3 +6708,14 @@ func (suite *GlideTestSuite) TestSortStoreWithOptions_ByPattern() { assert.Equal(suite.T(), resultList, sortedValues) }) } + +func (suite *GlideTestSuite) TestXGroupCreateConsumer() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key := uuid.New().String() + groupName := "group" + uuid.New().String() + zeroStreamId := "0" + consumerName := "consumer-" + uuid.New().String() + + client.XGroupCreate(key, groupName, zeroStreamId, "MKSTREAM") + }) +} diff --git a/go/integTest/test_utils.go b/go/integTest/test_utils.go index 8e7b37bb8f..f628db6044 100644 --- a/go/integTest/test_utils.go +++ b/go/integTest/test_utils.go @@ -2,6 +2,8 @@ package integTest +import "github.com/valkey-io/valkey-glide/go/glide/api" + // check if sliceA is a subset of sliceB func isSubset[T comparable](sliceA []T, sliceB []T) bool { setB := make(map[T]struct{}) From 8bafc225164b9235aaa1cb50a3cd57e21f916294 Mon Sep 17 00:00:00 2001 From: jbrinkman Date: Fri, 17 Jan 2025 15:07:45 -0500 Subject: [PATCH 02/14] add integration tests Signed-off-by: jbrinkman --- go/integTest/shared_commands_test.go | 50 +++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index a9f574c651..74a289919b 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -6716,6 +6716,54 @@ func (suite *GlideTestSuite) TestXGroupCreateConsumer() { zeroStreamId := "0" consumerName := "consumer-" + uuid.New().String() - client.XGroupCreate(key, groupName, zeroStreamId, "MKSTREAM") + sendWithCustomCommand( + suite, + client, + []string{"xgroup", "create", key, groupName, zeroStreamId, "MKSTREAM"}, + "Can't send XGROUP CREATE as a custom command", + ) + respBool, err := client.XGroupCreateConsumer(key, groupName, consumerName) + assert.NoError(suite.T(), err) + assert.True(suite.T(), respBool) + + // create a consumer for a group that doesn't exist should result in a NOGROUP error + _, err = client.XGroupCreateConsumer(key, "non-existent-group", consumerName) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + assert.True(suite.T(), strings.Contains(err.Error(), "NOGROUP")) + + // create consumer that already exists should return false + respBool, err = client.XGroupCreateConsumer(key, groupName, consumerName) + assert.NoError(suite.T(), err) + assert.False(suite.T(), respBool) + + // Delete a consumer that hasn't been created should return 0 + respInt64, err := client.XGroupDelConsumer(key, groupName, "non-existent-consumer") + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(0), respInt64) + + // Add two stream entries + streamId1, err := client.XAdd(key, [][]string{{"field1", "value1"}}) + assert.NoError(suite.T(), err) + streamId2, err := client.XAdd(key, [][]string{{"field2", "value2"}}) + assert.NoError(suite.T(), err) + + // read the stream for the consumer and mark messages as pending + command := []string{"XReadGroup", "GROUP", groupName, consumerName, "STREAMS", key, ">"} + sendWithCustomCommand(suite, client, command, "Can't send XREADGROUP as a custom command") + + // delete one of the streams using XDel + respInt64, err = client.XDel(key, []string{streamId1.Value()}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(1), respInt64) + + // xreadgroup should return one empty stream and one non-empty stream + command = []string{"XReadGroup", "GROUP", groupName, consumerName, "STREAMS", key, zeroStreamId} + sendWithCustomCommand(suite, client, command, "Can't send XREADGROUP as a custom command") + + // xack that streamid1 and streamid2 have been processed + command = []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()} + sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command") + }) } From 71fdaf9f62ac9f9dca80f3e7281396a0f3499a98 Mon Sep 17 00:00:00 2001 From: jbrinkman Date: Fri, 17 Jan 2025 15:34:29 -0500 Subject: [PATCH 03/14] add integration tests Signed-off-by: jbrinkman --- go/integTest/shared_commands_test.go | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 74a289919b..191e560613 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -6749,8 +6749,12 @@ func (suite *GlideTestSuite) TestXGroupCreateConsumer() { assert.NoError(suite.T(), err) // read the stream for the consumer and mark messages as pending - command := []string{"XReadGroup", "GROUP", groupName, consumerName, "STREAMS", key, ">"} - sendWithCustomCommand(suite, client, command, "Can't send XREADGROUP as a custom command") + expectedGroup := map[string]map[string][][]string{key: {streamId1.Value(): {{"field1", "value1"}}, streamId2.Value(): {{"field12", "value2"}}}} + actualGroup, err := client.XReadGroup(groupName, consumerName, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + assert.True(suite.T(), reflect.DeepEqual(expectedGroup, actualGroup), + "Expected and actual results do not match", + ) // delete one of the streams using XDel respInt64, err = client.XDel(key, []string{streamId1.Value()}) @@ -6758,12 +6762,24 @@ func (suite *GlideTestSuite) TestXGroupCreateConsumer() { assert.Equal(suite.T(), int64(1), respInt64) // xreadgroup should return one empty stream and one non-empty stream - command = []string{"XReadGroup", "GROUP", groupName, consumerName, "STREAMS", key, zeroStreamId} - sendWithCustomCommand(suite, client, command, "Can't send XREADGROUP as a custom command") + resp, err := client.XReadGroup(groupName, consumerName, map[string]string{key: zeroStreamId}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 2, len(resp)) + assert.Nil(suite.T(), resp[key][streamId1.Value()]) + assert.Equal(suite.T(), [][]string{{"field2", "value2"}}, resp[key][streamId2.Value()]) + + // add a new stream entry + streamId3, err := client.XAdd(key, [][]string{{"field3", "value3"}}) + assert.NoError(suite.T(), err) + assert.NotNil(suite.T(), streamId3) // xack that streamid1 and streamid2 have been processed - command = []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()} + command := []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()} sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command") + // Delete the consumer group and expect 1 pending message + respInt64, err = client.XGroupDelConsumer(key, groupName, consumerName) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(1), respInt64) }) } From 9feedaac8f0878770cdb522310157ae0b56c8e76 Mon Sep 17 00:00:00 2001 From: jbrinkman Date: Fri, 17 Jan 2025 17:32:01 -0500 Subject: [PATCH 04/14] add command comments Signed-off-by: jbrinkman --- go/api/base_client.go | 47 +++++++++++++++++++++++++++- go/integTest/shared_commands_test.go | 46 ++++++++++++++++++++++++--- 2 files changed, 87 insertions(+), 6 deletions(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index 8c549a9f49..1c90992d2a 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -2884,6 +2884,52 @@ func (client *baseClient) XGroupCreateConsumer( } +// XGroupCreateConsumer creates a consumer named `consumer` in the consumer group `group` for the +// stream stored at `key`. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key: The key of the stream. +// group: The consumer group name. +// consumer: The newly created consumer. +// +// Return value: +// +// Returns `true` if the consumer is created. Otherwise, returns `false`. +// +// Example: +// +// //Creates the consumer "myconsumer" in consumer group "mygroup" +// success, err := client.xgroupCreateConsumer("mystream", "mygroup", "myconsumer").get(); +// if err == nill && success { +// fmt.Println("Consumer created") +// } +// +// [valkey.io]: https://valkey.io/commands/xgroup-createconsumer/ +// XGroupDelConsumer deletes a consumer named `consumer` in the consumer group `group`. +// +// See [valkey.io] for details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name. +// consumer - The consumer to delete. +// +// Returns the number of pending messages the `consumer` had before it was deleted. +// +// Example: +// +// // Deletes the consumer "myconsumer" in consumer group "mygroup" +// pendingMsgCount, err := client.XGroupDelConsumer("mystream", "mygroup", "myconsumer") +// if err != nil { +// // handle error +// } +// fmt.Printf("Consumer 'myconsumer' had %d pending messages unclaimed.\n", pendingMsgCount) +// +// [valkey.io]: https://valkey.io/commands/xgroup-delconsumer/ func (client *baseClient) XGroupDelConsumer( key string, group string, @@ -2894,5 +2940,4 @@ func (client *baseClient) XGroupDelConsumer( return defaultIntResponse, err } return handleIntResponse(result) - } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 191e560613..1bd5dce076 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -6331,7 +6331,6 @@ func (suite *GlideTestSuite) TestEcho() { }) } -func (suite *GlideTestSuite) TestZRemRangeByRank() { suite.runWithDefaultClients(func(client api.BaseClient) { key1 := uuid.New().String() stringKey := uuid.New().String() @@ -6709,9 +6708,10 @@ func (suite *GlideTestSuite) TestSortStoreWithOptions_ByPattern() { }) } -func (suite *GlideTestSuite) TestXGroupCreateConsumer() { +func (suite *GlideTestSuite) TestXGroupStreamCommands() { suite.runWithDefaultClients(func(client api.BaseClient) { key := uuid.New().String() + stringKey := uuid.New().String() groupName := "group" + uuid.New().String() zeroStreamId := "0" consumerName := "consumer-" + uuid.New().String() @@ -6749,7 +6749,9 @@ func (suite *GlideTestSuite) TestXGroupCreateConsumer() { assert.NoError(suite.T(), err) // read the stream for the consumer and mark messages as pending - expectedGroup := map[string]map[string][][]string{key: {streamId1.Value(): {{"field1", "value1"}}, streamId2.Value(): {{"field12", "value2"}}}} + expectedGroup := map[string]map[string][][]string{ + key: {streamId1.Value(): {{"field1", "value1"}}, streamId2.Value(): {{"field2", "value2"}}}, + } actualGroup, err := client.XReadGroup(groupName, consumerName, map[string]string{key: ">"}) assert.NoError(suite.T(), err) assert.True(suite.T(), reflect.DeepEqual(expectedGroup, actualGroup), @@ -6764,9 +6766,12 @@ func (suite *GlideTestSuite) TestXGroupCreateConsumer() { // xreadgroup should return one empty stream and one non-empty stream resp, err := client.XReadGroup(groupName, consumerName, map[string]string{key: zeroStreamId}) assert.NoError(suite.T(), err) - assert.Equal(suite.T(), 2, len(resp)) + assert.Equal(suite.T(), 2, len(resp[key])) assert.Nil(suite.T(), resp[key][streamId1.Value()]) - assert.Equal(suite.T(), [][]string{{"field2", "value2"}}, resp[key][streamId2.Value()]) + assert.True(suite.T(), reflect.DeepEqual([][]string{{"field2", "value2"}}, resp[key][streamId2.Value()])) + + fmt.Printf("resp: %v\n", resp) + fmt.Printf("resp: %v\n", resp[key][streamId2.Value()]) // add a new stream entry streamId3, err := client.XAdd(key, [][]string{{"field3", "value3"}}) @@ -6777,9 +6782,40 @@ func (suite *GlideTestSuite) TestXGroupCreateConsumer() { command := []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()} sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command") + // Delete the consumer group and expect 0 pending messages + respInt64, err = client.XGroupDelConsumer(key, groupName, consumerName) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), int64(0), respInt64) + + // TODO: Use XAck when it is added to the Go client + // xack streamid_1, and streamid_2 already received returns 0L + command = []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()} + sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command") + + // Consume the last message with the previously deleted consumer (creates the consumer anew) + resp, err = client.XReadGroup(groupName, consumerName, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), 1, len(resp[key])) + + // TODO: Use XAck when it is added to the Go client + // Use non existent group, so xack streamid_3 returns 0 + command = []string{"XAck", key, "non-existent-group", streamId3.Value()} + sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command") + // Delete the consumer group and expect 1 pending message respInt64, err = client.XGroupDelConsumer(key, groupName, consumerName) assert.NoError(suite.T(), err) assert.Equal(suite.T(), int64(1), respInt64) + + // Set a string key, and expect an error when you try to create or delete a consumer group + _, err = client.Set(stringKey, "test") + assert.NoError(suite.T(), err) + respBool, err = client.XGroupCreateConsumer(stringKey, groupName, consumerName) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) + + respInt64, err = client.XGroupDelConsumer(stringKey, groupName, consumerName) + assert.Error(suite.T(), err) + assert.IsType(suite.T(), &api.RequestError{}, err) }) } From 54c96b2ab52ba5f398834ef69fbeb8c585545a49 Mon Sep 17 00:00:00 2001 From: jbrinkman Date: Fri, 17 Jan 2025 20:41:48 -0500 Subject: [PATCH 05/14] fix linting error with unused variables Signed-off-by: jbrinkman --- go/integTest/shared_commands_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 1bd5dce076..a0ba6f9b30 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -3,6 +3,7 @@ package integTest import ( + "fmt" "math" "reflect" "strconv" @@ -6331,6 +6332,7 @@ func (suite *GlideTestSuite) TestEcho() { }) } +func (suite *GlideTestSuite) TestZRemRangeByRank() { suite.runWithDefaultClients(func(client api.BaseClient) { key1 := uuid.New().String() stringKey := uuid.New().String() @@ -6810,11 +6812,11 @@ func (suite *GlideTestSuite) TestXGroupStreamCommands() { // Set a string key, and expect an error when you try to create or delete a consumer group _, err = client.Set(stringKey, "test") assert.NoError(suite.T(), err) - respBool, err = client.XGroupCreateConsumer(stringKey, groupName, consumerName) + _, err = client.XGroupCreateConsumer(stringKey, groupName, consumerName) assert.Error(suite.T(), err) assert.IsType(suite.T(), &api.RequestError{}, err) - respInt64, err = client.XGroupDelConsumer(stringKey, groupName, consumerName) + _, err = client.XGroupDelConsumer(stringKey, groupName, consumerName) assert.Error(suite.T(), err) assert.IsType(suite.T(), &api.RequestError{}, err) }) From 3e1b4191d7d2cd4a4cd193ffc823689d744291af Mon Sep 17 00:00:00 2001 From: jbrinkman Date: Fri, 17 Jan 2025 20:54:58 -0500 Subject: [PATCH 06/14] fix unused import Signed-off-by: jbrinkman --- go/integTest/test_utils.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/go/integTest/test_utils.go b/go/integTest/test_utils.go index f628db6044..8e7b37bb8f 100644 --- a/go/integTest/test_utils.go +++ b/go/integTest/test_utils.go @@ -2,8 +2,6 @@ package integTest -import "github.com/valkey-io/valkey-glide/go/glide/api" - // check if sliceA is a subset of sliceB func isSubset[T comparable](sliceA []T, sliceB []T) bool { setB := make(map[T]struct{}) From 44a7676ff60b6c37e310a67aa67dd005ed070f79 Mon Sep 17 00:00:00 2001 From: jbrinkman Date: Fri, 17 Jan 2025 21:05:56 -0500 Subject: [PATCH 07/14] fix format Signed-off-by: jbrinkman --- go/api/base_client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index 1c90992d2a..8b4344612d 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -2881,7 +2881,6 @@ func (client *baseClient) XGroupCreateConsumer( return false, err } return handleBoolResponse(result) - } // XGroupCreateConsumer creates a consumer named `consumer` in the consumer group `group` for the From 46b59260c960d56f50b08990f7bc1893e141bd12 Mon Sep 17 00:00:00 2001 From: Joseph Brinkman Date: Mon, 20 Jan 2025 13:07:42 -0500 Subject: [PATCH 08/14] Update go/api/base_client.go Co-authored-by: Yury-Fridlyand Signed-off-by: Joseph Brinkman Signed-off-by: jbrinkman --- go/api/base_client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index 8b4344612d..c922b02fd1 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -2890,9 +2890,9 @@ func (client *baseClient) XGroupCreateConsumer( // // Parameters: // -// key: The key of the stream. -// group: The consumer group name. -// consumer: The newly created consumer. +// key - The key of the stream. +// group - The consumer group name. +// consumer - The newly created consumer. // // Return value: // From 9fd744ee0f9c02cf1d3381ebff13fd261aa83f5c Mon Sep 17 00:00:00 2001 From: jbrinkman Date: Mon, 20 Jan 2025 13:14:19 -0500 Subject: [PATCH 09/14] chore: cleanup doc comment Signed-off-by: jbrinkman --- go/api/base_client.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index c922b02fd1..b46b736b51 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -2871,18 +2871,6 @@ func (client *baseClient) SortStoreWithOptions( return handleIntOrNilResponse(result) } -func (client *baseClient) XGroupCreateConsumer( - key string, - group string, - consumer string, -) (bool, error) { - result, err := client.executeCommand(C.XGroupCreateConsumer, []string{key, group, consumer}) - if err != nil { - return false, err - } - return handleBoolResponse(result) -} - // XGroupCreateConsumer creates a consumer named `consumer` in the consumer group `group` for the // stream stored at `key`. // @@ -2907,6 +2895,18 @@ func (client *baseClient) XGroupCreateConsumer( // } // // [valkey.io]: https://valkey.io/commands/xgroup-createconsumer/ +func (client *baseClient) XGroupCreateConsumer( + key string, + group string, + consumer string, +) (bool, error) { + result, err := client.executeCommand(C.XGroupCreateConsumer, []string{key, group, consumer}) + if err != nil { + return false, err + } + return handleBoolResponse(result) +} + // XGroupDelConsumer deletes a consumer named `consumer` in the consumer group `group`. // // See [valkey.io] for details. From 0b5385ff97bbc66ad6220c3dda33a3af71146ab5 Mon Sep 17 00:00:00 2001 From: Joseph Brinkman Date: Mon, 20 Jan 2025 13:16:20 -0500 Subject: [PATCH 10/14] Update go/integTest/shared_commands_test.go Co-authored-by: Yury-Fridlyand Signed-off-by: Joseph Brinkman Signed-off-by: jbrinkman --- go/integTest/shared_commands_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index a0ba6f9b30..701c7d2dc9 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -6768,9 +6768,12 @@ func (suite *GlideTestSuite) TestXGroupStreamCommands() { // xreadgroup should return one empty stream and one non-empty stream resp, err := client.XReadGroup(groupName, consumerName, map[string]string{key: zeroStreamId}) assert.NoError(suite.T(), err) - assert.Equal(suite.T(), 2, len(resp[key])) - assert.Nil(suite.T(), resp[key][streamId1.Value()]) - assert.True(suite.T(), reflect.DeepEqual([][]string{{"field2", "value2"}}, resp[key][streamId2.Value()])) + assert.Equal(suite.T(), map[string]map[string][][]string { + key: { + streamId1.Value(): nil, + streamId2.Value(): {{"field2", "value2"}}, + }, + }, resp)) fmt.Printf("resp: %v\n", resp) fmt.Printf("resp: %v\n", resp[key][streamId2.Value()]) From 7fd126da678228defd622ee01d360693f28e42f4 Mon Sep 17 00:00:00 2001 From: jbrinkman Date: Mon, 20 Jan 2025 13:18:42 -0500 Subject: [PATCH 11/14] fix compilation error Signed-off-by: jbrinkman --- go/integTest/shared_commands_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 701c7d2dc9..4752993a52 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -6768,12 +6768,12 @@ func (suite *GlideTestSuite) TestXGroupStreamCommands() { // xreadgroup should return one empty stream and one non-empty stream resp, err := client.XReadGroup(groupName, consumerName, map[string]string{key: zeroStreamId}) assert.NoError(suite.T(), err) - assert.Equal(suite.T(), map[string]map[string][][]string { + assert.Equal(suite.T(), map[string]map[string][][]string{ key: { streamId1.Value(): nil, streamId2.Value(): {{"field2", "value2"}}, }, - }, resp)) + }, resp) fmt.Printf("resp: %v\n", resp) fmt.Printf("resp: %v\n", resp[key][streamId2.Value()]) From ffb5fc260da521e763715c0523720c38e1a0b079 Mon Sep 17 00:00:00 2001 From: Joseph Brinkman Date: Mon, 20 Jan 2025 13:22:08 -0500 Subject: [PATCH 12/14] Update go/api/base_client.go Co-authored-by: Yury-Fridlyand Signed-off-by: Joseph Brinkman Signed-off-by: jbrinkman --- go/api/base_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index b46b736b51..709332cce5 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -2890,7 +2890,7 @@ func (client *baseClient) SortStoreWithOptions( // // //Creates the consumer "myconsumer" in consumer group "mygroup" // success, err := client.xgroupCreateConsumer("mystream", "mygroup", "myconsumer").get(); -// if err == nill && success { +// if err == nil && success { // fmt.Println("Consumer created") // } // From 09be0f9e27fd87a4cd03eef127cb827b23596620 Mon Sep 17 00:00:00 2001 From: Joseph Brinkman Date: Mon, 20 Jan 2025 13:22:30 -0500 Subject: [PATCH 13/14] Update go/api/base_client.go Co-authored-by: Yury-Fridlyand Signed-off-by: Joseph Brinkman Signed-off-by: jbrinkman --- go/api/base_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index 709332cce5..3c52edb79a 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -2889,7 +2889,7 @@ func (client *baseClient) SortStoreWithOptions( // Example: // // //Creates the consumer "myconsumer" in consumer group "mygroup" -// success, err := client.xgroupCreateConsumer("mystream", "mygroup", "myconsumer").get(); +// success, err := client.xgroupCreateConsumer("mystream", "mygroup", "myconsumer") // if err == nil && success { // fmt.Println("Consumer created") // } From 74ae8219fc314e62404a9f61e387dee40ac841de Mon Sep 17 00:00:00 2001 From: jbrinkman Date: Mon, 20 Jan 2025 13:27:43 -0500 Subject: [PATCH 14/14] code review feedback updates Signed-off-by: jbrinkman --- go/integTest/shared_commands_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 4752993a52..28421c428e 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -3,7 +3,6 @@ package integTest import ( - "fmt" "math" "reflect" "strconv" @@ -6775,9 +6774,6 @@ func (suite *GlideTestSuite) TestXGroupStreamCommands() { }, }, resp) - fmt.Printf("resp: %v\n", resp) - fmt.Printf("resp: %v\n", resp[key][streamId2.Value()]) - // add a new stream entry streamId3, err := client.XAdd(key, [][]string{{"field3", "value3"}}) assert.NoError(suite.T(), err)