Skip to content

Commit

Permalink
Implement Echo Cluster
Browse files Browse the repository at this point in the history
Signed-off-by: EdricCua <[email protected]>
  • Loading branch information
EdricCua committed Jan 27, 2025
1 parent 8195945 commit bf6cb9d
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 46 deletions.
68 changes: 68 additions & 0 deletions go/api/command_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package api
import (
"strconv"

"github.com/valkey-io/valkey-glide/go/glide/api/config"
"github.com/valkey-io/valkey-glide/go/glide/api/errors"
"github.com/valkey-io/valkey-glide/go/glide/utils"
)
Expand Down Expand Up @@ -394,3 +395,70 @@ func (opts *CopyOptions) toArgs() ([]string, error) {
}
return args, err
}

// Optional arguments for `Info` for standalone client
type EchoOptions struct {
// A list of [Section] values specifying which sections of information to retrieve.
// When no parameter is provided, [Section.Default] is assumed.
// Starting with server version 7.0.0 `INFO` command supports multiple sections.
Sections []Section
}

type Section string

const (
// SERVER: General information about the server
Server Section = "server"
// CLIENTS: Client connections section
Clients Section = "clients"
// MEMORY: Memory consumption related information
Memory Section = "memory"
// PERSISTENCE: RDB and AOF related information
Persistence Section = "persistence"
// STATS: General statistics
Stats Section = "stats"
// REPLICATION: Master/replica replication information
Replication Section = "replication"
// CPU: CPU consumption statistics
Cpu Section = "cpu"
// COMMANDSTATS: Valkey command statistics
Commandstats Section = "commandstats"
// LATENCYSTATS: Valkey command latency percentile distribution statistics
Latencystats Section = "latencystats"
// SENTINEL: Valkey Sentinel section (only applicable to Sentinel instances)
Sentinel Section = "sentinel"
// CLUSTER: Valkey Cluster section
Cluster Section = "cluster"
// MODULES: Modules section
Modules Section = "modules"
// KEYSPACE: Database related statistics
Keyspace Section = "keyspace"
// ERRORSTATS: Valkey error statistics
Errorstats Section = "errorstats"
// ALL: Return all sections (excluding module generated ones)
All Section = "all"
// DEFAULT: Return only the default set of sections
Default Section = "default"
// EVERYTHING: Includes all and modules
Everything Section = "everything"
)

// Optional arguments for `Echo` for cluster client
type ClusterEchoOptions struct {
*EchoOptions
// Specifies the routing configuration for the command.
// The client will route the command to the nodes defined by `Route`.
// The command will be routed to all primary nodes, unless `Route` is provided.
Route *config.Route
}

func (opts *EchoOptions) toArgs() []string {
if opts == nil {
return []string{}
}
args := make([]string, 0, len(opts.Sections))
for _, section := range opts.Sections {
args = append(args, string(section))
}
return args
}
17 changes: 17 additions & 0 deletions go/api/config/request_routing_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ import (
// - [config.ByAddressRoute]
type Route interface {
ToRoutesProtobuf() (*protobuf.Routes, error)
IsMultiNode() bool
}

type notMultiNode struct{}

func (*notMultiNode) IsMultiNode() bool { return false }

type SimpleNodeRoute int

const (
Expand Down Expand Up @@ -47,6 +52,15 @@ func (simpleNodeRoute SimpleNodeRoute) ToRoutesProtobuf() (*protobuf.Routes, err
return request, nil
}

func (route SimpleNodeRoute) IsMultiNode() bool {
return route != RandomRoute
}

func (snr SimpleNodeRoute) ToPtr() *Route {
a := Route(snr)
return &a
}

func mapSimpleNodeRoute(simpleNodeRoute SimpleNodeRoute) (protobuf.SimpleRoutes, error) {
switch simpleNodeRoute {
case AllNodes:
Expand Down Expand Up @@ -86,6 +100,7 @@ func mapSlotType(slotType SlotType) (protobuf.SlotTypes, error) {
type SlotIdRoute struct {
slotType SlotType
slotID int32
notMultiNode
}

// - slotType: Defines type of the node being addressed.
Expand Down Expand Up @@ -117,6 +132,7 @@ func (slotIdRoute *SlotIdRoute) ToRoutesProtobuf() (*protobuf.Routes, error) {
type SlotKeyRoute struct {
slotType SlotType
slotKey string
notMultiNode
}

// - slotType: Defines type of the node being addressed.
Expand Down Expand Up @@ -146,6 +162,7 @@ func (slotKeyRoute *SlotKeyRoute) ToRoutesProtobuf() (*protobuf.Routes, error) {
type ByAddressRoute struct {
host string
port int32
notMultiNode
}

// Create a route using hostname/address and port.
Expand Down
6 changes: 3 additions & 3 deletions go/api/connection_management_cluster_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

package api

import "github.com/valkey-io/valkey-glide/go/glide/api/options"

// Supports commands and transactions for the "Connection Management" group of commands for cluster client.
//
// See [valkey.io] for details.
//
// [valkey.io]: https://valkey.io/commands/#connection
type ConnectionManagementClusterCommands interface {
EchoWithOptions(echoOptions *options.EchoOptions) (ClusterValue[interface{}], error)
Echo(message string) (Result[string], error)

EchoWithOptions(options ClusterEchoOptions) (ClusterValue[string], error)
}
91 changes: 68 additions & 23 deletions go/api/glide_cluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@ package api
// #include "../lib.h"
import "C"

import (
"github.com/valkey-io/valkey-glide/go/glide/api/options"
)

// GlideClusterClient interface compliance check.
var _ GlideClusterClientCommands = (*GlideClusterClient)(nil)

// GlideClusterClientCommands is a client used for connection in cluster mode.
type GlideClusterClientCommands interface {
BaseClient
GenericClusterCommands
ServerManagementClusterCommands
}

// GlideClusterClient implements cluster mode operations by extending baseClient functionality.
Expand Down Expand Up @@ -65,45 +62,93 @@ func NewGlideClusterClient(config *GlideClusterClientConfiguration) (GlideCluste
func (client *GlideClusterClient) CustomCommand(args []string) (ClusterValue[interface{}], error) {
res, err := client.executeCommand(C.CustomCommand, args)
if err != nil {
return CreateEmptyClusterValue(), err
return createEmptyClusterValue[interface{}](), err
}
data, err := handleInterfaceResponse(res)
if err != nil {
return CreateEmptyClusterValue(), err
return createEmptyClusterValue[interface{}](), err
}
return createClusterValue[interface{}](data), nil
}

// Gets information and statistics about the server.
//
// The command will be routed to all primary nodes.
//
// See [valkey.io] for details.
//
// Return value:
//
// A map where each address is the key and its corresponding node response is the information for the default sections.
//
// Example:
//
// response, err := clusterClient.Info(opts)
// if err != nil {
// // handle error
// }
// for node, data := range response {
// fmt.Printf("%s node returned %s\n", node, data)
// }
//
// [valkey.io]: https://valkey.io/commands/info/
func (client *GlideClusterClient) Info() (map[string]string, error) {
result, err := client.executeCommand(C.Info, []string{})
if err != nil {
return nil, err
}
return CreateClusterValue(data), nil

return handleStringToStringMapResponse(result)
}

// Echoes the provided message back.
// Echo the provided message back.
// The command will be routed a random node.
//
// Parameters:
//
// echoOptions - The EchoOptions type.
// message - The provided message.
//
// Return value:
//
// Returns "PONG" or the copy of message.
// A map where each address is the key and its corresponding node response is the information for the default sections.
//
// For example:
//
// route := config.SimpleNodeRoute(config.RandomRoute)
// options := options.NewEchoOptionsBuilder().SetRoute(route).SetMessage("Hello")
// result, err := client.EchoWithOptions(options)
// fmt.Println(result) // Output: "Hello"
// response, err := clusterClient.Echo(opts)
// if err != nil {
// // handle error
// }
// for node, data := range response {
// fmt.Printf("%s node returned %s\n", node, data)
// }
//
// [valkey.io]: https://valkey.io/commands/echo/
func (client *glideClusterClient) EchoWithOptions(opts *options.EchoOptions) (ClusterValue[interface{}], error) {
args, err := opts.ToArgs()
if err != nil {
return CreateEmptyClusterValue(), err
func (client *GlideClusterClient) EchoWithOptions(options ClusterEchoOptions) (ClusterValue[string], error) {
if options.Route == nil {
response, err := client.executeCommand(C.Echo, options.toArgs())
if err != nil {
return createEmptyClusterValue[string](), err
}
data, err := handleStringToStringMapResponse(response)
if err != nil {
return createEmptyClusterValue[string](), err
}
return createClusterMultiValue[string](data), nil
}
res, err := client.executeCommandWithRoute(C.Echo, args, opts.Route)
response, err := client.executeCommandWithRoute(C.Echo, options.toArgs(), *options.Route)
if err != nil {
return CreateEmptyClusterValue(), err
return createEmptyClusterValue[string](), err
}
data, err := handleInterfaceResponse(res)
if (*options.Route).IsMultiNode() {
data, err := handleStringToStringMapResponse(response)
if err != nil {
return createEmptyClusterValue[string](), err
}
return createClusterMultiValue[string](data), nil
}
data, err := handleStringResponse(response)
if err != nil {
return CreateEmptyClusterValue(), err
return createEmptyClusterValue[string](), err
}
return CreateClusterValue(data), nil
return createClusterSingleValue[string](data), nil
}
47 changes: 27 additions & 20 deletions go/api/response_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,33 +88,40 @@ const (
// Enum-like structure which stores either a single-node response or multi-node response.
// Multi-node response stored in a map, where keys are hostnames or "<ip>:<port>" strings.
//
// For example:
// Example:
//
// // Command failed:
// value, err := clusterClient.CustomCommand(args)
// value.IsEmpty(): true
// err != nil: true
//
// // Command returns response from multiple nodes:
// value, _ := clusterClient.info()
// node, nodeResponse := range value.Value().(map[string]interface{}) {
// response := nodeResponse.(string)
// value, _ := clusterClient.Info()
// for node, nodeResponse := range value.MultiValue() {
// response := nodeResponse
// // `node` stores cluster node IP/hostname, `response` stores the command output from that node
// }
//
// // Command returns a response from single node:
// value, _ := clusterClient.infoWithRoute(Random{})
// response := value.Value().(string)
// value, _ := clusterClient.InfoWithOptions(api.ClusterInfoOptions{InfoOptions: nil, Route: api.RandomRoute.ToPtr()})
// response := value.SingleValue()
// // `response` stores the command output from a cluster node
type ClusterValue[T any] struct {
valueType ValueType
value Result[T]
value Result[any]
}

func (value ClusterValue[T]) Value() T {
return value.value.Value()
// Get the single value stored (value returned by a single cluster node).
func (value ClusterValue[T]) SingleValue() T {
return value.value.Value().(T)
}

// Get the multi value stored (value returned by multiple cluster nodes).
func (value ClusterValue[T]) MultiValue() map[string]T {
return value.value.Value().(map[string]T)
}

// Get the value type
func (value ClusterValue[T]) ValueType() ValueType {
return value.valueType
}
Expand All @@ -131,33 +138,33 @@ func (value ClusterValue[T]) IsEmpty() bool {
return value.value.IsNil()
}

func CreateClusterValue[T any](data T) ClusterValue[T] {
func createClusterValue[T any](data any) ClusterValue[T] {
switch any(data).(type) {
case map[string]interface{}:
return CreateClusterMultiValue(data)
return createClusterMultiValue(data.(map[string]T))
default:
return CreateClusterSingleValue(data)
return createClusterSingleValue(data.(T))
}
}

func CreateClusterSingleValue[T any](data T) ClusterValue[T] {
func createClusterSingleValue[T any](data T) ClusterValue[T] {
return ClusterValue[T]{
valueType: SingleValue,
value: Result[T]{val: data, isNil: false},
value: Result[any]{val: data, isNil: false},
}
}

func CreateClusterMultiValue[T any](data T) ClusterValue[T] {
func createClusterMultiValue[T any](data map[string]T) ClusterValue[T] {
return ClusterValue[T]{
valueType: MultiValue,
value: Result[T]{val: data, isNil: false},
value: Result[any]{val: data, isNil: false},
}
}

func CreateEmptyClusterValue() ClusterValue[interface{}] {
var empty interface{}
return ClusterValue[interface{}]{
value: Result[interface{}]{val: empty, isNil: true},
func createEmptyClusterValue[T any]() ClusterValue[T] {
var empty T
return ClusterValue[T]{
value: Result[any]{val: empty, isNil: true},
}
}

Expand Down
Loading

0 comments on commit bf6cb9d

Please sign in to comment.