Skip to content

Commit

Permalink
Added functionality of bzmpop
Browse files Browse the repository at this point in the history
Signed-off-by: Edward Liang <[email protected]>
  • Loading branch information
edlng committed Jan 24, 2025
1 parent 8ba7ccd commit e548f45
Show file tree
Hide file tree
Showing 6 changed files with 316 additions and 0 deletions.
130 changes: 130 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2954,6 +2954,136 @@ func (client *baseClient) BLMPopCount(
return handleStringToStringArrayMapOrNilResponse(result)
}

// Blocks the connection until it pops and returns a member-score pair from the first non-empty sorted set, with the
// given keys being checked in the order they are provided.
// BZMPop is the blocking variant of [api.ZMPop].
//
// Note:
// - When in cluster mode, all keys must map to the same hash slot.
// - BZMPop is a client blocking command, see [Blocking Commands] for more details and best practices.
//
// Since:
//
// Valkey 7.0 and above.
//
// See [valkey.io] for details.
//
// Parameters:
//
// keys - An array of keys to lists.
// scoreFilter - The element pop criteria - either MIN or MAX to pop members with the lowest/highest scores accordingly.
// timeoutSecs - The number of seconds to wait for a blocking operation to complete. A value of 0 will block indefinitely.
//
// Return value:
//
// An object containing the following elements:
// - The key name of the set from which the element was popped
// - An array of member scores of the popped elements in ascending order.
// Returns nil if no member could be popped and the timeout expired.
//
// For example:
//
// result, err := client.ZAdd("my_list", map[string]float64{"five": 5.0, "six": 6.0})
// result, err := client.BZMPop([]string{"my_list"}, api.MAX, float64(0.1))
// result["my_list"] = []MemberAndScore{{Member: "six", Score: 6.0}}
//
// [valkey.io]: https://valkey.io/commands/bzmpop/
// [Blocking Commands]: https://github.com/valkey-io/valkey-glide/wiki/General-Concepts#blocking-commands
func (client *baseClient) BZMPop(
keys []string,
scoreFilter ScoreFilter,
timeoutSecs float64,
) (Result[KeyWithArrayOfMembersAndScores], error) {
scoreFilterStr, err := scoreFilter.toString()
if err != nil {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), err
}

// Check for potential length overflow.
if len(keys) > math.MaxInt-3 {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), &errors.RequestError{
Msg: "Length overflow for the provided keys",
}
}

// args slice will have 3 more arguments with the keys provided.
args := make([]string, 0, len(keys)+3)
args = append(args, utils.FloatToString(timeoutSecs), strconv.Itoa(len(keys)))
args = append(args, keys...)
args = append(args, scoreFilterStr)
result, err := client.executeCommand(C.BZMPop, args)
if err != nil {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), err
}
return handleKeyWithArrayOfMembersAndScoresResponse(result)
}

// Blocks the connection until it pops and returns a member-score pair from the first non-empty sorted set, with the
// given keys being checked in the order they are provided.
// BZMPop is the blocking variant of [api.ZMPop].
//
// Note:
// - When in cluster mode, all keys must map to the same hash slot.
// - BZMPop is a client blocking command, see [Blocking Commands] for more details and best practices.
//
// Since:
//
// Valkey 7.0 and above.
//
// See [valkey.io] for details.
//
// Parameters:
//
// keys - An array of keys to lists.
// scoreFilter - The element pop criteria - either MIN or MAX to pop members with the lowest/highest scores accordingly.
// count - The maximum number of popped elements.
// timeoutSecs - The number of seconds to wait for a blocking operation to complete. A value of 0 will block indefinitely.
//
// Return value:
//
// An object containing the following elements:
// - The key name of the set from which the element was popped
// - An array of member scores of the popped elements in ascending order.
// Returns nil if no member could be popped and the timeout expired.
//
// For example:
//
// result, err := client.ZAdd("my_list", map[string]float64{"five": 5.0, "six": 6.0})
// result, err := client.BZMPopCount([]string{"my_list"}, api.MAX, 2, 0.1)
// result["my_list"] = []MemberAndScore{{Member: "six", Score: 6.0}, {Member: "five", Score 5.0}}
//
// [valkey.io]: https://valkey.io/commands/lmpop/
func (client *baseClient) BZMPopCount(
keys []string,
scoreFilter ScoreFilter,
count int64,
timeoutSecs float64,
) (Result[KeyWithArrayOfMembersAndScores], error) {
scoreFilterStr, err := scoreFilter.toString()
if err != nil {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), err
}

// Check for potential length overflow.
if len(keys) > math.MaxInt-5 {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), &errors.RequestError{
Msg: "Length overflow for the provided keys",
}
}

// args slice will have 5 more arguments with the keys provided.
args := make([]string, 0, len(keys)+5)
args = append(args, utils.FloatToString(timeoutSecs), strconv.Itoa(len(keys)))
args = append(args, keys...)
args = append(args, scoreFilterStr, CountKeyword, utils.IntToString(count))
result, err := client.executeCommand(C.BZMPop, args)
if err != nil {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), err
}

return handleKeyWithArrayOfMembersAndScoresResponse(result)
}

// Sets the list element at index to element.
// The index is zero-based, so 0 means the first element,1 the second element and so on. Negative indices can be used to
// designate elements starting at the tail of the list. Here, -1 means the last element, -2 means the penultimate and so
Expand Down
19 changes: 19 additions & 0 deletions go/api/command_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,25 @@ func (listDirection ListDirection) toString() (string, error) {
}
}

// Enumeration representing which elements to pop from the sorted set.
type ScoreFilter string

const (
MAX ScoreFilter = "MAX"
MIN ScoreFilter = "MIN"
)

func (scoreFilter ScoreFilter) toString() (string, error) {
switch scoreFilter {
case MAX:
return string(MAX), nil
case MIN:
return string(MIN), nil
default:
return "", &errors.RequestError{Msg: "Invalid score filter"}
}
}

// Optional arguments to Restore(key string, ttl int64, value string, option *RestoreOptions)
//
// Note IDLETIME and FREQ modifiers cannot be set at the same time.
Expand Down
52 changes: 52 additions & 0 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import "C"
import (
"fmt"
"reflect"
"sort"
"strconv"
"unsafe"

Expand Down Expand Up @@ -536,6 +537,57 @@ func handleKeyWithMemberAndScoreResponse(response *C.struct_CommandResponse) (Re
return CreateKeyWithMemberAndScoreResult(KeyWithMemberAndScore{key, member, score}), nil
}

func handleKeyWithArrayOfMembersAndScoresResponse(
response *C.struct_CommandResponse,
) (Result[KeyWithArrayOfMembersAndScores], error) {
defer C.free_command_response(response)

if response == nil || response.response_type == uint32(C.Null) {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), nil
}

typeErr := checkResponseType(response, C.Array, true)
if typeErr != nil {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), typeErr
}

slice, err := parseArray(response)
if err != nil {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), err
}

arr := slice.([]interface{})
key := arr[0].(string)
converted, err := mapConverter[float64]{
nil,
false,
}.convert(arr[1])
if err != nil {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), err
}
res, ok := converted.(map[string]float64)

if !ok {
return CreateNilKeyWithArrayOfMembersAndScoresResult(), &errors.RequestError{
Msg: fmt.Sprintf("unexpected type of second element: %T", converted),
}
}
memberAndScoreArray := make([]MemberAndScore, len(res))

idx := 0
for k, v := range res {
memberAndScoreArray[idx] = MemberAndScore{k, v}
idx++
}

// Ensure consistent output
sort.Slice(memberAndScoreArray, func(i, j int) bool {
return memberAndScoreArray[i].Score < memberAndScoreArray[j].Score
})

return CreateKeyWithArrayOfMembersAndScoresResult(KeyWithArrayOfMembersAndScores{key, memberAndScoreArray}), nil
}

func handleScanResponse(response *C.struct_CommandResponse) (string, []string, error) {
defer C.free_command_response(response)

Expand Down
21 changes: 21 additions & 0 deletions go/api/response_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ type KeyWithMemberAndScore struct {
Score float64
}

// Response of [BZMPop] command.
type KeyWithArrayOfMembersAndScores struct {
Key string
MembersAndScores []MemberAndScore
}

type MemberAndScore struct {
Member string
Score float64
}

// Response type of [XAutoClaim] command.
type XAutoClaimResponse struct {
NextEntry string
Expand Down Expand Up @@ -77,6 +88,16 @@ func CreateNilKeyWithMemberAndScoreResult() Result[KeyWithMemberAndScore] {
return Result[KeyWithMemberAndScore]{val: KeyWithMemberAndScore{"", "", 0.0}, isNil: true}
}

func CreateKeyWithArrayOfMembersAndScoresResult(
kmsVals KeyWithArrayOfMembersAndScores,
) Result[KeyWithArrayOfMembersAndScores] {
return Result[KeyWithArrayOfMembersAndScores]{val: kmsVals, isNil: false}
}

func CreateNilKeyWithArrayOfMembersAndScoresResult() Result[KeyWithArrayOfMembersAndScores] {
return Result[KeyWithArrayOfMembersAndScores]{val: KeyWithArrayOfMembersAndScores{"", nil}, isNil: true}
}

// Enum to distinguish value types stored in `ClusterValue`
type ValueType int

Expand Down
9 changes: 9 additions & 0 deletions go/api/sorted_set_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ type SortedSetCommands interface {

BZPopMin(keys []string, timeoutSecs float64) (Result[KeyWithMemberAndScore], error)

BZMPop(keys []string, scoreFilter ScoreFilter, timeoutSecs float64) (Result[KeyWithArrayOfMembersAndScores], error)

BZMPopCount(
keys []string,
scoreFilter ScoreFilter,
count int64,
timeoutSecs float64,
) (Result[KeyWithArrayOfMembersAndScores], error)

ZRange(key string, rangeQuery options.ZRangeQuery) ([]string, error)

ZRangeWithScores(key string, rangeQuery options.ZRangeQueryWithScores) (map[string]float64, error)
Expand Down
85 changes: 85 additions & 0 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2886,6 +2886,91 @@ func (suite *GlideTestSuite) TestBLMPopAndBLMPopCount() {
})
}

func (suite *GlideTestSuite) TestBZMPopAndBZMPopCount() {
if suite.serverVersion < "7.0.0" {
suite.T().Skip("This feature is added in version 7")
}
suite.runWithDefaultClients(func(client api.BaseClient) {
key1 := "{key}-1" + uuid.NewString()
key2 := "{key}-2" + uuid.NewString()
key3 := "{key}-3" + uuid.NewString()

res1, err := client.BZMPop([]string{key1}, api.MIN, float64(0.1))
assert.Nil(suite.T(), err)
assert.True(suite.T(), res1.IsNil())

membersScoreMap := map[string]float64{
"one": 1.0,
"two": 2.0,
"three": 3.0,
}

res3, err := client.ZAdd(key1, membersScoreMap)
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), int64(3), res3)
res4, err := client.ZAdd(key2, membersScoreMap)
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), int64(3), res4)

// Try to pop the top 2 elements from key1
res5, err := client.BZMPopCount([]string{key1}, api.MAX, int64(2), float64(0.1))
assert.Nil(suite.T(), err)
assert.Equal(
suite.T(),
api.CreateKeyWithArrayOfMembersAndScoresResult(
api.KeyWithArrayOfMembersAndScores{
Key: key1,
MembersAndScores: []api.MemberAndScore{
{Member: "two", Score: 2.0},
{Member: "three", Score: 3.0},
},
},
),
res5,
)

// Try to pop the minimum value from key2
res6, err := client.BZMPop([]string{key2}, api.MIN, float64(0.1))
assert.Nil(suite.T(), err)
assert.Equal(
suite.T(),
api.CreateKeyWithArrayOfMembersAndScoresResult(
api.KeyWithArrayOfMembersAndScores{
Key: key2,
MembersAndScores: []api.MemberAndScore{
{Member: "one", Score: 1.0},
},
},
),
res6,
)

// Pop the minimum value from multiple keys
res7, err := client.BZMPop([]string{key1, key2}, api.MIN, float64(0.1))
assert.Nil(suite.T(), err)
assert.Equal(
suite.T(),
api.CreateKeyWithArrayOfMembersAndScoresResult(
api.KeyWithArrayOfMembersAndScores{
Key: key1,
MembersAndScores: []api.MemberAndScore{
{Member: "one", Score: 1.0},
},
},
),
res7,
)

suite.verifyOK(client.Set(key3, "value"))

// Popping a non-existent value in key3
res8, err := client.BZMPop([]string{key3}, api.MIN, float64(0.1))
assert.True(suite.T(), res8.IsNil())
assert.NotNil(suite.T(), err)
assert.IsType(suite.T(), &errors.RequestError{}, err)
})
}

func (suite *GlideTestSuite) TestLSet() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.NewString()
Expand Down

0 comments on commit e548f45

Please sign in to comment.