Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Add Event Subscriber Config #2054

Merged
merged 3 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions cmd/bootstrap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
httpserver "github.com/absmach/magistrala/internal/server/http"
mglog "github.com/absmach/magistrala/logger"
"github.com/absmach/magistrala/pkg/auth"
"github.com/absmach/magistrala/pkg/events"
"github.com/absmach/magistrala/pkg/events/store"
mgsdk "github.com/absmach/magistrala/pkg/sdk/go"
"github.com/absmach/magistrala/pkg/uuid"
Expand All @@ -45,7 +46,7 @@ const (
defDB = "bootstrap"
defSvcHTTPPort = "9013"

thingsStream = "magistrala.things"
thingsStream = "events.magistrala.things"
streamID = "magistrala.bootstrap"
)

Expand Down Expand Up @@ -142,6 +143,8 @@ func main() {
return
}

logger.Info("Subscribed to Event Store")

httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
Expand Down Expand Up @@ -197,14 +200,15 @@ func newService(ctx context.Context, authClient magistrala.AuthServiceClient, db
}

func subscribeToThingsES(ctx context.Context, svc bootstrap.Service, cfg config, logger *slog.Logger) error {
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger)
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger)
if err != nil {
return err
}

handler := consumer.NewEventHandler(svc)

logger.Info("Subscribed to Redis Event Store")

return subscriber.Subscribe(ctx, handler)
subConfig := events.SubscriberConfig{
Stream: thingsStream,
Consumer: cfg.ESConsumerName,
Handler: consumer.NewEventHandler(svc),
}
return subscriber.Subscribe(ctx, subConfig)
}
22 changes: 13 additions & 9 deletions cmd/lora/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
mglog "github.com/absmach/magistrala/logger"
"github.com/absmach/magistrala/lora"
"github.com/absmach/magistrala/lora/api"
"github.com/absmach/magistrala/lora/events"
loraevents "github.com/absmach/magistrala/lora/events"
"github.com/absmach/magistrala/lora/mqtt"
"github.com/absmach/magistrala/pkg/events"
"github.com/absmach/magistrala/pkg/events/store"
"github.com/absmach/magistrala/pkg/messaging"
"github.com/absmach/magistrala/pkg/messaging/brokers"
Expand All @@ -44,7 +45,7 @@ const (
thingsRMPrefix = "thing"
channelsRMPrefix = "channel"
connsRMPrefix = "connection"
thingsStream = "magistrala.things"
thingsStream = "events.magistrala.things"
)

type config struct {
Expand Down Expand Up @@ -147,6 +148,8 @@ func main() {
return
}

logger.Info("Subscribed to Event Store")

hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(cfg.InstanceID), logger)

if cfg.SendTelemetry {
Expand Down Expand Up @@ -198,21 +201,22 @@ func subscribeToLoRaBroker(svc lora.Service, mc mqttpaho.Client, timeout time.Du
}

func subscribeToThingsES(ctx context.Context, svc lora.Service, cfg config, logger *slog.Logger) error {
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger)
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger)
if err != nil {
return err
}

handler := events.NewEventHandler(svc)

logger.Info("Subscribed to Redis Event Store")

return subscriber.Subscribe(ctx, handler)
subConfig := events.SubscriberConfig{
Stream: thingsStream,
Consumer: cfg.ESConsumerName,
Handler: loraevents.NewEventHandler(svc),
}
return subscriber.Subscribe(ctx, subConfig)
}

func newRouteMapRepository(client *redis.Client, prefix string, logger *slog.Logger) lora.RouteMapRepository {
logger.Info(fmt.Sprintf("Connected to %s Redis Route-map", prefix))
return events.NewRouteMapRepository(client, prefix)
return loraevents.NewRouteMapRepository(client, prefix)
}

func newService(pub messaging.Publisher, rmConn *redis.Client, thingsRMPrefix, channelsRMPrefix, connsRMPrefix string, logger *slog.Logger) lora.Service {
Expand Down
22 changes: 13 additions & 9 deletions cmd/opcua/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
"github.com/absmach/magistrala/opcua"
"github.com/absmach/magistrala/opcua/api"
"github.com/absmach/magistrala/opcua/db"
"github.com/absmach/magistrala/opcua/events"
opcuaevents "github.com/absmach/magistrala/opcua/events"
"github.com/absmach/magistrala/opcua/gopcua"
"github.com/absmach/magistrala/pkg/events"
"github.com/absmach/magistrala/pkg/events/store"
"github.com/absmach/magistrala/pkg/messaging/brokers"
brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing"
Expand All @@ -43,7 +44,7 @@ const (
channelsRMPrefix = "channel"
connectionRMPrefix = "connection"

thingsStream = "magistrala.things"
thingsStream = "events.magistrala.things"
)

type config struct {
Expand Down Expand Up @@ -142,6 +143,8 @@ func main() {
return
}

logger.Info("Subscribed to Event Store")

hs := httpserver.New(ctx, httpCancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, cfg.InstanceID), logger)

if cfg.SendTelemetry {
Expand Down Expand Up @@ -181,21 +184,22 @@ func subscribeToStoredSubs(ctx context.Context, sub opcua.Subscriber, cfg opcua.
}

func subscribeToThingsES(ctx context.Context, svc opcua.Service, cfg config, logger *slog.Logger) error {
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger)
subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger)
if err != nil {
return err
}

handler := events.NewEventHandler(svc)

logger.Info("Subscribed to Redis Event Store")

return subscriber.Subscribe(ctx, handler)
subConfig := events.SubscriberConfig{
Stream: thingsStream,
Consumer: cfg.ESConsumerName,
Handler: opcuaevents.NewEventHandler(svc),
}
return subscriber.Subscribe(ctx, subConfig)
}

func newRouteMapRepositoy(client *redis.Client, prefix string, logger *slog.Logger) opcua.RouteMapRepository {
logger.Info(fmt.Sprintf("Connected to %s Redis Route-map", prefix))
return events.NewRouteMapRepository(client, prefix)
return opcuaevents.NewRouteMapRepository(client, prefix)
}

func newService(sub opcua.Subscriber, browser opcua.Browser, thingRM, chanRM, connRM opcua.RouteMapRepository, opcuaConfig opcua.Config, logger *slog.Logger) opcua.Service {
Expand Down
9 changes: 8 additions & 1 deletion pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,19 @@ type EventHandler interface {
Handle(ctx context.Context, event Event) error
}

// SubscriberConfig represents event subscriber configuration.
type SubscriberConfig struct {
Consumer string
Stream string
Handler EventHandler
}

// Subscriber specifies event subscription API.
//
//go:generate mockery --name Subscriber --output=./mocks --filename subscriber.go --quiet --note "Copyright (c) Abstract Machines"
type Subscriber interface {
// Subscribe subscribes to the event stream and consumes events.
Subscribe(ctx context.Context, handler EventHandler) error
Subscribe(ctx context.Context, cfg SubscriberConfig) error

// Close gracefully closes event subscriber's connection.
Close() error
Expand Down
10 changes: 5 additions & 5 deletions pkg/events/mocks/subscriber.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading