From 265d292d57a70b5f50716f5752d779a7af8303b6 Mon Sep 17 00:00:00 2001 From: CrowleyRajapakse Date: Tue, 9 Jan 2024 11:51:33 +0530 Subject: [PATCH] adding retrieving API Project Zip for JMS event --- apim-apk-agent/conf/config.toml | 9 + apim-apk-agent/conf/log_config.toml | 37 ++ apim-apk-agent/go.mod | 10 +- apim-apk-agent/go.sum | 17 +- apim-apk-agent/internal/agent/agent.go | 2 +- .../internal/eventhub/dataloader.go | 32 +- .../messaging/notification_listener.go | 11 +- .../internal/notifier/deployment_notifier.go | 172 +++++++++ apim-apk-agent/internal/notifier/types.go | 38 ++ .../internal/synchronizer/apis_fetcher.go | 111 ++++++ apim-apk-agent/internal/synchronizer/types.go | 53 +++ apim-apk-agent/pkg/loggers/logger.go | 12 +- apim-apk-agent/pkg/logging/log_constants.go | 10 - apim-apk-agent/pkg/logging/log_format.go | 2 +- apim-apk-agent/pkg/logging/loggin_utils.go | 32 ++ .../pkg/logging/logging_constant.go | 215 ++++++++++++ apim-apk-agent/pkg/logging/package_logs.go | 2 +- apim-apk-agent/pkg/logging/root_logs.go | 2 +- .../pkg/synchronizer/apis_fetcher.go | 326 ++++++++++++++++++ .../pkg/synchronizer/client_pool.go | 147 ++++++++ apim-apk-agent/pkg/synchronizer/types.go | 78 +++++ 21 files changed, 1260 insertions(+), 58 deletions(-) create mode 100644 apim-apk-agent/conf/config.toml create mode 100644 apim-apk-agent/conf/log_config.toml create mode 100644 apim-apk-agent/internal/notifier/deployment_notifier.go create mode 100644 apim-apk-agent/internal/notifier/types.go create mode 100644 apim-apk-agent/internal/synchronizer/apis_fetcher.go create mode 100644 apim-apk-agent/internal/synchronizer/types.go create mode 100644 apim-apk-agent/pkg/logging/loggin_utils.go create mode 100644 apim-apk-agent/pkg/logging/logging_constant.go create mode 100644 apim-apk-agent/pkg/synchronizer/apis_fetcher.go create mode 100644 apim-apk-agent/pkg/synchronizer/client_pool.go create mode 100644 apim-apk-agent/pkg/synchronizer/types.go diff --git a/apim-apk-agent/conf/config.toml b/apim-apk-agent/conf/config.toml new file mode 100644 index 00000000..ee00a9f5 --- /dev/null +++ b/apim-apk-agent/conf/config.toml @@ -0,0 +1,9 @@ +[controlPlane] + enabled = true + serviceURL = "https://wso2apim:9443/" + username = "admin" + password = "admin" + environmentLabels = ["Default"] + skipSSLVerification = true + [controlPlane.brokerConnectionParameters] + eventListeningEndpoints = ["amqp://admin:admin@wso2apim:5672?retries='10'&connectdelay='30'"] \ No newline at end of file diff --git a/apim-apk-agent/conf/log_config.toml b/apim-apk-agent/conf/log_config.toml new file mode 100644 index 00000000..9519d2ef --- /dev/null +++ b/apim-apk-agent/conf/log_config.toml @@ -0,0 +1,37 @@ +# The logging configuration for Adapter + +## Adapter root Level configurations + +logLevel = "INFO" # LogLevels can be "DEBG", "FATL", "ERRO", "WARN", "INFO", "PANC" +LogFormat = "TEXT" # Values can be "JSON", "TEXT" + +[rotation] +MaxSize = 10 # In MegaBytes (MB) +MaxBackups = 3 +MaxAge = 2 # In days +Compress = true + +## Adapter package Level configurations + +[[pkg]] +name = "github.com/wso2/apk/adapter/internal/adapter" +logLevel = "INFO" # LogLevels can be "DEBG", "FATL", "ERRO", "WARN", "INFO", "PANC" + +[[pkg]] +name = "github.com/wso2/apk/adapter/internal/oasparser" +logLevel = "INFO" + + +# The logging configuration for Router + +[accessLogs] +enable = false +format = "[%START_TIME%] '%REQ(:METHOD)% %DYNAMIC_METADATA(envoy.filters.http.ext_authz:originalPath)% %REQ(:PATH)% %PROTOCOL%' %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% '%REQ(X-FORWARDED-FOR)%' '%REQ(USER-AGENT)%' '%REQ(X-REQUEST-ID)%' '%REQ(:AUTHORITY)%' '%UPSTREAM_HOST%'\n" + +[wireLogs] +enable = false +include = ["Headers", "Body", "Trailers"] + +# [[pkg]] +# name = "github.com/wso2/apk/Adapter/pkg/xds" +# logLevel = "INFO" diff --git a/apim-apk-agent/go.mod b/apim-apk-agent/go.mod index eeb2e54e..42f25263 100644 --- a/apim-apk-agent/go.mod +++ b/apim-apk-agent/go.mod @@ -11,17 +11,15 @@ require ( require ( github.com/BurntSushi/toml v1.3.2 // indirect - github.com/kr/pretty v0.3.1 // indirect - github.com/rogpeppe/go-internal v1.11.0 // indirect - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) require ( - github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.7.0 - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/mitchellh/mapstructure v1.5.0 github.com/streadway/amqp v1.1.0 github.com/wso2/apk/adapter v0.0.0-20231218081229-c5b096fc616f golang.org/x/sys v0.15.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/apim-apk-agent/go.sum b/apim-apk-agent/go.sum index 6e723fee..0d5b9ff4 100644 --- a/apim-apk-agent/go.sum +++ b/apim-apk-agent/go.sum @@ -1,26 +1,16 @@ github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= @@ -34,9 +24,8 @@ github.com/wso2/apk/adapter v0.0.0-20231218081229-c5b096fc616f/go.mod h1:4SnI4e8 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/apim-apk-agent/internal/agent/agent.go b/apim-apk-agent/internal/agent/agent.go index dc104d53..123d764d 100644 --- a/apim-apk-agent/internal/agent/agent.go +++ b/apim-apk-agent/internal/agent/agent.go @@ -27,7 +27,7 @@ import ( "github.com/fsnotify/fsnotify" "github.com/wso2/product-apim-tooling/apim-apk-agent/config" - eventhub "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/eventhub" + "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/eventhub" logger "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/loggers" logging "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/logging" "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/messaging" diff --git a/apim-apk-agent/internal/eventhub/dataloader.go b/apim-apk-agent/internal/eventhub/dataloader.go index f6fe9d9e..448a70cd 100644 --- a/apim-apk-agent/internal/eventhub/dataloader.go +++ b/apim-apk-agent/internal/eventhub/dataloader.go @@ -100,27 +100,33 @@ func init() { // LoadInitialData loads subscription/application and keymapping data from control-plane func LoadInitialData(configFile *config.Config, initialAPIUUIDListMap map[string]int) { + loggers.Info("Inside Load 1") conf = configFile accessToken = pkgAuth.GetBasicAuth(configFile.ControlPlane.Username, configFile.ControlPlane.Password) - + loggers.Info("accessToken: " + accessToken) var responseChannel = make(chan response) for _, url := range resources { - go InvokeService(url.endpoint, url.responseType, nil, responseChannel, 0) + // Create a local copy of the loop variable + localURL := url + loggers.Info("Inside loop" + localURL.endpoint) + + go InvokeService(localURL.endpoint, localURL.responseType, nil, responseChannel, 0) + for { data := <-responseChannel - logger.LoggerSync.Debug("Receiving subscription data for an environment") + logger.LoggerSync.Info("Receiving subscription data for an environment") if data.Payload != nil { - logger.LoggerSync.Info("Payload data with subscription information recieved") - loggers.Info("Payload data with subscription information recieved" + string(data.Payload)) + logger.LoggerSync.Info("Payload data with subscription information received") + loggers.Info("Payload data with subscription information received" + string(data.Payload)) //retrieveSubscriptionDataFromChannel(data) break } else if data.ErrorCode >= 400 && data.ErrorCode < 500 { //Error handle health.SetControlPlaneRestAPIStatus(false) } else { - // Keep the iteration going on until a response is recieved. - //Error handle - go func(d response) { + // Keep the iteration going on until a response is received. + // Error handle + go func(d response, endpoint string, responseType interface{}) { // Retry fetching from control plane after a configured time interval if conf.ControlPlane.RetryInterval == 0 { // Assign default retry interval @@ -129,8 +135,8 @@ func LoadInitialData(configFile *config.Config, initialAPIUUIDListMap map[string logger.LoggerSync.Debugf("Time Duration for retrying: %v", conf.ControlPlane.RetryInterval*time.Second) time.Sleep(conf.ControlPlane.RetryInterval * time.Second) logger.LoggerSync.Infof("Retrying to fetch APIs from control plane. Time Duration for the next retry: %v", conf.ControlPlane.RetryInterval*time.Second) - go InvokeService(url.endpoint, url.responseType, nil, responseChannel, 0) - }(data) + go InvokeService(endpoint, responseType, nil, responseChannel, 0) + }(data, localURL.endpoint, localURL.responseType) } } } @@ -181,7 +187,7 @@ func InvokeService(endpoint string, responseType interface{}, queryParamMap map[ } else { c <- response{err, nil, 0, endpoint, gatewayLabel, responseType} } - logger.LoggerSubscription.Errorf("Error occurred while calling the REST API: "+serviceURL, err) + loggers.Info("Error occurred while calling the REST API: "+serviceURL, err) return } @@ -189,13 +195,13 @@ func InvokeService(endpoint string, responseType interface{}, queryParamMap map[ if resp.StatusCode == http.StatusOK { if err != nil { c <- response{err, nil, resp.StatusCode, endpoint, gatewayLabel, responseType} - logger.LoggerSubscription.Errorf("Error occurred while reading the response received for: "+serviceURL, err) + loggers.Info("Error occurred while reading the response received for: "+serviceURL, err) return } c <- response{nil, responseBytes, resp.StatusCode, endpoint, gatewayLabel, responseType} } else { c <- response{errors.New(string(responseBytes)), nil, resp.StatusCode, endpoint, gatewayLabel, responseType} - logger.LoggerSubscription.Errorf("Failed to fetch data! "+serviceURL+" responded with "+strconv.Itoa(resp.StatusCode), + loggers.Info("Failed to fetch data! "+serviceURL+" responded with "+strconv.Itoa(resp.StatusCode), err) } } diff --git a/apim-apk-agent/internal/messaging/notification_listener.go b/apim-apk-agent/internal/messaging/notification_listener.go index 543c2c08..e5bad3a6 100644 --- a/apim-apk-agent/internal/messaging/notification_listener.go +++ b/apim-apk-agent/internal/messaging/notification_listener.go @@ -24,10 +24,11 @@ import ( "fmt" "strings" - "github.com/wso2/apk/adapter/pkg/logging" "github.com/wso2/product-apim-tooling/apim-apk-agent/config" + "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/synchronizer" "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/eventhub/types" logger "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/loggers" + "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/logging" msg "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/messaging" ) @@ -180,10 +181,10 @@ func handleAPIEvents(data []byte, eventType string) { return } - // Per each revision, synchronization should happen. - // if strings.EqualFold(deployAPIToGateway, apiEvent.Event.Type) { - //go synchronizer.FetchAPIsFromControlPlane(apiEvent.UUID, apiEvent.GatewayLabels) - // } + //Per each revision, synchronization should happen. + if strings.EqualFold(deployAPIToGateway, apiEvent.Event.Type) { + go synchronizer.FetchAPIsFromControlPlane(apiEvent.UUID, apiEvent.GatewayLabels) + } for _, env := range apiEvent.GatewayLabels { if isLaterEvent(apiListTimeStampMap, apiEvent.UUID+":"+env, currentTimeStamp) { diff --git a/apim-apk-agent/internal/notifier/deployment_notifier.go b/apim-apk-agent/internal/notifier/deployment_notifier.go new file mode 100644 index 00000000..55e46cc5 --- /dev/null +++ b/apim-apk-agent/internal/notifier/deployment_notifier.go @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package notifier + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "github.com/wso2/apk/adapter/pkg/logging" + "github.com/wso2/product-apim-tooling/apim-apk-agent/config" + logger "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/loggers" + "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/auth" + "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/tlsutils" +) + +const ( + deployedRevisionEP string = "internal/data/v1/apis/deployed-revisions" + unDeployedRevisionEP string = "internal/data/v1/apis/undeployed-revision" + authBasic string = "Basic " + authHeader string = "Authorization" + contentTypeHeader string = "Content-Type" +) + +// UpdateDeployedRevisions create the DeployedAPIRevision object +func UpdateDeployedRevisions(apiID string, revisionID int, envs []string, vhost string) *DeployedAPIRevision { + revisions := &DeployedAPIRevision{ + APIID: apiID, + RevisionID: revisionID, + EnvInfo: []DeployedEnvInfo{}, + } + for _, env := range envs { + info := DeployedEnvInfo{ + Name: env, + VHost: vhost, + } + revisions.EnvInfo = append(revisions.EnvInfo, info) + } + return revisions +} + +// SendRevisionUpdateAck sends succeeded revision deployment acknowledgement to the control plane +func SendRevisionUpdateAck(deployedRevisionList []*DeployedAPIRevision) { + conf, _ := config.ReadConfigs() + cpConfigs := conf.ControlPlane + + if len(deployedRevisionList) < 1 || !cpConfigs.Enabled || !cpConfigs.SendRevisionUpdate { + return + } + + logger.LoggerNotifier.Debugf("Revision deployed message is sending to Control plane") + + revisionEP := cpConfigs.ServiceURL + if strings.HasSuffix(revisionEP, "/") { + revisionEP += deployedRevisionEP + } else { + revisionEP += "/" + deployedRevisionEP + } + + jsonValue, _ := json.Marshal(deployedRevisionList) + + // Setting authorization header + basicAuth := authBasic + auth.GetBasicAuth(cpConfigs.Username, cpConfigs.Password) + + logger.LoggerNotifier.Debugf("Revision deployed message sending to Control plane: %v", string(jsonValue)) + + // Adding 3 retries for revision update sending + retries := 0 + for retries < 3 { + retries++ + + req, _ := http.NewRequest("PATCH", revisionEP, bytes.NewBuffer(jsonValue)) + req.Header.Set(authHeader, basicAuth) + req.Header.Set(contentTypeHeader, "application/json") + resp, err := tlsutils.InvokeControlPlane(req, cpConfigs.SkipSSLVerification) + + success := true + if err != nil { + logger.LoggerNotifier.ErrorC(logging.ErrorDetails{ + Message: fmt.Sprintf("Error response from %v for attempt %v : %v", revisionEP, retries, err.Error()), + Severity: logging.MAJOR, + ErrorCode: 2100, + }) + success = false + } + if resp != nil && resp.StatusCode != http.StatusOK { + logger.LoggerNotifier.ErrorC(logging.ErrorDetails{ + Message: fmt.Sprintf("Error response status code %v from %v for attempt %v", resp.StatusCode, revisionEP, retries), + Severity: logging.MINOR, + ErrorCode: 2101, + }) + success = false + } + if success { + logger.LoggerNotifier.Infof("Revision deployed message sent to Control plane for attempt %v", retries) + break + } + } +} + +// SendRevisionUndeployAck - send the undeployed revision acknowledgement to control plane +func SendRevisionUndeployAck(apiUUID string, revisionUUID string, environment string) { + conf, _ := config.ReadConfigs() + cpConfigs := conf.ControlPlane + if apiUUID == "" || revisionUUID == "" || environment == "" || !cpConfigs.Enabled || !cpConfigs.SendRevisionUpdate { + return + } + revisionEP := cpConfigs.ServiceURL + if strings.HasSuffix(revisionEP, "/") { + revisionEP += unDeployedRevisionEP + } else { + revisionEP += "/" + unDeployedRevisionEP + } + + removedRevision := UnDeployedAPIRevision{ + APIUUID: apiUUID, + RevisionUUID: revisionUUID, + Environment: environment, + } + + jsonValue, _ := json.Marshal(removedRevision) + basicAuth := authBasic + auth.GetBasicAuth(cpConfigs.Username, cpConfigs.Password) + retries := 0 + for retries < 3 { + retries++ + req, _ := http.NewRequest("POST", revisionEP, bytes.NewBuffer(jsonValue)) + req.Header.Set(authHeader, basicAuth) + req.Header.Set(contentTypeHeader, "application/json") + resp, err := tlsutils.InvokeControlPlane(req, cpConfigs.SkipSSLVerification) + + success := true + if err != nil { + logger.LoggerNotifier.ErrorC(logging.ErrorDetails{ + Message: fmt.Sprintf("Error response from %s for attempt %d : %v", revisionEP, retries, err.Error()), + Severity: logging.MAJOR, + ErrorCode: 2100, + }) + success = false + } + if resp != nil && resp.StatusCode != http.StatusOK { + logger.LoggerNotifier.ErrorC(logging.ErrorDetails{ + Message: fmt.Sprintf("Error response status code %v from %s for attempt %d", resp.StatusCode, revisionEP, retries), + Severity: logging.MINOR, + ErrorCode: 2101, + }) + success = false + } + if success { + logger.LoggerNotifier.Infof("Revision un-deployed message sent to Control plane for attempt %d", retries) + break + } + time.Sleep(2 * time.Second) + } +} diff --git a/apim-apk-agent/internal/notifier/types.go b/apim-apk-agent/internal/notifier/types.go new file mode 100644 index 00000000..b38beff9 --- /dev/null +++ b/apim-apk-agent/internal/notifier/types.go @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package notifier + +// DeployedAPIRevision represents Information of deployed API revision data +type DeployedAPIRevision struct { + APIID string `json:"apiId"` + RevisionID int `json:"revisionId"` + EnvInfo []DeployedEnvInfo `json:"envInfo"` +} + +// DeployedEnvInfo represents env Information of deployed API revision +type DeployedEnvInfo struct { + Name string `json:"name"` + VHost string `json:"vhost"` +} + +// UnDeployedAPIRevision info +type UnDeployedAPIRevision struct { + APIUUID string `json:"apiUUID"` + RevisionUUID string `json:"revisionUUID"` + Environment string `json:"environment"` +} diff --git a/apim-apk-agent/internal/synchronizer/apis_fetcher.go b/apim-apk-agent/internal/synchronizer/apis_fetcher.go new file mode 100644 index 00000000..1b8d705c --- /dev/null +++ b/apim-apk-agent/internal/synchronizer/apis_fetcher.go @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* + * Package "synchronizer" contains artifacts relate to fetching APIs and + * API related updates from the control plane event-hub. + * This file contains functions to retrieve APIs and API updates. + */ + +package synchronizer + +import ( + "github.com/wso2/product-apim-tooling/apim-apk-agent/config" + "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/common" + "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/health" + + logger "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/loggers" + sync "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/synchronizer" +) + +const ( + zipExt string = ".zip" + defaultCertPath string = "/home/wso2/security/controlplane.pem" +) + +func init() { + conf, _ := config.ReadConfigs() + sync.InitializeWorkerPool(conf.ControlPlane.RequestWorkerPool.PoolSize, conf.ControlPlane.RequestWorkerPool.QueueSizePerPool, + conf.ControlPlane.RequestWorkerPool.PauseTimeAfterFailure, conf.Adapter.Truststore.Location, + conf.ControlPlane.SkipSSLVerification, conf.ControlPlane.HTTPClient.RequestTimeOut, conf.ControlPlane.RetryInterval, + conf.ControlPlane.ServiceURL, conf.ControlPlane.Username, conf.ControlPlane.Password) +} + +// FetchAPIsFromControlPlane method pulls API data for a given APIs according to a +// given API ID and a list of environments that API has been deployed to. +// updatedAPIID is the corresponding ID of the API in the form of an UUID +// updatedEnvs contains the list of environments the API deployed to. +func FetchAPIsFromControlPlane(updatedAPIID string, updatedEnvs []string) { + // Read configurations and derive the eventHub details + conf, errReadConfig := config.ReadConfigs() + if errReadConfig != nil { + // This has to be error. For debugging purpose info + logger.LoggerSync.Errorf("Error reading configs: %v", errReadConfig) + } + // Populate data from config. + configuredEnvs := conf.ControlPlane.EnvironmentLabels + //finalEnvs contains the actual envrionments that the adapter should update + var finalEnvs []string + if len(configuredEnvs) > 0 { + // If the configuration file contains environment list, then check if then check if the + // affected environments are present in the provided configs. If so, add that environment + // to the finalEnvs slice + for _, updatedEnv := range updatedEnvs { + for _, configuredEnv := range configuredEnvs { + if updatedEnv == configuredEnv { + finalEnvs = append(finalEnvs, updatedEnv) + } + } + } + } else { + // If the labels are not configured, publish the APIS to the default environment + finalEnvs = []string{config.DefaultGatewayName} + } + + if len(finalEnvs) == 0 { + // If the finalEnvs is empty -> it means, the configured envrionments does not contains the affected/updated + // environments. If that's the case, then APIs should not be fetched from the adapter. + return + } + + c := make(chan sync.SyncAPIResponse) + logger.LoggerSync.Infof("API %s is added/updated to APIList for label %v", updatedAPIID, updatedEnvs) + var queryParamMap map[string]string + queryParamMap = common.PopulateQueryParamForOrganizationID(queryParamMap) + go sync.FetchAPIs(&updatedAPIID, finalEnvs, c, sync.RuntimeArtifactEndpoint, true, nil, queryParamMap) + for { + data := <-c + logger.LoggerSync.Infof("Receiving data for the API: %q", updatedAPIID) + if data.Resp != nil { + // For successfull fetches, data.Resp would return a byte slice with API project(s) + logger.LoggerSync.Infof("API Project %q", data.Resp) + // err := PushAPIProjects(data.Resp, finalEnvs) + // if err != nil { + // logger.LoggerSync.Errorf("Error occurred while pushing API data for the API %q: %v ", updatedAPIID, err) + // } + break + } else if data.ErrorCode >= 400 && data.ErrorCode < 500 { + logger.LoggerSync.Errorf("Error occurred when retrieving API %q from control plane: %v", updatedAPIID, data.Err) + health.SetControlPlaneRestAPIStatus(false) + } else { + // Keep the iteration still until all the envrionment response properly. + logger.LoggerSync.Errorf("Error occurred while fetching data from control plane for the API %q: %v. Hence retrying..", updatedAPIID, data.Err) + sync.RetryFetchingAPIs(c, data, sync.RuntimeArtifactEndpoint, true, queryParamMap) + } + } + +} diff --git a/apim-apk-agent/internal/synchronizer/types.go b/apim-apk-agent/internal/synchronizer/types.go new file mode 100644 index 00000000..8f52c899 --- /dev/null +++ b/apim-apk-agent/internal/synchronizer/types.go @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* + * Package "synchronizer" contains artifacts relate to fetching APIs and + * API related updates from the control plane event-hub. + * This file contains types to retrieve APIs and API updates. + */ + +package synchronizer + +// RevokedToken contains the JWT and the expirty time of the +// revoked JWT token. +type RevokedToken struct { + JWT string `json:"jwt_signature"` + ExpiryTime int64 `json:"expiry_time"` +} + +// BlockConditions defines a blocking condition retrieved from traffic manager +type BlockConditions struct { + API []string `json:"api"` + Application []string `json:"application"` + User []string `json:"user"` + Subscription []string `json:"subscription"` + Custom []string `json:"custom"` + IP []IPCondition `json:"ip"` +} + +// IPCondition defines a IP condition +type IPCondition struct { + Type string + ID int32 + FixedIP string + StartingIP string + EndingIP string + Invert bool + TenantDomain string + State string +} diff --git a/apim-apk-agent/pkg/loggers/logger.go b/apim-apk-agent/pkg/loggers/logger.go index 8ecff413..e71fff25 100644 --- a/apim-apk-agent/pkg/loggers/logger.go +++ b/apim-apk-agent/pkg/loggers/logger.go @@ -21,7 +21,7 @@ package loggers import ( "github.com/sirupsen/logrus" - "github.com/wso2/apk/adapter/pkg/logging" + "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/logging" ) /* loggers should be initiated only for the main packages @@ -32,12 +32,12 @@ When you add a new logger instance add the related package name as a constant // package name constants const ( - pkgAuth = "github.com/wso2/apk/adapter/pkg/auth" - pkgMsg = "github.com/wso2/apk/adapter/pkg/messaging" - pkgHealth = "github.com/wso2/apk/adapter/pkg/health" - pkgTLSUtils = "github.com/wso2/apk/adapter/pkg/tlsutils" + pkgAuth = "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/auth" + pkgMsg = "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/messaging" + pkgHealth = "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/health" + pkgTLSUtils = "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/tlsutils" pkgAdapter = "github.com/wso2/apk/adapter/pkg/adapter" - pkgSync = "github.com/wso2/apk/adapter/pkg/synchronizer" + pkgSync = "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/synchronizer" pkgSoapUtils = "github.com/wso2/apk/adapter/pkg/soaputils" ) diff --git a/apim-apk-agent/pkg/logging/log_constants.go b/apim-apk-agent/pkg/logging/log_constants.go index d343b359..63ea3a5d 100644 --- a/apim-apk-agent/pkg/logging/log_constants.go +++ b/apim-apk-agent/pkg/logging/log_constants.go @@ -36,16 +36,6 @@ const ( JSON = "JSON" ) -// Log (Error) severity level constants -const ( - BLOCKER = "Blocker" - CRITICAL = "Critical" - MAJOR = "Major" - MINOR = "Minor" - TRIVIAL = "Trivial" - DEFAULT = "Default" -) - // Error Log attribute name constants const ( SEVERITY = "severity" diff --git a/apim-apk-agent/pkg/logging/log_format.go b/apim-apk-agent/pkg/logging/log_format.go index adfe67a4..c6b90e06 100644 --- a/apim-apk-agent/pkg/logging/log_format.go +++ b/apim-apk-agent/pkg/logging/log_format.go @@ -25,7 +25,7 @@ import ( "strings" logrus "github.com/sirupsen/logrus" - "github.com/wso2/product-apim-tooling/apim-apk-agent/config" + "github.com/wso2/apk/adapter/pkg/config" ) type plainFormatter struct { diff --git a/apim-apk-agent/pkg/logging/loggin_utils.go b/apim-apk-agent/pkg/logging/loggin_utils.go new file mode 100644 index 00000000..e7615e91 --- /dev/null +++ b/apim-apk-agent/pkg/logging/loggin_utils.go @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package logging + +import ( + "fmt" +) + +// PrintError prints the error details +func PrintError(code int, severity string, message string, args ...interface{}) ErrorDetails { + errorLog := ErrorDetails{ + ErrorCode: code, + Message: fmt.Sprintf(message, args...), + Severity: severity, + } + return errorLog +} diff --git a/apim-apk-agent/pkg/logging/logging_constant.go b/apim-apk-agent/pkg/logging/logging_constant.go new file mode 100644 index 00000000..281a6f9c --- /dev/null +++ b/apim-apk-agent/pkg/logging/logging_constant.go @@ -0,0 +1,215 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package logging + +// Log (Error) severity level constants +const ( + BLOCKER = "Blocker" + CRITICAL = "Critical" + MAJOR = "Major" + MINOR = "Minor" + TRIVIAL = "Trivial" + DEFAULT = "Default" +) + +// Error Log Internal Configuration(1000-1099) Config Constants +// - loggerConfig +const ( + Error1000 = 1000 + Error1001 = 1001 + Error1002 = 1002 +) + +// Error Log Internal Adapter(1100-1199) Constants +// - LoggerAPK +const ( + Error1100 = 1100 + Error1101 = 1101 + Error1102 = 1102 + Error1103 = 1103 + Error1104 = 1104 + Error1105 = 1105 +) + +// Error Log Internal discovery(1400-1499) Config Constants +// - LoggerXds +const ( + Error1400 = 1400 + Error1401 = 1401 + Error1402 = 1402 + Error1403 = 1403 + Error1410 = 1410 + Error1411 = 1411 + Error1413 = 1413 + Error1414 = 1414 +) + +// Error Log Internal XDS(1700-1799) Config Constants +// - LoggerXds +const ( + Error1700 = 1700 + Error1701 = 1701 + Error1702 = 1702 + Error1703 = 1703 + Error1704 = 1704 + Error1705 = 1705 + Error1706 = 1706 + Error1707 = 1707 + Error1709 = 1709 + Error1710 = 1710 + Error1711 = 1711 + Error1712 = 1712 + Error1713 = 1713 + Error1714 = 1714 + Error1715 = 1715 + Error1716 = 1716 + Error1717 = 1717 + Error1718 = 1718 + Error1719 = 1719 + Error1720 = 1720 + Error1721 = 1721 + Error1722 = 1722 + Error1723 = 1723 + Error1724 = 1724 +) + +// Error Log RateLimiter callbacks(2300-2399) Config Constants +// - LoggerEnforcerXdsCallbacks +const ( + Error2300 = 2300 +) + +// Error Log Pkg operator(2600-2699) Config Constants +// - LoggerAPKOperator +const ( + Error2600 = 2600 + Error2601 = 2601 + Error2602 = 2602 + Error2603 = 2603 + Error2604 = 2604 + Error2605 = 2605 + Error2606 = 2606 + Error2607 = 2607 + Error2608 = 2608 + Error2609 = 2609 + Error2610 = 2610 + Error2611 = 2611 + Error2612 = 2612 + Error2613 = 2613 + Error2614 = 2614 + Error2615 = 2615 + Error2616 = 2616 + Error2617 = 2617 + Error2618 = 2618 + Error2619 = 2619 + Error2620 = 2620 + Error2621 = 2621 + Error2622 = 2622 + Error2623 = 2623 + Error2625 = 2625 + Error2626 = 2626 + Error2627 = 2627 + Error2628 = 2628 + Error2629 = 2629 + Error2630 = 2630 + Error2631 = 2631 + Error2632 = 2632 + Error2633 = 2633 + Error2634 = 2634 + Error2635 = 2635 + Error2636 = 2636 + Error2637 = 2637 + Error2638 = 2638 + Error2639 = 2639 + Error2640 = 2640 + Error2641 = 2641 + Error2642 = 2642 + Error2643 = 2643 + Error2644 = 2644 + Error2645 = 2645 + Error2646 = 2646 + Error2647 = 2647 + Error2648 = 2648 + Error2649 = 2649 + Error2650 = 2650 + Error2651 = 2651 + Error2652 = 2652 + Error2653 = 2653 + Error2654 = 2654 + Error2655 = 2655 + Error2656 = 2656 + Error2657 = 2657 + Error2658 = 2658 + Error2659 = 2659 + Error2660 = 2660 + Error2661 = 2661 + Error2662 = 2662 + Error2663 = 2663 + Error2664 = 2664 + Error2665 = 2665 + Error2666 = 2666 + Error2667 = 2667 +) + +// Error Log Pkg auth(3001-3099) Config Constants +const ( + Error3001 = 3001 + Error3002 = 3002 +) + +// Error codes gateway controller (3100-3199) +const ( + Error3100 = 3100 + Error3101 = 3101 + Error3102 = 3102 + Error3103 = 3103 + Error3104 = 3104 + Error3105 = 3105 + Error3106 = 3106 + Error3107 = 3107 + Error3108 = 3108 + Error3109 = 3109 + Error3110 = 3110 + Error3111 = 3111 + Error3112 = 3112 + Error3113 = 3113 + Error3114 = 3114 + Error3115 = 3115 + Error3116 = 3116 + Error3117 = 3117 + Error3118 = 3118 + Error3119 = 3119 + Error3120 = 3120 + Error3121 = 3121 + Error3122 = 3122 + Error3123 = 3123 + Error3124 = 3124 + Error3125 = 3125 + Error3126 = 3126 +) + +// Error codes api (3200-3299) +const ( + Error3200 = 3200 + Error3201 = 3201 + Error3202 = 3202 + Error3203 = 3203 + Error3204 = 3204 + Error3205 = 3205 + Error3206 = 3206 +) diff --git a/apim-apk-agent/pkg/logging/package_logs.go b/apim-apk-agent/pkg/logging/package_logs.go index e3d3ee36..a80a1e54 100644 --- a/apim-apk-agent/pkg/logging/package_logs.go +++ b/apim-apk-agent/pkg/logging/package_logs.go @@ -23,7 +23,7 @@ import ( "os" "github.com/sirupsen/logrus" - "github.com/wso2/product-apim-tooling/apim-apk-agent/config" + "github.com/wso2/apk/adapter/pkg/config" ) // ErrorDetails used to keep error details for error logs diff --git a/apim-apk-agent/pkg/logging/root_logs.go b/apim-apk-agent/pkg/logging/root_logs.go index e44c1eb5..23314831 100644 --- a/apim-apk-agent/pkg/logging/root_logs.go +++ b/apim-apk-agent/pkg/logging/root_logs.go @@ -22,7 +22,7 @@ import ( "os" "github.com/sirupsen/logrus" - "github.com/wso2/product-apim-tooling/apim-apk-agent/config" + "github.com/wso2/apk/adapter/pkg/config" lumberjack "gopkg.in/natefinch/lumberjack.v2" ) diff --git a/apim-apk-agent/pkg/synchronizer/apis_fetcher.go b/apim-apk-agent/pkg/synchronizer/apis_fetcher.go new file mode 100644 index 00000000..d9b061cf --- /dev/null +++ b/apim-apk-agent/pkg/synchronizer/apis_fetcher.go @@ -0,0 +1,326 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* + * Package "synchronizer" contains artifacts relate to fetching APIs and + * API related updates from the control plane event-hub. + * This file contains functions to retrieve APIs and API updates. + */ + +package synchronizer + +import ( + "archive/zip" + "bytes" + "encoding/base64" + "encoding/json" + "errors" + "io/ioutil" + "net/http" + "strings" + "time" + + parser "github.com/mitchellh/mapstructure" + "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/auth" + logger "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/loggers" +) + +const ( + apiID string = "apiId" + gatewayLabel string = "gatewayLabel" + gwType string = "type" + envoy string = "Envoy" + // Authorization represent the authorization header string. + Authorization string = "Authorization" + deploymentDescriptorFile string = "deployments.json" + envPropsFile string = "env_properties.json" + // RuntimeArtifactEndpoint represents the /runtime-artifacts endpoint. + RuntimeArtifactEndpoint string = "internal/data/v1/runtime-artifacts" + // APIArtifactEndpoint represents the /retrieve-api-artifacts endpoint. + APIArtifactEndpoint string = "internal/data/v1/retrieve-api-artifacts" + // httpTimeout is for connection timeout of httpClient in seconds + httpTimeout time.Duration = 30 +) + +// FetchAPIs submits the control plane http request to the thread pool. The thread pool would process it and return +// the http response to the channel which contains a zip file. +func FetchAPIs(id *string, gwLabel []string, c chan SyncAPIResponse, resourceEndpoint string, sendType bool, + apiUUIDList []string, queryParamMap map[string]string) { + if id != nil { + logger.LoggerSync.Infof("Fetching API from Control Plane for Id %q.", *id) + } else { + logger.LoggerSync.Info("Fetching APIs from Control Plane") + } + + req := ConstructControlPlaneRequest(id, gwLabel, workerPool.controlPlaneParams, resourceEndpoint, sendType, apiUUIDList, + queryParamMap) + workerReq := workerRequest{ + Req: *req, + APIUUID: id, + SyncAPIRespChannel: c, + } + if gwLabel != nil { + workerReq.labels = gwLabel + } + + if workerPool == nil { + logger.LoggerSync.Fatal("WorkerPool is not inititated due to an internal error.") + } + // If adding task to the pool cannot be done, the whole thread hangs here. + workerPool.Enqueue(workerReq) +} + +// SendRequestToControlPlane is the function triggered to send the request to the control plane. +// It returns true if a response is received from the api manager. +func SendRequestToControlPlane(req *http.Request, apiID *string, gwLabels []string, c chan SyncAPIResponse, + client *http.Client) bool { + // Make the request + if apiID != nil { + logger.LoggerSync.Debugf("Sending the control plane request for the API: %q", *apiID) + } else { + logger.LoggerSync.Debug("Sending the control plane request") + } + resp, err := client.Do(req) + + respSyncAPI := SyncAPIResponse{} + + if apiID != nil { + respSyncAPI.APIUUID = *apiID + } + if len(gwLabels) > 0 { + respSyncAPI.GatewayLabels = gwLabels + } + + // In the event of a connection error, the error would not be nil, then return the error + // If the error is not null, proceed + if err != nil { + logger.LoggerSync.Errorf("Error occurred while retrieving APIs from API manager: %v", err) + respSyncAPI.Err = err + respSyncAPI.Resp = nil + c <- respSyncAPI + return false + } + + // get the response in the form of a byte slice + respBytes, err := ioutil.ReadAll(resp.Body) + + // If the reading response gives an error + if err != nil { + logger.LoggerSync.Errorf("Error occurred while reading the response: %v", err) + respSyncAPI.Err = err + respSyncAPI.ErrorCode = resp.StatusCode + respSyncAPI.Resp = nil + c <- respSyncAPI + return false + } + // For successful response, return the byte slice and nil as error + if resp.StatusCode == http.StatusOK { + respSyncAPI.Err = nil + respSyncAPI.Resp = respBytes + c <- respSyncAPI + return true + } + // If the response is not successful, create a new error with the response and log it and return + // Ex: for 401 scenarios, 403 scenarios. + logger.LoggerSync.Errorf("Failure response from control plane: %v", string(respBytes)) + respSyncAPI.Err = errors.New(string(respBytes)) + respSyncAPI.Resp = nil + respSyncAPI.ErrorCode = resp.StatusCode + c <- respSyncAPI + return true +} + +// ConstructControlPlaneRequest constructs the http Request used to send to the control plane +func ConstructControlPlaneRequest(id *string, gwLabel []string, controlPlaneParams controlPlaneParameters, + resourceEndpoint string, sendType bool, apiUUIDList []string, queryParamMap map[string]string) *http.Request { + var ( + req *http.Request + err error + bodyJSON []byte + ) + + serviceURL := controlPlaneParams.serviceURL + userName := controlPlaneParams.username + password := controlPlaneParams.password + // postData contains the API UUID list in the payload of the post request. + type postData struct { + Uuids []string `json:"uuids"` + } + // NOTE: Getting resourceEndpoint as a parameter since GA and LA use different endpoints. + if strings.HasSuffix(serviceURL, "/") { + serviceURL += resourceEndpoint + } else { + serviceURL += "/" + resourceEndpoint + } + logger.LoggerSync.Debugf("Fetching APIs from the URL %v: ", serviceURL) + + // Populating the payload body with API UUID list + if apiUUIDList != nil { + body := postData{ + Uuids: apiUUIDList, + } + bodyJSON, err = json.Marshal(body) + if err != nil { + logger.LoggerSync.Errorf("Error marshaling the uuid List: %v", err) + } + } + // Create a HTTP request + if apiUUIDList == nil { + req, err = http.NewRequest("GET", serviceURL, nil) + } else { + req, err = http.NewRequest("POST", serviceURL, bytes.NewBuffer(bodyJSON)) + } + if err != nil { + logger.LoggerSync.Fatalf("Error while creating the HTTP request: %v", err) + } + // Making necessary query parameters for the request + q := req.URL.Query() + + if queryParamMap != nil && len(queryParamMap) > 0 { + // Making necessary query parameters for the request + for queryParamKey, queryParamValue := range queryParamMap { + q.Add(queryParamKey, queryParamValue) + } + } + + // If an API ID is present, make a query parameter + if id != nil { + q.Add(apiID, *id) + } + // If the gateway label is present, make a query parameter + if len(gwLabel) > 0 { + logger.LoggerSync.Debugf("Gateway Label: %v", gwLabel) + gatewaysQStr := strings.Join(gwLabel, "|") + q.Add(gatewayLabel, base64.StdEncoding.EncodeToString([]byte(gatewaysQStr))) + } + // NOTE: GA does not send this query parameter. + if sendType { + // Default "type" query parameter for adapter is "Envoy" + q.Add(gwType, envoy) + } + req.URL.RawQuery = q.Encode() + + // Setting authorization header + basicAuth := "Basic " + auth.GetBasicAuth(userName, password) + req.Header.Set(Authorization, basicAuth) + // If API UUID list is present, set the content-type header + if apiUUIDList != nil { + req.Header.Set("Content-Type", "application/json") + } + return req +} + +// RetryFetchingAPIs function keeps retrying to fetch APIs from runtime-artifact endpoint. +func RetryFetchingAPIs(c chan SyncAPIResponse, data SyncAPIResponse, endpoint string, sendType bool, + queryParamMap map[string]string) { + retryInterval := workerPool.controlPlaneParams.retryInterval + + // Retry fetching from control plane after a configured time interval + if retryInterval == 0 { + // Assign default retry interval + retryInterval = 5 + } + logger.LoggerSync.Debugf("Time Duration for retrying: %v", retryInterval*time.Second) + time.Sleep(retryInterval * time.Second) + logger.LoggerSync.Infof("Retrying to fetch API data from control plane for the API %q.", data.APIUUID) + channelFillPercentage := float64(len(workerPool.internalQueue)) / float64(cap(workerPool.internalQueue)) * 100 + logger.LoggerSync.Infof("Workerpool channel size as a percentage is : %f", channelFillPercentage) + FetchAPIs(&data.APIUUID, data.GatewayLabels, c, endpoint, sendType, nil, queryParamMap) +} + +// ReadRootFiles function reads following files inside the root zip +// deployment.json +// env_properties.json +func ReadRootFiles(reader *zip.Reader) (*DeploymentDescriptor, map[string]map[string]APIEnvProps, error) { + deploymentDescriptor := &DeploymentDescriptor{} + apiEnvProps := make(map[string]map[string]APIEnvProps) + // Read the .zip files within the root apis.zip + for _, file := range reader.File { + // Open deployment descriptor file + if strings.EqualFold(file.Name, deploymentDescriptorFile) { + logger.LoggerSync.Debugf("Start reading %v file", deploymentDescriptorFile) + f, err := file.Open() + if err != nil { + logger.LoggerSync.Error("Error reading deployment descriptor: ", err) + return deploymentDescriptor, apiEnvProps, err + } + data, err := ioutil.ReadAll(f) + _ = f.Close() // Close the file here (without defer) + if err != nil { + logger.LoggerSync.Error("Error reading deployment descriptor: ", err) + return deploymentDescriptor, apiEnvProps, err + } + logger.LoggerSync.Debugf("Parsing content of deployment descriptor, content: %s", string(data)) + if err = json.Unmarshal(data, deploymentDescriptor); err != nil { + logger.LoggerSync.Error("Error parsing JSON content of deployment descriptor: ", err) + return deploymentDescriptor, apiEnvProps, err + } + } else if strings.EqualFold(file.Name, envPropsFile) { + logger.LoggerSync.Debugf("Start reading %v file", envPropsFile) + f, err := file.Open() + if err != nil { + logger.LoggerSync.Error("Error reading environment specific properties: ", err) + return deploymentDescriptor, apiEnvProps, err + } + data, err := ioutil.ReadAll(f) + _ = f.Close() // Close the file here (without defer) + if err != nil { + logger.LoggerSync.Error("Error reading environment specific properties: ", err) + return deploymentDescriptor, apiEnvProps, err + } + logger.LoggerSync.Debugf("Parsing content of environment specific properties, content: %s", string(data)) + if apiEnvProps, err = parseEnvProps(data); err != nil { + logger.LoggerSync.Errorf("Error occurred while parsing environment specific properties : %v : %v", + file.Name, err.Error()) + return deploymentDescriptor, apiEnvProps, err + } + } + } + return deploymentDescriptor, apiEnvProps, nil +} + +func parseEnvProps(data []byte) (map[string]map[string]APIEnvProps, error) { + var envPropsFile map[string]interface{} + envPropsMap := make(map[string]map[string]APIEnvProps) + if err := json.Unmarshal(data, &envPropsFile); err != nil { + logger.LoggerSync.Error("Error parsing Environment specific values: ", err) + return nil, err + } + + if apisData, found := envPropsFile["apis"]; found { + if apis, ok := apisData.(map[string]interface{}); ok { + for apiUUID, apiData := range apis { + apiProps := make(map[string]APIEnvProps) + if api, ok := apiData.(map[string]interface{}); ok { + var envProps APIEnvProps + for envLabel, envData := range api { + if err := parser.Decode(envData, &envProps); err != nil { + logger.LoggerSync.Error("Error parsing environment specific values: ", err) + return nil, err + } + apiProps[envLabel] = envProps + } + } + envPropsMap[apiUUID] = apiProps + } + return envPropsMap, nil + } + logger.LoggerSync.Error("Wrong format given for parsing environment specific values") + return nil, errors.New("wrong format given for parsing environment specific values") + } + return nil, nil +} diff --git a/apim-apk-agent/pkg/synchronizer/client_pool.go b/apim-apk-agent/pkg/synchronizer/client_pool.go new file mode 100644 index 00000000..ab6274db --- /dev/null +++ b/apim-apk-agent/pkg/synchronizer/client_pool.go @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package synchronizer + +import ( + "crypto/tls" + "net/http" + "sync" + "time" + + "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/loggers" + "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/tlsutils" +) + +type worker struct { + id int + internalQueue <-chan workerRequest + processFunc processHTTPRequest + delayAfterFault time.Duration +} + +// workerRequest is the task which can be submitted to the pool. +type workerRequest struct { + Req http.Request + APIUUID *string + labels []string + SyncAPIRespChannel chan SyncAPIResponse +} + +// pool is the worker pool which is handling +type pool struct { + internalQueue chan workerRequest + workers []*worker + client http.Client + controlPlaneParams controlPlaneParameters +} + +type controlPlaneParameters struct { + serviceURL string + username string + password string + retryInterval time.Duration +} + +type processHTTPRequest func(*http.Request, *string, []string, chan SyncAPIResponse, *http.Client) bool + +func (w *worker) ProcessFunction() { + for workerReq := range w.internalQueue { + responseReceived := w.processFunc(&workerReq.Req, workerReq.APIUUID, workerReq.labels, workerReq.SyncAPIRespChannel, + &workerPool.client) + if !responseReceived { + time.Sleep(w.delayAfterFault) + } + } +} + +var ( + // WorkerPool is the thread pool responsible for sending the control plane request to fetch APIs + workerPool *pool + oncePoolInitiated sync.Once +) + +// InitializeWorkerPool creates the Worker Pool used for the Control Plane Rest API invocations. +// maxWorkers indicate the maximum number of parallel workers sending requests to the control plane. +// jobQueueCapacity indicate the maximum number of requests can kept inside a single worker's queue. +// delayForFaultRequests indicate the delay a worker enforce (in seconds) when a fault response is received. +func InitializeWorkerPool(maxWorkers, jobQueueCapacity int, delayForFaultRequests time.Duration, trustStoreLocation string, + skipSSL bool, requestTimeout, retryInterval time.Duration, serviceURL, username, password string) { + oncePoolInitiated.Do(func() { + workerPool = newWorkerPool(maxWorkers, jobQueueCapacity, delayForFaultRequests) + workerPool.controlPlaneParams = controlPlaneParameters{ + serviceURL: serviceURL, + username: username, + password: password, + retryInterval: retryInterval, + } + var tr *http.Transport + if !skipSSL { + caCertPool := tlsutils.GetTrustedCertPool(trustStoreLocation) + tr = &http.Transport{ + TLSClientConfig: &tls.Config{RootCAs: caCertPool}, + } + } else { + tr = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + } + // Configure Connection Level Parameters since it is reused over and over + tr.MaxConnsPerHost = maxWorkers * 2 + tr.MaxIdleConns = maxWorkers * 2 + tr.MaxIdleConnsPerHost = maxWorkers * 2 + workerPool.client = http.Client{ + Transport: tr, + Timeout: requestTimeout * time.Second, + } + }) +} + +func newWorkerPool(maxWorkers, jobQueueCapacity int, delayForFaultRequests time.Duration) *pool { + if jobQueueCapacity <= 0 { + jobQueueCapacity = 100 + } + requestChannel := make(chan workerRequest, jobQueueCapacity) + workers := make([]*worker, maxWorkers) + + // create workers + for i := 0; i < maxWorkers; i++ { + workers[i] = &worker{ + id: i, + internalQueue: requestChannel, + processFunc: SendRequestToControlPlane, + delayAfterFault: delayForFaultRequests, + } + go workers[i].ProcessFunction() + loggers.LoggerSync.Infof("ControlPlane processing worker %d spawned.", i) + } + + return &pool{ + internalQueue: requestChannel, + workers: workers, + } +} + +// Enqueue Tries to enqueue but fails if queue is full +func (q *pool) Enqueue(req workerRequest) bool { + select { + case q.internalQueue <- req: + return true + default: + return false + } +} diff --git a/apim-apk-agent/pkg/synchronizer/types.go b/apim-apk-agent/pkg/synchronizer/types.go new file mode 100644 index 00000000..7b569c16 --- /dev/null +++ b/apim-apk-agent/pkg/synchronizer/types.go @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* + * Package "synchronizer" contains artifacts relate to fetching APIs and + * API related updates from the control plane event-hub. + * This file contains types to retrieve APIs and API updates. + */ + +package synchronizer + +// SyncAPIResponse struct contains information related to +// response of the API pulling/fetching from control plane +// along with the apiId and the gateway label that the call +// was made with. +type SyncAPIResponse struct { + Resp []byte + Err error + ErrorCode int + APIUUID string + GatewayLabels []string +} + +// DeploymentDescriptor represents deployment descriptor file contains in Artifact +// received from control plane +type DeploymentDescriptor struct { + Type string `json:"type"` + Version string `json:"version"` + Data DeploymentData `json:"data"` +} + +// DeploymentData contains list of APIDeployment to be deployed to the gateway +type DeploymentData struct { + Deployments []APIDeployment `json:"deployments"` +} + +// APIDeployment represents an API project that contains zip file name and +// gateway environments (labels) that the project to be deployed +type APIDeployment struct { + APIFile string `json:"apiFile"` + Environments []GatewayLabel `json:"environments"` + // These properties are used by global Adapter + OrganizationID string `json:"organizationId"` + APIContext string `json:"apiContext"` + Version string `json:"version"` +} + +// GatewayLabel represents gateway environment name and VHost of an API project +type GatewayLabel struct { + Name string `json:"name"` + Vhost string `json:"vhost"` +} + +// APIConfigs represents env properties belongs to the API +type APIConfigs struct { + ProductionEndpoint string `mapstructure:"productionEndpoint,omitempty"` + SandBoxEndpoint string `mapstructure:"sandboxEndpoint,omitempty"` +} + +// APIEnvProps represents env properties +type APIEnvProps struct { + EnvID string `mapstructure:"envId,omitempty"` + APIConfigs APIConfigs `mapstructure:"configs,omitempty"` +}