Skip to content

Commit

Permalink
revert: redis to []chan event.Event for subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
sreeram77 committed Apr 9, 2023
1 parent f051d45 commit 275a954
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 17 deletions.
5 changes: 4 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/sreeram77/pubsub/manager"
"github.com/sreeram77/pubsub/publisher"
"github.com/sreeram77/pubsub/storage"
"github.com/sreeram77/pubsub/subscriber"
"google.golang.org/grpc"
)
Expand All @@ -23,7 +24,9 @@ func main() {
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)

eventManager := manager.New()
cache := storage.New()

eventManager := manager.New(cache)

publisherServer := publisher.NewServer(eventManager)
subscriberServer := subscriber.NewServer(eventManager)
Expand Down
2 changes: 1 addition & 1 deletion manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (em *eventManager) Broadcast(e event.Event) error {
for _, conn := range conns {
go func(c chan event.Event) {
c <- e
}(conn.(chan event.Event))
}(conn)
}

return nil
Expand Down
6 changes: 4 additions & 2 deletions storage/interface.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package storage

import "github.com/sreeram77/pubsub/event"

type Cache interface {
Get(string) []any
Set(string, any)
Get(string) []chan event.Event
Set(string, chan event.Event)
}
32 changes: 19 additions & 13 deletions storage/redis.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,35 @@
package storage

import (
"github.com/redis/go-redis/v9"
)
import "github.com/sreeram77/pubsub/event"

type subscribers []chan event.Event

type cache struct {
client *redis.Client
connections map[string]subscribers
}

func New() Cache {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
connections := make(map[string]subscribers)

return &cache{
client: rdb,
connections: connections,
}
}

func (c *cache) Get(key string) []any {
return []any{}
func (c *cache) Get(key string) []chan event.Event {
if value, found := c.connections[key]; found {
return value
}

return nil
}

func (c *cache) Set(key string, value any) {
func (c *cache) Set(key string, value chan event.Event) {
_, found := c.connections[key]
if found {
c.connections[key] = append(c.connections[key], value)
return
}

c.connections[key] = []chan event.Event{value}
}

0 comments on commit 275a954

Please sign in to comment.