Skip to content

Commit

Permalink
chore: refactor code to improve readability
Browse files Browse the repository at this point in the history
  • Loading branch information
nickytd committed Jun 18, 2024
1 parent d61b094 commit 7983f57
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 167 deletions.
26 changes: 14 additions & 12 deletions cmd/fluent-bit-vali-plugin/out_vali.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,14 @@ func FLBPluginInit(ctx unsafe.Pointer) int {

// numeric plugin ID, only used for user-facing purpose (logging, ...)
id := len(plugins)
logger := log.With(newLogger(conf.LogLevel), "ts", log.DefaultTimestampUTC, "id", id)
_logger := log.With(newLogger(conf.LogLevel), "ts", log.DefaultTimestampUTC, "id", id)

level.Info(logger).Log("[flb-go]", "Starting fluent-bit-go-vali",
_ = level.Info(_logger).Log(
"[flb-go]", "Starting fluent-bit-go-vali",
"version", version.Get().GitVersion,
"revision", version.Get().GitCommit)
paramLogger := log.With(logger, "[flb-go]", "provided parameter")
"revision", version.Get().GitCommit,
)
paramLogger := log.With(_logger, "[flb-go]", "provided parameter")
level.Info(paramLogger).Log("URL", conf.ClientConfig.CredativValiConfig.URL)
level.Info(paramLogger).Log("TenantID", conf.ClientConfig.CredativValiConfig.TenantID)
level.Info(paramLogger).Log("BatchWait", conf.ClientConfig.CredativValiConfig.BatchWait)
Expand Down Expand Up @@ -186,10 +188,10 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
level.Info(paramLogger).Log("SendLogsToDefaultClientWhenClusterIsInRestoreState", fmt.Sprintf("%+v", conf.ControllerConfig.DefaultControllerClientConfig.SendLogsWhenIsInRestoreState))
level.Info(paramLogger).Log("SendLogsToDefaultClientWhenClusterIsInMigrationState", fmt.Sprintf("%+v", conf.ControllerConfig.DefaultControllerClientConfig.SendLogsWhenIsInMigrationState))

plugin, err := valiplugin.NewPlugin(informer, conf, logger)
plugin, err := valiplugin.NewPlugin(informer, conf, _logger)
if err != nil {
metrics.Errors.WithLabelValues(metrics.ErrorNewPlugin).Inc()
level.Error(logger).Log("newPlugin", err)
level.Error(_logger).Log("newPlugin", err)
return output.FLB_ERROR
}

Expand Down Expand Up @@ -279,18 +281,18 @@ func FLBPluginExit() int {
}

func newLogger(logLevel logging.Level) log.Logger {
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = level.NewFilter(logger, logLevel.Gokit)
logger = log.With(logger, "caller", log.Caller(3))
return logger
_logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
_logger = level.NewFilter(_logger, logLevel.Gokit)
_logger = log.With(_logger, "caller", log.Caller(3))
return _logger
}

func getInclusterKubernetsClient() (gardenerclientsetversioned.Interface, error) {
config, err := rest.InClusterConfig()
c, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
return gardenerclientsetversioned.NewForConfig(config)
return gardenerclientsetversioned.NewForConfig(c)
}

func main() {}
30 changes: 15 additions & 15 deletions pkg/controller/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"time"

gardenercorev1beta1 "github.com/gardener/gardener/pkg/apis/core/v1beta1"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
giterrors "github.com/pkg/errors"
"github.com/prometheus/common/model"

Expand All @@ -31,8 +31,8 @@ func (ctl *controller) GetClient(name string) (client.ValiClient, bool) {
return nil, true
}

if client, ok := ctl.clients[name]; ok {
return client, false
if c, ok := ctl.clients[name]; ok {
return c, false
}

return nil, false
Expand Down Expand Up @@ -61,28 +61,28 @@ func (ctl *controller) newControllerClient(clientConf *config.Config) (Controlle
}

func (ctl *controller) createControllerClient(clusterName string, shoot *gardenercorev1beta1.Shoot) {
clientConf := ctl.getClientConfig(clusterName)
clientConf := ctl.updateClientConfig(clusterName)
if clientConf == nil {
return
}

client, err := ctl.newControllerClient(clientConf)
c, err := ctl.newControllerClient(clientConf)
if err != nil {
metrics.Errors.WithLabelValues(metrics.ErrorFailedToMakeValiClient).Inc()
_ = level.Error(ctl.logger).Log("msg", fmt.Sprintf("failed to make new vali client for cluster %v", clusterName), "error", err.Error())
return
}

ctl.updateControllerClientState(client, shoot)
ctl.updateControllerClientState(c, shoot)

_ = level.Info(ctl.logger).Log("msg", "add client", "cluster", clusterName, "state", client.GetState())
_ = level.Info(ctl.logger).Log("msg", "add client", "cluster", clusterName, "state", c.GetState())
ctl.lock.Lock()
defer ctl.lock.Unlock()

if ctl.isStopped() {
return
}
ctl.clients[clusterName] = client
ctl.clients[clusterName] = c
}

func (ctl *controller) deleteControllerClient(clusterName string) {
Expand All @@ -93,15 +93,15 @@ func (ctl *controller) deleteControllerClient(clusterName string) {
return
}

client, ok := ctl.clients[clusterName]
if ok && client != nil {
c, ok := ctl.clients[clusterName]
if ok && c != nil {
delete(ctl.clients, clusterName)
}

ctl.lock.Unlock()
if ok && client != nil {
if ok && c != nil {
_ = level.Info(ctl.logger).Log("msg", "delete client", "cluster", clusterName)
client.StopWait()
c.StopWait()
}
}

Expand Down Expand Up @@ -157,9 +157,9 @@ func (c *controllerClient) Handle(ls model.LabelSet, t time.Time, s string) erro

if sendToMain {
// because this client does not alter the labels set we don't need to clone
// the it if we don't spread the logs between the two clients. But if we
// it if we don't spread the logs between the two clients. But if we
// are sending the log record to both client we have to pass a copy because
// we are not sure what kind of label set processing will be done in the coresponding
// we are not sure what kind of label set processing will be done in the corresponding
// client which can lead to "concurrent map iteration and map write error".
if err := c.mainClient.Handle(copyLabelSet(ls, sendToDefault), t, s); err != nil {
combineErr = giterrors.Wrap(combineErr, err.Error())
Expand Down
123 changes: 37 additions & 86 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,32 @@ type controller struct {
}

// NewController return Controller interface
func NewController(informer cache.SharedIndexInformer, conf *config.Config, defaultClient client.ValiClient,
logger log.Logger) (Controller, error) {
controller := &controller{
func NewController(informer cache.SharedIndexInformer, conf *config.Config, defaultClient client.ValiClient, l log.Logger) (Controller, error) {
ctl := &controller{
clients: make(map[string]ControllerClient, expectedActiveClusters),
conf: conf,
defaultClient: defaultClient,
logger: logger,
logger: l,
}

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addFunc,
DeleteFunc: controller.delFunc,
UpdateFunc: controller.updateFunc,
})
if _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctl.addFunc,
DeleteFunc: ctl.delFunc,
UpdateFunc: ctl.updateFunc,
}); err != nil {
return nil, fmt.Errorf("failed to add event handler: %v", err)
}

stopChan := make(chan struct{})
time.AfterFunc(conf.ControllerConfig.CtlSyncTimeout, func() {
close(stopChan)
})

if !cache.WaitForCacheSync(stopChan, informer.HasSynced) {
if !cache.WaitForNamedCacheSync("controller", stopChan, informer.HasSynced) {
return nil, fmt.Errorf("failed to wait for caches to sync")
}

return controller, nil
return ctl, nil
}

func (ctl *controller) Stop() {
Expand All @@ -90,6 +91,7 @@ func (ctl *controller) Stop() {
})
}

// cluster informer callback
func (ctl *controller) addFunc(obj interface{}) {
cluster, ok := obj.(*extensionsv1alpha1.Cluster)
if !ok {
Expand All @@ -105,7 +107,7 @@ func (ctl *controller) addFunc(obj interface{}) {
return
}

if ctl.matches(shoot) && !ctl.isDeletedShoot(shoot) {
if ctl.isAllowedShoot(shoot) && !ctl.isDeletedShoot(shoot) {
ctl.createControllerClient(cluster.Name, shoot)
}
}
Expand Down Expand Up @@ -139,26 +141,27 @@ func (ctl *controller) updateFunc(oldObj interface{}, newObj interface{}) {

_ = level.Info(ctl.logger).Log("msg", "reconciling", "cluster", newCluster.Name)

client, ok := ctl.clients[newCluster.Name]
//The client exist in the list so we have to update it
_client, ok := ctl.clients[newCluster.Name]
// The client exists in the list, so we need to update it.
if ok {
// The shoot is no longer applicable for logging
if !ctl.matches(shoot) {
if !ctl.isAllowedShoot(shoot) {
ctl.deleteControllerClient(oldCluster.Name)
return
}
// Sanity check
if client == nil {
_ = level.Error(ctl.logger).Log("msg", fmt.Sprintf("The client for cluster %v is NIL. Will try to create new one", oldCluster.Name))
if _client == nil {
_ = level.Error(ctl.logger).Log(
"msg", fmt.Sprintf("The client for cluster %v is NIL. Will try to create new one", oldCluster.Name),
)
ctl.createControllerClient(newCluster.Name, shoot)
return
}

//TODO: replace createControllerClient with updateControllerClientState function once the loki->vali transition is over.
ctl.recreateControllerClient(newCluster.Name, shoot)
ctl.updateControllerClientState(_client, shoot)
} else {
//The client does not exist and we will try to create a new one if the shoot is applicable for logging
if ctl.matches(shoot) {
// The client does not exist. Try to create a new one, if the shoot is applicable for logging.
if ctl.isAllowedShoot(shoot) {
ctl.createControllerClient(newCluster.Name, shoot)
}
}
Expand All @@ -175,88 +178,36 @@ func (ctl *controller) delFunc(obj interface{}) {
ctl.deleteControllerClient(cluster.Name)
}

func (ctl *controller) getClientConfig(namespace string) *config.Config {
// updateClientConfig constructs the target URL and sets it in the client configuration
// together with the queue name
func (ctl *controller) updateClientConfig(clusterName string) *config.Config {
var clientURL flagext.URLValue

suffix := ctl.conf.ControllerConfig.DynamicHostSuffix
// Here we try to check the target logging backend. If we succeed,
// it takes precedence over the DynamicHostSuffix.
//TODO (nickytd) to remove the target logging backend check after the migration from loki to vali
if t := ctl.checkTargetLoggingBackend(ctl.conf.ControllerConfig.DynamicHostPrefix, namespace); len(t) > 0 {
suffix = t
}

url := fmt.Sprintf("%s%s%s", ctl.conf.ControllerConfig.DynamicHostPrefix, namespace, suffix)
_ = level.Info(ctl.logger).Log("msg", "set URL", "url", url, "cluster", namespace)
// Construct the target URL: DynamicHostPrefix + clusterName + DynamicHostSuffix
url := fmt.Sprintf("%s%s%s", ctl.conf.ControllerConfig.DynamicHostPrefix, clusterName, suffix)
_ = level.Info(ctl.logger).Log("msg", "set url", "url", url, "cluster", clusterName)

err := clientURL.Set(url)
if err != nil {
metrics.Errors.WithLabelValues(metrics.ErrorFailedToParseURL).Inc()
_ = level.Error(ctl.logger).Log("msg", fmt.Sprintf("failed to parse client URL for %v", namespace), "error", err.Error())
_ = level.Error(ctl.logger).Log(
"msg",
fmt.Sprintf("failed to parse client URL for %v: %v", clusterName, err.Error()),
)
return nil
}

conf := *ctl.conf
conf.ClientConfig.CredativValiConfig.URL = clientURL
conf.ClientConfig.BufferConfig.DqueConfig.QueueName = namespace
conf.ClientConfig.BufferConfig.DqueConfig.QueueName = clusterName

return &conf
}

func (ctl *controller) checkTargetLoggingBackend(prefix string, namespace string) string {
httpClient := http.Client{
Timeout: 5 * time.Second,
}
// let's create a Retriable Get
g := func(client http.Client, url string) (*http.Response, error) {
return client.Get(url)
}

// turning a logging backend config endpoint getter to a retryable with 5 retries and 2 seconds delay in between
retriableGet := retry(g, 5, 2*time.Second)
//we perform a retriable get
url := prefix + namespace + loggingBackendConfigEndPoint
resp, err := retriableGet(httpClient, url)
if err != nil {
_ = level.Error(ctl.logger).Log("msg",
fmt.Errorf("give up, can not connect to the target config endpoint %s after 5 retries", url))
return ""
}

if resp.StatusCode != 200 {
_ = level.Error(ctl.logger).Log("msg", fmt.Errorf("response status code is not expected, got %d, expected 200", resp.StatusCode))
return ""
}

cfg, err := ioutil.ReadAll(resp.Body)
if err != nil {
_ = level.Error(ctl.logger).Log("msg", fmt.Sprintf("error reading config from the response for %v", namespace), "error", err.Error())
return ""
}
scanner := bufio.NewScanner(strings.NewReader(string(cfg)))
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if strings.HasPrefix(line, "instance_id") {
instanceId := strings.Split(line, ":")
if len(instanceId) != 2 {
_ = level.Error(ctl.logger).Log("msg",
fmt.Sprintf("instance id is not in the expected format %s for %v", instanceId[0], namespace),
"error", err.Error())
return ""
}
switch {
case strings.Contains(instanceId[1], "loki"):
return lokiAPIPushEndPoint
case strings.Contains(instanceId[1], "vali"):
return valiAPIPushEndPoint
}
}
}
return ""
}

func (ctl *controller) matches(shoot *gardenercorev1beta1.Shoot) bool {
// Shoots which are testing shoots should not be targeted for logging
func (ctl *controller) isAllowedShoot(shoot *gardenercorev1beta1.Shoot) bool {
return !isTestingShoot(shoot)
}

Expand Down
Loading

0 comments on commit 7983f57

Please sign in to comment.