Skip to content

Commit

Permalink
Support insert to l7_events_ss table. Remove l7_request.tgid_ss.
Browse files Browse the repository at this point in the history
  • Loading branch information
StLeoX committed Nov 5, 2024
1 parent b1b246e commit 05c9470
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 60 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN CGO_ENABLED=1 go build -mod=readonly -ldflags "-X main.version=$VERSION" -o


FROM debian:bullseye
RUN apt update && apt install -y ca-certificates && apt clean
RUN apt update && apt install -y ca-certificates

COPY --from=builder /tmp/src/coroot-node-agent /usr/bin/coroot-node-agent

Expand Down
6 changes: 0 additions & 6 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,12 +766,6 @@ func (c *Container) onL7Request(pid uint32, fd uint64, connectionTimestamp uint6
return nil
}

func (c *Container) onL7Response(pid uint32, fd uint64, timestamp uint64, duration time.Duration, TgidReqSs, TgidRespSs uint64) {
klog.Infof("[deb] get tcp_event_ss: %d, %d\n", TgidReqSs, TgidRespSs)
// todo directly write to OLAP

}

func (c *Container) onRetransmission(srcDst AddrPair) bool {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down
22 changes: 21 additions & 1 deletion containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package containers

import (
"bytes"
"context"
"fmt"
"github.com/ClickHouse/ch-go"
"github.com/coroot/coroot-node-agent/tracing"
"os"
"regexp"
"strconv"
Expand Down Expand Up @@ -57,6 +60,8 @@ type Registry struct {
trafficStatsLastUpdated time.Time
trafficStatsLock sync.Mutex
trafficStatsUpdateCh chan *TrafficStatsUpdate

sseBatcher *tracing.SSEventBatcher
}

// NewRegistry 综合了各个功能模块
Expand Down Expand Up @@ -103,6 +108,18 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh
return nil, err
}

opts := ch.Options{
Address: flags.GetString(flags.ClickhouseEndpoint),
User: flags.GetString(flags.ClickhouseUser),
Password: flags.GetString(flags.ClickhousePassword),
Compression: ch.CompressionLZ4,
DialTimeout: 10 * time.Second,
}
chClient, err := ch.Dial(context.Background(), opts)
if err != nil {
return nil, err
}

r := &Registry{
reg: reg,
events: make(chan ebpftracer.Event, 10000), // 创建 eBPF 消息队列。达到 size 后的行为是阻塞而不是溢出。
Expand All @@ -119,10 +136,13 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh
tracer: ebpftracer.NewTracer(kernelVersion, *flags.DisableL7Tracing),

trafficStatsUpdateCh: make(chan *TrafficStatsUpdate),

sseBatcher: tracing.NewSSEventBatcher(tracing.SSEBatchLimit, tracing.SSEBatchTimeout, chClient),
}
if err = reg.Register(r); err != nil {
return nil, err
}

// 分别启动 eBPF 事件的消费者与生产者。
go r.handleEvents(r.events)
if err = r.tracer.Run(r.events); err != nil {
Expand Down Expand Up @@ -303,7 +323,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
}
case ebpftracer.EventTypeL7Response:
if c := r.containersByPid[e.Pid]; c != nil {
c.onL7Response(e.Pid, e.Fd, e.Timestamp, e.Duration, e.TgidReqSs, e.TgidRespSs)
r.sseBatcher.Append(e.Timestamp, e.Duration, e.TgidReqSs, e.TgidRespSs)
}
case ebpftracer.EventTypePythonThreadLock:
if c := r.containersByPid[e.Pid]; c != nil {
Expand Down
2 changes: 1 addition & 1 deletion ebpftracer/Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ebpf.go: $(wildcard ebpf/**.c)
ebpf.go: $(wildcard ./ebpf/**.c)
@echo ===BUILDING ebpf.go===
docker rmi -f ebpftracer
docker build -t ebpftracer --progress plain .
Expand Down
44 changes: 23 additions & 21 deletions ebpftracer/ebpf/l7/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,8 @@ struct l7_event {
__u64 fd;
__u64 connection_timestamp; // connection timestamp instead of the span start timestamp, actually unused from the GoLang part
__u32 pid;
__u64 tgid_req_cs;
__u64 tgid_req_ss;
__u64 tgid_resp_ss;
__u64 tgid_resp_cs;
__u64 tgid_write; // tgid who sends the request
__u64 tgid_read; // tgid who receives the response
__u32 status;
__u64 duration;
__u8 protocol;
Expand Down Expand Up @@ -114,7 +112,7 @@ struct l7_request_key {
struct l7_request { // more like concept `flow`, maybe request, maybe response
__u64 ns; // timestamp when sends the request
__u64 tgid_send; // tgid who sends the request
__u64 tgid_recv; // tgid who receives the response
__u64 tgid_recv; // tgid who receives the response, unused actually
__u8 protocol;
__u8 partial;
__u8 request_type;
Expand Down Expand Up @@ -181,7 +179,6 @@ void send_event(void *ctx, struct l7_event *e, struct connection_id cid, struct
e->fd = cid.fd;
e->pid = cid.pid;
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
// bpf_printk("output l7_events, pid %u", e->pid);
}

static inline __attribute__((__always_inline__))
Expand Down Expand Up @@ -272,14 +269,15 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
if (is_http_request(payload)) {
req->protocol = PROTOCOL_HTTP;
} else if (is_postgres_query(payload, size, &req->request_type)) {
if (req->request_type == POSTGRES_FRAME_CLOSE) { // protocol is pipelined, and req is partially Response
if (req->request_type == POSTGRES_FRAME_CLOSE) { // this request is the end of the pg stream query
struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
if (!e) {
return 0;
}
e->protocol = PROTOCOL_POSTGRES;
e->method = METHOD_STATEMENT_CLOSE;
// todo tgid
e->tgid_write = write_tgid;
e->tgid_read = write_tgid; // maybe the pg_server is single thread, for networking
e->payload_size = size;
COPY_PAYLOAD(e->payload, size, payload);
send_event(ctx, e, cid, conn);
Expand All @@ -291,14 +289,15 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
} else if (is_memcached_query(payload, size)) {
req->protocol = PROTOCOL_MEMCACHED;
} else if (is_mysql_query(payload, size, &req->request_type)) {
if (req->request_type == MYSQL_COM_STMT_CLOSE) { // protocol is pipelined, and req is partially Response
if (req->request_type == MYSQL_COM_STMT_CLOSE) { // this request is the end of the mysql stream query
struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
if (!e) {
return 0;
}
e->protocol = PROTOCOL_MYSQL;
e->method = METHOD_STATEMENT_CLOSE;
// todo tgid
e->tgid_write = write_tgid;
e->tgid_read = write_tgid; // maybe the mysqld is single thread, for networking
e->payload_size = size;
COPY_PAYLOAD(e->payload, size, payload);
send_event(ctx, e, cid, conn);
Expand All @@ -314,6 +313,8 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
}
e->protocol = PROTOCOL_RABBITMQ;
e->method = METHOD_PRODUCE;
e->tgid_write = write_tgid;
e->tgid_read = write_tgid; // checkme
send_event(ctx, e, cid, conn);
return 0;
} else if (nats_method(payload, size) == METHOD_PRODUCE) {
Expand All @@ -323,6 +324,8 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
}
e->protocol = PROTOCOL_NATS;
e->method = METHOD_PRODUCE;
e->tgid_write = write_tgid;
e->tgid_read = write_tgid;
send_event(ctx, e, cid, conn);
return 0;
} else if (is_cassandra_request(payload, size, &k.stream_id)) {
Expand All @@ -340,6 +343,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
}
e->protocol = PROTOCOL_HTTP2;
e->method = METHOD_HTTP2_CLIENT_FRAMES;
// todo client-side tgid for writing http2
e->duration = write_ns;
e->payload_size = size;
COPY_PAYLOAD(e->payload, size, payload);
Expand All @@ -350,14 +354,14 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
} else if (is_dns_request(payload, size, &k.stream_id)) {
req->protocol = PROTOCOL_DNS;
}

if (req->protocol == PROTOCOL_UNKNOWN) {
return 0;
}

// req is real REQUEST
req->ns = write_ns;
req->tgid_send = write_tgid;
req->tgid_recv = 0; // todo how to tgid other side?
req->tgid_recv = 0;

COPY_PAYLOAD(req->payload, size, payload);

Expand Down Expand Up @@ -462,20 +466,20 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
e->method = METHOD_UNKNOWN;
e->statement_id = 0;
e->payload_size = 0;
e->tgid_req_cs = 0;
e->tgid_req_ss = 0;
e->tgid_resp_ss = 0;
e->tgid_resp_cs = 0;
e->tgid_write = 0;
e->tgid_read = 0;

if (is_rabbitmq_consume(payload, ret)) {
e->protocol = PROTOCOL_RABBITMQ;
e->method = METHOD_CONSUME;
// todo tgid
send_event(ctx, e, cid, conn);
return 0;
}
if (nats_method(payload, ret) == METHOD_CONSUME) {
e->protocol = PROTOCOL_NATS;
e->method = METHOD_CONSUME;
// todo tgid
send_event(ctx, e, cid, conn);
return 0;
}
Expand All @@ -489,7 +493,6 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
return 0;
}
e->protocol = PROTOCOL_DNS;
// todo tgid
e->duration = read_ns - req->ns;
e->payload_size = ret;
COPY_PAYLOAD(e->payload, ret, payload);
Expand All @@ -508,6 +511,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
e->duration = read_ns;
e->payload_size = ret;
COPY_PAYLOAD(e->payload, ret, payload);
// todo client-side tgid for reading http2
send_event(ctx, e, cid, conn);
return 0;
} else {
Expand Down Expand Up @@ -561,10 +565,8 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
return 0;
}

e->tgid_req_cs = req->tgid_send;
e->tgid_req_ss = req->tgid_recv; // todo tgid server-side
e->tgid_resp_ss = 0; // todo tgid server-side
e->tgid_resp_cs = read_tgid;
e->tgid_write = req->tgid_send;
e->tgid_read = read_tgid;
e->duration = read_ns - req->ns;
send_event(ctx, e, cid, conn);
return 0;
Expand Down
12 changes: 4 additions & 8 deletions ebpftracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,8 @@ type l7Event struct {
Fd uint64
ConnectionTimestamp uint64
Pid uint32
TgidReqCs uint64
TgidReqSs uint64
TgidRespSs uint64
TgidRespCs uint64
TgidWrite uint64
TgidRead uint64
Status uint32
Duration uint64
Protocol uint8
Expand Down Expand Up @@ -438,10 +436,8 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
event = Event{
Type: EventTypeL7Request,
Pid: l7Event.Pid,
TgidReqCs: l7Event.TgidReqCs,
TgidReqSs: l7Event.TgidReqSs,
TgidRespSs: l7Event.TgidRespSs,
TgidRespCs: l7Event.TgidRespCs,
TgidReqCs: l7Event.TgidWrite,
TgidRespCs: l7Event.TgidRead,
Fd: l7Event.Fd,
//Timestamp: l7Event.ConnectionTimestamp, // shouldn't use this kernel timestamp
Duration: time.Duration(l7Event.Duration),
Expand Down
4 changes: 4 additions & 0 deletions flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ var (

ScrapeInterval = kingpin.Flag("scrape-interval", "How often to gather metrics from the agent").Default("15s").Envar("SCRAPE_INTERVAL").Duration()
WalDir = kingpin.Flag("wal-dir", "Path to where the agent stores data (e.g. the metrics Write-Ahead Log)").Default("/tmp/coroot-node-agent").Envar("WAL_DIR").String()

ClickhouseEndpoint = kingpin.Flag("clickhouse-endpoint", "Clickhouse endpoint").Envar("CLICKHOUSE_ENDPOINT").Default("default").String()
ClickhouseUser = kingpin.Flag("clickhouse-user", "Clickhouse user").Envar("CLICKHOUSE_USER").Default("default").String()
ClickhousePassword = kingpin.Flag("clickhouse-password", "Clickhouse password").Envar("CLICKHOUSE_PASSWORD").String()
)

func GetString(fl *string) string {
Expand Down
4 changes: 0 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.21
require (
cloud.google.com/go/compute/metadata v0.2.3
github.com/ClickHouse/ch-go v0.61.5
github.com/ClickHouse/clickhouse-go/v2 v2.30.0
github.com/agoda-com/opentelemetry-logs-go v0.4.1
github.com/cilium/cilium v1.13.7
github.com/cilium/ebpf v0.11.0
Expand Down Expand Up @@ -57,7 +56,6 @@ require (
github.com/Microsoft/hcsshim v0.9.10 // indirect
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/avvmoto/buf-readerat v0.0.0-20171115124131-a17c8cb89270 // indirect
github.com/aws/aws-sdk-go v1.50.0 // indirect
Expand Down Expand Up @@ -141,7 +139,6 @@ require (
github.com/opencontainers/selinux v1.10.1 // indirect
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b // indirect
github.com/pascaldekloe/name v1.0.1 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
Expand All @@ -157,7 +154,6 @@ require (
github.com/sasha-s/go-deadlock v0.3.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
Expand Down
Loading

0 comments on commit 05c9470

Please sign in to comment.