Skip to content

Commit

Permalink
Merge pull request #11 from go-admin-team/dev
Browse files Browse the repository at this point in the history
feat ✨: 全面支持runtime cache
  • Loading branch information
mss-boot authored Mar 25, 2021
2 parents 7c164d7 + e173e1c commit f63f944
Show file tree
Hide file tree
Showing 15 changed files with 122 additions and 203 deletions.
28 changes: 24 additions & 4 deletions cache/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type Memory struct {
PoolNum uint
}

func (*Memory) String() string {
return "memory"
}

func (r *Memory) SetPrefix(string) {}

func (m *Memory) Connect() error {
Expand Down Expand Up @@ -156,25 +160,25 @@ func (m *Memory) Expire(key string, dur time.Duration) error {
return m.setItem(key, item)
}

func (m *Memory) Append(name string, message Message) error {
func (m *Memory) Append(message Message) error {
m.mutex.RLock()
defer m.mutex.RUnlock()
memoryMessage := new(MemoryMessage)
memoryMessage.SetID(message.GetID())
memoryMessage.SetStream(message.GetStream())
memoryMessage.SetValues(message.GetValues())
v, ok := m.queue.Load(name)
v, ok := m.queue.Load(message.GetStream())
if !ok {
v = m.makeQueue()
m.queue.Store(name, v)
m.queue.Store(message.GetStream(), v)
}
var q queue
switch v.(type) {
case queue:
q = v.(queue)
default:
q = m.makeQueue()
m.queue.Store(name, q)
m.queue.Store(message.GetStream(), q)
}
go func(gm Message, gq queue) {
gm.SetID(uuid.New().String())
Expand Down Expand Up @@ -252,3 +256,19 @@ func (m *MemoryMessage) SetStream(stream string) {
func (m *MemoryMessage) SetValues(values map[string]interface{}) {
m.Values = values
}

func (m *MemoryMessage) GetPrefix() (prefix string) {
if m.Values == nil {
return
}
v, _ := m.Values[prefixKey]
prefix, _ = v.(string)
return
}

func (m *MemoryMessage) SetPrefix(prefix string) {
if m.Values == nil {
m.Values = make(map[string]interface{})
}
m.Values[prefixKey] = prefix
}
4 changes: 2 additions & 2 deletions cache/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestMemory_Append(t *testing.T) {
t.Errorf("Connect() error = %v", err)
return
}
if err := m.Append(tt.args.name, tt.args.message); (err != nil) != tt.wantErr {
if err := m.Append(tt.args.message); (err != nil) != tt.wantErr {
t.Errorf("Append() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestMemory_Register(t *testing.T) {
return
}
m.Register(tt.name, tt.args.f)
if err := m.Append(tt.name, &MemoryMessage{redisqueue.Message{
if err := m.Append(&MemoryMessage{redisqueue.Message{
ID: "",
Stream: "test",
Values: map[string]interface{}{
Expand Down
24 changes: 22 additions & 2 deletions cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ type Redis struct {
mutex *redislock.Client
}

func (*Redis) String() string {
return "redis"
}

// Connect Setup
func (r *Redis) Connect() error {
var err error
Expand Down Expand Up @@ -77,10 +81,10 @@ func (r *Redis) Expire(key string, dur time.Duration) error {
return r.client.Expire(key, dur).Err()
}

func (r *Redis) Append(name string, message Message) error {
func (r *Redis) Append(message Message) error {
err := r.producer.Enqueue(&redisqueue.Message{
ID: message.GetID(),
Stream: name,
Stream: message.GetStream(),
Values: message.GetValues(),
})
return err
Expand Down Expand Up @@ -159,3 +163,19 @@ func (m *RedisMessage) SetStream(stream string) {
func (m *RedisMessage) SetValues(values map[string]interface{}) {
m.Values = values
}

func (m *RedisMessage) GetPrefix() (prefix string) {
if m.Values == nil {
return
}
v, _ := m.Values[prefixKey]
prefix, _ = v.(string)
return
}

func (m *RedisMessage) SetPrefix(prefix string) {
if m.Values == nil {
m.Values = make(map[string]interface{})
}
m.Values[prefixKey] = prefix
}
4 changes: 2 additions & 2 deletions cache/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestRedis_Append(t *testing.T) {
},
ProducerOptions: &redisqueue.ProducerOptions{
StreamMaxLength: 100,
ApproximateMaxLength: true,
ApproximateMaxLength: false,
},
},
args{
Expand All @@ -68,7 +68,7 @@ func TestRedis_Append(t *testing.T) {
if err := r.Connect(); err != nil {
t.Errorf("SetQueue() error = %v", err)
}
if err := r.Append(tt.args.name, tt.args.message); (err != nil) != tt.wantErr {
if err := r.Append(tt.args.message); (err != nil) != tt.wantErr {
t.Errorf("SetQueue() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
9 changes: 8 additions & 1 deletion cache/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ import (
"github.com/bsm/redislock"
)

const (
prefixKey = "__host"
)

type Adapter interface {
String() string
SetPrefix(string)
Connect() error
Get(key string) (string, error)
Expand All @@ -22,7 +27,7 @@ type Adapter interface {
}

type AdapterQueue interface {
Append(name string, message Message) error
Append(message Message) error
Register(name string, f ConsumerFunc)
Run()
Shutdown()
Expand All @@ -35,6 +40,8 @@ type Message interface {
GetID() string
GetStream() string
GetValues() map[string]interface{}
GetPrefix() string
SetPrefix(string)
}

type ConsumerFunc func(Message) error
Expand Down
16 changes: 12 additions & 4 deletions sdk/config/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"github.com/go-admin-team/go-admin-core/cache"
"github.com/go-redis/redis/v7"
Expand Down Expand Up @@ -49,7 +50,10 @@ type Tls struct {
// CacheConfig cache配置
var CacheConfig = new(Cache)

func (e Cache) Setup() cache.Adapter {
func (e Cache) Setup() (cache.Adapter, error) {
if e.Driver == "" {
e.Driver = MemoryCache
}
var c cache.Adapter
switch e.Driver {
case MemoryCache:
Expand Down Expand Up @@ -108,8 +112,12 @@ func (e Cache) Setup() cache.Adapter {
}
default:
//没有配置,跳过
return nil
return nil, errors.New("cache driver[] not support")
}
err := c.Connect()
if err != nil {
return nil, err
}
c.Connect()
return c
go c.Run()
return c, err
}
6 changes: 0 additions & 6 deletions sdk/pkg/cache/doc.go

This file was deleted.

15 changes: 0 additions & 15 deletions sdk/pkg/cache/memory.go

This file was deleted.

46 changes: 0 additions & 46 deletions sdk/pkg/cache/memory_test.go

This file was deleted.

32 changes: 0 additions & 32 deletions sdk/pkg/cache/redis.go

This file was deleted.

46 changes: 0 additions & 46 deletions sdk/pkg/cache/redis_test.go

This file was deleted.

29 changes: 0 additions & 29 deletions sdk/pkg/cache/type.go

This file was deleted.

Loading

0 comments on commit f63f944

Please sign in to comment.