Skip to content

Commit

Permalink
WIP - Add API layer
Browse files Browse the repository at this point in the history
Signed-off-by: Dusan Borovcanin <[email protected]>
  • Loading branch information
dborovcanin committed Dec 25, 2024
1 parent 8e3b2a2 commit f478e1f
Show file tree
Hide file tree
Showing 19 changed files with 1,077 additions and 63 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
MG_DOCKER_IMAGE_NAME_PREFIX ?= ghcr.io/absmach/magistrala
BUILD_DIR = build
SERVICES = auth users things http coap ws postgres-writer postgres-reader timescale-writer \
timescale-reader cli bootstrap mqtt provision certs invitations journal
timescale-reader cli bootstrap mqtt provision certs invitations journal re
TEST_API_SERVICES = journal auth bootstrap certs http invitations notifiers provision readers things users
TEST_API = $(addprefix test_api_,$(TEST_API_SERVICES))
DOCKERS = $(addprefix docker_,$(SERVICES))
Expand Down
Binary file added cmd/re/__debug_bin1053626454
Binary file not shown.
223 changes: 223 additions & 0 deletions cmd/re/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0

// Package main contains rule engine main function to start the service.
package main

import (
"context"
"fmt"
"log"
"log/slog"
"net/url"
"os"
"time"

chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
redisclient "github.com/absmach/magistrala/internal/clients/redis"
mglog "github.com/absmach/magistrala/logger"
mgauthz "github.com/absmach/magistrala/pkg/authz"
"github.com/absmach/magistrala/pkg/grpcclient"
jaegerclient "github.com/absmach/magistrala/pkg/jaeger"
"github.com/absmach/magistrala/pkg/policies"
"github.com/absmach/magistrala/pkg/policies/spicedb"
"github.com/absmach/magistrala/pkg/postgres"
pgclient "github.com/absmach/magistrala/pkg/postgres"
"github.com/absmach/magistrala/pkg/server"
httpserver "github.com/absmach/magistrala/pkg/server/http"
"github.com/absmach/magistrala/pkg/uuid"
"github.com/absmach/magistrala/re"
httpapi "github.com/absmach/magistrala/re/api"
repg "github.com/absmach/magistrala/re/postgres"
"github.com/authzed/authzed-go/v1"
"github.com/authzed/grpcutil"
"github.com/caarlos0/env/v11"
"github.com/jmoiron/sqlx"
"github.com/redis/go-redis/v9"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
svcName = "rules_engine"
envPrefixDB = "MG_RE_DB_"
envPrefixHTTP = "MG_RE_HTTP_"
envPrefixAuth = "MG_AUTH_GRPC_"
defDB = "r"
defSvcHTTPPort = "9009"
defSvcAuthGRPCPort = "7000"
)

type config struct {
LogLevel string `env:"MG_RE_LOG_LEVEL" envDefault:"info"`
InstanceID string `env:"MG_RE_INSTANCE_ID" envDefault:""`
JaegerURL url.URL `env:"MG_JAEGER_URL" envDefault:"http://localhost:4318/v1/traces"`
SendTelemetry bool `env:"MG_SEND_TELEMETRY" envDefault:"true"`
ESURL string `env:"MG_ES_URL" envDefault:"nats://localhost:4222"`
CacheURL string `env:"MG_RE_CACHE_URL" envDefault:"redis://localhost:6379/0"`
CacheKeyDuration time.Duration `env:"MG_RE_CACHE_KEY_DURATION" envDefault:"10m"`
TraceRatio float64 `env:"MG_JAEGER_TRACE_RATIO" envDefault:"1.0"`
SpicedbHost string `env:"MG_SPICEDB_HOST" envDefault:"localhost"`
SpicedbPort string `env:"MG_SPICEDB_PORT" envDefault:"50051"`
SpicedbPreSharedKey string `env:"MG_SPICEDB_PRE_SHARED_KEY" envDefault:"12345678"`
}

func main() {
ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)

// Create new rule engine configuration
cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}

var logger *slog.Logger
logger, err := mglog.New(os.Stdout, cfg.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err.Error())
}

var exitCode int
defer mglog.ExitWithError(&exitCode)

if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err))
exitCode = 1
return
}
}

// Create new database for rule engine.
dbConfig := pgclient.Config{Name: defDB}
if err := env.ParseWithOptions(&dbConfig, env.Options{Prefix: envPrefixDB}); err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
db, err := pgclient.Setup(dbConfig, *repg.Migration())
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer db.Close()

tp, err := jaegerclient.NewProvider(ctx, svcName, cfg.JaegerURL, cfg.InstanceID, cfg.TraceRatio)
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
exitCode = 1
return
}
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
}
}()
tracer := tp.Tracer(svcName)

// Setup new redis cache client
cacheclient, err := redisclient.Connect(cfg.CacheURL)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer cacheclient.Close()

policyEvaluator, policyService, err := newSpiceDBPolicyServiceEvaluator(cfg, logger)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
logger.Info("Policy evaluator and Policy manager are successfully connected to SpiceDB gRPC server")

grpcCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&grpcCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err))
exitCode = 1
return
}
// authn, authnClient, err := authsvcAuthn.NewAuthentication(ctx, grpcCfg)
// if err != nil {
// logger.Error(err.Error())
// exitCode = 1
// return
// }
// defer authnClient.Close()
// logger.Info("AuthN successfully connected to auth gRPC server " + authnClient.Secure())

// authz, authzClient, err := authsvcAuthz.NewAuthorization(ctx, grpcCfg)
// if err != nil {
// logger.Error(err.Error())
// exitCode = 1
// return
// }
// defer authzClient.Close()
// logger.Info("AuthZ successfully connected to auth gRPC server " + authnClient.Secure())

svc, err := newService(ctx, db, dbConfig, nil, policyEvaluator, policyService, cacheclient, cfg.CacheKeyDuration, cfg.ESURL, tracer, logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to create services: %s", err))
exitCode = 1
return
}

httpServerConfig := server.Config{Port: defSvcHTTPPort}
if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
httpSvc := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, httpapi.MakeHandler(svc, nil, logger, cfg.InstanceID), logger)

if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
go chc.CallHome(ctx)
}

// Start all servers
g.Go(func() error {
return httpSvc.Start()
})

g.Go(func() error {
return server.StopSignalHandler(ctx, cancel, logger, svcName, httpSvc)
})

if err := g.Wait(); err != nil {
logger.Error(fmt.Sprintf("%s service terminated: %s", svcName, err))
}
}

func newService(ctx context.Context, db *sqlx.DB, dbConfig pgclient.Config, authz mgauthz.Authorization, pe policies.Evaluator, ps policies.Service, cacheClient *redis.Client, keyDuration time.Duration, esURL string, tracer trace.Tracer, logger *slog.Logger) (re.Service, error) {
database := postgres.NewDatabase(db, dbConfig, tracer)
repo := repg.NewRepository(database)
idp := uuid.New()

csvc := re.NewService(repo, repo, idp, nil)

// csvc = tmiddleware.AuthorizationMiddleware(csvc, authz)

return csvc, nil
}

func newSpiceDBPolicyServiceEvaluator(cfg config, logger *slog.Logger) (policies.Evaluator, policies.Service, error) {
client, err := authzed.NewClientWithExperimentalAPIs(
fmt.Sprintf("%s:%s", cfg.SpicedbHost, cfg.SpicedbPort),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpcutil.WithInsecureBearerToken(cfg.SpicedbPreSharedKey),
)
if err != nil {
return nil, nil, err
}
pe := spicedb.NewPolicyEvaluator(client, logger)
ps := spicedb.NewPolicyService(client, logger)

return pe, ps, nil
}
16 changes: 16 additions & 0 deletions docker/.env
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,22 @@ MG_THINGS_AUTH_GRPC_CLIENT_CERT=${GRPC_MTLS:+./ssl/certs/things-grpc-client.crt}
MG_THINGS_AUTH_GRPC_CLIENT_KEY=${GRPC_MTLS:+./ssl/certs/things-grpc-client.key}
MG_THINGS_AUTH_GRPC_CLIENT_CA_CERTS=${GRPC_MTLS:+./ssl/certs/ca.crt}

### RE
MG_RE_LOG_LEVEL=debug
MG_RE_HTTP_PORT=9009
MG_RE_HTTP_SERVER_CERT=
MG_RE_HTTP_SERVER_KEY=
MG_RE_DB_HOST=re-db
MG_RE_DB_PORT=5432
MG_RE_DB_USER=magistrala
MG_RE_DB_PASS=magistrala
MG_RE_DB_NAME=rule_engine
MG_RE_DB_SSL_MODE=disable
MG_RE_DB_SSL_CERT=
MG_RE_DB_SSL_KEY=
MG_RE_DB_SSL_ROOT_CERT=
MG_RE_INSTANCE_ID=

### HTTP
MG_HTTP_ADAPTER_LOG_LEVEL=debug
MG_HTTP_ADAPTER_HOST=http-adapter
Expand Down
75 changes: 75 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ networks:
volumes:
magistrala-users-db-volume:
magistrala-things-db-volume:
magistrala-re-db-volume:
magistrala-things-redis-volume:
magistrala-broker-volume:
magistrala-mqtt-broker-volume:
Expand Down Expand Up @@ -483,6 +484,80 @@ services:
bind:
create_host_path: true


re-db:
image: postgres:16.2-alpine
container_name: magistrala-re-db
restart: on-failure
command: postgres -c "max_connections=${MG_POSTGRES_MAX_CONNECTIONS}"
environment:
POSTGRES_USER: ${MG_RE_DB_USER}
POSTGRES_PASSWORD: ${MG_RE_DB_PASS}
POSTGRES_DB: ${MG_RE_DB_NAME}
MG_POSTGRES_MAX_CONNECTIONS: ${MG_POSTGRES_MAX_CONNECTIONS}
ports:
- 6008:5432
networks:
- magistrala-base-net
volumes:
- magistrala-re-db-volume:/var/lib/postgresql/data

re:
image: ghcr.io/absmach/magistrala/re:${MG_RELEASE_TAG}
container_name: magistrala-re
depends_on:
- re-db
- auth
- nats
restart: on-failure
environment:
MG_RE_LOG_LEVEL: ${MG_RE_LOG_LEVEL}
MG_RE_HTTP_PORT: ${MG_RE_HTTP_PORT}
MG_RE_HTTP_SERVER_CERT: ${MG_RE_HTTP_SERVER_CERT}
MG_RE_HTTP_SERVER_KEY: ${MG_RE_HTTP_SERVER_KEY}
MG_RE_DB_HOST: ${MG_RE_DB_HOST}
MG_RE_DB_PORT: ${MG_RE_DB_PORT}
MG_RE_DB_USER: ${MG_RE_DB_USER}
MG_RE_DB_PASS: ${MG_RE_DB_PASS}
MG_RE_DB_NAME: ${MG_RE_DB_NAME}
MG_RE_DB_SSL_MODE: ${MG_RE_DB_SSL_MODE}
MG_RE_DB_SSL_CERT: ${MG_RE_DB_SSL_CERT}
MG_RE_DB_SSL_KEY: ${MG_RE_DB_SSL_KEY}
MG_RE_DB_SSL_ROOT_CERT: ${MG_RE_DB_SSL_ROOT_CERT}
MG_ES_URL: ${MG_ES_URL}
MG_JAEGER_URL: ${MG_JAEGER_URL}
MG_JAEGER_TRACE_RATIO: ${MG_JAEGER_TRACE_RATIO}
MG_SEND_TELEMETRY: ${MG_SEND_TELEMETRY}
MG_AUTH_GRPC_URL: ${MG_AUTH_GRPC_URL}
MG_AUTH_GRPC_TIMEOUT: ${MG_AUTH_GRPC_TIMEOUT}
MG_AUTH_GRPC_CLIENT_CERT: ${MG_AUTH_GRPC_CLIENT_CERT:+/auth-grpc-client.crt}
MG_AUTH_GRPC_CLIENT_KEY: ${MG_AUTH_GRPC_CLIENT_KEY:+/auth-grpc-client.key}
MG_AUTH_GRPC_SERVER_CA_CERTS: ${MG_AUTH_GRPC_SERVER_CA_CERTS:+/auth-grpc-server-ca.crt}
MG_SPICEDB_PRE_SHARED_KEY: ${MG_SPICEDB_PRE_SHARED_KEY}
MG_SPICEDB_HOST: ${MG_SPICEDB_HOST}
MG_SPICEDB_PORT: ${MG_SPICEDB_PORT}
ports:
- ${MG_RE_HTTP_PORT}:${MG_RE_HTTP_PORT}
networks:
- magistrala-base-net
volumes:
# Auth gRPC client certificates
- type: bind
source: ${MG_AUTH_GRPC_CLIENT_CERT:-ssl/certs/dummy/client_cert}
target: /auth-grpc-client${MG_AUTH_GRPC_CLIENT_CERT:+.crt}
bind:
create_host_path: true
- type: bind
source: ${MG_AUTH_GRPC_CLIENT_KEY:-ssl/certs/dummy/client_key}
target: /auth-grpc-client${MG_AUTH_GRPC_CLIENT_KEY:+.key}
bind:
create_host_path: true
- type: bind
source: ${MG_AUTH_GRPC_SERVER_CA_CERTS:-ssl/certs/dummy/server_ca}
target: /auth-grpc-server-ca${MG_AUTH_GRPC_SERVER_CA_CERTS:+.crt}
bind:
create_host_path: true

jaeger:
image: jaegertracing/all-in-one:1.60
container_name: magistrala-jaeger
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/fatih/color v1.18.0
github.com/go-chi/chi v1.5.5
github.com/go-chi/chi/v5 v5.1.0
github.com/go-kit/kit v0.13.0
github.com/gofrs/uuid/v5 v5.3.0
Expand Down Expand Up @@ -43,6 +44,7 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.10.0
github.com/yuin/gopher-lua v1.1.1
go.etcd.io/etcd/client/v3 v3.5.12
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.57.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@ github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJu
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
go.etcd.io/etcd/api/v3 v3.5.12 h1:W4sw5ZoU2Juc9gBWuLk5U6fHfNVyY1WC5g9uiXZio/c=
go.etcd.io/etcd/api/v3 v3.5.12/go.mod h1:Ot+o0SWSyT6uHhA56al1oCED0JImsRiU9Dc26+C2a+4=
Expand Down
5 changes: 5 additions & 0 deletions re/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Magistrala Rule Engine


[doc]: https://docs.magistrala.abstractmachines.fr
[compose]: ../docker/docker-compose.yml
6 changes: 6 additions & 0 deletions re/api/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0

// Package api contains API-related concerns: endpoint definitions, middlewares
// and all resource representations.
package api
Loading

0 comments on commit f478e1f

Please sign in to comment.