Skip to content

Commit

Permalink
add command comments
Browse files Browse the repository at this point in the history
Signed-off-by: jbrinkman <[email protected]>
  • Loading branch information
jbrinkman committed Jan 18, 2025
1 parent 090c3f1 commit 7843cda
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 6 deletions.
47 changes: 46 additions & 1 deletion go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2678,6 +2678,52 @@ func (client *baseClient) XGroupCreateConsumer(

}

// XGroupCreateConsumer creates a consumer named `consumer` in the consumer group `group` for the
// stream stored at `key`.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key: The key of the stream.
// group: The consumer group name.
// consumer: The newly created consumer.
//
// Return value:
//
// Returns `true` if the consumer is created. Otherwise, returns `false`.
//
// Example:
//
// //Creates the consumer "myconsumer" in consumer group "mygroup"
// success, err := client.xgroupCreateConsumer("mystream", "mygroup", "myconsumer").get();
// if err == nill && success {
// fmt.Println("Consumer created")
// }
//
// [valkey.io]: https://valkey.io/commands/xgroup-createconsumer/
// XGroupDelConsumer deletes a consumer named `consumer` in the consumer group `group`.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// consumer - The consumer to delete.
//
// Returns the number of pending messages the `consumer` had before it was deleted.
//
// Example:
//
// // Deletes the consumer "myconsumer" in consumer group "mygroup"
// pendingMsgCount, err := client.XGroupDelConsumer("mystream", "mygroup", "myconsumer")
// if err != nil {
// // handle error
// }
// fmt.Printf("Consumer 'myconsumer' had %d pending messages unclaimed.\n", pendingMsgCount)
//
// [valkey.io]: https://valkey.io/commands/xgroup-delconsumer/
func (client *baseClient) XGroupDelConsumer(
key string,
group string,
Expand All @@ -2688,5 +2734,4 @@ func (client *baseClient) XGroupDelConsumer(
return defaultIntResponse, err
}
return handleIntResponse(result)

}
46 changes: 41 additions & 5 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6073,7 +6073,6 @@ func (suite *GlideTestSuite) TestEcho() {
})
}

func (suite *GlideTestSuite) TestZRemRangeByRank() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key1 := uuid.New().String()
stringKey := uuid.New().String()
Expand Down Expand Up @@ -6231,9 +6230,10 @@ func (suite *GlideTestSuite) TestZRemRangeByScore() {
assert.IsType(suite.T(), &api.RequestError{}, err)
})
}
func (suite *GlideTestSuite) TestXGroupCreateConsumer() {
func (suite *GlideTestSuite) TestXGroupStreamCommands() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.New().String()
stringKey := uuid.New().String()
groupName := "group" + uuid.New().String()
zeroStreamId := "0"
consumerName := "consumer-" + uuid.New().String()
Expand Down Expand Up @@ -6271,7 +6271,9 @@ func (suite *GlideTestSuite) TestXGroupCreateConsumer() {
assert.NoError(suite.T(), err)

// read the stream for the consumer and mark messages as pending
expectedGroup := map[string]map[string][][]string{key: {streamId1.Value(): {{"field1", "value1"}}, streamId2.Value(): {{"field12", "value2"}}}}
expectedGroup := map[string]map[string][][]string{
key: {streamId1.Value(): {{"field1", "value1"}}, streamId2.Value(): {{"field2", "value2"}}},
}
actualGroup, err := client.XReadGroup(groupName, consumerName, map[string]string{key: ">"})
assert.NoError(suite.T(), err)
assert.True(suite.T(), reflect.DeepEqual(expectedGroup, actualGroup),
Expand All @@ -6286,9 +6288,12 @@ func (suite *GlideTestSuite) TestXGroupCreateConsumer() {
// xreadgroup should return one empty stream and one non-empty stream
resp, err := client.XReadGroup(groupName, consumerName, map[string]string{key: zeroStreamId})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), 2, len(resp))
assert.Equal(suite.T(), 2, len(resp[key]))
assert.Nil(suite.T(), resp[key][streamId1.Value()])
assert.Equal(suite.T(), [][]string{{"field2", "value2"}}, resp[key][streamId2.Value()])
assert.True(suite.T(), reflect.DeepEqual([][]string{{"field2", "value2"}}, resp[key][streamId2.Value()]))

fmt.Printf("resp: %v\n", resp)
fmt.Printf("resp: %v\n", resp[key][streamId2.Value()])

// add a new stream entry
streamId3, err := client.XAdd(key, [][]string{{"field3", "value3"}})
Expand All @@ -6299,9 +6304,40 @@ func (suite *GlideTestSuite) TestXGroupCreateConsumer() {
command := []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()}
sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command")

// Delete the consumer group and expect 0 pending messages
respInt64, err = client.XGroupDelConsumer(key, groupName, consumerName)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(0), respInt64)

// TODO: Use XAck when it is added to the Go client
// xack streamid_1, and streamid_2 already received returns 0L
command = []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()}
sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command")

// Consume the last message with the previously deleted consumer (creates the consumer anew)
resp, err = client.XReadGroup(groupName, consumerName, map[string]string{key: ">"})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), 1, len(resp[key]))

// TODO: Use XAck when it is added to the Go client
// Use non existent group, so xack streamid_3 returns 0
command = []string{"XAck", key, "non-existent-group", streamId3.Value()}
sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command")

// Delete the consumer group and expect 1 pending message
respInt64, err = client.XGroupDelConsumer(key, groupName, consumerName)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(1), respInt64)

// Set a string key, and expect an error when you try to create or delete a consumer group
_, err = client.Set(stringKey, "test")
assert.NoError(suite.T(), err)
respBool, err = client.XGroupCreateConsumer(stringKey, groupName, consumerName)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)

respInt64, err = client.XGroupDelConsumer(stringKey, groupName, consumerName)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)
})
}

0 comments on commit 7843cda

Please sign in to comment.