Skip to content

Commit

Permalink
Merge pull request #1038 from CrowleyRajapakse/master
Browse files Browse the repository at this point in the history
Adding retrieving API Project Zip for JMS event
  • Loading branch information
CrowleyRajapakse authored Jan 9, 2024
2 parents 391c6aa + 265d292 commit 9515f5d
Show file tree
Hide file tree
Showing 21 changed files with 1,260 additions and 58 deletions.
9 changes: 9 additions & 0 deletions apim-apk-agent/conf/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[controlPlane]
enabled = true
serviceURL = "https://wso2apim:9443/"
username = "admin"
password = "admin"
environmentLabels = ["Default"]
skipSSLVerification = true
[controlPlane.brokerConnectionParameters]
eventListeningEndpoints = ["amqp://admin:admin@wso2apim:5672?retries='10'&connectdelay='30'"]
37 changes: 37 additions & 0 deletions apim-apk-agent/conf/log_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# The logging configuration for Adapter

## Adapter root Level configurations

logLevel = "INFO" # LogLevels can be "DEBG", "FATL", "ERRO", "WARN", "INFO", "PANC"
LogFormat = "TEXT" # Values can be "JSON", "TEXT"

[rotation]
MaxSize = 10 # In MegaBytes (MB)
MaxBackups = 3
MaxAge = 2 # In days
Compress = true

## Adapter package Level configurations

[[pkg]]
name = "github.com/wso2/apk/adapter/internal/adapter"
logLevel = "INFO" # LogLevels can be "DEBG", "FATL", "ERRO", "WARN", "INFO", "PANC"

[[pkg]]
name = "github.com/wso2/apk/adapter/internal/oasparser"
logLevel = "INFO"


# The logging configuration for Router

[accessLogs]
enable = false
format = "[%START_TIME%] '%REQ(:METHOD)% %DYNAMIC_METADATA(envoy.filters.http.ext_authz:originalPath)% %REQ(:PATH)% %PROTOCOL%' %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% '%REQ(X-FORWARDED-FOR)%' '%REQ(USER-AGENT)%' '%REQ(X-REQUEST-ID)%' '%REQ(:AUTHORITY)%' '%UPSTREAM_HOST%'\n"

[wireLogs]
enable = false
include = ["Headers", "Body", "Trailers"]

# [[pkg]]
# name = "github.com/wso2/apk/Adapter/pkg/xds"
# logLevel = "INFO"
10 changes: 4 additions & 6 deletions apim-apk-agent/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@ require (

require (
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.7.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.5.0
github.com/streadway/amqp v1.1.0
github.com/wso2/apk/adapter v0.0.0-20231218081229-c5b096fc616f
golang.org/x/sys v0.15.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
17 changes: 3 additions & 14 deletions apim-apk-agent/go.sum
Original file line number Diff line number Diff line change
@@ -1,26 +1,16 @@
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM=
Expand All @@ -34,9 +24,8 @@ github.com/wso2/apk/adapter v0.0.0-20231218081229-c5b096fc616f/go.mod h1:4SnI4e8
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
Expand Down
2 changes: 1 addition & 1 deletion apim-apk-agent/internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

"github.com/fsnotify/fsnotify"
"github.com/wso2/product-apim-tooling/apim-apk-agent/config"
eventhub "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/eventhub"
"github.com/wso2/product-apim-tooling/apim-apk-agent/internal/eventhub"
logger "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/loggers"
logging "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/logging"
"github.com/wso2/product-apim-tooling/apim-apk-agent/internal/messaging"
Expand Down
32 changes: 19 additions & 13 deletions apim-apk-agent/internal/eventhub/dataloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,27 +100,33 @@ func init() {

// LoadInitialData loads subscription/application and keymapping data from control-plane
func LoadInitialData(configFile *config.Config, initialAPIUUIDListMap map[string]int) {
loggers.Info("Inside Load 1")
conf = configFile
accessToken = pkgAuth.GetBasicAuth(configFile.ControlPlane.Username, configFile.ControlPlane.Password)

loggers.Info("accessToken: " + accessToken)
var responseChannel = make(chan response)
for _, url := range resources {
go InvokeService(url.endpoint, url.responseType, nil, responseChannel, 0)
// Create a local copy of the loop variable
localURL := url
loggers.Info("Inside loop" + localURL.endpoint)

go InvokeService(localURL.endpoint, localURL.responseType, nil, responseChannel, 0)

for {
data := <-responseChannel
logger.LoggerSync.Debug("Receiving subscription data for an environment")
logger.LoggerSync.Info("Receiving subscription data for an environment")
if data.Payload != nil {
logger.LoggerSync.Info("Payload data with subscription information recieved")
loggers.Info("Payload data with subscription information recieved" + string(data.Payload))
logger.LoggerSync.Info("Payload data with subscription information received")
loggers.Info("Payload data with subscription information received" + string(data.Payload))
//retrieveSubscriptionDataFromChannel(data)
break
} else if data.ErrorCode >= 400 && data.ErrorCode < 500 {
//Error handle
health.SetControlPlaneRestAPIStatus(false)
} else {
// Keep the iteration going on until a response is recieved.
//Error handle
go func(d response) {
// Keep the iteration going on until a response is received.
// Error handle
go func(d response, endpoint string, responseType interface{}) {
// Retry fetching from control plane after a configured time interval
if conf.ControlPlane.RetryInterval == 0 {
// Assign default retry interval
Expand All @@ -129,8 +135,8 @@ func LoadInitialData(configFile *config.Config, initialAPIUUIDListMap map[string
logger.LoggerSync.Debugf("Time Duration for retrying: %v", conf.ControlPlane.RetryInterval*time.Second)
time.Sleep(conf.ControlPlane.RetryInterval * time.Second)
logger.LoggerSync.Infof("Retrying to fetch APIs from control plane. Time Duration for the next retry: %v", conf.ControlPlane.RetryInterval*time.Second)
go InvokeService(url.endpoint, url.responseType, nil, responseChannel, 0)
}(data)
go InvokeService(endpoint, responseType, nil, responseChannel, 0)
}(data, localURL.endpoint, localURL.responseType)
}
}
}
Expand Down Expand Up @@ -181,21 +187,21 @@ func InvokeService(endpoint string, responseType interface{}, queryParamMap map[
} else {
c <- response{err, nil, 0, endpoint, gatewayLabel, responseType}
}
logger.LoggerSubscription.Errorf("Error occurred while calling the REST API: "+serviceURL, err)
loggers.Info("Error occurred while calling the REST API: "+serviceURL, err)
return
}

responseBytes, err := ioutil.ReadAll(resp.Body)
if resp.StatusCode == http.StatusOK {
if err != nil {
c <- response{err, nil, resp.StatusCode, endpoint, gatewayLabel, responseType}
logger.LoggerSubscription.Errorf("Error occurred while reading the response received for: "+serviceURL, err)
loggers.Info("Error occurred while reading the response received for: "+serviceURL, err)
return
}
c <- response{nil, responseBytes, resp.StatusCode, endpoint, gatewayLabel, responseType}
} else {
c <- response{errors.New(string(responseBytes)), nil, resp.StatusCode, endpoint, gatewayLabel, responseType}
logger.LoggerSubscription.Errorf("Failed to fetch data! "+serviceURL+" responded with "+strconv.Itoa(resp.StatusCode),
loggers.Info("Failed to fetch data! "+serviceURL+" responded with "+strconv.Itoa(resp.StatusCode),
err)
}
}
11 changes: 6 additions & 5 deletions apim-apk-agent/internal/messaging/notification_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
"fmt"
"strings"

"github.com/wso2/apk/adapter/pkg/logging"
"github.com/wso2/product-apim-tooling/apim-apk-agent/config"
"github.com/wso2/product-apim-tooling/apim-apk-agent/internal/synchronizer"
"github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/eventhub/types"
logger "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/loggers"
"github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/logging"
msg "github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/messaging"
)

Expand Down Expand Up @@ -180,10 +181,10 @@ func handleAPIEvents(data []byte, eventType string) {
return
}

// Per each revision, synchronization should happen.
// if strings.EqualFold(deployAPIToGateway, apiEvent.Event.Type) {
//go synchronizer.FetchAPIsFromControlPlane(apiEvent.UUID, apiEvent.GatewayLabels)
// }
//Per each revision, synchronization should happen.
if strings.EqualFold(deployAPIToGateway, apiEvent.Event.Type) {
go synchronizer.FetchAPIsFromControlPlane(apiEvent.UUID, apiEvent.GatewayLabels)
}

for _, env := range apiEvent.GatewayLabels {
if isLaterEvent(apiListTimeStampMap, apiEvent.UUID+":"+env, currentTimeStamp) {
Expand Down
172 changes: 172 additions & 0 deletions apim-apk-agent/internal/notifier/deployment_notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright (c) 2024, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package notifier

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"

"github.com/wso2/apk/adapter/pkg/logging"
"github.com/wso2/product-apim-tooling/apim-apk-agent/config"
logger "github.com/wso2/product-apim-tooling/apim-apk-agent/internal/loggers"
"github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/auth"
"github.com/wso2/product-apim-tooling/apim-apk-agent/pkg/tlsutils"
)

const (
deployedRevisionEP string = "internal/data/v1/apis/deployed-revisions"
unDeployedRevisionEP string = "internal/data/v1/apis/undeployed-revision"
authBasic string = "Basic "
authHeader string = "Authorization"
contentTypeHeader string = "Content-Type"
)

// UpdateDeployedRevisions create the DeployedAPIRevision object
func UpdateDeployedRevisions(apiID string, revisionID int, envs []string, vhost string) *DeployedAPIRevision {
revisions := &DeployedAPIRevision{
APIID: apiID,
RevisionID: revisionID,
EnvInfo: []DeployedEnvInfo{},
}
for _, env := range envs {
info := DeployedEnvInfo{
Name: env,
VHost: vhost,
}
revisions.EnvInfo = append(revisions.EnvInfo, info)
}
return revisions
}

// SendRevisionUpdateAck sends succeeded revision deployment acknowledgement to the control plane
func SendRevisionUpdateAck(deployedRevisionList []*DeployedAPIRevision) {
conf, _ := config.ReadConfigs()
cpConfigs := conf.ControlPlane

if len(deployedRevisionList) < 1 || !cpConfigs.Enabled || !cpConfigs.SendRevisionUpdate {
return
}

logger.LoggerNotifier.Debugf("Revision deployed message is sending to Control plane")

revisionEP := cpConfigs.ServiceURL
if strings.HasSuffix(revisionEP, "/") {
revisionEP += deployedRevisionEP
} else {
revisionEP += "/" + deployedRevisionEP
}

jsonValue, _ := json.Marshal(deployedRevisionList)

// Setting authorization header
basicAuth := authBasic + auth.GetBasicAuth(cpConfigs.Username, cpConfigs.Password)

logger.LoggerNotifier.Debugf("Revision deployed message sending to Control plane: %v", string(jsonValue))

// Adding 3 retries for revision update sending
retries := 0
for retries < 3 {
retries++

req, _ := http.NewRequest("PATCH", revisionEP, bytes.NewBuffer(jsonValue))
req.Header.Set(authHeader, basicAuth)
req.Header.Set(contentTypeHeader, "application/json")
resp, err := tlsutils.InvokeControlPlane(req, cpConfigs.SkipSSLVerification)

success := true
if err != nil {
logger.LoggerNotifier.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintf("Error response from %v for attempt %v : %v", revisionEP, retries, err.Error()),
Severity: logging.MAJOR,
ErrorCode: 2100,
})
success = false
}
if resp != nil && resp.StatusCode != http.StatusOK {
logger.LoggerNotifier.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintf("Error response status code %v from %v for attempt %v", resp.StatusCode, revisionEP, retries),
Severity: logging.MINOR,
ErrorCode: 2101,
})
success = false
}
if success {
logger.LoggerNotifier.Infof("Revision deployed message sent to Control plane for attempt %v", retries)
break
}
}
}

// SendRevisionUndeployAck - send the undeployed revision acknowledgement to control plane
func SendRevisionUndeployAck(apiUUID string, revisionUUID string, environment string) {
conf, _ := config.ReadConfigs()
cpConfigs := conf.ControlPlane
if apiUUID == "" || revisionUUID == "" || environment == "" || !cpConfigs.Enabled || !cpConfigs.SendRevisionUpdate {
return
}
revisionEP := cpConfigs.ServiceURL
if strings.HasSuffix(revisionEP, "/") {
revisionEP += unDeployedRevisionEP
} else {
revisionEP += "/" + unDeployedRevisionEP
}

removedRevision := UnDeployedAPIRevision{
APIUUID: apiUUID,
RevisionUUID: revisionUUID,
Environment: environment,
}

jsonValue, _ := json.Marshal(removedRevision)
basicAuth := authBasic + auth.GetBasicAuth(cpConfigs.Username, cpConfigs.Password)
retries := 0
for retries < 3 {
retries++
req, _ := http.NewRequest("POST", revisionEP, bytes.NewBuffer(jsonValue))
req.Header.Set(authHeader, basicAuth)
req.Header.Set(contentTypeHeader, "application/json")
resp, err := tlsutils.InvokeControlPlane(req, cpConfigs.SkipSSLVerification)

success := true
if err != nil {
logger.LoggerNotifier.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintf("Error response from %s for attempt %d : %v", revisionEP, retries, err.Error()),
Severity: logging.MAJOR,
ErrorCode: 2100,
})
success = false
}
if resp != nil && resp.StatusCode != http.StatusOK {
logger.LoggerNotifier.ErrorC(logging.ErrorDetails{
Message: fmt.Sprintf("Error response status code %v from %s for attempt %d", resp.StatusCode, revisionEP, retries),
Severity: logging.MINOR,
ErrorCode: 2101,
})
success = false
}
if success {
logger.LoggerNotifier.Infof("Revision un-deployed message sent to Control plane for attempt %d", retries)
break
}
time.Sleep(2 * time.Second)
}
}
Loading

0 comments on commit 9515f5d

Please sign in to comment.