Skip to content

Commit

Permalink
Go: Add command XClaim (valkey-io#2984)
Browse files Browse the repository at this point in the history
* Go: Add command XClaim

Signed-off-by: TJ Zhang <[email protected]>
  • Loading branch information
tjzhang-BQ authored and Maayanshani25 committed Jan 22, 2025
1 parent 70584d5 commit fad318b
Show file tree
Hide file tree
Showing 5 changed files with 638 additions and 1 deletion.
182 changes: 182 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3310,3 +3310,185 @@ func (client *baseClient) BitCountWithOptions(key string, opts *options.BitCount
}
return handleIntResponse(result)
}

// Changes the ownership of a pending message.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The name of the consumer group.
// consumer - The name of the consumer.
// minIdleTime - The minimum idle time in milliseconds.
// ids - The ids of the entries to claim.
//
// Return value:
//
// A `map of message entries with the format `{"entryId": [["entry", "data"], ...], ...}` that were claimed by
// the consumer.
//
// Example:
//
// result, err := client.XClaim("key", "group", "consumer", 1000, []string{"streamId1", "streamId2"})
// fmt.Println(result) // Output: map[streamId1:[["entry1", "data1"], ["entry2", "data2"]] streamId2:[["entry3", "data3"]]]
//
// [valkey.io]: https://valkey.io/commands/xclaim/
func (client *baseClient) XClaim(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
) (map[string][][]string, error) {
return client.XClaimWithOptions(key, group, consumer, minIdleTime, ids, nil)
}

// Changes the ownership of a pending message.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The name of the consumer group.
// consumer - The name of the consumer.
// minIdleTime - The minimum idle time in milliseconds.
// ids - The ids of the entries to claim.
// options - Stream claim options.
//
// Return value:
//
// A `map` of message entries with the format `{"entryId": [["entry", "data"], ...], ...}` that were claimed by
// the consumer.
//
// Example:
//
// result, err := client.XClaimWithOptions(
// "key",
// "group",
// "consumer",
// 1000,
// []string{"streamId1", "streamId2"},
// options.NewStreamClaimOptions().SetIdleTime(1),
// )
// fmt.Println(result) // Output: map[streamId1:[["entry1", "data1"], ["entry2", "data2"]] streamId2:[["entry3", "data3"]]]
//
// [valkey.io]: https://valkey.io/commands/xclaim/
func (client *baseClient) XClaimWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
opts *options.StreamClaimOptions,
) (map[string][][]string, error) {
args := append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...)
if opts != nil {
optionArgs, err := opts.ToArgs()
if err != nil {
return nil, err
}
args = append(args, optionArgs...)
}
result, err := client.executeCommand(C.XClaim, args)
if err != nil {
return nil, err
}
return handleMapOfArrayOfStringArrayResponse(result)
}

// Changes the ownership of a pending message. This function returns an `array` with
// only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The name of the consumer group.
// consumer - The name of the consumer.
// minIdleTime - The minimum idle time in milliseconds.
// ids - The ids of the entries to claim.
// options - Stream claim options.
//
// Return value:
//
// An array of the ids of the entries that were claimed by the consumer.
//
// Example:
//
// result, err := client.XClaimJustId(
// "key",
// "group",
// "consumer",
// 1000,
// []string{"streamId1", "streamId2"},
// )
// fmt.Println(result) // Output: ["streamId1", "streamId2"]
//
// [valkey.io]: https://valkey.io/commands/xclaim/
func (client *baseClient) XClaimJustId(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
) ([]string, error) {
return client.XClaimJustIdWithOptions(key, group, consumer, minIdleTime, ids, nil)
}

// Changes the ownership of a pending message. This function returns an `array` with
// only the message/entry IDs, and is equivalent to using `JUSTID` in the Valkey API.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The name of the consumer group.
// consumer - The name of the consumer.
// minIdleTime - The minimum idle time in milliseconds.
// ids - The ids of the entries to claim.
// options - Stream claim options.
//
// Return value:
//
// An array of the ids of the entries that were claimed by the consumer.
//
// Example:
//
// result, err := client.XClaimJustIdWithOptions(
// "key",
// "group",
// "consumer",
// 1000,
// []string{"streamId1", "streamId2"},
// options.NewStreamClaimOptions().SetIdleTime(1),
// )
// fmt.Println(result) // Output: ["streamId1", "streamId2"]
//
// [valkey.io]: https://valkey.io/commands/xclaim/
func (client *baseClient) XClaimJustIdWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
opts *options.StreamClaimOptions,
) ([]string, error) {
args := append([]string{key, group, consumer, utils.IntToString(minIdleTime)}, ids...)
if opts != nil {
optionArgs, err := opts.ToArgs()
if err != nil {
return nil, err
}
args = append(args, optionArgs...)
}
args = append(args, options.JUST_ID_VALKEY_API)
result, err := client.executeCommand(C.XClaim, args)
if err != nil {
return nil, err
}
return handleStringArrayResponse(result)
}
72 changes: 72 additions & 0 deletions go/api/options/stream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,75 @@ func (xgsio *XGroupSetIdOptions) ToArgs() ([]string, error) {

return args, nil
}

// Optional arguments for `XClaim` in [StreamCommands]
type StreamClaimOptions struct {
idleTime int64
idleUnixTime int64
retryCount int64
isForce bool
}

func NewStreamClaimOptions() *StreamClaimOptions {
return &StreamClaimOptions{}
}

// Set the idle time in milliseconds.
func (sco *StreamClaimOptions) SetIdleTime(idleTime int64) *StreamClaimOptions {
sco.idleTime = idleTime
return sco
}

// Set the idle time in unix-milliseconds.
func (sco *StreamClaimOptions) SetIdleUnixTime(idleUnixTime int64) *StreamClaimOptions {
sco.idleUnixTime = idleUnixTime
return sco
}

// Set the retry count.
func (sco *StreamClaimOptions) SetRetryCount(retryCount int64) *StreamClaimOptions {
sco.retryCount = retryCount
return sco
}

// Set the force flag.
func (sco *StreamClaimOptions) SetForce() *StreamClaimOptions {
sco.isForce = true
return sco
}

// Valkey API keywords for stream claim options
const (
// ValKey API string to designate IDLE time in milliseconds
IDLE_VALKEY_API string = "IDLE"
// ValKey API string to designate TIME time in unix-milliseconds
TIME_VALKEY_API string = "TIME"
// ValKey API string to designate RETRYCOUNT
RETRY_COUNT_VALKEY_API string = "RETRYCOUNT"
// ValKey API string to designate FORCE
FORCE_VALKEY_API string = "FORCE"
// ValKey API string to designate JUSTID
JUST_ID_VALKEY_API string = "JUSTID"
)

func (sco *StreamClaimOptions) ToArgs() ([]string, error) {
optionArgs := []string{}

if sco.idleTime > 0 {
optionArgs = append(optionArgs, IDLE_VALKEY_API, utils.IntToString(sco.idleTime))
}

if sco.idleUnixTime > 0 {
optionArgs = append(optionArgs, TIME_VALKEY_API, utils.IntToString(sco.idleUnixTime))
}

if sco.retryCount > 0 {
optionArgs = append(optionArgs, RETRY_COUNT_VALKEY_API, utils.IntToString(sco.retryCount))
}

if sco.isForce {
optionArgs = append(optionArgs, FORCE_VALKEY_API)
}

return optionArgs, nil
}
37 changes: 37 additions & 0 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ type mapConverter[T any] struct {
canBeNil bool
}

// Converts an untyped map into a map[string]T
func (node mapConverter[T]) convert(data interface{}) (interface{}, error) {
if data == nil {
if node.canBeNil {
Expand All @@ -598,14 +599,17 @@ func (node mapConverter[T]) convert(data interface{}) (interface{}, error) {
}
result := make(map[string]T)

// Iterate over the map and convert each value to T
for key, value := range data.(map[string]interface{}) {
if node.next == nil {
// try direct conversion to T when there is no next converter
valueT, ok := value.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type of map element: %T, expected: %v", value, getType[T]())}
}
result[key] = valueT
} else {
// nested iteration when there is a next converter
val, err := node.next.convert(value)
if err != nil {
return nil, err
Expand All @@ -615,6 +619,7 @@ func (node mapConverter[T]) convert(data interface{}) (interface{}, error) {
result[key] = null
continue
}
// convert to T
valueT, ok := val.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type of map element: %T, expected: %v", val, getType[T]())}
Expand Down Expand Up @@ -674,6 +679,38 @@ func (node arrayConverter[T]) convert(data interface{}) (interface{}, error) {

// TODO: convert sets

func handleMapOfArrayOfStringArrayResponse(response *C.struct_CommandResponse) (map[string][][]string, error) {
defer C.free_command_response(response)

typeErr := checkResponseType(response, C.Map, false)
if typeErr != nil {
return nil, typeErr
}
mapData, err := parseMap(response)
if err != nil {
return nil, err
}
converted, err := mapConverter[[][]string]{
arrayConverter[[]string]{
arrayConverter[string]{
nil,
false,
},
false,
},
false,
}.convert(mapData)
if err != nil {
return nil, err
}
claimedEntries, ok := converted.(map[string][][]string)
if !ok {
return nil, &RequestError{fmt.Sprintf("unexpected type of second element: %T", converted)}
}

return claimedEntries, nil
}

func handleXAutoClaimResponse(response *C.struct_CommandResponse) (XAutoClaimResponse, error) {
defer C.free_command_response(response)
var null XAutoClaimResponse // default response
Expand Down
28 changes: 28 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,32 @@ type StreamCommands interface {
XGroupDelConsumer(key string, group string, consumer string) (int64, error)

XAck(key string, group string, ids []string) (int64, error)

XClaim(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
) (map[string][][]string, error)

XClaimWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
options *options.StreamClaimOptions,
) (map[string][][]string, error)

XClaimJustId(key string, group string, consumer string, minIdleTime int64, ids []string) ([]string, error)

XClaimJustIdWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
ids []string,
options *options.StreamClaimOptions,
) ([]string, error)
}
Loading

0 comments on commit fad318b

Please sign in to comment.