Skip to content

Commit

Permalink
Merge pull request #1041 from Krishanx92/master
Browse files Browse the repository at this point in the history
Add initial api fetch in agent
  • Loading branch information
Krishanx92 authored Jan 12, 2024
2 parents 6d11752 + 01d0c4b commit d567406
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 90 deletions.
2 changes: 1 addition & 1 deletion apim-apk-agent/internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func Run(conf *config.Config) {
eventHubEnabled := conf.ControlPlane.Enabled

// Load initial data from control plane
eventhub.LoadInitialData(conf, nil)
eventhub.LoadInitialData(conf)

if eventHubEnabled {
var connectionURLList = conf.ControlPlane.BrokerConnectionParameters.EventListeningEndpoints
Expand Down
41 changes: 0 additions & 41 deletions apim-apk-agent/internal/common/constants.go

This file was deleted.

188 changes: 174 additions & 14 deletions apim-apk-agent/internal/eventhub/dataloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,27 @@
package eventhub

import (
"archive/zip"
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"reflect"
"strconv"
"strings"
"time"

loggers "github.com/sirupsen/logrus"
"github.com/wso2/product-apim-tooling/apim-apk-agent/config"
common "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/common"

logger "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/loggers"
pkgAuth "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/auth"
"github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/eventhub/types"
"github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/health"
logger "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/loggers"
"github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/logging"
sync "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/synchronizer"
"github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/tlsutils"
)

Expand All @@ -53,6 +58,13 @@ const (
ApisEndpoint string = "apis"
)

const (
// OrganizationID query parameter key.
organizationID string = "organization"
// OrganizationID query parameter value used when the global adapter is enabled and it is a shared gateway.
commonOrganizationIDValue string = "ALL"
)

var (
// This set of variables are used just for Type resolution with reflect.
// Hence no value needs to be assigned.
Expand All @@ -79,6 +91,7 @@ var (
APIListChannel chan response
accessToken string
conf *config.Config
apiUUIDList []string
)

type response struct {
Expand All @@ -100,25 +113,21 @@ func init() {
}

// LoadInitialData loads subscription/application and keymapping data from control-plane
func LoadInitialData(configFile *config.Config, initialAPIUUIDListMap map[string]int) {
loggers.Info("Inside Load 1")
func LoadInitialData(configFile *config.Config) {
conf = configFile
accessToken = pkgAuth.GetBasicAuth(configFile.ControlPlane.Username, configFile.ControlPlane.Password)
loggers.Info("accessToken: " + accessToken)
var responseChannel = make(chan response)
for _, url := range resources {
// Create a local copy of the loop variable
localURL := url
loggers.Info("Inside loop" + localURL.endpoint)

go InvokeService(localURL.endpoint, localURL.responseType, nil, responseChannel, 0)

for {
data := <-responseChannel
logger.LoggerSync.Info("Receiving subscription data for an environment")
if data.Payload != nil {
logger.LoggerSync.Info("Payload data with subscription information recieved")
loggers.Info("Payload data with subscription information recieved" + string(data.Payload))
logger.LoggerSync.Info("Payload data information received")
retrieveDataFromResponseChannel(data)
break
} else if data.ErrorCode >= 400 && data.ErrorCode < 500 {
Expand All @@ -141,14 +150,61 @@ func LoadInitialData(configFile *config.Config, initialAPIUUIDListMap map[string
}
}
}
// Take the configured labels from the adapter
configuredEnvs := conf.ControlPlane.EnvironmentLabels

// If no environments are configured, default gateway label value is assigned.
if len(configuredEnvs) == 0 {
configuredEnvs = append(configuredEnvs, config.DefaultGatewayName)
}
for _, configuredEnv := range configuredEnvs {
queryParamMap := make(map[string]string, 1)
queryParamMap[GatewayLabelParam] = configuredEnv
queryParamMap[organizationID] = commonOrganizationIDValue
go InvokeService(ApisEndpoint, apiList, queryParamMap, APIListChannel, 0)
for {
data := <-APIListChannel
logger.LoggerSync.Debug("Receiving API information for an environment")
if data.Payload != nil {
loggers.Info("Payload data with API information recieved" + string(data.Payload))
retrieveAPIList(data)
break
} else if data.ErrorCode >= 400 && data.ErrorCode < 500 {
logger.LoggerSync.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintf("Error occurred when retrieving Subscription information from the control plane: %v", data.Error),
Severity: logging.CRITICAL,
ErrorCode: 1600,
})
health.SetControlPlaneRestAPIStatus(false)
} else {
// Keep the iteration going on until a response is recieved.
logger.LoggerSync.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintf("Error occurred while fetching data from control plane: %v", data.Error),
Severity: logging.MAJOR,
ErrorCode: 1601,
})
go func(d response) {
// Retry fetching from control plane after a configured time interval
if conf.ControlPlane.RetryInterval == 0 {
// Assign default retry interval
conf.ControlPlane.RetryInterval = 5
}
logger.LoggerSync.Debugf("Time Duration for retrying: %v", conf.ControlPlane.RetryInterval*time.Second)
time.Sleep(conf.ControlPlane.RetryInterval * time.Second)
logger.LoggerSync.Infof("Retrying to fetch APIs from control plane. Time Duration for the next retry: %v", conf.ControlPlane.RetryInterval*time.Second)
go InvokeService(ApisEndpoint, apiList, queryParamMap, APIListChannel, 0)
}(data)
}
}
}
FetchAPIsOnStartUp(conf, apiUUIDList)
}

// InvokeService invokes the internal data resource
func InvokeService(endpoint string, responseType interface{}, queryParamMap map[string]string, c chan response,
retryAttempt int) {

serviceURL := conf.ControlPlane.ServiceURL + internalWebAppEP + endpoint
loggers.Info("Payload service URL: " + serviceURL)
// Create the request
req, err := http.NewRequest("GET", serviceURL, nil)
// gatewayLabel will only be required for apis endpoint
Expand All @@ -161,7 +217,6 @@ func InvokeService(endpoint string, responseType interface{}, queryParamMap map[
logger.LoggerSubscription.Errorf("Error occurred while creating an HTTP request for serviceURL: "+serviceURL, err)
return
}
queryParamMap = common.PopulateQueryParamForOrganizationID(queryParamMap)
q := req.URL.Query()

if queryParamMap != nil && len(queryParamMap) > 0 {
Expand All @@ -177,6 +232,7 @@ func InvokeService(endpoint string, responseType interface{}, queryParamMap map[

// Setting authorization header
req.Header.Set(authorizationHeaderDefault, authorizationBasic+accessToken)
req.Header.Set("x-wso2-tenant", "ALL")

// Make the request
//logger.LoggerSubscription.Debug("Sending the request to the control plane over the REST API: " + serviceURL)
Expand Down Expand Up @@ -213,23 +269,127 @@ func retrieveDataFromResponseChannel(response response) {
err := json.Unmarshal(response.Payload, &newResponse)

if err != nil {
logger.LoggerSubscription.Errorf("Error occurred while unmarshalling the response received for: "+response.Endpoint, err)
loggers.Info("Error occurred while unmarshalling the response received for: "+response.Endpoint, err)
} else {
switch t := newResponse.(type) {
case *types.SubscriptionList:
logger.LoggerSubscription.Debug("Received Subscription information.")
loggers.Info("Received Subscription information.")
subList = newResponse.(*types.SubscriptionList)
MarshalMultipleSubscriptions(subList)
case *types.ApplicationList:
logger.LoggerSubscription.Debug("Received Application information.")
loggers.Info("Received Application information.")
appList = newResponse.(*types.ApplicationList)
MarshalMultipleApplications(appList)
case *types.ApplicationKeyMappingList:
logger.LoggerSubscription.Debug("Received Application Key Mapping information.")
loggers.Info("Received Application Key Mapping information.")
appKeyMappingList = newResponse.(*types.ApplicationKeyMappingList)
MarshalMultipleApplicationKeyMappings(appKeyMappingList)
default:
logger.LoggerSubscription.Debugf("Unknown type %T", t)
}
}
}

// FetchAPIsOnStartUp APIs from control plane during the server start up and push them
// to the router and enforcer components.
func FetchAPIsOnStartUp(conf *config.Config, apiUUIDList []string) {
// Populate data from config.
envs := conf.ControlPlane.EnvironmentLabels

// Create a channel for the byte slice (response from the APIs from control plane)
c := make(chan sync.SyncAPIResponse)

var queryParamMap map[string]string
//Get API details.
if apiUUIDList != nil {
GetAPIs(c, nil, envs, sync.APIArtifactEndpoint, true, apiUUIDList, queryParamMap)
}
for i := 0; i < 1; i++ {
data := <-c
logger.LoggerMsg.Info("Receiving data for an environment")
if data.Resp != nil {
// Reading the root zip
zipReader, err := zip.NewReader(bytes.NewReader(data.Resp), int64(len(data.Resp)))

// apiFiles represents zipped API files fetched from API Manager
apiFiles := make(map[string]*zip.File)
// Read the .zip files within the root apis.zip and add apis to apiFiles array.
for _, file := range zipReader.File {
apiFiles[file.Name] = file
loggers.Info("API file found: " + file.Name)
// Todo:
}
health.SetControlPlaneRestAPIStatus(err == nil)

} else if data.ErrorCode == 204 {
logger.LoggerMsg.Infof("No API Artifacts are available in the control plane for the envionments :%s",
strings.Join(envs, ", "))
health.SetControlPlaneRestAPIStatus(true)
} else if data.ErrorCode >= 400 && data.ErrorCode < 500 {
logger.LoggerMsg.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintf("Error occurred when retrieving APIs from control plane(unrecoverable error): %v", data.Err.Error()),
Severity: logging.CRITICAL,
ErrorCode: 1106,
})
isNoAPIArtifacts := data.ErrorCode == 404 && strings.Contains(data.Err.Error(), "No Api artifacts found")
health.SetControlPlaneRestAPIStatus(isNoAPIArtifacts)
} else {
// Keep the iteration still until all the envrionment response properly.
i--
logger.LoggerMsg.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintf("Error occurred while fetching data from control plane: %v ..retrying..", data.Err),
Severity: logging.MINOR,
ErrorCode: 1107,
})
health.SetControlPlaneRestAPIStatus(false)
sync.RetryFetchingAPIs(c, data, sync.RuntimeArtifactEndpoint, true, queryParamMap)
}
}
logger.LoggerMsg.Info("Fetching APIs at startup is completed...")
}

// GetAPIs function calls the FetchAPIs() with relevant environment labels defined in the config.
func GetAPIs(c chan sync.SyncAPIResponse, id *string, envs []string, endpoint string, sendType bool, apiUUIDList []string,
queryParamMap map[string]string) {
if len(envs) > 0 {
// If the envrionment labels are present, call the controle plane with labels.
logger.LoggerAdapter.Debugf("Environment labels present: %v", envs)
go sync.FetchAPIs(id, envs, c, endpoint, sendType, apiUUIDList, queryParamMap)
} else {
// If the environments are not give, fetch the APIs from default envrionment
logger.LoggerAdapter.Debug("Environments label NOT present. Hence adding \"default\"")
envs = append(envs, "default")
go sync.FetchAPIs(id, nil, c, endpoint, sendType, apiUUIDList, queryParamMap)
}
}

func retrieveAPIList(response response) []string {

responseType := reflect.TypeOf(response.Type).Elem()
newResponse := reflect.New(responseType).Interface()
if response.Error == nil && response.Payload != nil {
err := json.Unmarshal(response.Payload, &newResponse)
if err != nil {
logger.LoggerSubscription.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintf("Error occurred while unmarshalling the APIList response received for: %v %v", response.Endpoint, err.Error()),
Severity: logging.MAJOR,
ErrorCode: 1602,
})
} else {
switch t := newResponse.(type) {
case *types.APIList:
apiListResponse := newResponse.(*types.APIList)
if apiListResponse.List != nil {
for _, api := range apiListResponse.List {
apiUUIDList = append(apiUUIDList, api.UUID)
}
}
loggers.Info("Received API information.", apiUUIDList)
return apiUUIDList
default:
logger.LoggerSubscription.Warnf("APIList Type DTO is not recieved. Unknown type %T", t)
}
}
}
return nil
}
40 changes: 8 additions & 32 deletions apim-apk-agent/internal/eventhub/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,6 @@ type ApplicationKeyMappingList struct {
List []ApplicationKeyMapping `json:"list"`
}

// APIs for struct Api
type APIs struct {
APIID int `json:"apiId"`
UUID string `json:"uuid"`
Provider string `json:"provider" json:"apiProvider"`
Name string `json:"name" json:"apiName"`
Version string `json:"version" json:"apiVersion"`
Context string `json:"context" json:"apiContext"`
Policy string `json:"policy"`
APIType string `json:"apiType"`
IsDefaultVersion bool `json:"isDefaultVersion"`
APIStatus string `json:"status"`
TenantID int32 `json:"tenanId,omitempty"`
TenantDomain string `json:"tenanDomain,omitempty"`
TimeStamp int64 `json:"timeStamp,omitempty"`
}

// APIList for struct ApiList
type APIList struct {
List []APIs `json:"list"`
}

// Subscription for struct subscription
type Subscription struct {
SubscriptionID int32 `json:"subscriptionId"`
Expand All @@ -111,8 +89,6 @@ type KeyManager struct {
}

var (
// APIListMap has the following mapping label -> apiUUID -> API (Metadata)
APIListMap map[string]map[string]APIs
// SubscriptionMap contains the subscriptions recieved from API Manager Control Plane
SubscriptionMap map[int32]Subscription
// ApplicationMap contains the applications recieved from API Manager Control Plane
Expand Down Expand Up @@ -245,11 +221,11 @@ func GetApplicationKeyMappingReference(keyMapping *types.ApplicationKeyMapping)

// CheckIfAPIMetadataIsAlreadyAvailable returns true only if the API Metadata for the given API UUID
// is already available
func CheckIfAPIMetadataIsAlreadyAvailable(apiUUID, label string) bool {
if _, labelAvailable := APIListMap[label]; labelAvailable {
if _, apiAvailale := APIListMap[label][apiUUID]; apiAvailale {
return true
}
}
return false
}
// func CheckIfAPIMetadataIsAlreadyAvailable(apiUUID, label string) bool {
// if _, labelAvailable := APIListMap[label]; labelAvailable {
// if _, apiAvailale := APIListMap[label][apiUUID]; apiAvailale {
// return true
// }
// }
// return false
// }
Loading

0 comments on commit d567406

Please sign in to comment.