diff --git a/cmd/channels/main.go b/cmd/channels/main.go index 3157343cc5..2afe764680 100644 --- a/cmd/channels/main.go +++ b/cmd/channels/main.go @@ -25,11 +25,13 @@ import ( "github.com/absmach/supermq/channels/postgres" pChannels "github.com/absmach/supermq/channels/private" "github.com/absmach/supermq/channels/tracing" + gpostgres "github.com/absmach/supermq/groups/postgres" smqlog "github.com/absmach/supermq/logger" authsvcAuthn "github.com/absmach/supermq/pkg/authn/authsvc" smqauthz "github.com/absmach/supermq/pkg/authz" authsvcAuthz "github.com/absmach/supermq/pkg/authz/authsvc" domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient" + gconsumer "github.com/absmach/supermq/pkg/groups/events/consumer" "github.com/absmach/supermq/pkg/grpcclient" jaegerclient "github.com/absmach/supermq/pkg/jaeger" "github.com/absmach/supermq/pkg/policies" @@ -74,6 +76,7 @@ type config struct { JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"` SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"` ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"` + ESConsumerName string `env:"SMQ_CLIENTS_EVENT_CONSUMER" envDefault:"channels"` TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"` SpicedbHost string `env:"SMQ_SPICEDB_HOST" envDefault:"localhost"` SpicedbPort string `env:"SMQ_SPICEDB_PORT" envDefault:"50051"` @@ -224,6 +227,15 @@ func main() { return } + gdatabase := pg.NewDatabase(db, dbConfig, tracer) + grepo := gpostgres.New(gdatabase) + + if err := gconsumer.GroupsEventsSubscribe(ctx, grepo, cfg.ESURL, cfg.ESConsumerName, logger); err != nil { + logger.Error(fmt.Sprintf("failed to create groups event store : %s", err)) + exitCode = 1 + return + } + grpcServerConfig := server.Config{Port: defSvcGRPCPort} if err := env.ParseWithOptions(&grpcServerConfig, env.Options{Prefix: envPrefixGRPC}); err != nil { logger.Error(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err)) diff --git a/cmd/clients/main.go b/cmd/clients/main.go index 5164b1e388..de3b089f39 100644 --- a/cmd/clients/main.go +++ b/cmd/clients/main.go @@ -244,6 +244,15 @@ func main() { return } + gdatabase := pg.NewDatabase(db, dbConfig, tracer) + grepo := gpostgres.New(gdatabase) + + if err := gconsumer.GroupsEventsSubscribe(ctx, grepo, cfg.ESURL, cfg.ESConsumerName, logger); err != nil { + logger.Error(fmt.Sprintf("failed to create groups event store : %s", err)) + exitCode = 1 + return + } + httpServerConfig := server.Config{Port: defSvcHTTPPort} if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil { logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) @@ -260,15 +269,6 @@ func main() { return } - gdatabase := pg.NewDatabase(db, dbConfig, tracer) - grepo := gpostgres.New(gdatabase) - - if err := gconsumer.GroupsEventsSubscribe(ctx, grepo, cfg.ESURL, cfg.ESConsumerName, logger); err != nil { - logger.Error(fmt.Sprintf("failed to create groups event store : %s", err)) - exitCode = 1 - return - } - registerClientsServer := func(srv *grpc.Server) { reflection.Register(srv) grpcClientsV1.RegisterClientsServiceServer(srv, grpcapi.NewServer(psvc)) diff --git a/pkg/sdk/channels_test.go b/pkg/sdk/channels_test.go index 4cad6ef9d1..1554a162be 100644 --- a/pkg/sdk/channels_test.go +++ b/pkg/sdk/channels_test.go @@ -392,9 +392,8 @@ func TestListChannels(t *testing.T) { offset: offset, total: total, channelsPageMeta: channels.PageMetadata{ - Offset: offset, - Limit: limit, - Permission: defPermission, + Offset: offset, + Limit: limit, }, svcRes: channels.Page{ PageMetadata: channels.PageMetadata{ @@ -444,9 +443,8 @@ func TestListChannels(t *testing.T) { offset: offset, limit: 0, channelsPageMeta: channels.PageMetadata{ - Offset: offset, - Limit: 10, - Permission: defPermission, + Offset: offset, + Limit: 10, }, svcRes: channels.Page{ PageMetadata: channels.PageMetadata{ @@ -483,9 +481,8 @@ func TestListChannels(t *testing.T) { limit: 1, level: 1, channelsPageMeta: channels.PageMetadata{ - Offset: offset, - Limit: 1, - Permission: defPermission, + Offset: offset, + Limit: 1, }, svcRes: channels.Page{ PageMetadata: channels.PageMetadata{ @@ -510,10 +507,9 @@ func TestListChannels(t *testing.T) { limit: 10, metadata: sdk.Metadata{"name": "client_89"}, channelsPageMeta: channels.PageMetadata{ - Offset: offset, - Limit: 10, - Permission: defPermission, - Metadata: clients.Metadata{"name": "client_89"}, + Offset: offset, + Limit: 10, + Metadata: clients.Metadata{"name": "client_89"}, }, svcRes: channels.Page{ PageMetadata: channels.PageMetadata{ @@ -552,9 +548,8 @@ func TestListChannels(t *testing.T) { offset: 0, limit: 10, channelsPageMeta: channels.PageMetadata{ - Offset: 0, - Limit: 10, - Permission: defPermission, + Offset: 0, + Limit: 10, }, svcRes: channels.Page{ PageMetadata: channels.PageMetadata{