Skip to content

Commit

Permalink
optimized fetching services in exported service controller (#19695)
Browse files Browse the repository at this point in the history
* optimized fetching services in exported service controller

* added aliases for some complex types
  • Loading branch information
aahel authored Nov 21, 2023
1 parent 58cc6ed commit a28f4b7
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,24 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
rt.Logger.Error("error getting partitioned exported services", "error", err)
return err
}
//TODO: we are listing more services than required. this should be optimized
oldComputedExportedService, err := getOldComputedExportedService(ctx, rt, req)
if err != nil {
return err
}
if len(exportedServices) == 0 && len(namespaceExportedServices) == 0 && len(partitionedExportedServices) == 0 {
if oldComputedExportedService.GetResource() != nil {
rt.Logger.Trace("deleting computed exported services")
if err := deleteResource(ctx, rt, oldComputedExportedService.GetResource()); err != nil {
rt.Logger.Error("error deleting computed exported service", "error", err)
return err
}
}
return nil
}
namespace := getNamespaceForServices(exportedServices, namespaceExportedServices, partitionedExportedServices)
services, err := resource.ListDecodedResource[*pbcatalog.Service](ctx, rt.Client, &pbresource.ListRequest{
Tenancy: &pbresource.Tenancy{
Namespace: storage.Wildcard,
Namespace: namespace,
Partition: req.ID.Tenancy.Partition,
PeerName: resource.DefaultPeerName,
},
Expand Down Expand Up @@ -123,20 +137,11 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
builder.track(svc.Id, pes.Data.Consumers)
}
}

oldComputedExportedService, err := getOldComputedExportedService(ctx, rt, req)
if err != nil {
return err
}
newComputedExportedService := builder.build()

if oldComputedExportedService.GetResource() != nil && newComputedExportedService == nil {
rt.Logger.Trace("deleting computed exported services")
_, err := rt.Client.Delete(ctx, &pbresource.DeleteRequest{
Id: oldComputedExportedService.GetResource().GetId(),
Version: oldComputedExportedService.GetResource().GetVersion(),
})
if err != nil {
if err := deleteResource(ctx, rt, oldComputedExportedService.GetResource()); err != nil {
rt.Logger.Error("error deleting computed exported service", "error", err)
return err
}
Expand Down Expand Up @@ -293,3 +298,43 @@ func sortRefValue(m map[resource.ReferenceKey]*serviceExports) []*serviceExports
})
return vals
}

func getNamespaceForServices(exportedServices []*types.DecodedExportedServices, namespaceExportedServices []*types.DecodedNamespaceExportedServices, partitionedExportedServices []*types.DecodedPartitionExportedServices) string {
if len(partitionedExportedServices) > 0 {
return storage.Wildcard
}
resources := []*pbresource.Resource{}
for _, exp := range exportedServices {
resources = append(resources, exp.GetResource())
}
for _, exp := range namespaceExportedServices {
resources = append(resources, exp.GetResource())
}
return getNamespace(resources)
}

func getNamespace(resources []*pbresource.Resource) string {
if len(resources) == 0 {
// We shouldn't ever hit this.
panic("resources cannot be empty")
}

namespace := resources[0].Id.Tenancy.Namespace
for _, res := range resources[1:] {
if res.Id.Tenancy.Namespace != namespace {
return storage.Wildcard
}
}
return namespace
}

func deleteResource(ctx context.Context, rt controller.Runtime, resource *pbresource.Resource) error {
_, err := rt.Client.Delete(ctx, &pbresource.DeleteRequest{
Id: resource.GetId(),
Version: resource.GetVersion(),
})
if err != nil {
return err
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/protobuf/proto"

svc "github.com/hashicorp/consul/agent/grpc-external/services/resource"
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
Expand Down Expand Up @@ -83,31 +82,19 @@ func (suite *controllerSuite) appendTenancyInfo(tenancy *pbresource.Tenancy) str
return fmt.Sprintf("%s_Namespace_%s_Partition", tenancy.Namespace, tenancy.Partition)
}

func removeService(consumers []*pbmulticluster.ComputedExportedService, ref *pbresource.Reference) []*pbmulticluster.ComputedExportedService {
newConsumers := []*pbmulticluster.ComputedExportedService{}
for _, consumer := range consumers {
if !proto.Equal(consumer.TargetRef, ref) {
newConsumers = append(newConsumers, consumer)
}
}
return newConsumers
}

func (suite *controllerSuite) getComputedExportedSvc(id *pbresource.ID) *pbmulticluster.ComputedExportedServices {
computedExportedService := suite.client.RequireResourceExists(suite.T(), id)
decodedComputedExportedService := rtest.MustDecode[*pbmulticluster.ComputedExportedServices](suite.T(), computedExportedService)
return decodedComputedExportedService.Data
}

var svc0, svc2, svc3, svc4, svc5 *pbresource.Resource

func (suite *controllerSuite) reconcileTest(tenancy *pbresource.Tenancy) {
id := rtest.Resource(pbmulticluster.ComputedExportedServicesType, "global").WithTenancy(&pbresource.Tenancy{
Partition: tenancy.Partition,
}).ID()
require.NotNil(suite.T(), id)

rtest.Resource(pbcatalog.ServiceType, "svc1").
svc1 := rtest.Resource(pbcatalog.ServiceType, "svc1").
WithData(suite.T(), &pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
Expand Down Expand Up @@ -137,7 +124,7 @@ func (suite *controllerSuite) reconcileTest(tenancy *pbresource.Tenancy) {
expectedComputedExportedService := getExpectation(tenancy, suite.isEnterprise, 0)
prototest.AssertDeepEqual(suite.T(), expectedComputedExportedService, actualComputedExportedService)

svc2 = rtest.Resource(pbcatalog.ServiceType, "svc2").
rtest.Resource(pbcatalog.ServiceType, "svc2").
WithData(suite.T(), &pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
Expand All @@ -146,7 +133,7 @@ func (suite *controllerSuite) reconcileTest(tenancy *pbresource.Tenancy) {
WithTenancy(tenancy).
Write(suite.T(), suite.client)

svc0 = rtest.Resource(pbcatalog.ServiceType, "svc0").
svc0 := rtest.Resource(pbcatalog.ServiceType, "svc0").
WithData(suite.T(), &pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
Expand All @@ -172,7 +159,7 @@ func (suite *controllerSuite) reconcileTest(tenancy *pbresource.Tenancy) {
expectedComputedExportedService = getExpectation(tenancy, suite.isEnterprise, 1)
prototest.AssertDeepEqual(suite.T(), expectedComputedExportedService, actualComputedExportedService)

svc3 = rtest.Resource(pbcatalog.ServiceType, "svc3").
svc3 := rtest.Resource(pbcatalog.ServiceType, "svc3").
WithData(suite.T(), &pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
Expand Down Expand Up @@ -212,7 +199,7 @@ func (suite *controllerSuite) reconcileTest(tenancy *pbresource.Tenancy) {
expectedComputedExportedService = getExpectation(tenancy, suite.isEnterprise, 4)
prototest.AssertDeepEqual(suite.T(), expectedComputedExportedService, actualComputedExportedService)

svc4 = rtest.Resource(pbcatalog.ServiceType, "svc4").
svc4 := rtest.Resource(pbcatalog.ServiceType, "svc4").
WithData(suite.T(), &pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
Expand All @@ -238,7 +225,7 @@ func (suite *controllerSuite) reconcileTest(tenancy *pbresource.Tenancy) {
expectedComputedExportedService = getExpectation(tenancy, suite.isEnterprise, 6)
prototest.AssertDeepEqual(suite.T(), expectedComputedExportedService, actualComputedExportedService)

svc5 = rtest.Resource(pbcatalog.ServiceType, "svc5").
rtest.Resource(pbcatalog.ServiceType, "svc5").
WithData(suite.T(), &pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
Expand Down Expand Up @@ -278,6 +265,37 @@ func (suite *controllerSuite) reconcileTest(tenancy *pbresource.Tenancy) {

suite.client.RequireResourceNotFound(suite.T(), id)

nameexpSvc1 := rtest.Resource(pbmulticluster.NamespaceExportedServicesType, "namesvc1").WithData(suite.T(), exportedNamespaceSvcData).WithTenancy(&pbresource.Tenancy{
Partition: tenancy.Partition,
Namespace: "app",
}).Write(suite.T(), suite.client)
require.NotNil(suite.T(), nameexpSvc1)
err = suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: id})
require.NoError(suite.T(), err)
actualComputedExportedService = suite.getComputedExportedSvc(id)
expectedComputedExportedService = getExpectation(&pbresource.Tenancy{
Partition: tenancy.Partition,
Namespace: "app",
}, suite.isEnterprise, 10)
prototest.AssertDeepEqual(suite.T(), expectedComputedExportedService, actualComputedExportedService)

expSvc1 := rtest.Resource(pbmulticluster.ExportedServicesType, "expsvc1").WithData(suite.T(), exportedSvcData).WithTenancy(tenancy).Write(suite.T(), suite.client)
require.NotNil(suite.T(), expSvc1)

err = suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: id})
require.NoError(suite.T(), err)
actualComputedExportedService = suite.getComputedExportedSvc(id)
expectedComputedExportedService = getExpectation(tenancy, suite.isEnterprise, 11)
prototest.AssertDeepEqual(suite.T(), expectedComputedExportedService, actualComputedExportedService)

suite.client.MustDelete(suite.T(), svc0.Id)
suite.client.MustDelete(suite.T(), svc1.Id)

err = suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: id})
require.NoError(suite.T(), err)

suite.client.RequireResourceNotFound(suite.T(), id)

}

func getExpectation(tenancy *pbresource.Tenancy, isEnterprise bool, testCase int) *pbmulticluster.ComputedExportedServices {
Expand Down Expand Up @@ -416,6 +434,15 @@ func getExpectation(tenancy *pbresource.Tenancy, isEnterprise bool, testCase int
return makeCES(
makeConsumer(svc1Ref, peer0Consumer, part0Consumer),
)
case 10:
return makeCES(
makeConsumer(svc0Ref, peer1Consumer),
)
case 11:
return makeCES(
makeConsumer(svc0Ref, peer1Consumer),
makeConsumer(svc1Ref, peer0Consumer, part0Consumer),
)
}

return nil
Expand Down
15 changes: 15 additions & 0 deletions internal/multicluster/internal/types/decoded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package types

import (
"github.com/hashicorp/consul/internal/resource"
pbmulticluster "github.com/hashicorp/consul/proto-public/pbmulticluster/v2beta1"
)

type (
DecodedExportedServices = resource.DecodedResource[*pbmulticluster.ExportedServices]
DecodedNamespaceExportedServices = resource.DecodedResource[*pbmulticluster.NamespaceExportedServices]
DecodedPartitionExportedServices = resource.DecodedResource[*pbmulticluster.PartitionExportedServices]
)

0 comments on commit a28f4b7

Please sign in to comment.