Skip to content

Commit

Permalink
add support for ClickHouse native protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
def committed Dec 19, 2024
1 parent 7f46467 commit 1217e5e
Show file tree
Hide file tree
Showing 12 changed files with 352 additions and 101 deletions.
4 changes: 4 additions & 0 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,10 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
stats.observe(r.Status.String(), r.Method.String(), 0)
case l7.ProtocolDubbo2:
stats.observe(r.Status.String(), "", r.Duration)
case l7.ProtocolClickhouse:
stats.observe(r.Status.String(), "", r.Duration)
query := l7.ParseClickhouse(r.Payload)
trace.ClickhouseQuery(query, r.Status.Error(), r.Duration)
}
return nil
}
Expand Down
46 changes: 24 additions & 22 deletions containers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,30 +104,32 @@ var metrics = struct {

var (
L7Requests = map[l7.Protocol]prometheus.CounterOpts{
l7.ProtocolHTTP: {Name: "container_http_requests_total", Help: "Total number of outbound HTTP requests"},
l7.ProtocolPostgres: {Name: "container_postgres_queries_total", Help: "Total number of outbound Postgres queries"},
l7.ProtocolRedis: {Name: "container_redis_queries_total", Help: "Total number of outbound Redis queries"},
l7.ProtocolMemcached: {Name: "container_memcached_queries_total", Help: "Total number of outbound Memcached queries"},
l7.ProtocolMysql: {Name: "container_mysql_queries_total", Help: "Total number of outbound Mysql queries"},
l7.ProtocolMongo: {Name: "container_mongo_queries_total", Help: "Total number of outbound Mongo queries"},
l7.ProtocolKafka: {Name: "container_kafka_requests_total", Help: "Total number of outbound Kafka requests"},
l7.ProtocolCassandra: {Name: "container_cassandra_queries_total", Help: "Total number of outbound Cassandra requests"},
l7.ProtocolRabbitmq: {Name: "container_rabbitmq_messages_total", Help: "Total number of Rabbitmq messages produced or consumed by the container"},
l7.ProtocolNats: {Name: "container_nats_messages_total", Help: "Total number of NATS messages produced or consumed by the container"},
l7.ProtocolDubbo2: {Name: "container_dubbo_requests_total", Help: "Total number of outbound DUBBO requests"},
l7.ProtocolDNS: {Name: "container_dns_requests_total", Help: "Total number of outbound DNS requests"},
l7.ProtocolHTTP: {Name: "container_http_requests_total", Help: "Total number of outbound HTTP requests"},
l7.ProtocolPostgres: {Name: "container_postgres_queries_total", Help: "Total number of outbound Postgres queries"},
l7.ProtocolRedis: {Name: "container_redis_queries_total", Help: "Total number of outbound Redis queries"},
l7.ProtocolMemcached: {Name: "container_memcached_queries_total", Help: "Total number of outbound Memcached queries"},
l7.ProtocolMysql: {Name: "container_mysql_queries_total", Help: "Total number of outbound Mysql queries"},
l7.ProtocolMongo: {Name: "container_mongo_queries_total", Help: "Total number of outbound Mongo queries"},
l7.ProtocolKafka: {Name: "container_kafka_requests_total", Help: "Total number of outbound Kafka requests"},
l7.ProtocolCassandra: {Name: "container_cassandra_queries_total", Help: "Total number of outbound Cassandra requests"},
l7.ProtocolRabbitmq: {Name: "container_rabbitmq_messages_total", Help: "Total number of Rabbitmq messages produced or consumed by the container"},
l7.ProtocolNats: {Name: "container_nats_messages_total", Help: "Total number of NATS messages produced or consumed by the container"},
l7.ProtocolDubbo2: {Name: "container_dubbo_requests_total", Help: "Total number of outbound DUBBO requests"},
l7.ProtocolDNS: {Name: "container_dns_requests_total", Help: "Total number of outbound DNS requests"},
l7.ProtocolClickhouse: {Name: "container_clickhouse_queries_total", Help: "Total number of outbound ClickHouse queries"},
}
L7Latency = map[l7.Protocol]prometheus.HistogramOpts{
l7.ProtocolHTTP: {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
l7.ProtocolPostgres: {Name: "container_postgres_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Postgres query"},
l7.ProtocolRedis: {Name: "container_redis_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Redis query"},
l7.ProtocolMemcached: {Name: "container_memcached_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Memcached query"},
l7.ProtocolMysql: {Name: "container_mysql_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mysql query"},
l7.ProtocolMongo: {Name: "container_mongo_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mongo query"},
l7.ProtocolKafka: {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
l7.ProtocolCassandra: {Name: "container_cassandra_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Cassandra request"},
l7.ProtocolDubbo2: {Name: "container_dubbo_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound DUBBO request"},
l7.ProtocolDNS: {Name: "container_dns_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound DNS request"},
l7.ProtocolHTTP: {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
l7.ProtocolPostgres: {Name: "container_postgres_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Postgres query"},
l7.ProtocolRedis: {Name: "container_redis_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Redis query"},
l7.ProtocolMemcached: {Name: "container_memcached_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Memcached query"},
l7.ProtocolMysql: {Name: "container_mysql_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mysql query"},
l7.ProtocolMongo: {Name: "container_mongo_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mongo query"},
l7.ProtocolKafka: {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
l7.ProtocolCassandra: {Name: "container_cassandra_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Cassandra request"},
l7.ProtocolDubbo2: {Name: "container_dubbo_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound DUBBO request"},
l7.ProtocolDNS: {Name: "container_dns_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound DNS request"},
l7.ProtocolClickhouse: {Name: "container_clickhouse_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound ClickHouse query"},
}
)

Expand Down
20 changes: 10 additions & 10 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

50 changes: 50 additions & 0 deletions ebpftracer/ebpf/l7/clickhouse.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#define CLICKHOUSE_QUERY_ID_SIZE 36

#define CLICKHOUSE_QUERY_KIND_INITIAL 1
#define CLICKHOUSE_QUERY_KIND_SECONDARY 2

#define CLICKHOUSE_CLIENT_CODE_QUERY 1

#define CLICKHOUSE_SERVER_CODE_DATA 1
#define CLICKHOUSE_SERVER_CODE_EXCEPTION 2
#define CLICKHOUSE_SERVER_CODE_END_OF_STREAM 5

static __always_inline
int is_clickhouse_query(char *buf, __u64 buf_size) {
__u8 b[CLICKHOUSE_QUERY_ID_SIZE+3];
if (bpf_probe_read(&b, sizeof(b), (void *)buf) < 0) {
return 0;
}
if (b[0] != CLICKHOUSE_CLIENT_CODE_QUERY) {
return 0;
}
int offset = 0;
if (b[1] == 0) {
offset = 2;
} else if (b[1] == CLICKHOUSE_QUERY_ID_SIZE) {
offset = 2 + CLICKHOUSE_QUERY_ID_SIZE;
} else {
return 0;
}
if (b[offset] != CLICKHOUSE_QUERY_KIND_INITIAL && b[offset] != CLICKHOUSE_QUERY_KIND_SECONDARY) {
return 0;
}
return 1;
}

static __always_inline
int is_clickhouse_response(char *buf, __u32 *status) {
__u8 code = 0;
if (bpf_probe_read(&code, sizeof(code), (void *)buf) < 0) {
return 0;
}
if (code == CLICKHOUSE_SERVER_CODE_DATA || code == CLICKHOUSE_SERVER_CODE_END_OF_STREAM) {
*status = STATUS_OK;
return 1;
}
if (code == CLICKHOUSE_SERVER_CODE_EXCEPTION) {
*status = STATUS_FAILED;
return 1;
}
return 0;
}
79 changes: 26 additions & 53 deletions ebpftracer/ebpf/l7/l7.c
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
#define PROTOCOL_UNKNOWN 0
#define PROTOCOL_HTTP 1
#define PROTOCOL_POSTGRES 2
#define PROTOCOL_REDIS 3
#define PROTOCOL_MEMCACHED 4
#define PROTOCOL_MYSQL 5
#define PROTOCOL_MONGO 6
#define PROTOCOL_KAFKA 7
#define PROTOCOL_CASSANDRA 8
#define PROTOCOL_RABBITMQ 9
#define PROTOCOL_NATS 10
#define PROTOCOL_HTTP2 11
#define PROTOCOL_DUBBO2 12
#define PROTOCOL_DNS 13
#define PROTOCOL_UNKNOWN 0
#define PROTOCOL_HTTP 1
#define PROTOCOL_POSTGRES 2
#define PROTOCOL_REDIS 3
#define PROTOCOL_MEMCACHED 4
#define PROTOCOL_MYSQL 5
#define PROTOCOL_MONGO 6
#define PROTOCOL_KAFKA 7
#define PROTOCOL_CASSANDRA 8
#define PROTOCOL_RABBITMQ 9
#define PROTOCOL_NATS 10
#define PROTOCOL_HTTP2 11
#define PROTOCOL_DUBBO2 12
#define PROTOCOL_DNS 13
#define PROTOCOL_CLICKHOUSE 14

#define STATUS_UNKNOWN 0
#define STATUS_OK 200
Expand All @@ -25,7 +26,6 @@
#define METHOD_HTTP2_CLIENT_FRAMES 5
#define METHOD_HTTP2_SERVER_FRAMES 6

#define MAX_PAYLOAD_SIZE 1024 // must be power of 2
#define TRUNCATE_PAYLOAD_SIZE(size) ({ \
size = MIN(size, MAX_PAYLOAD_SIZE-1); \
asm volatile ("%0 &= %1" : "+r"(size) : "i"(MAX_PAYLOAD_SIZE-1)); \
Expand Down Expand Up @@ -53,6 +53,7 @@
#include "http2.c"
#include "dubbo2.c"
#include "dns.c"
#include "clickhouse.c"

struct l7_event {
__u64 fd;
Expand Down Expand Up @@ -95,37 +96,13 @@ struct {
__uint(max_entries, 10240);
} active_reads SEC(".maps");

struct l7_request_key {
__u64 fd;
__u32 pid;
__u16 is_tls;
__s16 stream_id;
};

struct l7_request {
__u64 ns;
__u8 protocol;
__u8 partial;
__u8 request_type;
__s32 request_id;
__u64 payload_size;
char payload[MAX_PAYLOAD_SIZE];
};

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, int);
__type(value, struct l7_request);
__uint(max_entries, 1);
} l7_request_heap SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(key_size, sizeof(struct l7_request_key));
__uint(value_size, sizeof(struct l7_request));
__uint(max_entries, 32768);
} active_l7_requests SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, int);
Expand Down Expand Up @@ -315,6 +292,8 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
COPY_PAYLOAD(e->payload, size, payload);
send_event(ctx, e, cid, conn);
return 0;
} else if (is_clickhouse_query(payload, size)) {
req->protocol = PROTOCOL_CLICKHOUSE;
} else if (is_dubbo2_request(payload, size)) {
req->protocol = PROTOCOL_DUBBO2;
} else if (is_dns_request(payload, size, &k.stream_id)) {
Expand Down Expand Up @@ -464,8 +443,6 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
e->protocol = req->protocol;
e->payload_size = req->payload_size;
COPY_PAYLOAD(e->payload, req->payload_size, req->payload);

bpf_map_delete_elem(&active_l7_requests, &k);
if (e->protocol == PROTOCOL_HTTP) {
response = is_http_response(payload, &e->status);
} else if (e->protocol == PROTOCOL_POSTGRES) {
Expand All @@ -485,24 +462,20 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
} else if (e->protocol == PROTOCOL_MONGO) {
response = is_mongo_response(payload, ret, req->partial);
if (response == 2) { // partial
struct l7_request *r = bpf_map_lookup_elem(&l7_request_heap, &zero);
if (!r) {
return 0;
}
r->partial = 1;
r->protocol = e->protocol;
r->ns = req->ns;
r->payload_size = req->payload_size;
COPY_PAYLOAD(r->payload, req->payload_size, req->payload);
bpf_map_update_elem(&active_l7_requests, &k, r, BPF_ANY);
return 0;
req->partial = 1;
return 0; // keeping the query in the map
}
} else if (e->protocol == PROTOCOL_KAFKA) {
response = is_kafka_response(payload, req->request_id);
} else if (e->protocol == PROTOCOL_CLICKHOUSE) {
response = is_clickhouse_response(payload, &e->status);
if (!response) {
return 0; // keeping the query in the map
}
} else if (e->protocol == PROTOCOL_DUBBO2) {
response = is_dubbo2_response(payload, &e->status);
}

bpf_map_delete_elem(&active_l7_requests, &k);
if (!response) {
return 0;
}
Expand Down
36 changes: 36 additions & 0 deletions ebpftracer/ebpf/tcp/state.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#define MAX_CONNECTIONS 1000000
#define MAX_PAYLOAD_SIZE 1024 // must be power of 2

struct tcp_event {
__u64 fd;
Expand Down Expand Up @@ -82,6 +83,29 @@ struct {
__uint(max_entries, MAX_CONNECTIONS);
} active_connections SEC(".maps");

struct l7_request_key {
__u64 fd;
__u32 pid;
__u16 is_tls;
__s16 stream_id;
};

struct l7_request {
__u64 ns;
__u8 protocol;
__u8 partial;
__u8 request_type;
__s32 request_id;
__u64 payload_size;
char payload[MAX_PAYLOAD_SIZE];
};

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(key_size, sizeof(struct l7_request_key));
__uint(value_size, sizeof(struct l7_request));
__uint(max_entries, 32768);
} active_l7_requests SEC(".maps");

SEC("tracepoint/sock/inet_sock_set_state")
int inet_sock_set_state(void *ctx)
Expand Down Expand Up @@ -214,6 +238,18 @@ int sys_exit_connect(struct trace_event_raw_sys_exit__stub* ctx) {
conn.timestamp = bpf_ktime_get_ns();
bpf_map_update_elem(&active_connections, &cid, &conn, BPF_ANY);
}

struct l7_request_key k = {
.fd = cid.fd,
.pid = cid.pid,
.is_tls = 0,
.stream_id = -1,
};
bpf_map_delete_elem(&active_l7_requests, &k);

k.is_tls = 1;
bpf_map_delete_elem(&active_l7_requests, &k);

bpf_map_delete_elem(&fd_by_pid_tgid, &id);
return 0;
}
Expand Down
59 changes: 59 additions & 0 deletions ebpftracer/l7/clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package l7

import (
"bytes"

"github.com/ClickHouse/ch-go/proto"
)

func ParseClickhouse(payload []byte) string {
r := proto.NewReader(bytes.NewReader(payload))
var err error
if _, err = r.Byte(); err != nil {
return ""
}
if _, err = r.Str(); err != nil {
return ""
}
version := int(proto.FeatureServerQueryTimeInProgress)
info := proto.ClientInfo{}
if err = info.DecodeAware(r, version); err != nil {
return ""
}
if info.ProtocolVersion > 0 {
version = info.ProtocolVersion
}
var s proto.Setting

for {
if err = s.Decode(r); err != nil {
return ""
}
if s.Key == "" {
break
}
}
if _, err = r.Str(); err != nil { // inter-server secret
return ""
}
if _, err = r.UVarInt(); err != nil { // stage
return ""
}
if _, err = r.UVarInt(); err != nil { // compression
return ""
}
l, err := r.StrLen()
if err != nil {
return ""
}
query := make([]byte, min(l, 1024))
n, _ := r.Read(query)
query = bytes.TrimSpace(query[:n])
if len(query) == 0 {
return ""
}
if n < l {
query = append(query[:len(query)-1], []byte("...<TRUNCATED>")...)
}
return string(query)
}
Loading

0 comments on commit 1217e5e

Please sign in to comment.