Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-1.23] Fix tcp telemetry for service-oriented waypoints #6036

Open
wants to merge 9 commits into
base: release-1.23
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ envoy_cc_library(
"@envoy//envoy/stats:stats_macros",
"@envoy//envoy/stream_info:filter_state_interface",
"@envoy//source/common/http:utility_lib",
"@envoy//source/common/network:filter_state_dst_address_lib",
"@envoy//source/common/network:utility_lib",
"@envoy//source/common/protobuf",
"@envoy//source/common/protobuf:utility_lib",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "source/extensions/filters/network/metadata_exchange/metadata_exchange.h"

#include <cstdint>
#include <memory>
#include <string>

#include "absl/base/internal/endian.h"
Expand All @@ -25,6 +26,8 @@
#include "envoy/stats/scope.h"
#include "source/common/buffer/buffer_impl.h"
#include "source/common/protobuf/utility.h"
#include "source/common/network/utility.h"
#include "source/common/network/filter_state_dst_address.h"
#include "source/extensions/filters/network/metadata_exchange/metadata_exchange_initial_header.h"

namespace Envoy {
Expand Down Expand Up @@ -327,18 +330,98 @@ std::string MetadataExchangeFilter::getMetadataId() { return local_info_.node().

void MetadataExchangeFilter::setMetadataNotFoundFilterState() {
if (config_->metadata_provider_) {
Network::Address::InstanceConstSharedPtr upstream_peer;
const StreamInfo::StreamInfo& info = read_callbacks_->connection().streamInfo();
if (info.upstreamInfo()) {
auto upstream_host = info.upstreamInfo().value().get().upstreamHost();
if (upstream_host) {
const auto address = upstream_host->address();
ENVOY_LOG(debug, "Trying to check upstream host info of host {}", address->asString());
switch (address->type()) {
case Network::Address::Type::Ip:
upstream_peer = upstream_host->address();
break;
case Network::Address::Type::EnvoyInternal:
if (upstream_host->metadata()) {
ENVOY_LOG(debug, "Trying to check filter metadata of host {}",
upstream_host->address()->asString());
const auto& filter_metadata = upstream_host->metadata()->filter_metadata();
const auto& it = filter_metadata.find("envoy.filters.listener.original_dst");
if (it != filter_metadata.end()) {
const auto& destination_it = it->second.fields().find("local");
if (destination_it != it->second.fields().end()) {
upstream_peer = Network::Utility::parseInternetAddressAndPortNoThrow(
destination_it->second.string_value(), /*v6only=*/false);
}
}
}
break;
default:
break;
}
}
}
// Get our metadata differently based on the direction of the filter
auto downstream_peer_address = [&]() -> Network::Address::InstanceConstSharedPtr {
if (upstream_peer) {
// Query upstream peer data and save it in metadata for stats
const auto metadata_object = config_->metadata_provider_->GetMetadata(upstream_peer);
if (metadata_object) {
ENVOY_LOG(debug, "Metadata found for upstream peer address {}",
upstream_peer->asString());
const std::string fb =
Istio::Common::convertWorkloadMetadataToFlatNode(metadata_object.value());
// Filter object captures schema by view, hence the global singleton for the
// prototype.
auto state = std::make_unique<::Envoy::Extensions::Filters::Common::Expr::CelState>(
MetadataExchangeConfig::nodeInfoPrototype());
state->setValue(fb);
read_callbacks_->connection().streamInfo().filterState()->setData(
absl::StrCat(kMetadataPrefix, kUpstreamMetadataKey), std::move(state),
StreamInfo::FilterState::StateType::Mutable,
StreamInfo::FilterState::LifeSpan::Connection);

CelStatePrototype prototype(
false, ::Envoy::Extensions::Filters::Common::Expr::CelStateType::String,
absl::string_view(), StreamInfo::FilterState::LifeSpan::Connection);
auto id_state =
std::make_unique<::Envoy::Extensions::Filters::Common::Expr::CelState>(prototype);
id_state->setValue("unknown");
read_callbacks_->connection().streamInfo().filterState()->setData(
absl::StrCat(kMetadataPrefix, kUpstreamMetadataIdKey), std::move(id_state),
StreamInfo::FilterState::StateType::Mutable,
StreamInfo::FilterState::LifeSpan::Connection);
config_->stats().metadata_added_.inc();
}
}

// Regardless, return the downstream address for downstream metadata
return read_callbacks_->connection().connectionInfoProvider().remoteAddress();
};

auto upstream_peer_address = [&]() -> Network::Address::InstanceConstSharedPtr {
if (upstream_peer) {
return upstream_peer;
}
ENVOY_LOG(debug, "Upstream peer address is null. Fall back to localAddress");
return read_callbacks_->connection().connectionInfoProvider().localAddress();
};
const Network::Address::InstanceConstSharedPtr peer_address =
read_callbacks_->connection().connectionInfoProvider().remoteAddress();
config_->filter_direction_ == FilterDirection::Downstream ? downstream_peer_address()
: upstream_peer_address();
ENVOY_LOG(debug, "Look up metadata based on peer address {}", peer_address->asString());
const auto metadata_object = config_->metadata_provider_->GetMetadata(peer_address);
if (metadata_object) {
ENVOY_LOG(debug, "Metadata found for peer address {}", peer_address->asString());
updatePeer(Istio::Common::convertWorkloadMetadataToFlatNode(metadata_object.value()));
updatePeerId(config_->filter_direction_ == FilterDirection::Downstream
? kDownstreamMetadataIdKey
: kUpstreamMetadataIdKey,
"unknown");
config_->stats().metadata_added_.inc();
return;
} else {
ENVOY_LOG(debug, "Metadata not found for peer address {}", peer_address->asString());
}
}
updatePeerId(kMetadataNotFoundValue, kMetadataNotFoundValue);
Expand Down
2 changes: 1 addition & 1 deletion test/envoye2e/basic_flow/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestBasicTCPFlow(t *testing.T) {
&driver.Update{
Node: "server",
Version: "0",
Clusters: []string{driver.LoadTestData("testdata/cluster/tcp_server.yaml.tmpl")},
Clusters: []string{driver.LoadTestData("testdata/cluster/tcp_server.yaml.tmpl")},
Listeners: []string{driver.LoadTestData("testdata/listener/tcp_server.yaml.tmpl")},
},
&driver.Envoy{Bootstrap: params.LoadTestData("testdata/bootstrap/client.yaml.tmpl")},
Expand Down
1 change: 1 addition & 0 deletions test/envoye2e/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func init() {
"TestStatsEndpointLabels/#00",
"TestStatsServerWaypointProxy",
"TestStatsServerWaypointProxyCONNECT",
"TestTCPStatsServerWaypointProxyCONNECT",
"TestStatsGrpc/#00",
"TestStatsGrpcStream/#00",
"TestStatsParallel/Default",
Expand Down
73 changes: 73 additions & 0 deletions test/envoye2e/stats_plugin/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,79 @@ func TestStatsServerWaypointProxyCONNECT(t *testing.T) {
}
}

func TestTCPStatsServerWaypointProxyCONNECT(t *testing.T) {
params := driver.NewTestParams(t, map[string]string{
"EnableDelta": "true",
"EnableMetadataDiscovery": "true",
"DisableDirectResponse": "true",
"ConnectionCount": "10",
"StatsFilterServerConfig": driver.LoadTestJSON("testdata/stats/server_waypoint_proxy_config.yaml"),
}, envoye2e.ProxyE2ETests)
params.Vars["ServerClusterName"] = "internal_outbound"
params.Vars["ServerInternalAddress"] = "internal_inbound"
params.Vars["ServerMetadata"] = params.LoadTestData("testdata/server_waypoint_proxy_node_metadata.json.tmpl")
params.Vars["ServerNetworkFilters"] = driver.LoadTestData("testdata/filters/mx_waypoint_tcp.yaml.tmpl") + "\n" +
driver.LoadTestData("testdata/filters/stats_inbound.yaml.tmpl")
params.Vars["EnableTunnelEndpointMetadata"] = "true"
params.Vars["EnableOriginalDstPortOverride"] = "true"

if err := (&driver.Scenario{
Steps: []driver.Step{
&driver.XDS{},
&driver.Update{
Node: "client", Version: "0",
Clusters: []string{
driver.LoadTestData("testdata/cluster/internal_outbound.yaml.tmpl"),
driver.LoadTestData("testdata/cluster/original_dst.yaml.tmpl"),
},
Listeners: []string{
driver.LoadTestData("testdata/listener/tcp_client.yaml.tmpl"),
driver.LoadTestData("testdata/listener/internal_outbound.yaml.tmpl"),
},
Secrets: []string{
driver.LoadTestData("testdata/secret/client.yaml.tmpl"),
},
},
&driver.Update{
Node: "server", Version: "0",
Clusters: []string{
driver.LoadTestData("testdata/cluster/internal_inbound.yaml.tmpl"),
},
Listeners: []string{
driver.LoadTestData("testdata/listener/terminate_connect.yaml.tmpl"),
driver.LoadTestData("testdata/listener/tcp_waypoint_server.yaml.tmpl"),
},
Secrets: []string{
driver.LoadTestData("testdata/secret/server.yaml.tmpl"),
},
},
&driver.UpdateWorkloadMetadata{Workloads: []driver.WorkloadMetadata{{
Address: "127.0.0.1",
Metadata: ProductPageMetadata,
}, {
Address: "127.0.0.3",
Metadata: BackendMetadata,
}}},
&driver.Envoy{Bootstrap: params.LoadTestData("testdata/bootstrap/client.yaml.tmpl")},
&driver.Envoy{Bootstrap: params.LoadTestData("testdata/bootstrap/server.yaml.tmpl")},
&driver.Sleep{Duration: 1 * time.Second},
&driver.TCPServer{Prefix: "hello"},
&driver.Repeat{
N: 10,
Step: &driver.TCPConnection{},
},
&driver.Stats{
AdminPort: params.Ports.ServerAdmin,
Matchers: map[string]driver.StatMatcher{
"istio_tcp_connections_opened_total": &driver.ExactStat{Metric: "testdata/metric/server_waypoint_proxy_connect_connections_opened_total.yaml.tmpl"},
},
},
},
}).Run(params); err != nil {
t.Fatal(err)
}
}

func TestStatsExpiry(t *testing.T) {
params := driver.NewTestParams(t, map[string]string{
"RequestCount": "1",
Expand Down
7 changes: 7 additions & 0 deletions testdata/filters/mx_waypoint_tcp.yaml.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
- name: tc_mx_inbound{{.N}}
typed_config:
"@type": type.googleapis.com/udpa.type.v1.TypedStruct
type_url: type.googleapis.com/envoy.tcp.metadataexchange.config.MetadataExchange
value:
protocol: "istio-peer-exchange"
enable_discovery: true
4 changes: 4 additions & 0 deletions testdata/listener/tcp_client.yaml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ filter_chains:
type_url: envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
value:
stat_prefix: outbound_tcp
{{- if .Vars.ServerClusterName }}
cluster: {{ .Vars.ServerClusterName}}
{{- else }}
cluster: outbound|9080|tcp|server.default.svc.cluster.local
{{- end }}
29 changes: 29 additions & 0 deletions testdata/listener/tcp_waypoint_server.yaml.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{{- if ne .Vars.ServerListeners "" }}
{{ .Vars.ServerListeners }}
{{- else }}
{{- if ne .Vars.ServerInternalAddress "" }}
name: {{ .Vars.ServerInternalAddress }}
{{- else }}
name: server
{{- end }}
traffic_direction: INBOUND
{{- if ne .Vars.ServerInternalAddress "" }}
internal_listener: {}
{{- else }}
address:
socket_address:
address: 127.0.0.2
port_value: {{ .Ports.ServerPort }}
{{- end }}
filter_chains:
- filters:
{{ .Vars.ServerNetworkFilters | fill | indent 2 }}
- name: tcp_proxy
typed_config:
"@type": type.googleapis.com/udpa.type.v1.TypedStruct
type_url: envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
value:
stat_prefix: server_inbound_tcp
cluster: server-inbound-cluster
{{ .Vars.ServerListenerTLSContext | indent 2 }}
{{- end }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: istio_tcp_connections_opened_total
type: COUNTER
metric:
- counter:
value: 10
label:
- name: reporter
value: waypoint
- name: source_workload
value: productpage-v1
- name: source_canonical_service
value: unknown
- name: source_canonical_revision
value: latest
- name: source_workload_namespace
value: default
- name: source_principal
value: spiffe://cluster.local/ns/default/sa/client
- name: source_app
value: unknown
- name: source_version
value: unknown
- name: source_cluster
value: unknown
- name: destination_workload
value: ratings-v1
- name: destination_workload_namespace
value: default
- name: destination_principal
value: spiffe://cluster.global/ns/default/sa/ratings
- name: destination_app
value: ratings
- name: destination_version
value: version-1
- name: destination_service
value: server.default.svc.cluster.local
- name: destination_canonical_service
value: ratings
- name: destination_canonical_revision
value: version-1
- name: destination_service_name
value: server
- name: destination_service_namespace
value: default
- name: destination_cluster
value: server-cluster
- name: request_protocol
value: tcp
- name: response_flags
value: "-"
- name: connection_security_policy
value: mutual_tls
Loading