Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aether Scaling #161

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion consumer/nf_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
"context"
"fmt"
"net/http"
"sort"
"strconv"
"strings"

"github.com/antihax/optional"
"github.com/omec-project/openapi/Nnrf_NFDiscovery"
Expand Down Expand Up @@ -96,15 +99,37 @@
logger.Consumerlog.Error(err.Error())
return ""
}
nfInstanceIds := make([]string, 0, len(result.NfInstances))
for _, profile := range result.NfInstances {
nfInstanceIds = append(nfInstanceIds, profile.NfInstanceId)
}
sort.Strings(nfInstanceIds)

nfInstanceIdIndexMap := make(map[string]int)
for index, value := range nfInstanceIds {
nfInstanceIdIndexMap[value] = index
}

nfInstanceIndex := 0
if pcfContext.PCF_Self().EnableScaling && len(result.NfInstances) > 0 {
parts := strings.Split(id, "-")
imsiNumber, _ := strconv.Atoi(parts[1])

Check failure on line 116 in consumer/nf_discovery.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `strconv.Atoi` is not checked (errcheck)
nfInstanceIndex = imsiNumber % len(result.NfInstances)
}

for _, profile := range result.NfInstances {
if nfInstanceIndex != nfInstanceIdIndexMap[profile.NfInstanceId] {
continue
}
if uri := util.SearchNFServiceUri(profile, models.ServiceName_NUDR_DR, models.NfServiceStatus_REGISTERED); uri != "" {
logger.ConsumerLog.Warnln("for Ue: ", id, " nfInstanceIndex: ", nfInstanceIndex, " for targetNfType ", string(targetNfType), " uri:", uri, " NF is: ", profile)
return uri
}
}
return ""
}

func SendNFInstancesAMF(nrfUri string, guami models.Guami, serviceName models.ServiceName) string {
func SendNFInstancesAMF(nrfUri string, guami models.Guami, serviceName models.ServiceName, ipAddress string) string {
targetNfType := models.NfType_AMF
requestNfType := models.NfType_PCF

Expand All @@ -126,6 +151,10 @@
return ""
}
for _, profile := range result.NfInstances {
//logger.ConsumerLog.Infof("Looking for ip match %v %v %d", profile.Ipv4Addresses[0], ipAddress, len(result.NfInstances))
if profile.Ipv4Addresses[0] != ipAddress {
continue
}
return util.SearchNFServiceUri(profile, serviceName, models.NfServiceStatus_REGISTERED)
}
return ""
Expand Down
1 change: 1 addition & 0 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
pcfCtx.PcfSubscriberPolicyData = make(map[string]*PcfSubscriberPolicyData)
}

type PCFContext struct {

Check failure on line 40 in context/context.go

View workflow job for this annotation

GitHub Actions / lint

fieldalignment: struct of size 480 could be 472 (govet)
NfId string
Name string
UriScheme models.UriScheme
Expand Down Expand Up @@ -71,6 +71,7 @@
DefaultUdrURILock sync.RWMutex
EnableNrfCaching bool
NrfCacheEvictionInterval time.Duration
EnableScaling bool
}

type SessionPolicy struct {
Expand Down
1 change: 1 addition & 0 deletions factory/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
PCF_DEFAULT_PORT_INT = 8000
)

type Configuration struct {

Check failure on line 39 in factory/config.go

View workflow job for this annotation

GitHub Actions / lint

fieldalignment: struct of size 176 could be 168 (govet)
PcfName string `yaml:"pcfName,omitempty"`
Sbi *Sbi `yaml:"sbi,omitempty"`
TimeFormat string `yaml:"timeFormat,omitempty"`
Expand All @@ -52,6 +52,7 @@
PlmnList []PlmnSupportItem `yaml:"plmnList,omitempty"`
EnableNrfCaching bool `yaml:"enableNrfCaching"`
NrfCacheEvictionInterval int `yaml:"nrfCacheEvictionInterval,omitempty"`
EnableScaling bool `yaml:"enableScaling,omitempty"`
}

type Service struct {
Expand Down
66 changes: 66 additions & 0 deletions httpcallback/nf_subscribe_notify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// SPDX-FileCopyrightText: 2021 Open Networking Foundation <[email protected]>
// Copyright 2019 free5GC.org
//
// SPDX-License-Identifier: Apache-2.0
//

package httpcallback

import (
"net/http"

"github.com/gin-gonic/gin"

"github.com/omec-project/openapi"
"github.com/omec-project/openapi/models"
"github.com/omec-project/pcf/logger"
"github.com/omec-project/pcf/producer"
"github.com/omec-project/util/httpwrapper"
)

func HTTPNfSubscriptionStatusNotify(c *gin.Context) {
var nfSubscriptionStatusNotification models.NotificationData

requestBody, err := c.GetRawData()
if err != nil {
logger.CallbackLog.Errorf("Get Request Body error: %+v", err)
problemDetail := models.ProblemDetails{
Title: "System failure",
Status: http.StatusInternalServerError,
Detail: err.Error(),
Cause: "SYSTEM_FAILURE",
}
c.JSON(http.StatusInternalServerError, problemDetail)
return
}

err = openapi.Deserialize(&nfSubscriptionStatusNotification, requestBody, "application/json")
if err != nil {
problemDetail := "[Request Body] " + err.Error()
rsp := models.ProblemDetails{
Title: "Malformed request syntax",
Status: http.StatusBadRequest,
Detail: problemDetail,
}
logger.CallbackLog.Errorln(problemDetail)
c.JSON(http.StatusBadRequest, rsp)
return
}

req := httpwrapper.NewRequest(c.Request, nfSubscriptionStatusNotification)

rsp := producer.HandleNfSubscriptionStatusNotify(req)

responseBody, err := openapi.Serialize(rsp.Body, "application/json")
if err != nil {
logger.CallbackLog.Errorln(err)
problemDetails := models.ProblemDetails{
Status: http.StatusInternalServerError,
Cause: "SYSTEM_FAILURE",
Detail: err.Error(),
}
c.JSON(http.StatusInternalServerError, problemDetails)
} else if rsp.Body != nil {
c.Data(rsp.Status, "application/json", responseBody)
}
}
4 changes: 4 additions & 0 deletions httpcallback/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,8 @@ var routes = Routes{
Name: "HTTPAmfStatusChangeNotify", Method: "POST",
Pattern: "/amfstatus", HandlerFunc: HTTPAmfStatusChangeNotify,
},
{
Name: "NfStatusNotify", Method: "POST",
Pattern: "/nf-status-notify", HandlerFunc: HTTPNfSubscriptionStatusNotify,
},
}
8 changes: 7 additions & 1 deletion producer/ampolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"net/http"
"reflect"
"strings"

"github.com/mohae/deepcopy"
"github.com/omec-project/openapi"
Expand Down Expand Up @@ -301,7 +302,9 @@ func PostPoliciesProcedure(polAssoId string,

if needSubscribe {
logger.AMpolicylog.Debugf("subscribe AMF status change[GUAMI: %+v]", *policyAssociationRequest.Guami)
amfUri := consumer.SendNFInstancesAMF(pcfSelf.NrfUri, *policyAssociationRequest.Guami, models.ServiceName_NAMF_COMM)
ipAddress := strings.SplitN(strings.TrimPrefix(policyAssociationRequest.NotificationUri,
"http://"), ":", 2)[0]
amfUri := consumer.SendNFInstancesAMF(pcfSelf.NrfUri, *policyAssociationRequest.Guami, models.ServiceName_NAMF_COMM, ipAddress)
if amfUri != "" {
problemDetails, err := consumer.AmfStatusChangeSubscribe(amfUri, []models.Guami{*policyAssociationRequest.Guami})
if err != nil {
Expand Down Expand Up @@ -419,5 +422,8 @@ func SendAMPolicyTerminationRequestNotification(ue *pcf_context.UeContext,

// returns UDR Uri of Ue, if ue.UdrUri dose not exist, query NRF to get supported Udr Uri
func getUdrUri(ue *pcf_context.UeContext) string {
if ue.UdrUri != "" {
return ue.UdrUri
}
return consumer.SendNFInstancesUDR(pcf_context.PCF_Self().NrfUri, ue.Supi)
}
3 changes: 3 additions & 0 deletions producer/bdtpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ func createBDTPolicyContextProcedure(request *models.BdtReqData) (
func getDefaultUdrUri(context *pcf_context.PCFContext) string {
context.DefaultUdrURILock.RLock()
defer context.DefaultUdrURILock.RUnlock()
if context.DefaultUdrURI != "" {
return context.DefaultUdrURI
}
param := Nnrf_NFDiscovery.SearchNFInstancesParamOpts{
ServiceNames: optional.NewInterface([]models.ServiceName{models.ServiceName_NUDR_DR}),
}
Expand Down
17 changes: 16 additions & 1 deletion service/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,22 @@ func (pcf *PCF) Terminate() {
} else {
logger.InitLog.Infoln("deregister from NRF successfully")
}
logger.InitLog.Infoln("PCF terminated")
pcfSelf := context.PCF_Self()
pcfSelf.NfStatusSubscriptions.Range(func(nfInstanceId, v interface{}) bool {
if subscriptionId, ok := pcfSelf.NfStatusSubscriptions.Load(nfInstanceId); ok {
logger.InitLog.Debugf("SubscriptionId is %v", subscriptionId.(string))
problemDetails, err := consumer.SendRemoveSubscription(subscriptionId.(string))
if problemDetails != nil {
logger.InitLog.Errorf("Remove NF Subscription Failed Problem[%+v]", problemDetails)
} else if err != nil {
logger.InitLog.Errorf("Remove NF Subscription Error[%+v]", err)
} else {
logger.InitLog.Infoln("[PCF] Remove NF Subscription successful")
}
}
return true
})
logger.InitLog.Infof("PCF terminated")
}

func (pcf *PCF) BuildAndSendRegisterNFInstance() (models.NfProfile, error) {
Expand Down
Loading