Skip to content

Commit

Permalink
Merge pull request #69 from MrSupiri/issue-66
Browse files Browse the repository at this point in the history
Changed the peeling mechanism
  • Loading branch information
isala404 authored Aug 19, 2021
2 parents 7d42f59 + f355616 commit 77f2c47
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 29 deletions.
6 changes: 3 additions & 3 deletions config/samples/simulations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ spec:
labels:
loggingplumber.isala.me/test: simulations
filters:
- record_modifier:
records:
- foo: "bar"
- grep:
regexp:
- key: first
pattern: /^5\d\d$/
- record_modifier:
records:
- foo: "bar"
---
apiVersion: logging.banzaicloud.io/v1beta1
kind: Output
Expand Down
17 changes: 12 additions & 5 deletions controllers/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,21 @@ func (r *FlowTestReconciler) cleanUpOutputResources(ctx context.Context) error {
logger := log.FromContext(ctx)

var flowTests loggingplumberv1alpha1.FlowTestList
if err := r.List(ctx, &flowTests, &client.MatchingLabels{"app.kubernetes.io/created-by": "logging-plumber"}); err != nil {
if err := r.List(ctx, &flowTests); err != nil {
logger.Error(err, fmt.Sprintf("failed to get provisioned %s", flowTests.Kind))
return err
}
for _, flowTest := range flowTests.Items {
if flowTest.Status.Status != loggingplumberv1alpha1.Completed {
logger.V(1).Info("unfinished flowtest found, skipping cleanUpOutputResources")
return nil
if len(flowTests.Items) > 1 {
for _, flowTest := range flowTests.Items {
if flowTest.Status.Status != loggingplumberv1alpha1.Completed && !flowTest.ObjectMeta.DeletionTimestamp.IsZero() {
logger.V(1).Info("unfinished flowtest found, skipping cleanup of log-aggregator")
return nil
}
}
}

logger.V(1).Info("no running flowtest found, cleaning up log-aggregator")

matchingLabels := &client.MatchingLabels{"loggingplumber.isala.me/component": "log-aggregator"}
var podList v1.PodList
if err := r.List(ctx, &podList, matchingLabels); client.IgnoreNotFound(err) != nil {
Expand Down Expand Up @@ -152,6 +156,8 @@ func (r *FlowTestReconciler) cleanUpOutputResources(ctx context.Context) error {

func (r *FlowTestReconciler) deleteResources(ctx context.Context, finalizerName string) error {
flowTest := ctx.Value("flowTest").(loggingplumberv1alpha1.FlowTest)
logger := log.FromContext(ctx)

if err := r.cleanUpResources(ctx, flowTest.ObjectMeta.Name); client.IgnoreNotFound(err) != nil {
return err
}
Expand All @@ -161,6 +167,7 @@ func (r *FlowTestReconciler) deleteResources(ctx context.Context, finalizerName
// remove our finalizer from the list and update it.
controllerutil.RemoveFinalizer(&flowTest, finalizerName)
if err := r.Update(ctx, &flowTest); err != nil {
logger.Error(err, "failed to remove the finalizer")
return err
}
return nil
Expand Down
25 changes: 20 additions & 5 deletions controllers/flowtest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/record"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -67,7 +68,15 @@ func (r *FlowTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
logger.Info("Reconciling")

var flowTest loggingplumberv1alpha1.FlowTest
if err := r.Get(ctx, req.NamespacedName, &flowTest); err != client.IgnoreNotFound(err) {
if err := r.Get(ctx, req.NamespacedName, &flowTest); err != nil {
// all the resources are already deleted
if apierrors.IsNotFound(err) {
// Remove if log aggregator is still running
if err := r.cleanUpOutputResources(ctx); client.IgnoreNotFound(err) != nil {
return ctrl.Result{Requeue: true}, err
}
return ctrl.Result{Requeue: false}, nil
}
logger.Error(err, "failed to get the flowtest")
return ctrl.Result{Requeue: false}, err
}
Expand All @@ -84,7 +93,12 @@ func (r *FlowTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{Requeue: true}, err
}
// Stop reconciliation as the item is being deleted
return ctrl.Result{}, nil
return ctrl.Result{Requeue: false}, nil
}

if flowTest.ObjectMeta.Name == "" {
logger.V(-1).Info("flowtest without a name queued")
return ctrl.Result{Requeue: false}, nil
}

// Reconcile depending on status
Expand All @@ -95,15 +109,16 @@ func (r *FlowTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
// Set the finalizer
controllerutil.AddFinalizer(&flowTest, finalizerName)
if err := r.Update(ctx, &flowTest); err != nil {
logger.Error(err, "failed to update flowtest status")
logger.Error(err, "failed to add finalizer")
return ctrl.Result{Requeue: true}, err
}
// Set the status
flowTest.Status.Status = loggingplumberv1alpha1.Created
if err := r.Status().Update(ctx, &flowTest); err != nil {
logger.Error(err, "failed to update flowtest status")
logger.Error(err, "failed to set status as created")
return ctrl.Result{Requeue: true}, err
}
r.Recorder.Event(&flowTest, v1.EventTypeNormal, EventReasonProvision, "moved to created state")
return ctrl.Result{Requeue: true}, nil

case loggingplumberv1alpha1.Created:
Expand All @@ -121,7 +136,7 @@ func (r *FlowTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if time.Now().After(fiveMinuteAfterCreation) {
flowTest.Status.Status = loggingplumberv1alpha1.Completed
if err := r.Status().Update(ctx, &flowTest); err != nil {
logger.Error(err, "failed to update flowtest status")
logger.Error(err, "failed to set status as completed")
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{RequeueAfter: 1 * time.Second}, nil
Expand Down
22 changes: 14 additions & 8 deletions controllers/provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ func (r *FlowTestReconciler) deploySlicedFlows(ctx context.Context, extraLabels
flowTest.Status.FilterStatus = make([]bool, len(referenceFlow.Spec.Filters))

i := 0
flowTemplate, outTemplate := r.clusterFlowTemplates(referenceFlow, *flowTest, extraLabels)
for x := 1; x <= len(referenceFlow.Spec.Match); x++ {
flowTemplate, outTemplate := r.clusterFlowTemplates(referenceFlow, *flowTest)
for x := 0; x <= len(referenceFlow.Spec.Match)-1; x++ {
targetFlow := *flowTemplate.DeepCopy()
targetOutput := *outTemplate.DeepCopy()

Expand All @@ -158,7 +158,7 @@ func (r *FlowTestReconciler) deploySlicedFlows(ctx context.Context, extraLabels

targetFlow.Spec.GlobalOutputRefs = []string{targetOutput.ObjectMeta.Name}

targetFlow.Spec.Match = append(targetFlow.Spec.Match, referenceFlow.Spec.Match[:x]...)
targetFlow.Spec.Match = []flowv1beta1.ClusterMatch{referenceFlow.Spec.Match[x]}

if err = r.Create(ctx, &targetOutput); err != nil {
logger.Error(err, fmt.Sprintf("failed to deploy Flow #%d for %s", i, referenceFlow.ObjectMeta.Name))
Expand Down Expand Up @@ -189,7 +189,10 @@ func (r *FlowTestReconciler) deploySlicedFlows(ctx context.Context, extraLabels

targetFlow.Spec.GlobalOutputRefs = []string{targetOutput.ObjectMeta.Name}

targetFlow.Spec.Match = nil
// ensure logs are only coming from our simulation pod
targetFlow.Spec.Match = []flowv1beta1.ClusterMatch{{
ClusterSelect: &flowv1beta1.ClusterSelect{Labels: extraLabels},
}}

targetFlow.Spec.Filters = append(targetFlow.Spec.Filters, referenceFlow.Spec.Filters[:x]...)

Expand Down Expand Up @@ -219,9 +222,9 @@ func (r *FlowTestReconciler) deploySlicedFlows(ctx context.Context, extraLabels
flowTest.Status.FilterStatus = make([]bool, len(referenceFlow.Spec.Filters))

i := 0
flowTemplate, outTemplate := r.flowTemplates(referenceFlow, *flowTest, extraLabels)
flowTemplate, outTemplate := r.flowTemplates(referenceFlow, *flowTest)

for x := 1; x <= len(referenceFlow.Spec.Match); x++ {
for x := 0; x <= len(referenceFlow.Spec.Match)-1; x++ {
targetFlow := *flowTemplate.DeepCopy()
targetOutput := *outTemplate.DeepCopy()

Expand All @@ -236,7 +239,7 @@ func (r *FlowTestReconciler) deploySlicedFlows(ctx context.Context, extraLabels

targetFlow.Spec.LocalOutputRefs = []string{targetOutput.ObjectMeta.Name}

targetFlow.Spec.Match = append(targetFlow.Spec.Match, referenceFlow.Spec.Match[:x]...)
targetFlow.Spec.Match = []flowv1beta1.Match{referenceFlow.Spec.Match[x]}

if err = r.Create(ctx, &targetOutput); err != nil {
logger.Error(err, fmt.Sprintf("failed to deploy Flow #%d for %s", i, referenceFlow.ObjectMeta.Name))
Expand Down Expand Up @@ -267,7 +270,10 @@ func (r *FlowTestReconciler) deploySlicedFlows(ctx context.Context, extraLabels

targetFlow.Spec.LocalOutputRefs = []string{targetOutput.ObjectMeta.Name}

targetFlow.Spec.Match = nil
// ensure logs are only coming from our simulation pod
targetFlow.Spec.Match = []flowv1beta1.Match{{
Select: &flowv1beta1.Select{Labels: extraLabels},
}}

targetFlow.Spec.Filters = append(targetFlow.Spec.Filters, referenceFlow.Spec.Filters[:x]...)

Expand Down
12 changes: 4 additions & 8 deletions controllers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
EventReasonReconcile = "Reconcile"
)

func (r *FlowTestReconciler) flowTemplates(flow flowv1beta1.Flow, flowTest loggingplumberv1alpha1.FlowTest, extraLabels map[string]string) (flowv1beta1.Flow, flowv1beta1.Output) {
func (r *FlowTestReconciler) flowTemplates(flow flowv1beta1.Flow, flowTest loggingplumberv1alpha1.FlowTest) (flowv1beta1.Flow, flowv1beta1.Output) {
flowTemplate := flowv1beta1.Flow{
TypeMeta: metav1.TypeMeta{
APIVersion: "logging.banzaicloud.io/v1beta1",
Expand All @@ -35,9 +35,7 @@ func (r *FlowTestReconciler) flowTemplates(flow flowv1beta1.Flow, flowTest loggi
},
Spec: flowv1beta1.FlowSpec{
LocalOutputRefs: nil,
Match: []flowv1beta1.Match{{
Select: &flowv1beta1.Select{Labels: extraLabels},
}},
Match: nil,
Filters: []flowv1beta1.Filter{{
Grep: &filters.GrepConfig{
Regexp: []filters.RegexpSection{{
Expand Down Expand Up @@ -73,7 +71,7 @@ func (r *FlowTestReconciler) flowTemplates(flow flowv1beta1.Flow, flowTest loggi
return flowTemplate, outTemplate
}

func (r *FlowTestReconciler) clusterFlowTemplates(flow flowv1beta1.ClusterFlow, flowTest loggingplumberv1alpha1.FlowTest, extraLabels map[string]string) (flowv1beta1.ClusterFlow, flowv1beta1.ClusterOutput) {
func (r *FlowTestReconciler) clusterFlowTemplates(flow flowv1beta1.ClusterFlow, flowTest loggingplumberv1alpha1.FlowTest) (flowv1beta1.ClusterFlow, flowv1beta1.ClusterOutput) {
flowTemplate := flowv1beta1.ClusterFlow{
TypeMeta: metav1.TypeMeta{
APIVersion: "logging.banzaicloud.io/v1beta1",
Expand All @@ -86,9 +84,7 @@ func (r *FlowTestReconciler) clusterFlowTemplates(flow flowv1beta1.ClusterFlow,
},
Spec: flowv1beta1.ClusterFlowSpec{
GlobalOutputRefs: nil,
Match: []flowv1beta1.ClusterMatch{{
ClusterSelect: &flowv1beta1.ClusterSelect{Labels: extraLabels},
}},
Match: nil,
Filters: []flowv1beta1.Filter{{
Grep: &filters.GrepConfig{
Regexp: []filters.RegexpSection{{
Expand Down

0 comments on commit 77f2c47

Please sign in to comment.