Skip to content

Commit

Permalink
Update redis library version
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffrey Koehler committed Nov 30, 2023
1 parent 6b159ef commit c5797ea
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 299 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/testAndTag.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
tests:
env:
GOPRIVATE: github.com/streemtech
runs-on: ubuntu-latest
runs-on: self-hosted
# outputs:
# newTag: ${{ steps.bump_tag.outputs.new_tag }}
# tagMajor: ${{ steps.semver_parser.outputs.major }}
Expand Down
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/streemtech/divider"

"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
rd "github.com/streemtech/divider/redis"
)

Expand Down
4 changes: 2 additions & 2 deletions cmd/resize/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"sync"
"time"

"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"github.com/streemtech/divider"
"github.com/streemtech/divider/redisconsistent"
)
Expand All @@ -15,7 +15,7 @@ func main() {

r := redis.NewUniversalClient(&redis.UniversalOptions{
Password: "password",
Addrs: []string{"192.168.100.153:6379"},
Addrs: []string{"192.168.35.1:6379"},
})
l := divider.DefaultLogger{}

Expand Down
12 changes: 5 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
module github.com/streemtech/divider

go 1.18
go 1.20

require (
github.com/go-redis/cache/v8 v8.4.3
github.com/go-redis/redis/v8 v8.11.5
github.com/go-redis/cache/v9 v9.0.0
github.com/google/uuid v1.3.0
github.com/prometheus/client_golang v1.13.0
github.com/redis/go-redis/v9 v9.3.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/klauspost/compress v1.15.11 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/stretchr/testify v1.8.0 // indirect
github.com/vmihailenco/go-tinylfu v0.2.2 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
golang.org/x/exp v0.0.0-20221012211006-4de253d81b95 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.1.0 // indirect
golang.org/x/sys v0.4.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)
320 changes: 79 additions & 241 deletions go.sum

Large diffs are not rendered by default.

76 changes: 38 additions & 38 deletions redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
"github.com/google/uuid"
"github.com/streemtech/divider"

cache "github.com/go-redis/cache/v8"
redis "github.com/go-redis/redis/v8"
cache "github.com/go-redis/cache/v9"
redis "github.com/redis/go-redis/v9"
)

//Divider is a redis backed implementation of divider.Divider. The
// Divider is a redis backed implementation of divider.Divider. The
type Divider struct {
redis redis.UniversalClient
redisCache *cache.Cache
Expand Down Expand Up @@ -57,11 +57,11 @@ type Divider struct {

var _ divider.Divider = (*Divider)(nil)

//NewDivider returns a Divider using the Go-Redis library as a backend.
//The keys beginning in <masterKey>:__meta are used to keep track of the different
//metainformation to divide the work.
// NewDivider returns a Divider using the Go-Redis library as a backend.
// The keys beginning in <masterKey>:__meta are used to keep track of the different
// metainformation to divide the work.
//
//:* is appended automatically to the work when using default work fetcher.!!
// :* is appended automatically to the work when using default work fetcher.!!
func NewDivider(r redis.UniversalClient, masterKey, name string, informer divider.Informer, timeout, masterTimeout int) *Divider {
var i divider.Informer
if informer == nil {
Expand Down Expand Up @@ -119,7 +119,7 @@ func NewDivider(r redis.UniversalClient, masterKey, name string, informer divide
return d
}

//SetWorkFetcher tells the deivider how to look up the list of work that needs to be done.
// SetWorkFetcher tells the deivider how to look up the list of work that needs to be done.
func (r *Divider) SetWorkFetcher(f divider.WorkFetcher) error {
if f == nil {
return fmt.Errorf("work divider can not be nill")
Expand All @@ -128,65 +128,65 @@ func (r *Divider) SetWorkFetcher(f divider.WorkFetcher) error {
return nil
}

//Start is the trigger to make the divider begin checking for keys, and returning those keys to the channels.
//No values should return to the channels without start being called.
// Start is the trigger to make the divider begin checking for keys, and returning those keys to the channels.
// No values should return to the channels without start being called.
func (r *Divider) Start() {
r.mux.Lock()
//make so I cant start multiple times.
r.start()
r.mux.Unlock()
}

//Stop begins the process of stopping processing of all assigned keys.
//Releasing these keys via stop allows them to immediately be picked up by other nodes.
//Start must be called to begin picking up work keys again.
// Stop begins the process of stopping processing of all assigned keys.
// Releasing these keys via stop allows them to immediately be picked up by other nodes.
// Start must be called to begin picking up work keys again.
func (r *Divider) Stop() {
r.mux.Lock()
r.stop()
r.mux.Unlock()
}

//Close shuts down, closes and cleans up the process.
//If called before flushed, processing keys will be timed out instead of released.
// Close shuts down, closes and cleans up the process.
// If called before flushed, processing keys will be timed out instead of released.
func (r *Divider) Close() {
r.mux.Lock()
r.close()
r.mux.Unlock()
}

//GetAssignedProcessingArray returns a string array that represents the keys that this node is set to process.
// GetAssignedProcessingArray returns a string array that represents the keys that this node is set to process.
func (r *Divider) GetAssignedProcessingArray() []string {
r.mux.Lock()
v := r.getAssignedProcessingArray()
r.mux.Unlock()
return v
}

//GetReceiveStartProcessingChan returns a channel of strings.
//The string from this channel represents a key to a processable entity.
//This particular channel is for receiving keys that this node is to begin processing.
// GetReceiveStartProcessingChan returns a channel of strings.
// The string from this channel represents a key to a processable entity.
// This particular channel is for receiving keys that this node is to begin processing.
func (r *Divider) GetReceiveStartProcessingChan() <-chan string {
return r.startChan
}

//GetReceiveStopProcessingChan returns a channel of strings.
//The string from this channel represents a key to a processable entity.
//This particular channel is for receiving keys that this node is to stop processing.
// GetReceiveStopProcessingChan returns a channel of strings.
// The string from this channel represents a key to a processable entity.
// This particular channel is for receiving keys that this node is to stop processing.
func (r *Divider) GetReceiveStopProcessingChan() <-chan string {
return r.stopChan
}

//ConfirmStopProcessing takes in a string of a key that this node is no longer processing.
//This is to be used to confirm that the processing has stopped for a key gotten from the GetReceiveStopProcessingChan channel.
//To manually release processing of a key, use SendStopProcessing instead.
//ConfirmStopProcessing is expected to be required for the proper implementation of Flush()
// ConfirmStopProcessing takes in a string of a key that this node is no longer processing.
// This is to be used to confirm that the processing has stopped for a key gotten from the GetReceiveStopProcessingChan channel.
// To manually release processing of a key, use SendStopProcessing instead.
// ConfirmStopProcessing is expected to be required for the proper implementation of Flush()
func (r *Divider) StartProcessing(key string) error {
return fmt.Errorf("Unimplemented")
}

//SendStopProcessing takes in a string of a key that this node is no longer processing.
//This is to be used to release the processing to another node.
//To confirm that the processing stoppage is completed, use ConfirmStopProcessing instead.
// SendStopProcessing takes in a string of a key that this node is no longer processing.
// This is to be used to release the processing to another node.
// To confirm that the processing stoppage is completed, use ConfirmStopProcessing instead.
func (r *Divider) StopProcessing(key string) error {
return fmt.Errorf("Unimplemented")
}
Expand Down Expand Up @@ -262,7 +262,7 @@ func (r *Divider) watch() {
go r.watchForUpdates()
}

//watchForKeys is a function looped to constantly look for new keys that need results output to them.
// watchForKeys is a function looped to constantly look for new keys that need results output to them.
func (r *Divider) watchForKeys() {

for {
Expand All @@ -276,7 +276,7 @@ func (r *Divider) watchForKeys() {
}
}

//watchForUpdates is a function looped to constantly check the system and make sure that the work is divided right.
// watchForUpdates is a function looped to constantly check the system and make sure that the work is divided right.
func (r *Divider) watchForUpdates() {

for {
Expand Down Expand Up @@ -308,7 +308,7 @@ func (r *Divider) getAssignedProcessingArray() []string {
return assigned
}

//updatePing is used to consistently tell the system that the worker is online, and listening to work.
// updatePing is used to consistently tell the system that the worker is online, and listening to work.
func (r *Divider) updatePing() {
defer func() {
if rec := recover(); rec != nil {
Expand All @@ -331,7 +331,7 @@ func (r *Divider) updatePing() {

}

//updates all assignments if master.
// updates all assignments if master.
func (r *Divider) updateAssignments() {

defer func() {
Expand Down Expand Up @@ -365,8 +365,8 @@ func (r *Divider) updateAssignments() {

}

//updateData does the work to keep track of the work distribution.
//This work should only be done by the master.
// updateData does the work to keep track of the work distribution.
// This work should only be done by the master.
func (r *Divider) updateData() error {
data, err := r.getInfo()
data.divider = r
Expand Down Expand Up @@ -498,13 +498,13 @@ func (r *Divider) setInfo(data *DividerData) error {

}

//NodeCountChange is a simple structure to allow for sorting by what node needs to add what data.
// NodeCountChange is a simple structure to allow for sorting by what node needs to add what data.
type NodeCountChange struct {
Node string
ChangeCount int
}

//NodeCountChangesSort is used to implement a sorter on NodeChange.
// NodeCountChangesSort is used to implement a sorter on NodeChange.
type NodeCountChangesSort []NodeCountChange

func (a NodeCountChangesSort) Len() int { return len(a) }
Expand All @@ -519,7 +519,7 @@ func (a NodeCountChangesSort) Less(i, j int) bool {
return strings.Compare(a[i].Node, a[j].Node) > 0
}

//DividerData is a backup of the dividerData
// DividerData is a backup of the dividerData
type DividerData struct {
//Worker is map[workId]workerNode so that you can look up the worker based on the work
Worker map[string]string
Expand Down
16 changes: 8 additions & 8 deletions redisconsistent/multiworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"math"
"time"

redis "github.com/go-redis/redis/v8"
redis "github.com/redis/go-redis/v9"
)

type Worker interface {
Expand Down Expand Up @@ -76,7 +76,7 @@ func (r *redisWorkerImpl) AddWork(ctx context.Context, value string) error {
// if r.l != nil {
// r.l.Debugf("Adding work %s", value)
// }
return r.r.ZAdd(ctx, r.getDataKey(), &redis.Z{
return r.r.ZAdd(ctx, r.getDataKey(), redis.Z{
Score: float64(r.CalculateKey(value)),
Member: value,
}).Err()
Expand Down Expand Up @@ -244,16 +244,16 @@ func (r *redisWorkerImpl) GetAllWork(ctx context.Context) (workList []string, er
// this allows GetNextWorker to skip anything that has been timed out.
func (r *redisWorkerImpl) UpdateWorkers(ctx context.Context, Key []string) error {

nodeDat := make([]*redis.Z, len(Key))
timeoutDat := make([]*redis.Z, len(Key))
nodeDat := make([]redis.Z, len(Key))
timeoutDat := make([]redis.Z, len(Key))
for i, v := range Key {
ks := r.CalculateKey(v)
k := float64(ks)
nodeDat[i] = &redis.Z{
nodeDat[i] = redis.Z{
Score: k,
Member: ks,
}
timeoutDat[i] = &redis.Z{
timeoutDat[i] = redis.Z{
Score: float64(time.Now().UnixNano()),
Member: ks,
}
Expand All @@ -279,9 +279,9 @@ func (r *redisWorkerImpl) AddWorks(ctx context.Context, value []string) error {
if len(value) <= 0 {
return nil
}
dat := make([]*redis.Z, len(value))
dat := make([]redis.Z, len(value))
for i, v := range value {
dat[i] = &redis.Z{
dat[i] = redis.Z{
Member: v,
Score: float64(r.CalculateKey(v)),
}
Expand Down
2 changes: 1 addition & 1 deletion redisconsistent/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/streemtech/divider"
"github.com/streemtech/divider/internal/ticker"

redis "github.com/go-redis/redis/v8"
redis "github.com/redis/go-redis/v9"
)

var _ divider.Divider = (*Divider)(nil)
Expand Down

0 comments on commit c5797ea

Please sign in to comment.