diff --git a/Dockerfile b/Dockerfile index 1045605..5a5e38e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,7 @@ RUN CGO_ENABLED=1 go build -mod=readonly -ldflags "-X main.version=$VERSION" -o FROM debian:bullseye -RUN apt update && apt install -y ca-certificates && apt clean +RUN apt update && apt install -y ca-certificates COPY --from=builder /tmp/src/coroot-node-agent /usr/bin/coroot-node-agent diff --git a/containers/container.go b/containers/container.go index 07dcfa6..d2df3fb 100644 --- a/containers/container.go +++ b/containers/container.go @@ -766,12 +766,6 @@ func (c *Container) onL7Request(pid uint32, fd uint64, connectionTimestamp uint6 return nil } -func (c *Container) onL7Response(pid uint32, fd uint64, timestamp uint64, duration time.Duration, TgidReqSs, TgidRespSs uint64) { - klog.Infof("[deb] get tcp_event_ss: %d, %d\n", TgidReqSs, TgidRespSs) - // todo directly write to OLAP - -} - func (c *Container) onRetransmission(srcDst AddrPair) bool { c.lock.Lock() defer c.lock.Unlock() diff --git a/containers/registry.go b/containers/registry.go index cf89639..8aef89f 100644 --- a/containers/registry.go +++ b/containers/registry.go @@ -2,7 +2,10 @@ package containers import ( "bytes" + "context" "fmt" + "github.com/ClickHouse/ch-go" + "github.com/coroot/coroot-node-agent/tracing" "os" "regexp" "strconv" @@ -57,6 +60,8 @@ type Registry struct { trafficStatsLastUpdated time.Time trafficStatsLock sync.Mutex trafficStatsUpdateCh chan *TrafficStatsUpdate + + sseBatcher *tracing.SSEventBatcher } // NewRegistry 综合了各个功能模块 @@ -103,6 +108,18 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh return nil, err } + opts := ch.Options{ + Address: flags.GetString(flags.ClickhouseEndpoint), + User: flags.GetString(flags.ClickhouseUser), + Password: flags.GetString(flags.ClickhousePassword), + Compression: ch.CompressionLZ4, + DialTimeout: 10 * time.Second, + } + chClient, err := ch.Dial(context.Background(), opts) + if err != nil { + return nil, err + } + r := &Registry{ reg: reg, events: make(chan ebpftracer.Event, 10000), // 创建 eBPF 消息队列。达到 size 后的行为是阻塞而不是溢出。 @@ -119,10 +136,13 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh tracer: ebpftracer.NewTracer(kernelVersion, *flags.DisableL7Tracing), trafficStatsUpdateCh: make(chan *TrafficStatsUpdate), + + sseBatcher: tracing.NewSSEventBatcher(tracing.SSEBatchLimit, tracing.SSEBatchTimeout, chClient), } if err = reg.Register(r); err != nil { return nil, err } + // 分别启动 eBPF 事件的消费者与生产者。 go r.handleEvents(r.events) if err = r.tracer.Run(r.events); err != nil { @@ -303,7 +323,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) { } case ebpftracer.EventTypeL7Response: if c := r.containersByPid[e.Pid]; c != nil { - c.onL7Response(e.Pid, e.Fd, e.Timestamp, e.Duration, e.TgidReqSs, e.TgidRespSs) + r.sseBatcher.Append(e.Timestamp, e.Duration, e.TgidReqSs, e.TgidRespSs) } case ebpftracer.EventTypePythonThreadLock: if c := r.containersByPid[e.Pid]; c != nil { diff --git a/ebpftracer/Makefile b/ebpftracer/Makefile index a8b51b7..cc77e05 100644 --- a/ebpftracer/Makefile +++ b/ebpftracer/Makefile @@ -1,4 +1,4 @@ -ebpf.go: $(wildcard ebpf/**.c) +ebpf.go: $(wildcard ./ebpf/**.c) @echo ===BUILDING ebpf.go=== docker rmi -f ebpftracer docker build -t ebpftracer --progress plain . diff --git a/ebpftracer/ebpf/l7/l7.c b/ebpftracer/ebpf/l7/l7.c index 4f56ab6..32219ed 100644 --- a/ebpftracer/ebpf/l7/l7.c +++ b/ebpftracer/ebpf/l7/l7.c @@ -63,10 +63,8 @@ struct l7_event { __u64 fd; __u64 connection_timestamp; // connection timestamp instead of the span start timestamp, actually unused from the GoLang part __u32 pid; - __u64 tgid_req_cs; - __u64 tgid_req_ss; - __u64 tgid_resp_ss; - __u64 tgid_resp_cs; + __u64 tgid_write; // tgid who sends the request + __u64 tgid_read; // tgid who receives the response __u32 status; __u64 duration; __u8 protocol; @@ -114,7 +112,7 @@ struct l7_request_key { struct l7_request { // more like concept `flow`, maybe request, maybe response __u64 ns; // timestamp when sends the request __u64 tgid_send; // tgid who sends the request - __u64 tgid_recv; // tgid who receives the response + __u64 tgid_recv; // tgid who receives the response, unused actually __u8 protocol; __u8 partial; __u8 request_type; @@ -181,7 +179,6 @@ void send_event(void *ctx, struct l7_event *e, struct connection_id cid, struct e->fd = cid.fd; e->pid = cid.pid; bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e)); -// bpf_printk("output l7_events, pid %u", e->pid); } static inline __attribute__((__always_inline__)) @@ -272,14 +269,15 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size, if (is_http_request(payload)) { req->protocol = PROTOCOL_HTTP; } else if (is_postgres_query(payload, size, &req->request_type)) { - if (req->request_type == POSTGRES_FRAME_CLOSE) { // protocol is pipelined, and req is partially Response + if (req->request_type == POSTGRES_FRAME_CLOSE) { // this request is the end of the pg stream query struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero); if (!e) { return 0; } e->protocol = PROTOCOL_POSTGRES; e->method = METHOD_STATEMENT_CLOSE; - // todo tgid + e->tgid_write = write_tgid; + e->tgid_read = write_tgid; // maybe the pg_server is single thread, for networking e->payload_size = size; COPY_PAYLOAD(e->payload, size, payload); send_event(ctx, e, cid, conn); @@ -291,14 +289,15 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size, } else if (is_memcached_query(payload, size)) { req->protocol = PROTOCOL_MEMCACHED; } else if (is_mysql_query(payload, size, &req->request_type)) { - if (req->request_type == MYSQL_COM_STMT_CLOSE) { // protocol is pipelined, and req is partially Response + if (req->request_type == MYSQL_COM_STMT_CLOSE) { // this request is the end of the mysql stream query struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero); if (!e) { return 0; } e->protocol = PROTOCOL_MYSQL; e->method = METHOD_STATEMENT_CLOSE; - // todo tgid + e->tgid_write = write_tgid; + e->tgid_read = write_tgid; // maybe the mysqld is single thread, for networking e->payload_size = size; COPY_PAYLOAD(e->payload, size, payload); send_event(ctx, e, cid, conn); @@ -314,6 +313,8 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size, } e->protocol = PROTOCOL_RABBITMQ; e->method = METHOD_PRODUCE; + e->tgid_write = write_tgid; + e->tgid_read = write_tgid; // checkme send_event(ctx, e, cid, conn); return 0; } else if (nats_method(payload, size) == METHOD_PRODUCE) { @@ -323,6 +324,8 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size, } e->protocol = PROTOCOL_NATS; e->method = METHOD_PRODUCE; + e->tgid_write = write_tgid; + e->tgid_read = write_tgid; send_event(ctx, e, cid, conn); return 0; } else if (is_cassandra_request(payload, size, &k.stream_id)) { @@ -340,6 +343,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size, } e->protocol = PROTOCOL_HTTP2; e->method = METHOD_HTTP2_CLIENT_FRAMES; + // todo client-side tgid for writing http2 e->duration = write_ns; e->payload_size = size; COPY_PAYLOAD(e->payload, size, payload); @@ -350,14 +354,14 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size, } else if (is_dns_request(payload, size, &k.stream_id)) { req->protocol = PROTOCOL_DNS; } + if (req->protocol == PROTOCOL_UNKNOWN) { return 0; } - // req is real REQUEST req->ns = write_ns; req->tgid_send = write_tgid; - req->tgid_recv = 0; // todo how to tgid other side? + req->tgid_recv = 0; COPY_PAYLOAD(req->payload, size, payload); @@ -462,20 +466,20 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret) e->method = METHOD_UNKNOWN; e->statement_id = 0; e->payload_size = 0; - e->tgid_req_cs = 0; - e->tgid_req_ss = 0; - e->tgid_resp_ss = 0; - e->tgid_resp_cs = 0; + e->tgid_write = 0; + e->tgid_read = 0; if (is_rabbitmq_consume(payload, ret)) { e->protocol = PROTOCOL_RABBITMQ; e->method = METHOD_CONSUME; + // todo tgid send_event(ctx, e, cid, conn); return 0; } if (nats_method(payload, ret) == METHOD_CONSUME) { e->protocol = PROTOCOL_NATS; e->method = METHOD_CONSUME; + // todo tgid send_event(ctx, e, cid, conn); return 0; } @@ -489,7 +493,6 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret) return 0; } e->protocol = PROTOCOL_DNS; - // todo tgid e->duration = read_ns - req->ns; e->payload_size = ret; COPY_PAYLOAD(e->payload, ret, payload); @@ -508,6 +511,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret) e->duration = read_ns; e->payload_size = ret; COPY_PAYLOAD(e->payload, ret, payload); + // todo client-side tgid for reading http2 send_event(ctx, e, cid, conn); return 0; } else { @@ -561,10 +565,8 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret) return 0; } - e->tgid_req_cs = req->tgid_send; - e->tgid_req_ss = req->tgid_recv; // todo tgid server-side - e->tgid_resp_ss = 0; // todo tgid server-side - e->tgid_resp_cs = read_tgid; + e->tgid_write = req->tgid_send; + e->tgid_read = read_tgid; e->duration = read_ns - req->ns; send_event(ctx, e, cid, conn); return 0; diff --git a/ebpftracer/tracer.go b/ebpftracer/tracer.go index 2d4740e..adeaf32 100644 --- a/ebpftracer/tracer.go +++ b/ebpftracer/tracer.go @@ -367,10 +367,8 @@ type l7Event struct { Fd uint64 ConnectionTimestamp uint64 Pid uint32 - TgidReqCs uint64 - TgidReqSs uint64 - TgidRespSs uint64 - TgidRespCs uint64 + TgidWrite uint64 + TgidRead uint64 Status uint32 Duration uint64 Protocol uint8 @@ -438,10 +436,8 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy event = Event{ Type: EventTypeL7Request, Pid: l7Event.Pid, - TgidReqCs: l7Event.TgidReqCs, - TgidReqSs: l7Event.TgidReqSs, - TgidRespSs: l7Event.TgidRespSs, - TgidRespCs: l7Event.TgidRespCs, + TgidReqCs: l7Event.TgidWrite, + TgidRespCs: l7Event.TgidRead, Fd: l7Event.Fd, //Timestamp: l7Event.ConnectionTimestamp, // shouldn't use this kernel timestamp Duration: time.Duration(l7Event.Duration), diff --git a/flags/flags.go b/flags/flags.go index 4518277..fb33a49 100644 --- a/flags/flags.go +++ b/flags/flags.go @@ -39,6 +39,10 @@ var ( ScrapeInterval = kingpin.Flag("scrape-interval", "How often to gather metrics from the agent").Default("15s").Envar("SCRAPE_INTERVAL").Duration() WalDir = kingpin.Flag("wal-dir", "Path to where the agent stores data (e.g. the metrics Write-Ahead Log)").Default("/tmp/coroot-node-agent").Envar("WAL_DIR").String() + + ClickhouseEndpoint = kingpin.Flag("clickhouse-endpoint", "Clickhouse endpoint").Envar("CLICKHOUSE_ENDPOINT").Default("default").String() + ClickhouseUser = kingpin.Flag("clickhouse-user", "Clickhouse user").Envar("CLICKHOUSE_USER").Default("default").String() + ClickhousePassword = kingpin.Flag("clickhouse-password", "Clickhouse password").Envar("CLICKHOUSE_PASSWORD").String() ) func GetString(fl *string) string { diff --git a/go.mod b/go.mod index 29eb9a2..353fd6b 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.21 require ( cloud.google.com/go/compute/metadata v0.2.3 github.com/ClickHouse/ch-go v0.61.5 - github.com/ClickHouse/clickhouse-go/v2 v2.30.0 github.com/agoda-com/opentelemetry-logs-go v0.4.1 github.com/cilium/cilium v1.13.7 github.com/cilium/ebpf v0.11.0 @@ -57,7 +56,6 @@ require ( github.com/Microsoft/hcsshim v0.9.10 // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 // indirect - github.com/andybalholm/brotli v1.1.1 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/avvmoto/buf-readerat v0.0.0-20171115124131-a17c8cb89270 // indirect github.com/aws/aws-sdk-go v1.50.0 // indirect @@ -141,7 +139,6 @@ require ( github.com/opencontainers/selinux v1.10.1 // indirect github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b // indirect github.com/pascaldekloe/name v1.0.1 // indirect - github.com/paulmach/orb v0.11.1 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.5 // indirect github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect @@ -157,7 +154,6 @@ require ( github.com/sasha-s/go-deadlock v0.3.1 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/shirou/gopsutil/v3 v3.23.12 // indirect - github.com/shopspring/decimal v1.4.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/spf13/afero v1.9.2 // indirect github.com/spf13/cast v1.5.0 // indirect diff --git a/go.sum b/go.sum index e04d478..ea808a0 100644 --- a/go.sum +++ b/go.sum @@ -53,9 +53,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v5 v5.4 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v5 v5.4.0/go.mod h1:uYt4CfhkJA9o0FN7jfE5minm/i4nUE4MjGUJkzB6Zs8= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v4 v4.3.0 h1:bXwSugBiSbgtz7rOtbfGf+woewp4f06orW9OP5BjHLA= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v4 v4.3.0/go.mod h1:Y/HgrePTmGy9HjdSGTqZNa+apUpTVIEVKXJyARP2lrk= +github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= -github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= -github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Azure/go-autorest v10.8.1+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest/autorest v0.11.1/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw= @@ -72,8 +71,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ClickHouse/ch-go v0.61.5 h1:zwR8QbYI0tsMiEcze/uIMK+Tz1D3XZXLdNrlaOpeEI4= github.com/ClickHouse/ch-go v0.61.5/go.mod h1:s1LJW/F/LcFs5HJnuogFMta50kKDO0lf9zzfrbl0RQg= -github.com/ClickHouse/clickhouse-go/v2 v2.30.0 h1:AG4D/hW39qa58+JHQIFOSnxyL46H6h2lrmGGk17dhFo= -github.com/ClickHouse/clickhouse-go/v2 v2.30.0/go.mod h1:i9ZQAojcayW3RsdCb3YR+n+wC2h65eJsZCscZ1Z1wyo= github.com/Code-Hex/go-generics-cache v1.3.1 h1:i8rLwyhoyhaerr7JpjtYjJZUcCbWOdiYO3fZXLiEC4g= github.com/Code-Hex/go-generics-cache v1.3.1/go.mod h1:qxcC9kRVrct9rHeiYpFWSoW1vxyillCVzX13KZG8dl4= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= @@ -116,8 +113,6 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5 github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 h1:ez/4by2iGztzR4L0zgAOR8lTQK9VlyBVVd7G4omaOQs= github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:CgnQgUtFrFz9mxFNtED3jI5tLDjKlOM+oUF/sTk6ps0= -github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= -github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA= @@ -831,8 +826,8 @@ github.com/moby/sys/signal v0.6.0 h1:aDpY94H8VlhTGa9sNYUFCFsMZIUh5wm0B6XkIoJj/iY github.com/moby/sys/signal v0.6.0/go.mod h1:GQ6ObYZfqacOwTtlXvcmh9A26dVRul/hbOZn88Kg8Tg= github.com/moby/sys/symlink v0.1.0/go.mod h1:GGDODQmbFOjFsXvfLVn3+ZRxkch54RkSiGqsZeMYowQ= github.com/moby/term v0.0.0-20200312100748-672ec06f55cd/go.mod h1:DdlQx2hp0Ss5/fLikoLlEeIYiATotOjgB//nb973jeo= -github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= -github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= +github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc= +github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6/go.mod h1:E2VnQOmVuvZB6UYnnDB0qG5Nq/1tD9acaOpo6xmt0Kw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -910,9 +905,6 @@ github.com/ovh/go-ovh v1.4.3 h1:Gs3V823zwTFpzgGLZNI6ILS4rmxZgJwJCz54Er9LwD0= github.com/ovh/go-ovh v1.4.3/go.mod h1:AkPXVtgwB6xlKblMjRKJJmjRp+ogrE7fz2lVgcQY8SY= github.com/pascaldekloe/name v1.0.1 h1:9lnXOHeqeHHnWLbKfH6X98+4+ETVqFqxN09UXSjcMb0= github.com/pascaldekloe/name v1.0.1/go.mod h1:Z//MfYJnH4jVpQ9wkclwu2I2MkHmXTlT9wR5UZScttM= -github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU= -github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= -github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/pelletier/go-toml v1.8.1/go.mod h1:T2/BmBdy8dvIRq1a/8aqjN41wvWlN4lrapLU/GW4pbc= @@ -1008,8 +1000,6 @@ github.com/shirou/gopsutil/v3 v3.23.12 h1:z90NtUkp3bMtmICZKpC4+WaknU1eXtp5vtbQ11 github.com/shirou/gopsutil/v3 v3.23.12/go.mod h1:1FrWgea594Jp7qmjHUUPlJDTPgcsb9mGnXDxavtikzM= github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= -github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= -github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.0.4-0.20170822132746-89742aefa4b2/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= @@ -1118,8 +1108,6 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q github.com/xin053/hsperfdata v0.2.3 h1:AG95n8Ktml6pO1rkUAnuBghdgtDkMj04xbTA1sXFQSU= github.com/xin053/hsperfdata v0.2.3/go.mod h1:zYsee+7zNKcRF+SLyf19UoqPDxyr/N86O4VTQUHaKpE= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= -github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= -github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -1140,7 +1128,6 @@ go.etcd.io/etcd v0.5.0-alpha.5.0.20200910180754-dd1b699fc489/go.mod h1:yVHk9ub3C go.mongodb.org/mongo-driver v1.7.3/go.mod h1:NqaYOwnXWr5Pm7AOpO5QFxKJ503nbMse/R79oO62zWg= go.mongodb.org/mongo-driver v1.7.5/go.mod h1:VXEWRZ6URJIkUq2SCAyapmhH0ZLRBP+FT4xhp5Zvxng= go.mongodb.org/mongo-driver v1.10.0/go.mod h1:wsihk0Kdgv8Kqu1Anit4sfK+22vSFbUrAVEYRhCXrA8= -go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= go.mongodb.org/mongo-driver v1.13.1 h1:YIc7HTYsKndGK4RFzJ3covLz1byri52x0IoMB0Pt/vk= go.mongodb.org/mongo-driver v1.13.1/go.mod h1:wcDf1JBCXy2mOW0bWHwO/IOYqdca1MPCwDtFu/Z9+eo= go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1/go.mod h1:SNgMg+EgDFwmvSmLRTNKC5fegJjB7v23qTQ0XLGUNHk= diff --git a/tracing/ss_event.go b/tracing/ss_event.go new file mode 100644 index 0000000..25d4907 --- /dev/null +++ b/tracing/ss_event.go @@ -0,0 +1,106 @@ +// Uploads server-side events (response related events) + +package tracing + +import ( + "context" + "k8s.io/klog/v2" + "sync" + "time" + + "github.com/ClickHouse/ch-go" + chproto "github.com/ClickHouse/ch-go/proto" +) + +const ( + SSEBatchLimit = 50 // l7_event_ss processing batch size + SSEBatchTimeout = 5 * time.Second +) + +type SSEventBatcher struct { + limit int + client *ch.Client + + lock sync.Mutex + done chan struct{} + + Timestamp *chproto.ColDateTime64 + Duration *chproto.ColUInt64 + TgidRead *chproto.ColUInt64 + TgidWrite *chproto.ColUInt64 + StatementID *chproto.ColUInt32 +} + +func NewSSEventBatcher(limit int, timeout time.Duration, client *ch.Client) *SSEventBatcher { + b := &SSEventBatcher{ + limit: limit, + client: client, + + done: make(chan struct{}), + + Timestamp: new(chproto.ColDateTime64).WithPrecision(chproto.PrecisionNano), + Duration: new(chproto.ColUInt64), + TgidRead: new(chproto.ColUInt64), + TgidWrite: new(chproto.ColUInt64), + StatementID: new(chproto.ColUInt32), + } + + go func() { + ticker := time.NewTicker(timeout) + defer ticker.Stop() + for { + select { + case <-b.done: + return + case <-ticker.C: + b.lock.Lock() + b.save() + b.lock.Unlock() + } + } + }() + + return b +} + +func (b *SSEventBatcher) Append(timestamp uint64, duration time.Duration, TgidReqSs, TgidRespSs uint64) { + b.Timestamp.Append(time.Unix(0, int64(timestamp))) + b.Duration.Append(uint64(duration)) + b.TgidRead.Append(TgidReqSs) + b.TgidWrite.Append(TgidRespSs) + b.StatementID.Append(0) // todo support something like x-request-id + + if b.Timestamp.Rows() < b.limit { + return + } + b.save() +} + +func (b *SSEventBatcher) Close() { + b.done <- struct{}{} + b.lock.Lock() + b.save() + b.lock.Unlock() +} + +func (b *SSEventBatcher) save() { + if b.Timestamp.Rows() == 0 { + return + } + + input := chproto.Input{ + {Name: "Timestamp", Data: b.Timestamp}, + {Name: "Duration", Data: b.Duration}, + {Name: "StatementId", Data: b.StatementID}, + {Name: "TgidRead", Data: b.TgidRead}, + {Name: "TgidWrite", Data: b.TgidWrite}, + } + query := ch.Query{Body: input.Into("l7_events_ss"), Input: input} + err := b.client.Do(context.Background(), query) + if err != nil { + klog.Errorln(err) + } + for _, i := range input { + i.Data.(chproto.Resettable).Reset() + } +} diff --git a/tracing/tracing.go b/tracing/tracing.go index a7f22ac..0b88d53 100644 --- a/tracing/tracing.go +++ b/tracing/tracing.go @@ -89,8 +89,6 @@ func NewSpanBuilder(containerId string, destination netaddr.IPPort, rawEvent *eb semconv.NetPeerName(destination.IP().String()), semconv.NetPeerPort(int(destination.Port())), attribute.String("tgid_req_cs", strconv.FormatUint(rawEvent.TgidReqCs, 10)), - attribute.String("tgid_req_ss", strconv.FormatUint(rawEvent.TgidReqSs, 10)), - attribute.String("tgid_resp_ss", strconv.FormatUint(rawEvent.TgidRespSs, 10)), attribute.String("tgid_resp_cs", strconv.FormatUint(rawEvent.TgidRespCs, 10)), }} }