Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kuma-cp): implement delta xDS for envoy config exchange #11296

Draft
wants to merge 31 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3b81728
poc
lukidzi Aug 4, 2024
87cea34
s
lukidzi Aug 29, 2024
a1bb875
cleanup
lukidzi Sep 3, 2024
af6ae16
cleanup
lukidzi Sep 3, 2024
f231364
more cleanup and tests
lukidzi Sep 3, 2024
caadff5
Merge branch 'master' into delta-poc
lukidzi Sep 3, 2024
1cb3103
fix conflict
lukidzi Oct 21, 2024
cfda701
Merge branch 'master' into delta-poc
lukidzi Nov 5, 2024
b9b339b
cleanup
lukidzi Nov 5, 2024
9ce981f
cleanup
lukidzi Nov 5, 2024
f1fcb25
trigger build
lukidzi Nov 5, 2024
5e99aef
added better doc
lukidzi Nov 6, 2024
a414c2b
Merge branch 'master' into delta-poc
lukidzi Nov 6, 2024
e3896ef
Merge branch 'master' into delta-poc
Icarus9913 Nov 7, 2024
811c7be
fix k8s by adding new field in dataplane
lukidzi Nov 8, 2024
316b2dd
Merge remote-tracking branch 'upstream/master' into delta-poc
lukidzi Nov 8, 2024
fe30989
fix and add error
lukidzi Nov 12, 2024
b667262
Merge branch 'master' into delta-poc
Icarus9913 Nov 12, 2024
bd4543b
Merge remote-tracking branch 'upstream/master' into delta-poc
lukidzi Nov 14, 2024
ce21e0f
Merge branch 'master' into delta-poc
lukidzi Nov 18, 2024
cad052d
Merge branch 'master' into delta-poc
lukidzi Nov 19, 2024
361efe0
Merge branch 'delta-poc' of github.com:lukidzi/kuma into delta-poc
lukidzi Nov 20, 2024
9c57a2e
review
lukidzi Nov 20, 2024
4f3f1aa
test eds fix in go-control-plane
lukidzi Nov 22, 2024
b596303
Merge branch 'master' into delta-poc
lukidzi Nov 22, 2024
8047426
Merge branch 'master' into delta-poc
lukidzi Nov 25, 2024
6ea619b
Merge branch 'master' into delta-poc
lukidzi Nov 25, 2024
291685f
Merge branch 'master' into delta-poc
lukidzi Nov 27, 2024
5d5db40
fix
lukidzi Dec 2, 2024
9cdeeca
Merge branch 'master' into delta-poc
lukidzi Dec 4, 2024
e9b519d
fix
lukidzi Dec 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions app/kuma-dp/pkg/dataplane/envoy/remote_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ func (b *remoteBootstrap) requestForBootstrap(ctx context.Context, client *http.
},
SystemCaPath: params.SystemCaPath,
}
switch cfg.DataplaneRuntime.XdsConfigMode {
case "delta":
request.XdsConfigMode = types.DELTA
case "sotw":
request.XdsConfigMode = types.SOTW
default:
request.XdsConfigMode = types.NOT_DEFINED
}
jsonBytes, err := json.MarshalIndent(request, "", " ")
if err != nil {
return nil, errors.Wrap(err, "could not marshal request to json")
Expand Down
1 change: 1 addition & 0 deletions app/kuma-dp/pkg/dataplane/envoy/remote_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ var _ = Describe("Remote Bootstrap", func() {
cfg.Dataplane.Mesh = "demo"
cfg.Dataplane.Name = "sample"
cfg.DataplaneRuntime.Token = "token"
cfg.DataplaneRuntime.XdsConfigMode = "delta"

return testCase{
config: cfg,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@
"certPath": "",
"keyPath": ""
},
"systemCaPath": ""
"systemCaPath": "",
"xdsConfigMode": 2
}
2 changes: 2 additions & 0 deletions pkg/config/app/kuma-cp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,8 @@ type ExperimentalConfig struct {
// If true skips persisted VIPs. Change to true only if generateMeshServices is enabled.
// Do not enable on production.
SkipPersistedVIPs bool `json:"skipPersistedVIPs" envconfig:"KUMA_EXPERIMENTAL_SKIP_PERSISTED_VIPS"`
// If true uses Delta xDS to deliver changes to sidecars.
UseDeltaXds bool `json:"useDeltaXds" envconfig:"KUMA_EXPERIMENTAL_USE_DELTA_XDS"`
lukidzi marked this conversation as resolved.
Show resolved Hide resolved
}

type ExperimentalKDSEventBasedWatchdog struct {
Expand Down
11 changes: 11 additions & 0 deletions pkg/config/app/kuma-dp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ type DataplaneRuntime struct {
DynamicConfiguration DynamicConfiguration `json:"dynamicConfiguration" envconfig:"kuma_dataplane_runtime_dynamic_configuration"`
// SystemCaPath defines path of system provided Ca
SystemCaPath string `json:"systemCaPath,omitempty" envconfig:"kuma_dataplane_runtime_dynamic_system_ca_path"`
// XdsConfigMode defines xDS communication type between Envoy and control-plane.
// Available values are: sotw, delta
// By default it takes configuration provided by the control-plane.
XdsConfigMode string `json:"xdsConfigMode,omitempty" envconfig:"kuma_dataplane_runtime_dynamic_xds_config_mode"`
}

type Metrics struct {
Expand Down Expand Up @@ -335,6 +339,13 @@ func (d *DataplaneRuntime) Validate() error {
if d.BinaryPath == "" {
errs = multierr.Append(errs, errors.Errorf(".BinaryPath must be non-empty"))
}
if d.XdsConfigMode != "" {
switch d.XdsConfigMode {
case "delta", "sotw":
default:
errs = multierr.Append(errs, errors.Errorf(".XdsConfigMode can be one of: delta, sotw"))
}
}
return errs
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/config/app/kuma-dp/testdata/invalid-config.golden.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
parsing configuration from file 'testdata/invalid-config.input.yaml' failed: configuration validation failed: .ControlPlane is not valid: .Retry is not valid: .Backoff must be a positive duration; .Dataplane is not valid: .ProxyType is not valid: not-a-proxy is not a valid proxy type; .Mesh must be non-empty; .Name must be non-empty; .DrainTime must be positive; .DataplaneRuntime is not valid: .BinaryPath must be non-empty
parsing configuration from file 'testdata/invalid-config.input.yaml' failed: configuration validation failed: .ControlPlane is not valid: .Retry is not valid: .Backoff must be a positive duration; .Dataplane is not valid: .ProxyType is not valid: not-a-proxy is not a valid proxy type; .Mesh must be non-empty; .Name must be non-empty; .DrainTime must be positive; .DataplaneRuntime is not valid: .BinaryPath must be non-empty; .XdsConfigMode can be one of: delta, sotw
1 change: 1 addition & 0 deletions pkg/config/app/kuma-dp/testdata/invalid-config.input.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ dataplane:
proxyType: not-a-proxy
dataplaneRuntime:
binaryPath:
xdsConfigMode: a
1 change: 1 addition & 0 deletions pkg/config/app/kuma-dp/testdata/valid-config.input.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ dataplaneRuntime:
binaryPath: envoy.sh
configDir: /var/run/envoy
envoyLogLevel: trace
xdsConfigMode: delta
3 changes: 3 additions & 0 deletions pkg/config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ var _ = Describe("Config loader", func() {
Expect(cfg.Experimental.SidecarContainers).To(BeTrue())
Expect(cfg.Experimental.SkipPersistedVIPs).To(BeTrue())
Expect(cfg.Experimental.GenerateMeshServices).To(BeTrue())
Expect(cfg.Experimental.UseDeltaXds).To(BeTrue())

Expect(cfg.Proxy.Gateway.GlobalDownstreamMaxConnections).To(BeNumerically("==", 1))
Expect(cfg.EventBus.BufferSize).To(Equal(uint(30)))
Expand Down Expand Up @@ -759,6 +760,7 @@ experimental:
sidecarContainers: true
generateMeshServices: true
skipPersistedVIPs: true
useDeltaXds: true
proxy:
gateway:
globalDownstreamMaxConnections: 1
Expand Down Expand Up @@ -1065,6 +1067,7 @@ meshService:
"KUMA_EXPERIMENTAL_SIDECAR_CONTAINERS": "true",
"KUMA_EXPERIMENTAL_GENERATE_MESH_SERVICES": "true",
"KUMA_EXPERIMENTAL_SKIP_PERSISTED_VIPS": "true",
"KUMA_EXPERIMENTAL_USE_DELTA_XDS": "true",
"KUMA_PROXY_GATEWAY_GLOBAL_DOWNSTREAM_MAX_CONNECTIONS": "1",
"KUMA_TRACING_OPENTELEMETRY_ENDPOINT": "otel-collector:4317",
"KUMA_TRACING_OPENTELEMETRY_ENABLED": "true",
Expand Down
20 changes: 15 additions & 5 deletions pkg/hds/tracker/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ type tracker struct {

sync.RWMutex // protects access to the fields below
streamsAssociation map[xds.StreamID]core_model.ResourceKey
dpStreams map[core_model.ResourceKey]streams
// deltaStreamsAssociation map[xds.StreamID]core_model.ResourceKey
dpStreams map[core_model.ResourceKey]streams
}

func NewCallbacks(
Expand All @@ -57,10 +58,11 @@ func NewCallbacks(
return &tracker{
resourceManager: resourceManager,
streamsAssociation: map[xds.StreamID]core_model.ResourceKey{},
dpStreams: map[core_model.ResourceKey]streams{},
config: config,
log: log,
metrics: metrics,
// deltaStreamsAssociation: map[xds.StreamID]core_model.ResourceKey{},
lukidzi marked this conversation as resolved.
Show resolved Hide resolved
dpStreams: map[core_model.ResourceKey]streams{},
config: config,
log: log,
metrics: metrics,
reconciler: &reconciler{
cache: cache,
hasher: hasher,
Expand All @@ -69,6 +71,14 @@ func NewCallbacks(
}
}

func (t *tracker) OnDeltaStreamOpen(ctx context.Context, streamID int64) error {
return t.OnStreamOpen(ctx, streamID)
}

func (t *tracker) OnDeltaStreamClosed(streamID xds.StreamID) {
t.OnStreamClosed(streamID)
}

func (t *tracker) OnStreamOpen(ctx context.Context, streamID int64) error {
t.metrics.StreamsActiveInc()
return nil
Expand Down
46 changes: 25 additions & 21 deletions pkg/util/xds/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,50 @@ package xds
import (
"context"

discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"
)

// DiscoveryRequest defines interface over real Envoy's DiscoveryRequest.
type DiscoveryRequest interface {
type Request interface {
NodeId() string
// Node returns either a v2 or v3 Node
Node() interface{}
Metadata() *structpb.Struct
VersionInfo() string
GetTypeUrl() string
GetResponseNonce() string
GetResourceNames() []string
GetTypeUrl() string
HasErrors() bool
ErrorMsg() string
VersionInfo() string
GetResourceNames() []string
}

// DiscoveryResponse defines interface over real Envoy's DiscoveryResponse.
type DiscoveryResponse interface {
type Response interface {
GetTypeUrl() string
VersionInfo() string
GetResources() []*anypb.Any
GetNonce() string
VersionInfo() string
GetNumberOfResources() int
}

// DiscoveryRequest defines interface over real Envoy's DiscoveryRequest.
type DiscoveryRequest interface {
Request
}

// DiscoveryResponse defines interface over real Envoy's DiscoveryResponse.
type DiscoveryResponse interface {
Response
}

type DeltaDiscoveryRequest interface {
NodeId() string
// Node returns either a v2 or v3 Node
Node() interface{}
Metadata() *structpb.Struct
GetTypeUrl() string
GetResponseNonce() string
Request
GetResourceNamesSubscribe() []string
GetInitialResourceVersions() map[string]string
HasErrors() bool
ErrorMsg() string
}

// DeltaDiscoveryResponse defines interface over real Envoy's DeltaDiscoveryResponse.
type DeltaDiscoveryResponse interface {
GetTypeUrl() string
GetResources() []*discoveryv3.Resource
Response
GetRemovedResources() []string
GetNonce() string
}

// Callbacks defines Callbacks for xDS streaming requests. The difference over real go-control-plane Callbacks is that it takes an DiscoveryRequest / DiscoveryResponse interface.
Expand Down Expand Up @@ -94,4 +92,10 @@ type RestCallbacks interface {
type MultiCallbacks interface {
Callbacks
RestCallbacks
DeltaCallbacks
}

type MultiXDSCallbacks interface {
Callbacks
DeltaCallbacks
}
29 changes: 29 additions & 0 deletions pkg/util/xds/v3/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_sd "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
envoy_xds "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"

"github.com/kumahq/kuma/pkg/util/xds"
Expand Down Expand Up @@ -175,6 +176,10 @@ type discoveryResponse struct {
*envoy_sd.DiscoveryResponse
}

func (d *discoveryResponse) GetNumberOfResources() int {
return len(d.Resources)
}

func (d *discoveryResponse) VersionInfo() string {
return d.GetVersionInfo()
}
Expand Down Expand Up @@ -211,12 +216,36 @@ func (d *deltaDiscoveryRequest) GetInitialResourceVersions() map[string]string {
return d.InitialResourceVersions
}

func (d *deltaDiscoveryRequest) GetResourceNames() []string {
return d.GetResourceNamesSubscribe()
}

func (d *deltaDiscoveryRequest) VersionInfo() string {
return ""
}

var _ xds.DeltaDiscoveryRequest = &deltaDiscoveryRequest{}

type deltaDiscoveryResponse struct {
*envoy_sd.DeltaDiscoveryResponse
}

func (d *deltaDiscoveryResponse) VersionInfo() string {
return d.SystemVersionInfo
}

func (d *deltaDiscoveryResponse) GetResources() []*anypb.Any {
resources := []*anypb.Any{}
for _, res := range d.Resources {
resources = append(resources, res.Resource)
}
return resources
}

func (d *deltaDiscoveryResponse) GetNumberOfResources() int {
return len(d.Resources)
}

var _ xds.DeltaDiscoveryResponse = &deltaDiscoveryResponse{}

func (d *deltaDiscoveryResponse) GetTypeUrl() string {
Expand Down
53 changes: 47 additions & 6 deletions pkg/xds/auth/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"google.golang.org/grpc/metadata"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
core_manager "github.com/kumahq/kuma/pkg/core/resources/manager"
"github.com/kumahq/kuma/pkg/core/resources/model"
Expand All @@ -30,14 +31,15 @@ type DPNotFoundRetry struct {
MaxTimes uint
}

func NewCallbacks(resManager core_manager.ReadOnlyResourceManager, authenticator Authenticator, dpNotFoundRetry DPNotFoundRetry) util_xds.Callbacks {
func NewCallbacks(resManager core_manager.ReadOnlyResourceManager, authenticator Authenticator, dpNotFoundRetry DPNotFoundRetry) util_xds.MultiXDSCallbacks {
if dpNotFoundRetry.Backoff == 0 { // backoff cannot be 0
dpNotFoundRetry.Backoff = 1 * time.Millisecond
}
return &authCallbacks{
resManager: resManager,
authenticator: authenticator,
streams: map[core_xds.StreamID]stream{},
deltaStreams: map[core_xds.StreamID]stream{},
dpNotFoundRetry: dpNotFoundRetry,
}
}
Expand All @@ -51,6 +53,7 @@ type authCallbacks struct {

sync.RWMutex // protects streams
streams map[core_xds.StreamID]stream
deltaStreams map[core_xds.StreamID]stream
}

type stream struct {
Expand All @@ -62,7 +65,7 @@ type stream struct {
nodeID string
}

var _ util_xds.Callbacks = &authCallbacks{}
var _ util_xds.MultiXDSCallbacks = &authCallbacks{}

func (a *authCallbacks) OnStreamOpen(ctx context.Context, streamID core_xds.StreamID, _ string) error {
a.Lock()
Expand All @@ -82,10 +85,38 @@ func (a *authCallbacks) OnStreamClosed(streamID core_xds.StreamID) {
}

func (a *authCallbacks) OnStreamRequest(streamID core_xds.StreamID, req util_xds.DiscoveryRequest) error {
s, err := a.stream(streamID, req)
return a.onStreamRequest(streamID, req, false)
}

func (a *authCallbacks) OnDeltaStreamOpen(ctx context.Context, streamID core_xds.StreamID, _ string) error {
a.Lock()
defer a.Unlock()

a.deltaStreams[streamID] = stream{
ctx: ctx,
resource: nil,
}

core.Log.V(1).Info("OnDeltaStreamOpen", "streamID", streamID)
return nil
}

func (a *authCallbacks) OnDeltaStreamClosed(streamID int64) {
a.Lock()
delete(a.deltaStreams, streamID)
a.Unlock()
}

func (a *authCallbacks) OnStreamDeltaRequest(streamID core_xds.StreamID, req util_xds.DeltaDiscoveryRequest) error {
return a.onStreamRequest(streamID, req, true)
}

func (a *authCallbacks) onStreamRequest(streamID core_xds.StreamID, req util_xds.Request, isDelta bool) error {
s, err := a.stream(streamID, req, isDelta)
if err != nil {
return err
}
core.Log.V(1).Info("OnStreamDeltaRequest auth", "req", req)

credential, err := ExtractCredential(s.ctx)
if err != nil {
Expand All @@ -95,14 +126,24 @@ func (a *authCallbacks) OnStreamRequest(streamID core_xds.StreamID, req util_xds
return errors.Wrap(err, "authentication failed")
}
a.Lock()
a.streams[streamID] = s
if isDelta {
a.deltaStreams[streamID] = s
} else {
a.streams[streamID] = s
}
a.Unlock()
return nil
}

func (a *authCallbacks) stream(streamID core_xds.StreamID, req util_xds.DiscoveryRequest) (stream, error) {
func (a *authCallbacks) stream(streamID core_xds.StreamID, req util_xds.Request, isDelta bool) (stream, error) {
a.RLock()
s, ok := a.streams[streamID]
var s stream
var ok bool
if isDelta {
s, ok = a.deltaStreams[streamID]
} else {
s, ok = a.streams[streamID]
}
a.RUnlock()
if !ok {
return stream{}, errors.New("stream is not present")
Expand Down
1 change: 1 addition & 0 deletions pkg/xds/bootstrap/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func RegisterBootstrap(rt core_runtime.Runtime) error {
rt.Config().DpServer.Authn.EnableReloadableTokens || rt.Config().Store.Type == store.KubernetesStore,
rt.Config().DpServer.Hds.Enabled,
rt.Config().GetEnvoyAdminPort(),
rt.Config().Experimental.UseDeltaXds,
)
if err != nil {
return err
Expand Down
Loading
Loading