Skip to content

Commit

Permalink
add: listing of channels
Browse files Browse the repository at this point in the history
Signed-off-by: Arvindh <[email protected]>
  • Loading branch information
arvindh123 committed Jan 17, 2025
1 parent 0d54895 commit 286d707
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 25 deletions.
12 changes: 12 additions & 0 deletions cmd/channels/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"github.com/absmach/supermq/channels/postgres"
pChannels "github.com/absmach/supermq/channels/private"
"github.com/absmach/supermq/channels/tracing"
gpostgres "github.com/absmach/supermq/groups/postgres"
smqlog "github.com/absmach/supermq/logger"
authsvcAuthn "github.com/absmach/supermq/pkg/authn/authsvc"
smqauthz "github.com/absmach/supermq/pkg/authz"
authsvcAuthz "github.com/absmach/supermq/pkg/authz/authsvc"
domainsAuthz "github.com/absmach/supermq/pkg/domains/grpcclient"
gconsumer "github.com/absmach/supermq/pkg/groups/events/consumer"
"github.com/absmach/supermq/pkg/grpcclient"
jaegerclient "github.com/absmach/supermq/pkg/jaeger"
"github.com/absmach/supermq/pkg/policies"
Expand Down Expand Up @@ -74,6 +76,7 @@ type config struct {
JaegerURL url.URL `env:"SMQ_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"SMQ_SEND_TELEMETRY" envDefault:"true"`
ESURL string `env:"SMQ_ES_URL" envDefault:"nats://localhost:4222"`
ESConsumerName string `env:"SMQ_CLIENTS_EVENT_CONSUMER" envDefault:"channels"`
TraceRatio float64 `env:"SMQ_JAEGER_TRACE_RATIO" envDefault:"1.0"`
SpicedbHost string `env:"SMQ_SPICEDB_HOST" envDefault:"localhost"`
SpicedbPort string `env:"SMQ_SPICEDB_PORT" envDefault:"50051"`
Expand Down Expand Up @@ -224,6 +227,15 @@ func main() {
return
}

gdatabase := pg.NewDatabase(db, dbConfig, tracer)
grepo := gpostgres.New(gdatabase)

if err := gconsumer.GroupsEventsSubscribe(ctx, grepo, cfg.ESURL, cfg.ESConsumerName, logger); err != nil {
logger.Error(fmt.Sprintf("failed to create groups event store : %s", err))
exitCode = 1
return
}

grpcServerConfig := server.Config{Port: defSvcGRPCPort}
if err := env.ParseWithOptions(&grpcServerConfig, env.Options{Prefix: envPrefixGRPC}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s gRPC server configuration : %s", svcName, err))
Expand Down
18 changes: 9 additions & 9 deletions cmd/clients/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,15 @@ func main() {
return
}

gdatabase := pg.NewDatabase(db, dbConfig, tracer)
grepo := gpostgres.New(gdatabase)

if err := gconsumer.GroupsEventsSubscribe(ctx, grepo, cfg.ESURL, cfg.ESConsumerName, logger); err != nil {
logger.Error(fmt.Sprintf("failed to create groups event store : %s", err))
exitCode = 1
return
}

httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
Expand All @@ -260,15 +269,6 @@ func main() {
return
}

gdatabase := pg.NewDatabase(db, dbConfig, tracer)
grepo := gpostgres.New(gdatabase)

if err := gconsumer.GroupsEventsSubscribe(ctx, grepo, cfg.ESURL, cfg.ESConsumerName, logger); err != nil {
logger.Error(fmt.Sprintf("failed to create groups event store : %s", err))
exitCode = 1
return
}

registerClientsServer := func(srv *grpc.Server) {
reflection.Register(srv)
grpcClientsV1.RegisterClientsServiceServer(srv, grpcapi.NewServer(psvc))
Expand Down
27 changes: 11 additions & 16 deletions pkg/sdk/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,8 @@ func TestListChannels(t *testing.T) {
offset: offset,
total: total,
channelsPageMeta: channels.PageMetadata{
Offset: offset,
Limit: limit,
Permission: defPermission,
Offset: offset,
Limit: limit,
},
svcRes: channels.Page{
PageMetadata: channels.PageMetadata{
Expand Down Expand Up @@ -444,9 +443,8 @@ func TestListChannels(t *testing.T) {
offset: offset,
limit: 0,
channelsPageMeta: channels.PageMetadata{
Offset: offset,
Limit: 10,
Permission: defPermission,
Offset: offset,
Limit: 10,
},
svcRes: channels.Page{
PageMetadata: channels.PageMetadata{
Expand Down Expand Up @@ -483,9 +481,8 @@ func TestListChannels(t *testing.T) {
limit: 1,
level: 1,
channelsPageMeta: channels.PageMetadata{
Offset: offset,
Limit: 1,
Permission: defPermission,
Offset: offset,
Limit: 1,
},
svcRes: channels.Page{
PageMetadata: channels.PageMetadata{
Expand All @@ -510,10 +507,9 @@ func TestListChannels(t *testing.T) {
limit: 10,
metadata: sdk.Metadata{"name": "client_89"},
channelsPageMeta: channels.PageMetadata{
Offset: offset,
Limit: 10,
Permission: defPermission,
Metadata: clients.Metadata{"name": "client_89"},
Offset: offset,
Limit: 10,
Metadata: clients.Metadata{"name": "client_89"},
},
svcRes: channels.Page{
PageMetadata: channels.PageMetadata{
Expand Down Expand Up @@ -552,9 +548,8 @@ func TestListChannels(t *testing.T) {
offset: 0,
limit: 10,
channelsPageMeta: channels.PageMetadata{
Offset: 0,
Limit: 10,
Permission: defPermission,
Offset: 0,
Limit: 10,
},
svcRes: channels.Page{
PageMetadata: channels.PageMetadata{
Expand Down

0 comments on commit 286d707

Please sign in to comment.