Skip to content

Commit

Permalink
Go: Add stream commands XGroupCreateConsumer/XGroupDelConsumer (valke…
Browse files Browse the repository at this point in the history
…y-io#2975)

* go xGroupCreateConsumer and XGroupDelConsumer

Signed-off-by: jbrinkman <[email protected]>

Co-authored-by: Yury-Fridlyand <[email protected]>
Signed-off-by: Edward Liang <[email protected]>
  • Loading branch information
2 people authored and edlng committed Jan 20, 2025
1 parent 5096018 commit c37ca67
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 0 deletions.
70 changes: 70 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5813,3 +5813,73 @@ func (client *baseClient) SortStoreWithOptions(
}
return handleIntOrNilResponse(result)
}

// XGroupCreateConsumer creates a consumer named `consumer` in the consumer group `group` for the
// stream stored at `key`.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// consumer - The newly created consumer.
//
// Return value:
//
// Returns `true` if the consumer is created. Otherwise, returns `false`.
//
// Example:
//
// //Creates the consumer "myconsumer" in consumer group "mygroup"
// success, err := client.xgroupCreateConsumer("mystream", "mygroup", "myconsumer")
// if err == nil && success {
// fmt.Println("Consumer created")
// }
//
// [valkey.io]: https://valkey.io/commands/xgroup-createconsumer/
func (client *baseClient) XGroupCreateConsumer(
key string,
group string,
consumer string,
) (bool, error) {
result, err := client.executeCommand(C.XGroupCreateConsumer, []string{key, group, consumer})
if err != nil {
return false, err
}
return handleBoolResponse(result)
}

// XGroupDelConsumer deletes a consumer named `consumer` in the consumer group `group`.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// consumer - The consumer to delete.
//
// Returns the number of pending messages the `consumer` had before it was deleted.
//
// Example:
//
// // Deletes the consumer "myconsumer" in consumer group "mygroup"
// pendingMsgCount, err := client.XGroupDelConsumer("mystream", "mygroup", "myconsumer")
// if err != nil {
// // handle error
// }
// fmt.Printf("Consumer 'myconsumer' had %d pending messages unclaimed.\n", pendingMsgCount)
//
// [valkey.io]: https://valkey.io/commands/xgroup-delconsumer/
func (client *baseClient) XGroupDelConsumer(
key string,
group string,
consumer string,
) (int64, error) {
result, err := client.executeCommand(C.XGroupDelConsumer, []string{key, group, consumer})
if err != nil {
return defaultIntResponse, err
}
return handleIntResponse(result)
}
4 changes: 4 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ type StreamCommands interface {
XGroupCreate(key string, group string, id string) (string, error)

XGroupCreateWithOptions(key string, group string, id string, opts *options.XGroupCreateOptions) (string, error)

XGroupCreateConsumer(key string, group string, consumer string) (bool, error)

XGroupDelConsumer(key string, group string, consumer string) (int64, error)
}
112 changes: 112 additions & 0 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6708,3 +6708,115 @@ func (suite *GlideTestSuite) TestSortStoreWithOptions_ByPattern() {
assert.Equal(suite.T(), resultList, sortedValues)
})
}

func (suite *GlideTestSuite) TestXGroupStreamCommands() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.New().String()
stringKey := uuid.New().String()
groupName := "group" + uuid.New().String()
zeroStreamId := "0"
consumerName := "consumer-" + uuid.New().String()

sendWithCustomCommand(
suite,
client,
[]string{"xgroup", "create", key, groupName, zeroStreamId, "MKSTREAM"},
"Can't send XGROUP CREATE as a custom command",
)
respBool, err := client.XGroupCreateConsumer(key, groupName, consumerName)
assert.NoError(suite.T(), err)
assert.True(suite.T(), respBool)

// create a consumer for a group that doesn't exist should result in a NOGROUP error
_, err = client.XGroupCreateConsumer(key, "non-existent-group", consumerName)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)
assert.True(suite.T(), strings.Contains(err.Error(), "NOGROUP"))

// create consumer that already exists should return false
respBool, err = client.XGroupCreateConsumer(key, groupName, consumerName)
assert.NoError(suite.T(), err)
assert.False(suite.T(), respBool)

// Delete a consumer that hasn't been created should return 0
respInt64, err := client.XGroupDelConsumer(key, groupName, "non-existent-consumer")
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(0), respInt64)

// Add two stream entries
streamId1, err := client.XAdd(key, [][]string{{"field1", "value1"}})
assert.NoError(suite.T(), err)
streamId2, err := client.XAdd(key, [][]string{{"field2", "value2"}})
assert.NoError(suite.T(), err)

// read the stream for the consumer and mark messages as pending
expectedGroup := map[string]map[string][][]string{
key: {streamId1.Value(): {{"field1", "value1"}}, streamId2.Value(): {{"field2", "value2"}}},
}
actualGroup, err := client.XReadGroup(groupName, consumerName, map[string]string{key: ">"})
assert.NoError(suite.T(), err)
assert.True(suite.T(), reflect.DeepEqual(expectedGroup, actualGroup),
"Expected and actual results do not match",
)

// delete one of the streams using XDel
respInt64, err = client.XDel(key, []string{streamId1.Value()})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(1), respInt64)

// xreadgroup should return one empty stream and one non-empty stream
resp, err := client.XReadGroup(groupName, consumerName, map[string]string{key: zeroStreamId})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), map[string]map[string][][]string{
key: {
streamId1.Value(): nil,
streamId2.Value(): {{"field2", "value2"}},
},
}, resp)

// add a new stream entry
streamId3, err := client.XAdd(key, [][]string{{"field3", "value3"}})
assert.NoError(suite.T(), err)
assert.NotNil(suite.T(), streamId3)

// xack that streamid1 and streamid2 have been processed
command := []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()}
sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command")

// Delete the consumer group and expect 0 pending messages
respInt64, err = client.XGroupDelConsumer(key, groupName, consumerName)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(0), respInt64)

// TODO: Use XAck when it is added to the Go client
// xack streamid_1, and streamid_2 already received returns 0L
command = []string{"XAck", key, groupName, streamId1.Value(), streamId2.Value()}
sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command")

// Consume the last message with the previously deleted consumer (creates the consumer anew)
resp, err = client.XReadGroup(groupName, consumerName, map[string]string{key: ">"})
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), 1, len(resp[key]))

// TODO: Use XAck when it is added to the Go client
// Use non existent group, so xack streamid_3 returns 0
command = []string{"XAck", key, "non-existent-group", streamId3.Value()}
sendWithCustomCommand(suite, client, command, "Can't send XACK as a custom command")

// Delete the consumer group and expect 1 pending message
respInt64, err = client.XGroupDelConsumer(key, groupName, consumerName)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), int64(1), respInt64)

// Set a string key, and expect an error when you try to create or delete a consumer group
_, err = client.Set(stringKey, "test")
assert.NoError(suite.T(), err)
_, err = client.XGroupCreateConsumer(stringKey, groupName, consumerName)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)

_, err = client.XGroupDelConsumer(stringKey, groupName, consumerName)
assert.Error(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)
})
}

0 comments on commit c37ca67

Please sign in to comment.