From 337a678f88b1afecba40cdb567235537c49bd981 Mon Sep 17 00:00:00 2001 From: gatici Date: Mon, 21 Oct 2024 15:11:34 +0300 Subject: [PATCH 01/30] fix: observe GRPC client Signed-off-by: gatici --- VERSION | 2 +- factory/factory.go | 7 +------ go.mod | 4 ++-- go.sum | 8 ++++---- 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/VERSION b/VERSION index 06bb452..9c6d629 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.6.1-dev +1.6.1 diff --git a/factory/factory.go b/factory/factory.go index 3d94b97..0e3b2a7 100644 --- a/factory/factory.go +++ b/factory/factory.go @@ -14,7 +14,6 @@ import ( "fmt" "os" - "github.com/omec-project/config5g/proto/client" protos "github.com/omec-project/config5g/proto/sdcoreConfig" "github.com/omec-project/udr/logger" "gopkg.in/yaml.v2" @@ -52,12 +51,8 @@ func InitConfigFactory(f string) error { if UdrConfig.Configuration.WebuiUri == "" { UdrConfig.Configuration.WebuiUri = "webui:9876" } - roc := os.Getenv("MANAGED_BY_CONFIG_POD") - if roc == "true" { + if os.Getenv("MANAGED_BY_CONFIG_POD") == "true" { logger.InitLog.Infoln("MANAGED_BY_CONFIG_POD is true") - commChannel := client.ConfigWatcher(UdrConfig.Configuration.WebuiUri) - ConfigUpdateDbTrigger = make(chan *UpdateDb, 10) - go UdrConfig.updateConfig(commChannel, ConfigUpdateDbTrigger) } else { go func() { logger.InitLog.Infoln("use helm chart config") diff --git a/go.mod b/go.mod index 40fd5b5..1ae710b 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/gin-gonic/gin v1.10.0 github.com/google/uuid v1.6.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/omec-project/config5g v1.5.1 + github.com/omec-project/config5g v1.5.3 github.com/omec-project/openapi v1.3.1 github.com/omec-project/util v1.2.3 github.com/prometheus/client_golang v1.20.5 @@ -70,7 +70,7 @@ require ( golang.org/x/text v0.19.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect google.golang.org/grpc v1.67.1 // indirect - google.golang.org/protobuf v1.34.2 // indirect + google.golang.org/protobuf v1.35.1 // indirect gopkg.in/h2non/gock.v1 v1.1.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 0ef5662..82945b5 100644 --- a/go.sum +++ b/go.sum @@ -80,8 +80,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy/FJl/rCYT0+EuS8+Z0z4= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms= -github.com/omec-project/config5g v1.5.1 h1:JaVgr76tnjJIb7Uoesv5a9GI72NdOXtCvfukj0/ONio= -github.com/omec-project/config5g v1.5.1/go.mod h1:o04ZdwGcM7tbGjuT5t/WzYSKLXOSnFl6vH7b6BGAspU= +github.com/omec-project/config5g v1.5.3 h1:FPetMFU1/BXGzihcoRHJm0q1vU1AnYyR4Tq1T1gyvUQ= +github.com/omec-project/config5g v1.5.3/go.mod h1:HOvQtmi79f8cw35AiFHWHDoCTuZbXfMjeFJWgtPbwaI= github.com/omec-project/openapi v1.3.1 h1:NCteMRdMtWnMhf1CXYduuLgeu8fEhc/7XO1CiE7fN3Y= github.com/omec-project/openapi v1.3.1/go.mod h1:cR6Iharp2TLOzEmskQ/EdCVFZnpKh0zTvUSSuyXAYLE= github.com/omec-project/util v1.2.3 h1:h32ZYFT99+fB9VPp1CQUIKwrSP6RtX+PbFDcqmEHmn0= @@ -183,8 +183,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= From 8caa68337717c34bb0f09a0e4856920427724880 Mon Sep 17 00:00:00 2001 From: gatici Date: Mon, 21 Oct 2024 15:11:57 +0300 Subject: [PATCH 02/30] chore: add fmt and golint to `make` options Signed-off-by: gatici --- Makefile | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Makefile b/Makefile index 78c1f4e..028bebb 100644 --- a/Makefile +++ b/Makefile @@ -95,3 +95,8 @@ test: .coverage -v \ ./ ./... +fmt: + @go fmt ./... + +golint: + @docker run --rm -v $(CURDIR):/app -w /app golangci/golangci-lint:latest golangci-lint run -v --config /app/.golangci.yml From cf9f7f3a9982c622242a2049e8d44f26e52fb4d7 Mon Sep 17 00:00:00 2001 From: gatici Date: Tue, 22 Oct 2024 23:49:33 +0300 Subject: [PATCH 03/30] fix: rename and organize a method Signed-off-by: gatici --- factory/factory.go | 5 +++++ go.mod | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/factory/factory.go b/factory/factory.go index 0e3b2a7..514481d 100644 --- a/factory/factory.go +++ b/factory/factory.go @@ -16,6 +16,11 @@ import ( protos "github.com/omec-project/config5g/proto/sdcoreConfig" "github.com/omec-project/udr/logger" +<<<<<<< HEAD +======= + "go.uber.org/zap" + "google.golang.org/grpc/connectivity" +>>>>>>> 4fa3dfc (fix: rename and organize a method) "gopkg.in/yaml.v2" ) diff --git a/go.mod b/go.mod index 1ae710b..5d23aac 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/urfave/cli v1.22.16 go.mongodb.org/mongo-driver v1.17.1 go.uber.org/zap v1.27.0 + google.golang.org/grpc v1.67.1 gopkg.in/yaml.v2 v2.4.0 ) @@ -69,7 +70,6 @@ require ( golang.org/x/sys v0.26.0 // indirect golang.org/x/text v0.19.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect - google.golang.org/grpc v1.67.1 // indirect google.golang.org/protobuf v1.35.1 // indirect gopkg.in/h2non/gock.v1 v1.1.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect From 64edc1039c425b0ac735aeb8ff5bb2d9b49ffa4a Mon Sep 17 00:00:00 2001 From: gatici Date: Wed, 23 Oct 2024 14:17:38 +0300 Subject: [PATCH 04/30] chore: change config5g Signed-off-by: gatici --- go.mod | 2 ++ go.sum | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 5d23aac..eef53a0 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/omec-project/udr go 1.21 +replace github.com/omec-project/config5g => /home/gatici/omec/config5g/ + require ( github.com/evanphx/json-patch v5.9.0+incompatible github.com/gin-gonic/gin v1.10.0 diff --git a/go.sum b/go.sum index 82945b5..3c6f917 100644 --- a/go.sum +++ b/go.sum @@ -80,8 +80,6 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy/FJl/rCYT0+EuS8+Z0z4= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms= -github.com/omec-project/config5g v1.5.3 h1:FPetMFU1/BXGzihcoRHJm0q1vU1AnYyR4Tq1T1gyvUQ= -github.com/omec-project/config5g v1.5.3/go.mod h1:HOvQtmi79f8cw35AiFHWHDoCTuZbXfMjeFJWgtPbwaI= github.com/omec-project/openapi v1.3.1 h1:NCteMRdMtWnMhf1CXYduuLgeu8fEhc/7XO1CiE7fN3Y= github.com/omec-project/openapi v1.3.1/go.mod h1:cR6Iharp2TLOzEmskQ/EdCVFZnpKh0zTvUSSuyXAYLE= github.com/omec-project/util v1.2.3 h1:h32ZYFT99+fB9VPp1CQUIKwrSP6RtX+PbFDcqmEHmn0= From 9b16a51aeff6c9b6bee181b6a34246f603f10395 Mon Sep 17 00:00:00 2001 From: gatici Date: Wed, 23 Oct 2024 14:29:23 +0300 Subject: [PATCH 05/30] adding gClient Signed-off-by: gatici --- factory/factory.go | 1 + factory/gclient.go | 201 +++++++++++++++++++++++++++++++++++++++++++++ go.mod | 2 - go.sum | 2 + 4 files changed, 204 insertions(+), 2 deletions(-) create mode 100644 factory/gclient.go diff --git a/factory/factory.go b/factory/factory.go index 514481d..61ff515 100644 --- a/factory/factory.go +++ b/factory/factory.go @@ -58,6 +58,7 @@ func InitConfigFactory(f string) error { } if os.Getenv("MANAGED_BY_CONFIG_POD") == "true" { logger.InitLog.Infoln("MANAGED_BY_CONFIG_POD is true") + } else { go func() { logger.InitLog.Infoln("use helm chart config") diff --git a/factory/gclient.go b/factory/gclient.go new file mode 100644 index 0000000..91a4fb2 --- /dev/null +++ b/factory/gclient.go @@ -0,0 +1,201 @@ +package factory + +import ( + "context" + "fmt" + "math/rand" + "os" + "time" + + "github.com/omec-project/config5g/logger" + protos "github.com/omec-project/config5g/proto/sdcoreConfig" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" +) + +var ( + selfRestartCounter uint32 + configPodRestartCounter uint32 = 0 +) + +func init() { + s1 := rand.NewSource(time.Now().UnixNano()) + r1 := rand.New(s1) + selfRestartCounter = r1.Uint32() +} + +type PlmnId struct { + Mcc string + Mnc string +} + +type Nssai struct { + Sst string + Sd string +} + +type ConfigClient struct { + Client protos.ConfigServiceClient + Conn *grpc.ClientConn + Channel chan *protos.NetworkSliceResponse + Host string + Version string + MetadataRequested bool +} + +type ConfClient interface { + // PublishOnConfigChange creates a channel to perform the subscription using it. + // On Receiving Configuration from ConfigServer, this api publishes + // on created channel and returns the channel + PublishOnConfigChange(metadataRequested bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse + + // getConfigClientConn returns grpc connection object + GetConfigClientConn() *grpc.ClientConn + + // Client Subscribing channel to ConfigPod to receive configuration + subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) + + // CheckGrpcConnectivity checks the connectivity status and + // subscribes to a stream of NetworkSlice if connectivity is ready + CheckGrpcConnectivity() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) +} + +// ConnectToConfigServer this API is added to control metadata from NF clients +// Connects to the ConfigServer using host address +func ConnectToConfigServer(host string) (ConfClient, error) { + confClient := CreateConfClient(host) + if confClient == nil { + return nil, fmt.Errorf("create grpc channel to config pod failed") + } + return confClient, nil +} + +// PublishOnConfigChange creates a communication channel to publish the messages from ConfigServer to the channel +// then NFs gets the messages +func (confClient *ConfigClient) PublishOnConfigChange(metadataFlag bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse { + confClient.MetadataRequested = metadataFlag + commChan := make(chan *protos.NetworkSliceResponse) + confClient.Channel = commChan + logger.GrpcLog.Debugln("a communication channel is created for ConfigServer") + go confClient.subscribeToConfigPod(commChan, stream) + return commChan +} + +// CreateConfClient creates a GRPC client by connecting to GRPC server (host). +func CreateConfClient(host string) ConfClient { + logger.GrpcLog.Debugln("create config client") + // Second, check to see if we can reuse the gRPC connection for a new P4RT client + conn, err := newClientConnection(host) + if err != nil { + logger.GrpcLog.Errorf("grpc connection failed %v", err) + return nil + } + + client := &ConfigClient{ + Client: protos.NewConfigServiceClient(conn), + Conn: conn, + Host: host, + } + + return client +} + +var kacp = keepalive.ClientParameters{ + Time: 20 * time.Second, // send pings every 20 seconds if there is no activity + Timeout: 2 * time.Second, // wait 1 second for ping ack before considering the connection dead + PermitWithoutStream: true, // send pings even without active streams +} + +var retryPolicy = `{ + "methodConfig": [{ + "name": [{"service": "grpc.Config"}], + "waitForReady": true, + "retryPolicy": { + "MaxAttempts": 4, + "InitialBackoff": ".01s", + "MaxBackoff": ".01s", + "BackoffMultiplier": 1.0, + "RetryableStatusCodes": [ "UNAVAILABLE" ] + }}]}` + +// newClientConnection opens a GRPC connection to the host +func newClientConnection(host string) (conn *grpc.ClientConn, err error) { + logger.GrpcLog.Debugln("dial grpc connection:", host) + + bd := 1 * time.Second + mltpr := 1.0 + jitter := 0.2 + MaxDelay := 5 * time.Second + bc := backoff.Config{BaseDelay: bd, Multiplier: mltpr, Jitter: jitter, MaxDelay: MaxDelay} + + crt := grpc.ConnectParams{Backoff: bc} + dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithKeepaliveParams(kacp), grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithConnectParams(crt)} + conn, err = grpc.NewClient(host, dialOptions...) + if err != nil { + return nil, fmt.Errorf("grpc newclient creation failed: %v", err) + } + conn.Connect() + return conn, nil +} + +// GetConfigClientConn exposes the GRPC client connection +func (confClient *ConfigClient) GetConfigClientConn() *grpc.ClientConn { + return confClient.Conn +} + +// CheckGrpcConnectivity checks the connectivity status and subscribes to a stream of NetworkSlice +// if connectivity is Ready. It returns a stream if connection is successful else returns nil. +func (confClient *ConfigClient) CheckGrpcConnectivity() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) { + logger.GrpcLog.Debugln("connectToGrpcServer") + myid := os.Getenv("HOSTNAME") + status := confClient.Conn.GetState() + if status == connectivity.Ready { + logger.GrpcLog.Debugln("connectivity ready") + rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: confClient.MetadataRequested} + if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil { + return stream, fmt.Errorf("failed to subscribe: %v", err) + } + return stream, nil + } else if status == connectivity.Idle { + return nil, fmt.Errorf("connectivity status idle") + } else { + return nil, fmt.Errorf("connectivity status not ready") + } +} + +// subscribeToConfigPod subscribing channel to ConfigPod to receive configuration +// using stream and communication channel as inputs +func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) { + for { + rsp, err := stream.Recv() + if err != nil { + logger.GrpcLog.Errorf("failed to receive message from stream: %v", err) + return + } + + logger.GrpcLog.Infoln("stream message received") + logger.GrpcLog.Debugf("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter) + if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) { + // first time connection or config update + configPodRestartCounter = rsp.RestartCounter + if len(rsp.NetworkSlice) > 0 { + // always carries full config copy + logger.GrpcLog.Infoln("first time config received", rsp) + commChan <- rsp + } else if rsp.ConfigUpdated == 1 { + // config delete, all slices deleted + logger.GrpcLog.Infoln("complete config deleted") + commChan <- rsp + } + } else if len(rsp.NetworkSlice) > 0 { + logger.GrpcLog.Errorln("config received after config pod restart") + configPodRestartCounter = rsp.RestartCounter + commChan <- rsp + } else { + logger.GrpcLog.Errorln("config pod is restarted and no config received") + } + } +} diff --git a/go.mod b/go.mod index eef53a0..5d23aac 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module github.com/omec-project/udr go 1.21 -replace github.com/omec-project/config5g => /home/gatici/omec/config5g/ - require ( github.com/evanphx/json-patch v5.9.0+incompatible github.com/gin-gonic/gin v1.10.0 diff --git a/go.sum b/go.sum index 3c6f917..82945b5 100644 --- a/go.sum +++ b/go.sum @@ -80,6 +80,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy/FJl/rCYT0+EuS8+Z0z4= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms= +github.com/omec-project/config5g v1.5.3 h1:FPetMFU1/BXGzihcoRHJm0q1vU1AnYyR4Tq1T1gyvUQ= +github.com/omec-project/config5g v1.5.3/go.mod h1:HOvQtmi79f8cw35AiFHWHDoCTuZbXfMjeFJWgtPbwaI= github.com/omec-project/openapi v1.3.1 h1:NCteMRdMtWnMhf1CXYduuLgeu8fEhc/7XO1CiE7fN3Y= github.com/omec-project/openapi v1.3.1/go.mod h1:cR6Iharp2TLOzEmskQ/EdCVFZnpKh0zTvUSSuyXAYLE= github.com/omec-project/util v1.2.3 h1:h32ZYFT99+fB9VPp1CQUIKwrSP6RtX+PbFDcqmEHmn0= From d2ef0a865a971f0e1a6d93658a77cc9fc650d9cb Mon Sep 17 00:00:00 2001 From: gatici Date: Wed, 23 Oct 2024 14:56:46 +0300 Subject: [PATCH 06/30] do not return if message is not available Signed-off-by: gatici --- factory/gclient.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/factory/gclient.go b/factory/gclient.go index 91a4fb2..ba5a026 100644 --- a/factory/gclient.go +++ b/factory/gclient.go @@ -173,7 +173,8 @@ func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.Netwo rsp, err := stream.Recv() if err != nil { logger.GrpcLog.Errorf("failed to receive message from stream: %v", err) - return + time.Sleep(10 * time.Second) + continue } logger.GrpcLog.Infoln("stream message received") From 66c7cc2fed98d95c7e1c3d3d10c0e7c8125d61f2 Mon Sep 17 00:00:00 2001 From: gatici Date: Wed, 23 Oct 2024 15:30:45 +0300 Subject: [PATCH 07/30] chore: change the orders Signed-off-by: gatici --- factory/factory.go | 1 - 1 file changed, 1 deletion(-) diff --git a/factory/factory.go b/factory/factory.go index 61ff515..514481d 100644 --- a/factory/factory.go +++ b/factory/factory.go @@ -58,7 +58,6 @@ func InitConfigFactory(f string) error { } if os.Getenv("MANAGED_BY_CONFIG_POD") == "true" { logger.InitLog.Infoln("MANAGED_BY_CONFIG_POD is true") - } else { go func() { logger.InitLog.Infoln("use helm chart config") From 2b4faafe3481181ceb41e87ffcd60b230add2053 Mon Sep 17 00:00:00 2001 From: gatici Date: Wed, 23 Oct 2024 16:17:33 +0300 Subject: [PATCH 08/30] fix: get the new stream Signed-off-by: gatici --- factory/gclient.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/factory/gclient.go b/factory/gclient.go index ba5a026..54a8afc 100644 --- a/factory/gclient.go +++ b/factory/gclient.go @@ -170,6 +170,16 @@ func (confClient *ConfigClient) CheckGrpcConnectivity() (stream protos.ConfigSer // using stream and communication channel as inputs func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) { for { + var err error + stream, err = confClient.CheckGrpcConnectivity() + if err != nil { + logger.GrpcLog.Errorf("failed to receive message: %v", err) + // Clearing the stream will force the client to resubscribe on next iteration + stream = nil + time.Sleep(time.Second * 5) + // Retry on failure + continue + } rsp, err := stream.Recv() if err != nil { logger.GrpcLog.Errorf("failed to receive message from stream: %v", err) From 13c240ee29c1cba12f0d0838df1126f35000bd87 Mon Sep 17 00:00:00 2001 From: gatici Date: Wed, 23 Oct 2024 17:15:36 +0300 Subject: [PATCH 09/30] fix: change method Signed-off-by: gatici --- factory/factory.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/factory/factory.go b/factory/factory.go index 514481d..23dd81d 100644 --- a/factory/factory.go +++ b/factory/factory.go @@ -19,8 +19,11 @@ import ( <<<<<<< HEAD ======= "go.uber.org/zap" +<<<<<<< HEAD "google.golang.org/grpc/connectivity" >>>>>>> 4fa3dfc (fix: rename and organize a method) +======= +>>>>>>> bd9df4e (fix: change method) "gopkg.in/yaml.v2" ) From da162bb964f8744dbe753c9217376857e5daaa82 Mon Sep 17 00:00:00 2001 From: gatici Date: Wed, 23 Oct 2024 23:31:54 +0300 Subject: [PATCH 10/30] modify subscribeToConfigPod Signed-off-by: gatici --- factory/factory.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++ factory/gclient.go | 31 +++++++++++++++--------------- 2 files changed, 63 insertions(+), 16 deletions(-) diff --git a/factory/factory.go b/factory/factory.go index 23dd81d..245f88a 100644 --- a/factory/factory.go +++ b/factory/factory.go @@ -19,11 +19,15 @@ import ( <<<<<<< HEAD ======= "go.uber.org/zap" +<<<<<<< HEAD <<<<<<< HEAD "google.golang.org/grpc/connectivity" >>>>>>> 4fa3dfc (fix: rename and organize a method) ======= >>>>>>> bd9df4e (fix: change method) +======= + "google.golang.org/grpc/connectivity" +>>>>>>> 7bb8bf6 (modify subscribeToConfigPod) "gopkg.in/yaml.v2" ) @@ -61,6 +65,7 @@ func InitConfigFactory(f string) error { } if os.Getenv("MANAGED_BY_CONFIG_POD") == "true" { logger.InitLog.Infoln("MANAGED_BY_CONFIG_POD is true") + } else { go func() { logger.InitLog.Infoln("use helm chart config") @@ -72,6 +77,49 @@ func InitConfigFactory(f string) error { return nil } +<<<<<<< HEAD +======= +// manageGrpcClient connects the config pod GRPC server and subscribes the config changes. +// Then it updates UDR configuration. +func manageGrpcClient(webuiUri string) { + var configChannel chan *protos.NetworkSliceResponse + var client ConfClient + var err error + for { + if client != nil { + _, err = client.CheckGrpcConnectivity() + if err != nil { + initLog.Infoln("Connectivity error, waiting 30 seconds") + time.Sleep(time.Second * 30) + } + time.Sleep(time.Second * 30) + if client.GetConfigClientConn().GetState() != connectivity.Ready { + err = client.GetConfigClientConn().Close() + if err != nil { + initLog.Infof("failing ConfigClient is not closed properly: %+v", err) + } + client = nil + continue + } + if configChannel == nil { + configChannel = client.PublishOnConfigChange(true) + initLog.Infoln("PublishOnConfigChange is triggered.") + ConfigUpdateDbTrigger = make(chan *UpdateDb, 10) + go UdrConfig.updateConfig(configChannel, ConfigUpdateDbTrigger) + initLog.Infoln("UDR updateConfig is triggered.") + } + } else { + client, err = ConnectToConfigServer(webuiUri) + initLog.Infoln("Connecting to config server.") + if err != nil { + logger.InitLog.Errorf("%+v", err) + } + continue + } + } +} + +>>>>>>> 7bb8bf6 (modify subscribeToConfigPod) func CheckConfigVersion() error { currentVersion := UdrConfig.GetVersion() diff --git a/factory/gclient.go b/factory/gclient.go index 54a8afc..e3cc110 100644 --- a/factory/gclient.go +++ b/factory/gclient.go @@ -50,13 +50,13 @@ type ConfClient interface { // PublishOnConfigChange creates a channel to perform the subscription using it. // On Receiving Configuration from ConfigServer, this api publishes // on created channel and returns the channel - PublishOnConfigChange(metadataRequested bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse + PublishOnConfigChange(metadataRequested bool) chan *protos.NetworkSliceResponse // getConfigClientConn returns grpc connection object GetConfigClientConn() *grpc.ClientConn // Client Subscribing channel to ConfigPod to receive configuration - subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) + subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse) // CheckGrpcConnectivity checks the connectivity status and // subscribes to a stream of NetworkSlice if connectivity is ready @@ -75,12 +75,12 @@ func ConnectToConfigServer(host string) (ConfClient, error) { // PublishOnConfigChange creates a communication channel to publish the messages from ConfigServer to the channel // then NFs gets the messages -func (confClient *ConfigClient) PublishOnConfigChange(metadataFlag bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse { +func (confClient *ConfigClient) PublishOnConfigChange(metadataFlag bool) chan *protos.NetworkSliceResponse { confClient.MetadataRequested = metadataFlag commChan := make(chan *protos.NetworkSliceResponse) confClient.Channel = commChan logger.GrpcLog.Debugln("a communication channel is created for ConfigServer") - go confClient.subscribeToConfigPod(commChan, stream) + go confClient.subscribeToConfigPod(commChan) return commChan } @@ -168,27 +168,25 @@ func (confClient *ConfigClient) CheckGrpcConnectivity() (stream protos.ConfigSer // subscribeToConfigPod subscribing channel to ConfigPod to receive configuration // using stream and communication channel as inputs -func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) { +func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse) { for { - var err error - stream, err = confClient.CheckGrpcConnectivity() + stream, err := confClient.CheckGrpcConnectivity() if err != nil { - logger.GrpcLog.Errorf("failed to receive message: %v", err) - // Clearing the stream will force the client to resubscribe on next iteration - stream = nil - time.Sleep(time.Second * 5) - // Retry on failure + logger.GrpcLog.Errorf("%v", err) + } + if stream == nil { + time.Sleep(time.Second * 30) continue } rsp, err := stream.Recv() if err != nil { - logger.GrpcLog.Errorf("failed to receive message from stream: %v", err) - time.Sleep(10 * time.Second) + stream = nil + logger.GrpcLog.Errorf("failed to receive message: %v", err) + time.Sleep(time.Second * 5) continue } - logger.GrpcLog.Infoln("stream message received") - logger.GrpcLog.Debugf("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter) + logger.GrpcLog.Infof("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter) if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) { // first time connection or config update configPodRestartCounter = rsp.RestartCounter @@ -208,5 +206,6 @@ func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.Netwo } else { logger.GrpcLog.Errorln("config pod is restarted and no config received") } + time.Sleep(time.Second * 10) } } From e03d0faad6d66c8339bfa84e574432bd71c14838 Mon Sep 17 00:00:00 2001 From: gatici Date: Thu, 24 Oct 2024 14:22:56 +0300 Subject: [PATCH 11/30] compare old response and new response Signed-off-by: gatici --- factory/gclient.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/factory/gclient.go b/factory/gclient.go index e3cc110..3ddd12c 100644 --- a/factory/gclient.go +++ b/factory/gclient.go @@ -169,6 +169,7 @@ func (confClient *ConfigClient) CheckGrpcConnectivity() (stream protos.ConfigSer // subscribeToConfigPod subscribing channel to ConfigPod to receive configuration // using stream and communication channel as inputs func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse) { + var oldRsp *protos.NetworkSliceResponse = nil for { stream, err := confClient.CheckGrpcConnectivity() if err != nil { @@ -185,7 +186,9 @@ func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.Netwo time.Sleep(time.Second * 5) continue } - logger.GrpcLog.Infoln("stream message received") + if rsp != oldRsp { + logger.GrpcLog.Infoln("stream message received") + } logger.GrpcLog.Infof("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter) if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) { // first time connection or config update @@ -207,5 +210,7 @@ func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.Netwo logger.GrpcLog.Errorln("config pod is restarted and no config received") } time.Sleep(time.Second * 10) + oldRsp = rsp + rsp = nil } } From 01f1bcf0332241e821a33fa8b999e7963ea305a7 Mon Sep 17 00:00:00 2001 From: gatici Date: Thu, 24 Oct 2024 15:23:16 +0300 Subject: [PATCH 12/30] Trigger config updates in service Signed-off-by: gatici --- factory/config.go | 2 +- factory/factory.go | 47 +++------------------------------ {factory => service}/gclient.go | 5 ++-- service/init.go | 47 +++++++++++++++++++++++++++++++++ 4 files changed, 53 insertions(+), 48 deletions(-) rename {factory => service}/gclient.go (98%) diff --git a/factory/config.go b/factory/config.go index 638901f..669e222 100644 --- a/factory/config.go +++ b/factory/config.go @@ -106,7 +106,7 @@ func (c *Config) addSmPolicyInfo(nwSlice *protos.NetworkSlice, dbUpdateChannel c return nil } -func (c *Config) updateConfig(commChannel chan *protos.NetworkSliceResponse, dbUpdateChannel chan *UpdateDb) bool { +func (c *Config) UpdateConfig(commChannel chan *protos.NetworkSliceResponse, dbUpdateChannel chan *UpdateDb) bool { var minConfig bool for rsp := range commChannel { logger.GrpcLog.Infoln("received updateConfig in the udr app:", rsp) diff --git a/factory/factory.go b/factory/factory.go index 245f88a..aa12854 100644 --- a/factory/factory.go +++ b/factory/factory.go @@ -20,6 +20,7 @@ import ( ======= "go.uber.org/zap" <<<<<<< HEAD +<<<<<<< HEAD <<<<<<< HEAD "google.golang.org/grpc/connectivity" >>>>>>> 4fa3dfc (fix: rename and organize a method) @@ -28,6 +29,8 @@ import ( ======= "google.golang.org/grpc/connectivity" >>>>>>> 7bb8bf6 (modify subscribeToConfigPod) +======= +>>>>>>> 12f8d52 (Trigger config updates in service) "gopkg.in/yaml.v2" ) @@ -65,7 +68,6 @@ func InitConfigFactory(f string) error { } if os.Getenv("MANAGED_BY_CONFIG_POD") == "true" { logger.InitLog.Infoln("MANAGED_BY_CONFIG_POD is true") - } else { go func() { logger.InitLog.Infoln("use helm chart config") @@ -77,49 +79,6 @@ func InitConfigFactory(f string) error { return nil } -<<<<<<< HEAD -======= -// manageGrpcClient connects the config pod GRPC server and subscribes the config changes. -// Then it updates UDR configuration. -func manageGrpcClient(webuiUri string) { - var configChannel chan *protos.NetworkSliceResponse - var client ConfClient - var err error - for { - if client != nil { - _, err = client.CheckGrpcConnectivity() - if err != nil { - initLog.Infoln("Connectivity error, waiting 30 seconds") - time.Sleep(time.Second * 30) - } - time.Sleep(time.Second * 30) - if client.GetConfigClientConn().GetState() != connectivity.Ready { - err = client.GetConfigClientConn().Close() - if err != nil { - initLog.Infof("failing ConfigClient is not closed properly: %+v", err) - } - client = nil - continue - } - if configChannel == nil { - configChannel = client.PublishOnConfigChange(true) - initLog.Infoln("PublishOnConfigChange is triggered.") - ConfigUpdateDbTrigger = make(chan *UpdateDb, 10) - go UdrConfig.updateConfig(configChannel, ConfigUpdateDbTrigger) - initLog.Infoln("UDR updateConfig is triggered.") - } - } else { - client, err = ConnectToConfigServer(webuiUri) - initLog.Infoln("Connecting to config server.") - if err != nil { - logger.InitLog.Errorf("%+v", err) - } - continue - } - } -} - ->>>>>>> 7bb8bf6 (modify subscribeToConfigPod) func CheckConfigVersion() error { currentVersion := UdrConfig.GetVersion() diff --git a/factory/gclient.go b/service/gclient.go similarity index 98% rename from factory/gclient.go rename to service/gclient.go index 3ddd12c..bbdf32e 100644 --- a/factory/gclient.go +++ b/service/gclient.go @@ -1,4 +1,4 @@ -package factory +package service import ( "context" @@ -188,8 +188,8 @@ func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.Netwo } if rsp != oldRsp { logger.GrpcLog.Infoln("stream message received") + logger.GrpcLog.Infof("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter) } - logger.GrpcLog.Infof("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter) if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) { // first time connection or config update configPodRestartCounter = rsp.RestartCounter @@ -211,6 +211,5 @@ func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.Netwo } time.Sleep(time.Second * 10) oldRsp = rsp - rsp = nil } } diff --git a/service/init.go b/service/init.go index 3a90f77..84f040f 100644 --- a/service/init.go +++ b/service/init.go @@ -18,6 +18,7 @@ import ( "syscall" "time" + protos "github.com/omec-project/config5g/proto/sdcoreConfig" "github.com/omec-project/openapi/models" "github.com/omec-project/udr/consumer" "github.com/omec-project/udr/context" @@ -32,6 +33,7 @@ import ( "github.com/urfave/cli" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "google.golang.org/grpc/connectivity" ) type UDR struct{} @@ -85,9 +87,54 @@ func (udr *UDR) Initialize(c *cli.Context) error { return err } + if os.Getenv("MANAGED_BY_CONFIG_POD") == "true" { + logger.InitLog.Infoln("MANAGED_BY_CONFIG_POD is true") + go manageGrpcClient(factory.UdrConfig.Configuration.WebuiUri) + } + return nil } +// manageGrpcClient connects the config pod GRPC server and subscribes the config changes. +// Then it updates UDR configuration. +func manageGrpcClient(webuiUri string) { + var configChannel chan *protos.NetworkSliceResponse + var client ConfClient + var err error + for { + if client != nil { + _, err = client.CheckGrpcConnectivity() + if err != nil { + logger.InitLog.Infoln("GRPC connectivity error, waiting 15 seconds") + time.Sleep(time.Second * 15) + } + time.Sleep(time.Second * 15) + if client.GetConfigClientConn().GetState() != connectivity.Ready { + err = client.GetConfigClientConn().Close() + if err != nil { + logger.InitLog.Infof("failing ConfigClient is not closed properly: %+v", err) + } + client = nil + continue + } + if configChannel == nil { + configChannel = client.PublishOnConfigChange(true) + logger.InitLog.Infoln("PublishOnConfigChange is triggered.") + factory.ConfigUpdateDbTrigger = make(chan *factory.UpdateDb, 10) + go factory.UdrConfig.UpdateConfig(configChannel, factory.ConfigUpdateDbTrigger) + logger.InitLog.Infoln("UDR updateConfig is triggered.") + } + } else { + client, err = ConnectToConfigServer(webuiUri) + logger.InitLog.Infoln("Connecting to config server.") + if err != nil { + logger.InitLog.Errorf("%+v", err) + } + continue + } + } +} + func (udr *UDR) setLogLevel() { if factory.UdrConfig.Logger == nil { logger.InitLog.Warnln("UDR config without log level setting") From ada6214975a25ea2b86c03d86ceaef4459fb3559 Mon Sep 17 00:00:00 2001 From: gatici Date: Thu, 24 Oct 2024 15:43:21 +0300 Subject: [PATCH 13/30] add log Signed-off-by: gatici --- service/gclient.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/service/gclient.go b/service/gclient.go index bbdf32e..1f4396a 100644 --- a/service/gclient.go +++ b/service/gclient.go @@ -169,7 +169,6 @@ func (confClient *ConfigClient) CheckGrpcConnectivity() (stream protos.ConfigSer // subscribeToConfigPod subscribing channel to ConfigPod to receive configuration // using stream and communication channel as inputs func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse) { - var oldRsp *protos.NetworkSliceResponse = nil for { stream, err := confClient.CheckGrpcConnectivity() if err != nil { @@ -186,7 +185,9 @@ func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.Netwo time.Sleep(time.Second * 5) continue } - if rsp != oldRsp { + + logger.GrpcLog.Infof("new rsp: %v", rsp) + if rsp != nil { logger.GrpcLog.Infoln("stream message received") logger.GrpcLog.Infof("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter) } @@ -210,6 +211,5 @@ func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.Netwo logger.GrpcLog.Errorln("config pod is restarted and no config received") } time.Sleep(time.Second * 10) - oldRsp = rsp } } From 2c57586a2994e6efa43b71e8f10dc43c0a3a3164 Mon Sep 17 00:00:00 2001 From: gatici Date: Thu, 24 Oct 2024 18:14:04 +0300 Subject: [PATCH 14/30] get stream once Signed-off-by: gatici --- service/gclient.go | 14 +++++--------- service/init.go | 11 ++++------- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/service/gclient.go b/service/gclient.go index 1f4396a..d64ee33 100644 --- a/service/gclient.go +++ b/service/gclient.go @@ -50,13 +50,13 @@ type ConfClient interface { // PublishOnConfigChange creates a channel to perform the subscription using it. // On Receiving Configuration from ConfigServer, this api publishes // on created channel and returns the channel - PublishOnConfigChange(metadataRequested bool) chan *protos.NetworkSliceResponse + PublishOnConfigChange(metadataRequested bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse // getConfigClientConn returns grpc connection object GetConfigClientConn() *grpc.ClientConn // Client Subscribing channel to ConfigPod to receive configuration - subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse) + subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) // CheckGrpcConnectivity checks the connectivity status and // subscribes to a stream of NetworkSlice if connectivity is ready @@ -75,12 +75,12 @@ func ConnectToConfigServer(host string) (ConfClient, error) { // PublishOnConfigChange creates a communication channel to publish the messages from ConfigServer to the channel // then NFs gets the messages -func (confClient *ConfigClient) PublishOnConfigChange(metadataFlag bool) chan *protos.NetworkSliceResponse { +func (confClient *ConfigClient) PublishOnConfigChange(metadataFlag bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse { confClient.MetadataRequested = metadataFlag commChan := make(chan *protos.NetworkSliceResponse) confClient.Channel = commChan logger.GrpcLog.Debugln("a communication channel is created for ConfigServer") - go confClient.subscribeToConfigPod(commChan) + go confClient.subscribeToConfigPod(commChan, stream) return commChan } @@ -168,12 +168,8 @@ func (confClient *ConfigClient) CheckGrpcConnectivity() (stream protos.ConfigSer // subscribeToConfigPod subscribing channel to ConfigPod to receive configuration // using stream and communication channel as inputs -func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse) { +func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) { for { - stream, err := confClient.CheckGrpcConnectivity() - if err != nil { - logger.GrpcLog.Errorf("%v", err) - } if stream == nil { time.Sleep(time.Second * 30) continue diff --git a/service/init.go b/service/init.go index 84f040f..3e7e70e 100644 --- a/service/init.go +++ b/service/init.go @@ -103,12 +103,8 @@ func manageGrpcClient(webuiUri string) { var err error for { if client != nil { - _, err = client.CheckGrpcConnectivity() - if err != nil { - logger.InitLog.Infoln("GRPC connectivity error, waiting 15 seconds") - time.Sleep(time.Second * 15) - } - time.Sleep(time.Second * 15) + logger.InitLog.Infoln("waiting for connectivity to be ready") + time.Sleep(time.Second * 30) if client.GetConfigClientConn().GetState() != connectivity.Ready { err = client.GetConfigClientConn().Close() if err != nil { @@ -118,7 +114,8 @@ func manageGrpcClient(webuiUri string) { continue } if configChannel == nil { - configChannel = client.PublishOnConfigChange(true) + stream, _ := client.CheckGrpcConnectivity() + configChannel = client.PublishOnConfigChange(true, stream) logger.InitLog.Infoln("PublishOnConfigChange is triggered.") factory.ConfigUpdateDbTrigger = make(chan *factory.UpdateDb, 10) go factory.UdrConfig.UpdateConfig(configChannel, factory.ConfigUpdateDbTrigger) From 1b7929f0a81194c9e2b58824db23b3ad5ec4c671 Mon Sep 17 00:00:00 2001 From: gatici Date: Thu, 24 Oct 2024 21:14:04 +0300 Subject: [PATCH 15/30] increasing timeout Signed-off-by: gatici --- service/init.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/service/init.go b/service/init.go index 3e7e70e..2c689bb 100644 --- a/service/init.go +++ b/service/init.go @@ -103,8 +103,8 @@ func manageGrpcClient(webuiUri string) { var err error for { if client != nil { - logger.InitLog.Infoln("waiting for connectivity to be ready") - time.Sleep(time.Second * 30) + logger.InitLog.Infoln("Checking the connectivity readiness") + time.Sleep(time.Second * 60) if client.GetConfigClientConn().GetState() != connectivity.Ready { err = client.GetConfigClientConn().Close() if err != nil { From fe5f645cd6dc5e16b2596f2275aa50bda90c8764 Mon Sep 17 00:00:00 2001 From: gatici Date: Thu, 24 Oct 2024 22:04:57 +0300 Subject: [PATCH 16/30] check stream availability Signed-off-by: gatici --- service/init.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/service/init.go b/service/init.go index 2c689bb..35be59f 100644 --- a/service/init.go +++ b/service/init.go @@ -100,6 +100,7 @@ func (udr *UDR) Initialize(c *cli.Context) error { func manageGrpcClient(webuiUri string) { var configChannel chan *protos.NetworkSliceResponse var client ConfClient + var stream protos.ConfigService_NetworkSliceSubscribeClient var err error for { if client != nil { @@ -113,14 +114,17 @@ func manageGrpcClient(webuiUri string) { client = nil continue } + if stream == nil { + stream, _ = client.CheckGrpcConnectivity() + } if configChannel == nil { - stream, _ := client.CheckGrpcConnectivity() configChannel = client.PublishOnConfigChange(true, stream) logger.InitLog.Infoln("PublishOnConfigChange is triggered.") factory.ConfigUpdateDbTrigger = make(chan *factory.UpdateDb, 10) go factory.UdrConfig.UpdateConfig(configChannel, factory.ConfigUpdateDbTrigger) logger.InitLog.Infoln("UDR updateConfig is triggered.") } + } else { client, err = ConnectToConfigServer(webuiUri) logger.InitLog.Infoln("Connecting to config server.") From 13d923fbd5c0ebae36f704dad547f0e7e4794608 Mon Sep 17 00:00:00 2001 From: gatici Date: Fri, 25 Oct 2024 01:47:31 +0300 Subject: [PATCH 17/30] create stream Signed-off-by: gatici --- service/init.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/service/init.go b/service/init.go index 35be59f..dd7edec 100644 --- a/service/init.go +++ b/service/init.go @@ -114,9 +114,9 @@ func manageGrpcClient(webuiUri string) { client = nil continue } - if stream == nil { - stream, _ = client.CheckGrpcConnectivity() - } + + stream, _ = client.CheckGrpcConnectivity() + if configChannel == nil { configChannel = client.PublishOnConfigChange(true, stream) logger.InitLog.Infoln("PublishOnConfigChange is triggered.") From 15a2ac95a616dcf06df5ff26bc4d2c710563e304 Mon Sep 17 00:00:00 2001 From: gatici Date: Fri, 25 Oct 2024 09:30:33 +0300 Subject: [PATCH 18/30] use random client subscription id Signed-off-by: gatici --- service/gclient.go | 6 ++++-- service/init.go | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/service/gclient.go b/service/gclient.go index d64ee33..944e6b2 100644 --- a/service/gclient.go +++ b/service/gclient.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "os" + "strconv" "time" "github.com/omec-project/config5g/logger" @@ -150,11 +151,12 @@ func (confClient *ConfigClient) GetConfigClientConn() *grpc.ClientConn { // if connectivity is Ready. It returns a stream if connection is successful else returns nil. func (confClient *ConfigClient) CheckGrpcConnectivity() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) { logger.GrpcLog.Debugln("connectToGrpcServer") - myid := os.Getenv("HOSTNAME") + hostname := os.Getenv("HOSTNAME") + randomId := hostname + "-" + strconv.Itoa(rand.Int()) status := confClient.Conn.GetState() if status == connectivity.Ready { logger.GrpcLog.Debugln("connectivity ready") - rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: confClient.MetadataRequested} + rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: randomId, MetadataRequested: confClient.MetadataRequested} if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil { return stream, fmt.Errorf("failed to subscribe: %v", err) } diff --git a/service/init.go b/service/init.go index dd7edec..23dfeb2 100644 --- a/service/init.go +++ b/service/init.go @@ -115,7 +115,9 @@ func manageGrpcClient(webuiUri string) { continue } - stream, _ = client.CheckGrpcConnectivity() + if stream == nil { + stream, _ = client.CheckGrpcConnectivity() + } if configChannel == nil { configChannel = client.PublishOnConfigChange(true, stream) From 744a84f6a477fdb3f32cebc0619f338b06fbc47b Mon Sep 17 00:00:00 2001 From: gatici Date: Fri, 25 Oct 2024 10:19:15 +0300 Subject: [PATCH 19/30] refactoring methods Signed-off-by: gatici --- service/gclient.go | 59 ++++++++++++++++++++++++++-------------------- service/init.go | 10 +++++--- 2 files changed, 41 insertions(+), 28 deletions(-) diff --git a/service/gclient.go b/service/gclient.go index 944e6b2..44bdd0e 100644 --- a/service/gclient.go +++ b/service/gclient.go @@ -53,15 +53,19 @@ type ConfClient interface { // on created channel and returns the channel PublishOnConfigChange(metadataRequested bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse - // getConfigClientConn returns grpc connection object + // GetConfigClientConn returns grpc connection object GetConfigClientConn() *grpc.ClientConn - // Client Subscribing channel to ConfigPod to receive configuration - subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) + // subscribeToConfigPod receives the configuration changes using stream + // send messages to communication channel + sendMessagesToChannel(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) - // CheckGrpcConnectivity checks the connectivity status and - // subscribes to a stream of NetworkSlice if connectivity is ready - CheckGrpcConnectivity() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) + // CheckGrpcConnectivity checks the connectivity status and returns the state of connectivity + CheckGrpcConnectivity() (state string) + + // SubscribeToConfigServer Subscribes to a stream of NetworkSlice + // It returns a stream if subscription is successful else returns nil. + SubscribeToConfigServer() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) } // ConnectToConfigServer this API is added to control metadata from NF clients @@ -81,7 +85,7 @@ func (confClient *ConfigClient) PublishOnConfigChange(metadataFlag bool, stream commChan := make(chan *protos.NetworkSliceResponse) confClient.Channel = commChan logger.GrpcLog.Debugln("a communication channel is created for ConfigServer") - go confClient.subscribeToConfigPod(commChan, stream) + go confClient.sendMessagesToChannel(commChan, stream) return commChan } @@ -147,30 +151,36 @@ func (confClient *ConfigClient) GetConfigClientConn() *grpc.ClientConn { return confClient.Conn } -// CheckGrpcConnectivity checks the connectivity status and subscribes to a stream of NetworkSlice -// if connectivity is Ready. It returns a stream if connection is successful else returns nil. -func (confClient *ConfigClient) CheckGrpcConnectivity() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) { - logger.GrpcLog.Debugln("connectToGrpcServer") - hostname := os.Getenv("HOSTNAME") - randomId := hostname + "-" + strconv.Itoa(rand.Int()) +// CheckGrpcConnectivity checks the connectivity status and returns the state of connectivity +func (confClient *ConfigClient) CheckGrpcConnectivity() (state string) { + logger.GrpcLog.Debugln("Checking GRPC connectivity status") status := confClient.Conn.GetState() if status == connectivity.Ready { - logger.GrpcLog.Debugln("connectivity ready") - rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: randomId, MetadataRequested: confClient.MetadataRequested} - if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil { - return stream, fmt.Errorf("failed to subscribe: %v", err) - } - return stream, nil + return "ready" } else if status == connectivity.Idle { - return nil, fmt.Errorf("connectivity status idle") + return "idle" } else { - return nil, fmt.Errorf("connectivity status not ready") + return "unconnected" + } +} + +// SubscribeToConfigServer Subscribes to a stream of NetworkSlice +// It returns a stream if subscription is successful else returns nil. +func (confClient *ConfigClient) SubscribeToConfigServer() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) { + logger.GrpcLog.Debugln("connectToGrpcServer") + hostname := os.Getenv("HOSTNAME") + randomId := hostname + "-" + strconv.Itoa(rand.Int()) + logger.GrpcLog.Debugln("connectivity ready") + rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: randomId, MetadataRequested: confClient.MetadataRequested} + if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil { + return stream, fmt.Errorf("failed to subscribe: %v", err) } + return stream, nil } -// subscribeToConfigPod subscribing channel to ConfigPod to receive configuration -// using stream and communication channel as inputs -func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) { +// subscribeToConfigPod receives the configuration changes using stream +// send messages to communication channel +func (confClient *ConfigClient) sendMessagesToChannel(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) { for { if stream == nil { time.Sleep(time.Second * 30) @@ -184,7 +194,6 @@ func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.Netwo continue } - logger.GrpcLog.Infof("new rsp: %v", rsp) if rsp != nil { logger.GrpcLog.Infoln("stream message received") logger.GrpcLog.Infof("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter) diff --git a/service/init.go b/service/init.go index 23dfeb2..89854e9 100644 --- a/service/init.go +++ b/service/init.go @@ -33,7 +33,6 @@ import ( "github.com/urfave/cli" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "google.golang.org/grpc/connectivity" ) type UDR struct{} @@ -106,7 +105,7 @@ func manageGrpcClient(webuiUri string) { if client != nil { logger.InitLog.Infoln("Checking the connectivity readiness") time.Sleep(time.Second * 60) - if client.GetConfigClientConn().GetState() != connectivity.Ready { + if client.CheckGrpcConnectivity() != "ready" { err = client.GetConfigClientConn().Close() if err != nil { logger.InitLog.Infof("failing ConfigClient is not closed properly: %+v", err) @@ -116,7 +115,11 @@ func manageGrpcClient(webuiUri string) { } if stream == nil { - stream, _ = client.CheckGrpcConnectivity() + stream, err = client.SubscribeToConfigServer() + if err != nil { + logger.InitLog.Infof("failing SubscribeToConfigServer: %+v", err) + continue + } } if configChannel == nil { @@ -129,6 +132,7 @@ func manageGrpcClient(webuiUri string) { } else { client, err = ConnectToConfigServer(webuiUri) + stream = nil logger.InitLog.Infoln("Connecting to config server.") if err != nil { logger.InitLog.Errorf("%+v", err) From f6a61d4d08f13514f27e48a23b0ce0b54f93c628 Mon Sep 17 00:00:00 2001 From: gatici Date: Fri, 25 Oct 2024 10:52:11 +0300 Subject: [PATCH 20/30] If config server restarted, reset the channel Signed-off-by: gatici --- service/init.go | 1 + 1 file changed, 1 insertion(+) diff --git a/service/init.go b/service/init.go index 89854e9..e074dcd 100644 --- a/service/init.go +++ b/service/init.go @@ -133,6 +133,7 @@ func manageGrpcClient(webuiUri string) { } else { client, err = ConnectToConfigServer(webuiUri) stream = nil + configChannel = nil logger.InitLog.Infoln("Connecting to config server.") if err != nil { logger.InitLog.Errorf("%+v", err) From 0952a2d847857f013a1a5389ac1fd23937aa9bea Mon Sep 17 00:00:00 2001 From: gatici Date: Fri, 25 Oct 2024 11:33:56 +0300 Subject: [PATCH 21/30] Replacing config5g with local folder for testing Signed-off-by: gatici --- go.mod | 4 +- go.sum | 2 - service/gclient.go | 222 --------------------------------------------- service/init.go | 5 +- 4 files changed, 6 insertions(+), 227 deletions(-) delete mode 100644 service/gclient.go diff --git a/go.mod b/go.mod index 5d23aac..4215066 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,7 @@ module github.com/omec-project/udr +replace github.com/omec-project/config5g => /home/gatici/omec/config5g + go 1.21 require ( @@ -15,7 +17,6 @@ require ( github.com/urfave/cli v1.22.16 go.mongodb.org/mongo-driver v1.17.1 go.uber.org/zap v1.27.0 - google.golang.org/grpc v1.67.1 gopkg.in/yaml.v2 v2.4.0 ) @@ -70,6 +71,7 @@ require ( golang.org/x/sys v0.26.0 // indirect golang.org/x/text v0.19.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect + google.golang.org/grpc v1.67.1 // indirect google.golang.org/protobuf v1.35.1 // indirect gopkg.in/h2non/gock.v1 v1.1.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 82945b5..3c6f917 100644 --- a/go.sum +++ b/go.sum @@ -80,8 +80,6 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy/FJl/rCYT0+EuS8+Z0z4= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms= -github.com/omec-project/config5g v1.5.3 h1:FPetMFU1/BXGzihcoRHJm0q1vU1AnYyR4Tq1T1gyvUQ= -github.com/omec-project/config5g v1.5.3/go.mod h1:HOvQtmi79f8cw35AiFHWHDoCTuZbXfMjeFJWgtPbwaI= github.com/omec-project/openapi v1.3.1 h1:NCteMRdMtWnMhf1CXYduuLgeu8fEhc/7XO1CiE7fN3Y= github.com/omec-project/openapi v1.3.1/go.mod h1:cR6Iharp2TLOzEmskQ/EdCVFZnpKh0zTvUSSuyXAYLE= github.com/omec-project/util v1.2.3 h1:h32ZYFT99+fB9VPp1CQUIKwrSP6RtX+PbFDcqmEHmn0= diff --git a/service/gclient.go b/service/gclient.go deleted file mode 100644 index 44bdd0e..0000000 --- a/service/gclient.go +++ /dev/null @@ -1,222 +0,0 @@ -package service - -import ( - "context" - "fmt" - "math/rand" - "os" - "strconv" - "time" - - "github.com/omec-project/config5g/logger" - protos "github.com/omec-project/config5g/proto/sdcoreConfig" - "google.golang.org/grpc" - "google.golang.org/grpc/backoff" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/keepalive" -) - -var ( - selfRestartCounter uint32 - configPodRestartCounter uint32 = 0 -) - -func init() { - s1 := rand.NewSource(time.Now().UnixNano()) - r1 := rand.New(s1) - selfRestartCounter = r1.Uint32() -} - -type PlmnId struct { - Mcc string - Mnc string -} - -type Nssai struct { - Sst string - Sd string -} - -type ConfigClient struct { - Client protos.ConfigServiceClient - Conn *grpc.ClientConn - Channel chan *protos.NetworkSliceResponse - Host string - Version string - MetadataRequested bool -} - -type ConfClient interface { - // PublishOnConfigChange creates a channel to perform the subscription using it. - // On Receiving Configuration from ConfigServer, this api publishes - // on created channel and returns the channel - PublishOnConfigChange(metadataRequested bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse - - // GetConfigClientConn returns grpc connection object - GetConfigClientConn() *grpc.ClientConn - - // subscribeToConfigPod receives the configuration changes using stream - // send messages to communication channel - sendMessagesToChannel(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) - - // CheckGrpcConnectivity checks the connectivity status and returns the state of connectivity - CheckGrpcConnectivity() (state string) - - // SubscribeToConfigServer Subscribes to a stream of NetworkSlice - // It returns a stream if subscription is successful else returns nil. - SubscribeToConfigServer() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) -} - -// ConnectToConfigServer this API is added to control metadata from NF clients -// Connects to the ConfigServer using host address -func ConnectToConfigServer(host string) (ConfClient, error) { - confClient := CreateConfClient(host) - if confClient == nil { - return nil, fmt.Errorf("create grpc channel to config pod failed") - } - return confClient, nil -} - -// PublishOnConfigChange creates a communication channel to publish the messages from ConfigServer to the channel -// then NFs gets the messages -func (confClient *ConfigClient) PublishOnConfigChange(metadataFlag bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse { - confClient.MetadataRequested = metadataFlag - commChan := make(chan *protos.NetworkSliceResponse) - confClient.Channel = commChan - logger.GrpcLog.Debugln("a communication channel is created for ConfigServer") - go confClient.sendMessagesToChannel(commChan, stream) - return commChan -} - -// CreateConfClient creates a GRPC client by connecting to GRPC server (host). -func CreateConfClient(host string) ConfClient { - logger.GrpcLog.Debugln("create config client") - // Second, check to see if we can reuse the gRPC connection for a new P4RT client - conn, err := newClientConnection(host) - if err != nil { - logger.GrpcLog.Errorf("grpc connection failed %v", err) - return nil - } - - client := &ConfigClient{ - Client: protos.NewConfigServiceClient(conn), - Conn: conn, - Host: host, - } - - return client -} - -var kacp = keepalive.ClientParameters{ - Time: 20 * time.Second, // send pings every 20 seconds if there is no activity - Timeout: 2 * time.Second, // wait 1 second for ping ack before considering the connection dead - PermitWithoutStream: true, // send pings even without active streams -} - -var retryPolicy = `{ - "methodConfig": [{ - "name": [{"service": "grpc.Config"}], - "waitForReady": true, - "retryPolicy": { - "MaxAttempts": 4, - "InitialBackoff": ".01s", - "MaxBackoff": ".01s", - "BackoffMultiplier": 1.0, - "RetryableStatusCodes": [ "UNAVAILABLE" ] - }}]}` - -// newClientConnection opens a GRPC connection to the host -func newClientConnection(host string) (conn *grpc.ClientConn, err error) { - logger.GrpcLog.Debugln("dial grpc connection:", host) - - bd := 1 * time.Second - mltpr := 1.0 - jitter := 0.2 - MaxDelay := 5 * time.Second - bc := backoff.Config{BaseDelay: bd, Multiplier: mltpr, Jitter: jitter, MaxDelay: MaxDelay} - - crt := grpc.ConnectParams{Backoff: bc} - dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithKeepaliveParams(kacp), grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithConnectParams(crt)} - conn, err = grpc.NewClient(host, dialOptions...) - if err != nil { - return nil, fmt.Errorf("grpc newclient creation failed: %v", err) - } - conn.Connect() - return conn, nil -} - -// GetConfigClientConn exposes the GRPC client connection -func (confClient *ConfigClient) GetConfigClientConn() *grpc.ClientConn { - return confClient.Conn -} - -// CheckGrpcConnectivity checks the connectivity status and returns the state of connectivity -func (confClient *ConfigClient) CheckGrpcConnectivity() (state string) { - logger.GrpcLog.Debugln("Checking GRPC connectivity status") - status := confClient.Conn.GetState() - if status == connectivity.Ready { - return "ready" - } else if status == connectivity.Idle { - return "idle" - } else { - return "unconnected" - } -} - -// SubscribeToConfigServer Subscribes to a stream of NetworkSlice -// It returns a stream if subscription is successful else returns nil. -func (confClient *ConfigClient) SubscribeToConfigServer() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) { - logger.GrpcLog.Debugln("connectToGrpcServer") - hostname := os.Getenv("HOSTNAME") - randomId := hostname + "-" + strconv.Itoa(rand.Int()) - logger.GrpcLog.Debugln("connectivity ready") - rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: randomId, MetadataRequested: confClient.MetadataRequested} - if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil { - return stream, fmt.Errorf("failed to subscribe: %v", err) - } - return stream, nil -} - -// subscribeToConfigPod receives the configuration changes using stream -// send messages to communication channel -func (confClient *ConfigClient) sendMessagesToChannel(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) { - for { - if stream == nil { - time.Sleep(time.Second * 30) - continue - } - rsp, err := stream.Recv() - if err != nil { - stream = nil - logger.GrpcLog.Errorf("failed to receive message: %v", err) - time.Sleep(time.Second * 5) - continue - } - - if rsp != nil { - logger.GrpcLog.Infoln("stream message received") - logger.GrpcLog.Infof("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter) - } - if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) { - // first time connection or config update - configPodRestartCounter = rsp.RestartCounter - if len(rsp.NetworkSlice) > 0 { - // always carries full config copy - logger.GrpcLog.Infoln("first time config received", rsp) - commChan <- rsp - } else if rsp.ConfigUpdated == 1 { - // config delete, all slices deleted - logger.GrpcLog.Infoln("complete config deleted") - commChan <- rsp - } - } else if len(rsp.NetworkSlice) > 0 { - logger.GrpcLog.Errorln("config received after config pod restart") - configPodRestartCounter = rsp.RestartCounter - commChan <- rsp - } else { - logger.GrpcLog.Errorln("config pod is restarted and no config received") - } - time.Sleep(time.Second * 10) - } -} diff --git a/service/init.go b/service/init.go index e074dcd..78e6aff 100644 --- a/service/init.go +++ b/service/init.go @@ -18,6 +18,7 @@ import ( "syscall" "time" + grpcClient "github.com/omec-project/config5g/proto/client" protos "github.com/omec-project/config5g/proto/sdcoreConfig" "github.com/omec-project/openapi/models" "github.com/omec-project/udr/consumer" @@ -98,7 +99,7 @@ func (udr *UDR) Initialize(c *cli.Context) error { // Then it updates UDR configuration. func manageGrpcClient(webuiUri string) { var configChannel chan *protos.NetworkSliceResponse - var client ConfClient + var client grpcClient.ConfClient var stream protos.ConfigService_NetworkSliceSubscribeClient var err error for { @@ -131,7 +132,7 @@ func manageGrpcClient(webuiUri string) { } } else { - client, err = ConnectToConfigServer(webuiUri) + client, err = grpcClient.ConnectToConfigServer(webuiUri) stream = nil configChannel = nil logger.InitLog.Infoln("Connecting to config server.") From 95eb71df05930890426472022311fde122e0b075 Mon Sep 17 00:00:00 2001 From: gatici Date: Fri, 25 Oct 2024 12:17:23 +0300 Subject: [PATCH 22/30] adding gclient again Signed-off-by: gatici --- go.mod | 4 +- service/gclient.go | 221 +++++++++++++++++++++++++++++++++++++++++++++ service/init.go | 7 +- 3 files changed, 226 insertions(+), 6 deletions(-) create mode 100644 service/gclient.go diff --git a/go.mod b/go.mod index 4215066..85d7e1c 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/omec-project/udr -replace github.com/omec-project/config5g => /home/gatici/omec/config5g +//replace github.com/omec-project/config5g => /home/gatici/omec/config5g go 1.21 @@ -17,6 +17,7 @@ require ( github.com/urfave/cli v1.22.16 go.mongodb.org/mongo-driver v1.17.1 go.uber.org/zap v1.27.0 + google.golang.org/grpc v1.67.1 gopkg.in/yaml.v2 v2.4.0 ) @@ -71,7 +72,6 @@ require ( golang.org/x/sys v0.26.0 // indirect golang.org/x/text v0.19.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect - google.golang.org/grpc v1.67.1 // indirect google.golang.org/protobuf v1.35.1 // indirect gopkg.in/h2non/gock.v1 v1.1.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/service/gclient.go b/service/gclient.go new file mode 100644 index 0000000..c17a9a8 --- /dev/null +++ b/service/gclient.go @@ -0,0 +1,221 @@ +package service + +import ( + "context" + "fmt" + "math/rand" + "os" + "time" + + "github.com/omec-project/config5g/logger" + protos "github.com/omec-project/config5g/proto/sdcoreConfig" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" +) + +var ( + selfRestartCounter uint32 + configPodRestartCounter uint32 = 0 +) + +func init() { + s1 := rand.NewSource(time.Now().UnixNano()) + r1 := rand.New(s1) + selfRestartCounter = r1.Uint32() +} + +type PlmnId struct { + Mcc string + Mnc string +} + +type Nssai struct { + Sst string + Sd string +} + +type ConfigClient struct { + Client protos.ConfigServiceClient + Conn *grpc.ClientConn + Channel chan *protos.NetworkSliceResponse + Host string + Version string + MetadataRequested bool +} + +type ConfClient interface { + // PublishOnConfigChange creates a channel to perform the subscription using it. + // On Receiving Configuration from ConfigServer, this api publishes + // on created channel and returns the channel + PublishOnConfigChange(metadataRequested bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse + + // GetConfigClientConn returns grpc connection object + GetConfigClientConn() *grpc.ClientConn + + // sendMessagesToChannel receives the configuration changes using stream + // send messages to communication channel + sendMessagesToChannel(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) + + // CheckGrpcConnectivity checks the connectivity status and returns the state of connectivity + CheckGrpcConnectivity() (state string) + + // SubscribeToConfigServer Subscribes to a stream of NetworkSlice + // It returns a stream if subscription is successful else returns nil. + SubscribeToConfigServer() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) +} + +// ConnectToConfigServer this API is added to control metadata from NF clients +// Connects to the ConfigServer using host address +func ConnectToConfigServer(host string) (ConfClient, error) { + confClient := CreateConfClient(host) + if confClient == nil { + return nil, fmt.Errorf("create grpc channel to config pod failed") + } + return confClient, nil +} + +// PublishOnConfigChange creates a communication channel to publish the messages from ConfigServer to the channel +// then NFs gets the messages +func (confClient *ConfigClient) PublishOnConfigChange(metadataFlag bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse { + confClient.MetadataRequested = metadataFlag + commChan := make(chan *protos.NetworkSliceResponse) + confClient.Channel = commChan + logger.GrpcLog.Debugln("a communication channel is created for ConfigServer") + go confClient.sendMessagesToChannel(commChan, stream) + return commChan +} + +// CreateConfClient creates a GRPC client by connecting to GRPC server (host). +func CreateConfClient(host string) ConfClient { + logger.GrpcLog.Debugln("create config client") + // Second, check to see if we can reuse the gRPC connection for a new P4RT client + conn, err := newClientConnection(host) + if err != nil { + logger.GrpcLog.Errorf("grpc connection failed %v", err) + return nil + } + + client := &ConfigClient{ + Client: protos.NewConfigServiceClient(conn), + Conn: conn, + Host: host, + } + + return client +} + +var kacp = keepalive.ClientParameters{ + Time: 20 * time.Second, // send pings every 20 seconds if there is no activity + Timeout: 2 * time.Second, // wait 1 second for ping ack before considering the connection dead + PermitWithoutStream: true, // send pings even without active streams +} + +var retryPolicy = `{ + "methodConfig": [{ + "name": [{"service": "grpc.Config"}], + "waitForReady": true, + "retryPolicy": { + "MaxAttempts": 4, + "InitialBackoff": ".01s", + "MaxBackoff": ".01s", + "BackoffMultiplier": 1.0, + "RetryableStatusCodes": [ "UNAVAILABLE" ] + }}]}` + +// newClientConnection opens a GRPC connection to the host +func newClientConnection(host string) (conn *grpc.ClientConn, err error) { + logger.GrpcLog.Debugln("dial grpc connection:", host) + + bd := 1 * time.Second + mltpr := 1.0 + jitter := 0.2 + MaxDelay := 5 * time.Second + bc := backoff.Config{BaseDelay: bd, Multiplier: mltpr, Jitter: jitter, MaxDelay: MaxDelay} + + crt := grpc.ConnectParams{Backoff: bc} + dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithKeepaliveParams(kacp), grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithConnectParams(crt)} + conn, err = grpc.NewClient(host, dialOptions...) + if err != nil { + return nil, fmt.Errorf("grpc newclient creation failed: %v", err) + } + conn.Connect() + return conn, nil +} + +// GetConfigClientConn exposes the GRPC client connection +func (confClient *ConfigClient) GetConfigClientConn() *grpc.ClientConn { + return confClient.Conn +} + +// CheckGrpcConnectivity checks the connectivity status and returns the state of connectivity +func (confClient *ConfigClient) CheckGrpcConnectivity() (state string) { + logger.GrpcLog.Debugln("checking GRPC connectivity status") + status := confClient.Conn.GetState() + if status == connectivity.Ready { + return "ready" + } else if status == connectivity.Idle { + return "idle" + } else { + return "unconnected" + } +} + +// SubscribeToConfigServer Subscribes to a stream of NetworkSlice +// It returns a stream if subscription is successful else returns nil. +func (confClient *ConfigClient) SubscribeToConfigServer() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) { + logger.GrpcLog.Debugln("SubscribeToConfigServer") + hostname := os.Getenv("HOSTNAME") + // randomId := hostname + "-" + strconv.Itoa(rand.Int()) + rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: hostname, MetadataRequested: confClient.MetadataRequested} + if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil { + return stream, fmt.Errorf("failed to subscribe: %v", err) + } + logger.GrpcLog.Debugln("subscribed to config server successfully") + return stream, nil +} + +// sendMessagesToChannel receives the configuration changes using stream +// and send messages to communication channel +func (confClient *ConfigClient) sendMessagesToChannel(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) { + for { + if stream == nil { + time.Sleep(time.Second * 30) + continue + } + rsp, err := stream.Recv() + if err != nil { + stream = nil + logger.GrpcLog.Errorf("failed to receive message: %v", err) + time.Sleep(time.Second * 5) + continue + } + + if rsp != nil { + logger.GrpcLog.Infoln("stream message received") + logger.GrpcLog.Debugf("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter) + } + if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) { + // first time connection or config update + configPodRestartCounter = rsp.RestartCounter + if len(rsp.NetworkSlice) > 0 { + // always carries full config copy + logger.GrpcLog.Infoln("first time config received", rsp) + commChan <- rsp + } else if rsp.ConfigUpdated == 1 { + // config delete, all slices deleted + logger.GrpcLog.Infoln("complete config deleted") + commChan <- rsp + } + } else if len(rsp.NetworkSlice) > 0 { + logger.GrpcLog.Errorln("config received after config pod restart") + configPodRestartCounter = rsp.RestartCounter + commChan <- rsp + } else { + logger.GrpcLog.Errorln("config pod is restarted and no config received") + } + time.Sleep(time.Second * 10) + } +} diff --git a/service/init.go b/service/init.go index 78e6aff..57c58e8 100644 --- a/service/init.go +++ b/service/init.go @@ -18,7 +18,6 @@ import ( "syscall" "time" - grpcClient "github.com/omec-project/config5g/proto/client" protos "github.com/omec-project/config5g/proto/sdcoreConfig" "github.com/omec-project/openapi/models" "github.com/omec-project/udr/consumer" @@ -99,13 +98,13 @@ func (udr *UDR) Initialize(c *cli.Context) error { // Then it updates UDR configuration. func manageGrpcClient(webuiUri string) { var configChannel chan *protos.NetworkSliceResponse - var client grpcClient.ConfClient + var client ConfClient var stream protos.ConfigService_NetworkSliceSubscribeClient var err error for { if client != nil { logger.InitLog.Infoln("Checking the connectivity readiness") - time.Sleep(time.Second * 60) + time.Sleep(time.Second * 120) if client.CheckGrpcConnectivity() != "ready" { err = client.GetConfigClientConn().Close() if err != nil { @@ -132,7 +131,7 @@ func manageGrpcClient(webuiUri string) { } } else { - client, err = grpcClient.ConnectToConfigServer(webuiUri) + client, err = ConnectToConfigServer(webuiUri) stream = nil configChannel = nil logger.InitLog.Infoln("Connecting to config server.") From eef705d691cb8a3796acf16eeaaecb13d2bae79c Mon Sep 17 00:00:00 2001 From: gatici Date: Fri, 25 Oct 2024 13:13:23 +0300 Subject: [PATCH 23/30] arrranging sleep seconds Signed-off-by: gatici --- service/gclient.go | 7 +++---- service/init.go | 14 +++++++++----- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/service/gclient.go b/service/gclient.go index c17a9a8..535bbfc 100644 --- a/service/gclient.go +++ b/service/gclient.go @@ -167,9 +167,8 @@ func (confClient *ConfigClient) CheckGrpcConnectivity() (state string) { // It returns a stream if subscription is successful else returns nil. func (confClient *ConfigClient) SubscribeToConfigServer() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) { logger.GrpcLog.Debugln("SubscribeToConfigServer") - hostname := os.Getenv("HOSTNAME") - // randomId := hostname + "-" + strconv.Itoa(rand.Int()) - rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: hostname, MetadataRequested: confClient.MetadataRequested} + clientId := os.Getenv("HOSTNAME") + rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: clientId, MetadataRequested: confClient.MetadataRequested} if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil { return stream, fmt.Errorf("failed to subscribe: %v", err) } @@ -216,6 +215,6 @@ func (confClient *ConfigClient) sendMessagesToChannel(commChan chan *protos.Netw } else { logger.GrpcLog.Errorln("config pod is restarted and no config received") } - time.Sleep(time.Second * 10) + time.Sleep(time.Second * 5) } } diff --git a/service/init.go b/service/init.go index 57c58e8..6050cbf 100644 --- a/service/init.go +++ b/service/init.go @@ -103,14 +103,18 @@ func manageGrpcClient(webuiUri string) { var err error for { if client != nil { + count := 0 logger.InitLog.Infoln("Checking the connectivity readiness") - time.Sleep(time.Second * 120) if client.CheckGrpcConnectivity() != "ready" { - err = client.GetConfigClientConn().Close() - if err != nil { - logger.InitLog.Infof("failing ConfigClient is not closed properly: %+v", err) + time.Sleep(time.Second * 30) + count++ + if count > 5 { + err = client.GetConfigClientConn().Close() + if err != nil { + logger.InitLog.Infof("failing ConfigClient is not closed properly: %+v", err) + } + client = nil } - client = nil continue } From eead42bb4d0bbe34ab0582675422ba917f2cbc5c Mon Sep 17 00:00:00 2001 From: gatici Date: Fri, 25 Oct 2024 13:26:48 +0300 Subject: [PATCH 24/30] importing gclient from config5g Signed-off-by: gatici --- go.mod | 4 +- service/gclient.go | 220 --------------------------------------------- service/init.go | 5 +- 3 files changed, 5 insertions(+), 224 deletions(-) delete mode 100644 service/gclient.go diff --git a/go.mod b/go.mod index 85d7e1c..4215066 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/omec-project/udr -//replace github.com/omec-project/config5g => /home/gatici/omec/config5g +replace github.com/omec-project/config5g => /home/gatici/omec/config5g go 1.21 @@ -17,7 +17,6 @@ require ( github.com/urfave/cli v1.22.16 go.mongodb.org/mongo-driver v1.17.1 go.uber.org/zap v1.27.0 - google.golang.org/grpc v1.67.1 gopkg.in/yaml.v2 v2.4.0 ) @@ -72,6 +71,7 @@ require ( golang.org/x/sys v0.26.0 // indirect golang.org/x/text v0.19.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect + google.golang.org/grpc v1.67.1 // indirect google.golang.org/protobuf v1.35.1 // indirect gopkg.in/h2non/gock.v1 v1.1.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/service/gclient.go b/service/gclient.go deleted file mode 100644 index 535bbfc..0000000 --- a/service/gclient.go +++ /dev/null @@ -1,220 +0,0 @@ -package service - -import ( - "context" - "fmt" - "math/rand" - "os" - "time" - - "github.com/omec-project/config5g/logger" - protos "github.com/omec-project/config5g/proto/sdcoreConfig" - "google.golang.org/grpc" - "google.golang.org/grpc/backoff" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/keepalive" -) - -var ( - selfRestartCounter uint32 - configPodRestartCounter uint32 = 0 -) - -func init() { - s1 := rand.NewSource(time.Now().UnixNano()) - r1 := rand.New(s1) - selfRestartCounter = r1.Uint32() -} - -type PlmnId struct { - Mcc string - Mnc string -} - -type Nssai struct { - Sst string - Sd string -} - -type ConfigClient struct { - Client protos.ConfigServiceClient - Conn *grpc.ClientConn - Channel chan *protos.NetworkSliceResponse - Host string - Version string - MetadataRequested bool -} - -type ConfClient interface { - // PublishOnConfigChange creates a channel to perform the subscription using it. - // On Receiving Configuration from ConfigServer, this api publishes - // on created channel and returns the channel - PublishOnConfigChange(metadataRequested bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse - - // GetConfigClientConn returns grpc connection object - GetConfigClientConn() *grpc.ClientConn - - // sendMessagesToChannel receives the configuration changes using stream - // send messages to communication channel - sendMessagesToChannel(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) - - // CheckGrpcConnectivity checks the connectivity status and returns the state of connectivity - CheckGrpcConnectivity() (state string) - - // SubscribeToConfigServer Subscribes to a stream of NetworkSlice - // It returns a stream if subscription is successful else returns nil. - SubscribeToConfigServer() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) -} - -// ConnectToConfigServer this API is added to control metadata from NF clients -// Connects to the ConfigServer using host address -func ConnectToConfigServer(host string) (ConfClient, error) { - confClient := CreateConfClient(host) - if confClient == nil { - return nil, fmt.Errorf("create grpc channel to config pod failed") - } - return confClient, nil -} - -// PublishOnConfigChange creates a communication channel to publish the messages from ConfigServer to the channel -// then NFs gets the messages -func (confClient *ConfigClient) PublishOnConfigChange(metadataFlag bool, stream protos.ConfigService_NetworkSliceSubscribeClient) chan *protos.NetworkSliceResponse { - confClient.MetadataRequested = metadataFlag - commChan := make(chan *protos.NetworkSliceResponse) - confClient.Channel = commChan - logger.GrpcLog.Debugln("a communication channel is created for ConfigServer") - go confClient.sendMessagesToChannel(commChan, stream) - return commChan -} - -// CreateConfClient creates a GRPC client by connecting to GRPC server (host). -func CreateConfClient(host string) ConfClient { - logger.GrpcLog.Debugln("create config client") - // Second, check to see if we can reuse the gRPC connection for a new P4RT client - conn, err := newClientConnection(host) - if err != nil { - logger.GrpcLog.Errorf("grpc connection failed %v", err) - return nil - } - - client := &ConfigClient{ - Client: protos.NewConfigServiceClient(conn), - Conn: conn, - Host: host, - } - - return client -} - -var kacp = keepalive.ClientParameters{ - Time: 20 * time.Second, // send pings every 20 seconds if there is no activity - Timeout: 2 * time.Second, // wait 1 second for ping ack before considering the connection dead - PermitWithoutStream: true, // send pings even without active streams -} - -var retryPolicy = `{ - "methodConfig": [{ - "name": [{"service": "grpc.Config"}], - "waitForReady": true, - "retryPolicy": { - "MaxAttempts": 4, - "InitialBackoff": ".01s", - "MaxBackoff": ".01s", - "BackoffMultiplier": 1.0, - "RetryableStatusCodes": [ "UNAVAILABLE" ] - }}]}` - -// newClientConnection opens a GRPC connection to the host -func newClientConnection(host string) (conn *grpc.ClientConn, err error) { - logger.GrpcLog.Debugln("dial grpc connection:", host) - - bd := 1 * time.Second - mltpr := 1.0 - jitter := 0.2 - MaxDelay := 5 * time.Second - bc := backoff.Config{BaseDelay: bd, Multiplier: mltpr, Jitter: jitter, MaxDelay: MaxDelay} - - crt := grpc.ConnectParams{Backoff: bc} - dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithKeepaliveParams(kacp), grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithConnectParams(crt)} - conn, err = grpc.NewClient(host, dialOptions...) - if err != nil { - return nil, fmt.Errorf("grpc newclient creation failed: %v", err) - } - conn.Connect() - return conn, nil -} - -// GetConfigClientConn exposes the GRPC client connection -func (confClient *ConfigClient) GetConfigClientConn() *grpc.ClientConn { - return confClient.Conn -} - -// CheckGrpcConnectivity checks the connectivity status and returns the state of connectivity -func (confClient *ConfigClient) CheckGrpcConnectivity() (state string) { - logger.GrpcLog.Debugln("checking GRPC connectivity status") - status := confClient.Conn.GetState() - if status == connectivity.Ready { - return "ready" - } else if status == connectivity.Idle { - return "idle" - } else { - return "unconnected" - } -} - -// SubscribeToConfigServer Subscribes to a stream of NetworkSlice -// It returns a stream if subscription is successful else returns nil. -func (confClient *ConfigClient) SubscribeToConfigServer() (stream protos.ConfigService_NetworkSliceSubscribeClient, err error) { - logger.GrpcLog.Debugln("SubscribeToConfigServer") - clientId := os.Getenv("HOSTNAME") - rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: clientId, MetadataRequested: confClient.MetadataRequested} - if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil { - return stream, fmt.Errorf("failed to subscribe: %v", err) - } - logger.GrpcLog.Debugln("subscribed to config server successfully") - return stream, nil -} - -// sendMessagesToChannel receives the configuration changes using stream -// and send messages to communication channel -func (confClient *ConfigClient) sendMessagesToChannel(commChan chan *protos.NetworkSliceResponse, stream protos.ConfigService_NetworkSliceSubscribeClient) { - for { - if stream == nil { - time.Sleep(time.Second * 30) - continue - } - rsp, err := stream.Recv() - if err != nil { - stream = nil - logger.GrpcLog.Errorf("failed to receive message: %v", err) - time.Sleep(time.Second * 5) - continue - } - - if rsp != nil { - logger.GrpcLog.Infoln("stream message received") - logger.GrpcLog.Debugf("network slices %d, RC of config pod %d", len(rsp.NetworkSlice), rsp.RestartCounter) - } - if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) { - // first time connection or config update - configPodRestartCounter = rsp.RestartCounter - if len(rsp.NetworkSlice) > 0 { - // always carries full config copy - logger.GrpcLog.Infoln("first time config received", rsp) - commChan <- rsp - } else if rsp.ConfigUpdated == 1 { - // config delete, all slices deleted - logger.GrpcLog.Infoln("complete config deleted") - commChan <- rsp - } - } else if len(rsp.NetworkSlice) > 0 { - logger.GrpcLog.Errorln("config received after config pod restart") - configPodRestartCounter = rsp.RestartCounter - commChan <- rsp - } else { - logger.GrpcLog.Errorln("config pod is restarted and no config received") - } - time.Sleep(time.Second * 5) - } -} diff --git a/service/init.go b/service/init.go index 6050cbf..6870ee4 100644 --- a/service/init.go +++ b/service/init.go @@ -18,6 +18,7 @@ import ( "syscall" "time" + grpcClient "github.com/omec-project/config5g/proto/client" protos "github.com/omec-project/config5g/proto/sdcoreConfig" "github.com/omec-project/openapi/models" "github.com/omec-project/udr/consumer" @@ -98,7 +99,7 @@ func (udr *UDR) Initialize(c *cli.Context) error { // Then it updates UDR configuration. func manageGrpcClient(webuiUri string) { var configChannel chan *protos.NetworkSliceResponse - var client ConfClient + var client grpcClient.ConfClient var stream protos.ConfigService_NetworkSliceSubscribeClient var err error for { @@ -135,7 +136,7 @@ func manageGrpcClient(webuiUri string) { } } else { - client, err = ConnectToConfigServer(webuiUri) + client, err = grpcClient.ConnectToConfigServer(webuiUri) stream = nil configChannel = nil logger.InitLog.Infoln("Connecting to config server.") From e5dd1a015b075284b551a2889a6d56fb92e3b037 Mon Sep 17 00:00:00 2001 From: gatici Date: Fri, 25 Oct 2024 13:50:33 +0300 Subject: [PATCH 25/30] resolve merge conflicts Signed-off-by: gatici --- factory/factory.go | 15 --------------- service/init.go | 5 +++-- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/factory/factory.go b/factory/factory.go index aa12854..0e3b2a7 100644 --- a/factory/factory.go +++ b/factory/factory.go @@ -16,21 +16,6 @@ import ( protos "github.com/omec-project/config5g/proto/sdcoreConfig" "github.com/omec-project/udr/logger" -<<<<<<< HEAD -======= - "go.uber.org/zap" -<<<<<<< HEAD -<<<<<<< HEAD -<<<<<<< HEAD - "google.golang.org/grpc/connectivity" ->>>>>>> 4fa3dfc (fix: rename and organize a method) -======= ->>>>>>> bd9df4e (fix: change method) -======= - "google.golang.org/grpc/connectivity" ->>>>>>> 7bb8bf6 (modify subscribeToConfigPod) -======= ->>>>>>> 12f8d52 (Trigger config updates in service) "gopkg.in/yaml.v2" ) diff --git a/service/init.go b/service/init.go index 6870ee4..d0b3602 100644 --- a/service/init.go +++ b/service/init.go @@ -102,10 +102,9 @@ func manageGrpcClient(webuiUri string) { var client grpcClient.ConfClient var stream protos.ConfigService_NetworkSliceSubscribeClient var err error + count := 0 for { if client != nil { - count := 0 - logger.InitLog.Infoln("Checking the connectivity readiness") if client.CheckGrpcConnectivity() != "ready" { time.Sleep(time.Second * 30) count++ @@ -115,8 +114,10 @@ func manageGrpcClient(webuiUri string) { logger.InitLog.Infof("failing ConfigClient is not closed properly: %+v", err) } client = nil + count = 0 } continue + logger.InitLog.Infoln("checking the connectivity readiness") } if stream == nil { From 76e96deb8367202b207dd49cad062fad37d695b5 Mon Sep 17 00:00:00 2001 From: gatici Date: Fri, 25 Oct 2024 13:57:19 +0300 Subject: [PATCH 26/30] fix linting issues Signed-off-by: gatici --- service/init.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/service/init.go b/service/init.go index d0b3602..070d919 100644 --- a/service/init.go +++ b/service/init.go @@ -116,8 +116,8 @@ func manageGrpcClient(webuiUri string) { client = nil count = 0 } - continue logger.InitLog.Infoln("checking the connectivity readiness") + continue } if stream == nil { @@ -135,7 +135,6 @@ func manageGrpcClient(webuiUri string) { go factory.UdrConfig.UpdateConfig(configChannel, factory.ConfigUpdateDbTrigger) logger.InitLog.Infoln("UDR updateConfig is triggered.") } - } else { client, err = grpcClient.ConnectToConfigServer(webuiUri) stream = nil From 2ab654f6956da4358a76c4caf3eb2a18a7c58ed2 Mon Sep 17 00:00:00 2001 From: gatici Date: Sat, 26 Oct 2024 15:52:13 +0300 Subject: [PATCH 27/30] chore: use updated config5g version Signed-off-by: gatici --- go.mod | 4 +--- go.sum | 2 ++ service/init.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 4215066..f44f4f2 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,5 @@ module github.com/omec-project/udr -replace github.com/omec-project/config5g => /home/gatici/omec/config5g - go 1.21 require ( @@ -9,7 +7,7 @@ require ( github.com/gin-gonic/gin v1.10.0 github.com/google/uuid v1.6.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/omec-project/config5g v1.5.3 + github.com/omec-project/config5g v1.5.4 github.com/omec-project/openapi v1.3.1 github.com/omec-project/util v1.2.3 github.com/prometheus/client_golang v1.20.5 diff --git a/go.sum b/go.sum index 3c6f917..2b98089 100644 --- a/go.sum +++ b/go.sum @@ -80,6 +80,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy/FJl/rCYT0+EuS8+Z0z4= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms= +github.com/omec-project/config5g v1.5.4 h1:5JMw5Fsr5qyLZpQi3IZQaQPj78QJMFQXDVS3QDMOY9Y= +github.com/omec-project/config5g v1.5.4/go.mod h1:HOvQtmi79f8cw35AiFHWHDoCTuZbXfMjeFJWgtPbwaI= github.com/omec-project/openapi v1.3.1 h1:NCteMRdMtWnMhf1CXYduuLgeu8fEhc/7XO1CiE7fN3Y= github.com/omec-project/openapi v1.3.1/go.mod h1:cR6Iharp2TLOzEmskQ/EdCVFZnpKh0zTvUSSuyXAYLE= github.com/omec-project/util v1.2.3 h1:h32ZYFT99+fB9VPp1CQUIKwrSP6RtX+PbFDcqmEHmn0= diff --git a/service/init.go b/service/init.go index 070d919..6e42696 100644 --- a/service/init.go +++ b/service/init.go @@ -139,7 +139,7 @@ func manageGrpcClient(webuiUri string) { client, err = grpcClient.ConnectToConfigServer(webuiUri) stream = nil configChannel = nil - logger.InitLog.Infoln("Connecting to config server.") + logger.InitLog.Infoln("connecting to config server.") if err != nil { logger.InitLog.Errorf("%+v", err) } From 78cd8b162fffc02b0d5289100906319e49b495bf Mon Sep 17 00:00:00 2001 From: gatici Date: Sat, 26 Oct 2024 15:56:12 +0300 Subject: [PATCH 28/30] removing spots Signed-off-by: gatici --- service/init.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/service/init.go b/service/init.go index 6e42696..37826e9 100644 --- a/service/init.go +++ b/service/init.go @@ -130,16 +130,16 @@ func manageGrpcClient(webuiUri string) { if configChannel == nil { configChannel = client.PublishOnConfigChange(true, stream) - logger.InitLog.Infoln("PublishOnConfigChange is triggered.") + logger.InitLog.Infoln("PublishOnConfigChange is triggered") factory.ConfigUpdateDbTrigger = make(chan *factory.UpdateDb, 10) go factory.UdrConfig.UpdateConfig(configChannel, factory.ConfigUpdateDbTrigger) - logger.InitLog.Infoln("UDR updateConfig is triggered.") + logger.InitLog.Infoln("UDR updateConfig is triggered") } } else { client, err = grpcClient.ConnectToConfigServer(webuiUri) stream = nil configChannel = nil - logger.InitLog.Infoln("connecting to config server.") + logger.InitLog.Infoln("connecting to config server") if err != nil { logger.InitLog.Errorf("%+v", err) } From 5a55120e9e7f7ab30dfe0cbd416f665b190b3a33 Mon Sep 17 00:00:00 2001 From: gatici Date: Sun, 27 Oct 2024 14:05:40 +0300 Subject: [PATCH 29/30] move if clause to check MANAGED_BY_CONFIG_POD env value to service Signed-off-by: gatici --- factory/factory.go | 8 -------- service/init.go | 5 +++++ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/factory/factory.go b/factory/factory.go index 0e3b2a7..2678ea8 100644 --- a/factory/factory.go +++ b/factory/factory.go @@ -51,14 +51,6 @@ func InitConfigFactory(f string) error { if UdrConfig.Configuration.WebuiUri == "" { UdrConfig.Configuration.WebuiUri = "webui:9876" } - if os.Getenv("MANAGED_BY_CONFIG_POD") == "true" { - logger.InitLog.Infoln("MANAGED_BY_CONFIG_POD is true") - } else { - go func() { - logger.InitLog.Infoln("use helm chart config") - ConfigPodTrigger <- true - }() - } } return nil diff --git a/service/init.go b/service/init.go index 37826e9..4edebe7 100644 --- a/service/init.go +++ b/service/init.go @@ -90,6 +90,11 @@ func (udr *UDR) Initialize(c *cli.Context) error { if os.Getenv("MANAGED_BY_CONFIG_POD") == "true" { logger.InitLog.Infoln("MANAGED_BY_CONFIG_POD is true") go manageGrpcClient(factory.UdrConfig.Configuration.WebuiUri) + } else { + go func() { + logger.InitLog.Infoln("use helm chart config") + factory.ConfigPodTrigger <- true + }() } return nil From 861633af0015a0cc7fb07fb034f2caa5b4aae06c Mon Sep 17 00:00:00 2001 From: gatici Date: Wed, 6 Nov 2024 17:22:17 +0300 Subject: [PATCH 30/30] Create UpdateDB channel in config.go init function Signed-off-by: gatici --- factory/config.go | 1 + service/init.go | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/factory/config.go b/factory/config.go index 669e222..9b4c3e5 100644 --- a/factory/config.go +++ b/factory/config.go @@ -80,6 +80,7 @@ var ( func init() { ConfigPodTrigger = make(chan bool) + ConfigUpdateDbTrigger = make(chan *UpdateDb, 10) } func (c *Config) GetVersion() string { diff --git a/service/init.go b/service/init.go index 4edebe7..146abc2 100644 --- a/service/init.go +++ b/service/init.go @@ -136,7 +136,6 @@ func manageGrpcClient(webuiUri string) { if configChannel == nil { configChannel = client.PublishOnConfigChange(true, stream) logger.InitLog.Infoln("PublishOnConfigChange is triggered") - factory.ConfigUpdateDbTrigger = make(chan *factory.UpdateDb, 10) go factory.UdrConfig.UpdateConfig(configChannel, factory.ConfigUpdateDbTrigger) logger.InitLog.Infoln("UDR updateConfig is triggered") }