Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for ClickHouse native protocol #159

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,10 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
stats.observe(r.Status.String(), r.Method.String(), 0)
case l7.ProtocolDubbo2:
stats.observe(r.Status.String(), "", r.Duration)
case l7.ProtocolClickhouse:
stats.observe(r.Status.String(), "", r.Duration)
query := l7.ParseClickhouse(r.Payload)
trace.ClickhouseQuery(query, r.Status.Error(), r.Duration)
}
return nil
}
Expand Down
46 changes: 24 additions & 22 deletions containers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,30 +104,32 @@ var metrics = struct {

var (
L7Requests = map[l7.Protocol]prometheus.CounterOpts{
l7.ProtocolHTTP: {Name: "container_http_requests_total", Help: "Total number of outbound HTTP requests"},
l7.ProtocolPostgres: {Name: "container_postgres_queries_total", Help: "Total number of outbound Postgres queries"},
l7.ProtocolRedis: {Name: "container_redis_queries_total", Help: "Total number of outbound Redis queries"},
l7.ProtocolMemcached: {Name: "container_memcached_queries_total", Help: "Total number of outbound Memcached queries"},
l7.ProtocolMysql: {Name: "container_mysql_queries_total", Help: "Total number of outbound Mysql queries"},
l7.ProtocolMongo: {Name: "container_mongo_queries_total", Help: "Total number of outbound Mongo queries"},
l7.ProtocolKafka: {Name: "container_kafka_requests_total", Help: "Total number of outbound Kafka requests"},
l7.ProtocolCassandra: {Name: "container_cassandra_queries_total", Help: "Total number of outbound Cassandra requests"},
l7.ProtocolRabbitmq: {Name: "container_rabbitmq_messages_total", Help: "Total number of Rabbitmq messages produced or consumed by the container"},
l7.ProtocolNats: {Name: "container_nats_messages_total", Help: "Total number of NATS messages produced or consumed by the container"},
l7.ProtocolDubbo2: {Name: "container_dubbo_requests_total", Help: "Total number of outbound DUBBO requests"},
l7.ProtocolDNS: {Name: "container_dns_requests_total", Help: "Total number of outbound DNS requests"},
l7.ProtocolHTTP: {Name: "container_http_requests_total", Help: "Total number of outbound HTTP requests"},
l7.ProtocolPostgres: {Name: "container_postgres_queries_total", Help: "Total number of outbound Postgres queries"},
l7.ProtocolRedis: {Name: "container_redis_queries_total", Help: "Total number of outbound Redis queries"},
l7.ProtocolMemcached: {Name: "container_memcached_queries_total", Help: "Total number of outbound Memcached queries"},
l7.ProtocolMysql: {Name: "container_mysql_queries_total", Help: "Total number of outbound Mysql queries"},
l7.ProtocolMongo: {Name: "container_mongo_queries_total", Help: "Total number of outbound Mongo queries"},
l7.ProtocolKafka: {Name: "container_kafka_requests_total", Help: "Total number of outbound Kafka requests"},
l7.ProtocolCassandra: {Name: "container_cassandra_queries_total", Help: "Total number of outbound Cassandra requests"},
l7.ProtocolRabbitmq: {Name: "container_rabbitmq_messages_total", Help: "Total number of Rabbitmq messages produced or consumed by the container"},
l7.ProtocolNats: {Name: "container_nats_messages_total", Help: "Total number of NATS messages produced or consumed by the container"},
l7.ProtocolDubbo2: {Name: "container_dubbo_requests_total", Help: "Total number of outbound DUBBO requests"},
l7.ProtocolDNS: {Name: "container_dns_requests_total", Help: "Total number of outbound DNS requests"},
l7.ProtocolClickhouse: {Name: "container_clickhouse_queries_total", Help: "Total number of outbound ClickHouse queries"},
}
L7Latency = map[l7.Protocol]prometheus.HistogramOpts{
l7.ProtocolHTTP: {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
l7.ProtocolPostgres: {Name: "container_postgres_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Postgres query"},
l7.ProtocolRedis: {Name: "container_redis_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Redis query"},
l7.ProtocolMemcached: {Name: "container_memcached_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Memcached query"},
l7.ProtocolMysql: {Name: "container_mysql_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mysql query"},
l7.ProtocolMongo: {Name: "container_mongo_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mongo query"},
l7.ProtocolKafka: {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
l7.ProtocolCassandra: {Name: "container_cassandra_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Cassandra request"},
l7.ProtocolDubbo2: {Name: "container_dubbo_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound DUBBO request"},
l7.ProtocolDNS: {Name: "container_dns_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound DNS request"},
l7.ProtocolHTTP: {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
l7.ProtocolPostgres: {Name: "container_postgres_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Postgres query"},
l7.ProtocolRedis: {Name: "container_redis_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Redis query"},
l7.ProtocolMemcached: {Name: "container_memcached_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Memcached query"},
l7.ProtocolMysql: {Name: "container_mysql_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mysql query"},
l7.ProtocolMongo: {Name: "container_mongo_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mongo query"},
l7.ProtocolKafka: {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
l7.ProtocolCassandra: {Name: "container_cassandra_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Cassandra request"},
l7.ProtocolDubbo2: {Name: "container_dubbo_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound DUBBO request"},
l7.ProtocolDNS: {Name: "container_dns_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound DNS request"},
l7.ProtocolClickhouse: {Name: "container_clickhouse_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound ClickHouse query"},
}
)

Expand Down
20 changes: 10 additions & 10 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

50 changes: 50 additions & 0 deletions ebpftracer/ebpf/l7/clickhouse.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#define CLICKHOUSE_QUERY_ID_SIZE 36

#define CLICKHOUSE_QUERY_KIND_INITIAL 1
#define CLICKHOUSE_QUERY_KIND_SECONDARY 2

#define CLICKHOUSE_CLIENT_CODE_QUERY 1

#define CLICKHOUSE_SERVER_CODE_DATA 1
#define CLICKHOUSE_SERVER_CODE_EXCEPTION 2
#define CLICKHOUSE_SERVER_CODE_END_OF_STREAM 5

static __always_inline
int is_clickhouse_query(char *buf, __u64 buf_size) {
__u8 b[CLICKHOUSE_QUERY_ID_SIZE+3];
if (bpf_probe_read(&b, sizeof(b), (void *)buf) < 0) {
return 0;
}
if (b[0] != CLICKHOUSE_CLIENT_CODE_QUERY) {
return 0;
}
int offset = 0;
if (b[1] == 0) {
offset = 2;
} else if (b[1] == CLICKHOUSE_QUERY_ID_SIZE) {
offset = 2 + CLICKHOUSE_QUERY_ID_SIZE;
} else {
return 0;
}
if (b[offset] != CLICKHOUSE_QUERY_KIND_INITIAL && b[offset] != CLICKHOUSE_QUERY_KIND_SECONDARY) {
return 0;
}
return 1;
}

static __always_inline
int is_clickhouse_response(char *buf, __u32 *status) {
__u8 code = 0;
if (bpf_probe_read(&code, sizeof(code), (void *)buf) < 0) {
return 0;
}
if (code == CLICKHOUSE_SERVER_CODE_DATA || code == CLICKHOUSE_SERVER_CODE_END_OF_STREAM) {
*status = STATUS_OK;
return 1;
}
if (code == CLICKHOUSE_SERVER_CODE_EXCEPTION) {
*status = STATUS_FAILED;
return 1;
}
return 0;
}
79 changes: 26 additions & 53 deletions ebpftracer/ebpf/l7/l7.c
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
#define PROTOCOL_UNKNOWN 0
#define PROTOCOL_HTTP 1
#define PROTOCOL_POSTGRES 2
#define PROTOCOL_REDIS 3
#define PROTOCOL_MEMCACHED 4
#define PROTOCOL_MYSQL 5
#define PROTOCOL_MONGO 6
#define PROTOCOL_KAFKA 7
#define PROTOCOL_CASSANDRA 8
#define PROTOCOL_RABBITMQ 9
#define PROTOCOL_NATS 10
#define PROTOCOL_HTTP2 11
#define PROTOCOL_DUBBO2 12
#define PROTOCOL_DNS 13
#define PROTOCOL_UNKNOWN 0
#define PROTOCOL_HTTP 1
#define PROTOCOL_POSTGRES 2
#define PROTOCOL_REDIS 3
#define PROTOCOL_MEMCACHED 4
#define PROTOCOL_MYSQL 5
#define PROTOCOL_MONGO 6
#define PROTOCOL_KAFKA 7
#define PROTOCOL_CASSANDRA 8
#define PROTOCOL_RABBITMQ 9
#define PROTOCOL_NATS 10
#define PROTOCOL_HTTP2 11
#define PROTOCOL_DUBBO2 12
#define PROTOCOL_DNS 13
#define PROTOCOL_CLICKHOUSE 14

#define STATUS_UNKNOWN 0
#define STATUS_OK 200
Expand All @@ -25,7 +26,6 @@
#define METHOD_HTTP2_CLIENT_FRAMES 5
#define METHOD_HTTP2_SERVER_FRAMES 6

#define MAX_PAYLOAD_SIZE 1024 // must be power of 2
#define TRUNCATE_PAYLOAD_SIZE(size) ({ \
size = MIN(size, MAX_PAYLOAD_SIZE-1); \
asm volatile ("%0 &= %1" : "+r"(size) : "i"(MAX_PAYLOAD_SIZE-1)); \
Expand Down Expand Up @@ -53,6 +53,7 @@
#include "http2.c"
#include "dubbo2.c"
#include "dns.c"
#include "clickhouse.c"

struct l7_event {
__u64 fd;
Expand Down Expand Up @@ -95,37 +96,13 @@ struct {
__uint(max_entries, 10240);
} active_reads SEC(".maps");

struct l7_request_key {
__u64 fd;
__u32 pid;
__u16 is_tls;
__s16 stream_id;
};

struct l7_request {
__u64 ns;
__u8 protocol;
__u8 partial;
__u8 request_type;
__s32 request_id;
__u64 payload_size;
char payload[MAX_PAYLOAD_SIZE];
};

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, int);
__type(value, struct l7_request);
__uint(max_entries, 1);
} l7_request_heap SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(key_size, sizeof(struct l7_request_key));
__uint(value_size, sizeof(struct l7_request));
__uint(max_entries, 32768);
} active_l7_requests SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, int);
Expand Down Expand Up @@ -315,6 +292,8 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
COPY_PAYLOAD(e->payload, size, payload);
send_event(ctx, e, cid, conn);
return 0;
} else if (is_clickhouse_query(payload, size)) {
req->protocol = PROTOCOL_CLICKHOUSE;
} else if (is_dubbo2_request(payload, size)) {
req->protocol = PROTOCOL_DUBBO2;
} else if (is_dns_request(payload, size, &k.stream_id)) {
Expand Down Expand Up @@ -464,8 +443,6 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
e->protocol = req->protocol;
e->payload_size = req->payload_size;
COPY_PAYLOAD(e->payload, req->payload_size, req->payload);

bpf_map_delete_elem(&active_l7_requests, &k);
if (e->protocol == PROTOCOL_HTTP) {
response = is_http_response(payload, &e->status);
} else if (e->protocol == PROTOCOL_POSTGRES) {
Expand All @@ -485,24 +462,20 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
} else if (e->protocol == PROTOCOL_MONGO) {
response = is_mongo_response(payload, ret, req->partial);
if (response == 2) { // partial
struct l7_request *r = bpf_map_lookup_elem(&l7_request_heap, &zero);
if (!r) {
return 0;
}
r->partial = 1;
r->protocol = e->protocol;
r->ns = req->ns;
r->payload_size = req->payload_size;
COPY_PAYLOAD(r->payload, req->payload_size, req->payload);
bpf_map_update_elem(&active_l7_requests, &k, r, BPF_ANY);
return 0;
req->partial = 1;
return 0; // keeping the query in the map
}
} else if (e->protocol == PROTOCOL_KAFKA) {
response = is_kafka_response(payload, req->request_id);
} else if (e->protocol == PROTOCOL_CLICKHOUSE) {
response = is_clickhouse_response(payload, &e->status);
if (!response) {
return 0; // keeping the query in the map
}
} else if (e->protocol == PROTOCOL_DUBBO2) {
response = is_dubbo2_response(payload, &e->status);
}

bpf_map_delete_elem(&active_l7_requests, &k);
if (!response) {
return 0;
}
Expand Down
36 changes: 36 additions & 0 deletions ebpftracer/ebpf/tcp/state.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#define MAX_CONNECTIONS 1000000
#define MAX_PAYLOAD_SIZE 1024 // must be power of 2

struct tcp_event {
__u64 fd;
Expand Down Expand Up @@ -82,6 +83,29 @@ struct {
__uint(max_entries, MAX_CONNECTIONS);
} active_connections SEC(".maps");

struct l7_request_key {
__u64 fd;
__u32 pid;
__u16 is_tls;
__s16 stream_id;
};

struct l7_request {
__u64 ns;
__u8 protocol;
__u8 partial;
__u8 request_type;
__s32 request_id;
__u64 payload_size;
char payload[MAX_PAYLOAD_SIZE];
};

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(key_size, sizeof(struct l7_request_key));
__uint(value_size, sizeof(struct l7_request));
__uint(max_entries, 32768);
} active_l7_requests SEC(".maps");

SEC("tracepoint/sock/inet_sock_set_state")
int inet_sock_set_state(void *ctx)
Expand Down Expand Up @@ -214,6 +238,18 @@ int sys_exit_connect(struct trace_event_raw_sys_exit__stub* ctx) {
conn.timestamp = bpf_ktime_get_ns();
bpf_map_update_elem(&active_connections, &cid, &conn, BPF_ANY);
}

struct l7_request_key k = {
.fd = cid.fd,
.pid = cid.pid,
.is_tls = 0,
.stream_id = -1,
};
bpf_map_delete_elem(&active_l7_requests, &k);

k.is_tls = 1;
bpf_map_delete_elem(&active_l7_requests, &k);

bpf_map_delete_elem(&fd_by_pid_tgid, &id);
return 0;
}
Expand Down
59 changes: 59 additions & 0 deletions ebpftracer/l7/clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package l7

import (
"bytes"

"github.com/ClickHouse/ch-go/proto"
)

func ParseClickhouse(payload []byte) string {
r := proto.NewReader(bytes.NewReader(payload))
var err error
if _, err = r.Byte(); err != nil {
return ""
}
if _, err = r.Str(); err != nil {
return ""
}
version := int(proto.FeatureServerQueryTimeInProgress)
info := proto.ClientInfo{}
if err = info.DecodeAware(r, version); err != nil {
return ""
}
if info.ProtocolVersion > 0 {
version = info.ProtocolVersion
}
var s proto.Setting

for {
if err = s.Decode(r); err != nil {
return ""
}
if s.Key == "" {
break
}
}
if _, err = r.Str(); err != nil { // inter-server secret
return ""
}
if _, err = r.UVarInt(); err != nil { // stage
return ""
}
if _, err = r.UVarInt(); err != nil { // compression
return ""
}
l, err := r.StrLen()
if err != nil {
return ""
}
query := make([]byte, min(l, 1024))
n, _ := r.Read(query)
query = bytes.TrimSpace(query[:n])
if len(query) == 0 {
return ""
}
if n < l {
query = append(query[:len(query)-1], []byte("...<TRUNCATED>")...)
}
return string(query)
}
Loading
Loading