From f6e731c8096805463a54ab7654425233ec60b35d Mon Sep 17 00:00:00 2001 From: Eric Lin Date: Fri, 21 Jun 2024 15:29:32 +0000 Subject: [PATCH] cri: get pid count from container metrics This reduces latency of calling ListPodSandboxStats() by avoiding calling shim API Task(). Signed-off-by: Eric Lin --- internal/cri/server/container_stats.go | 2 +- internal/cri/server/container_stats_list.go | 88 +++++++++++++------ .../cri/server/container_stats_list_test.go | 6 +- internal/cri/server/sandbox_stats_linux.go | 36 ++------ 4 files changed, 73 insertions(+), 59 deletions(-) diff --git a/internal/cri/server/container_stats.go b/internal/cri/server/container_stats.go index e2ca5f1ace64..1600238b240d 100644 --- a/internal/cri/server/container_stats.go +++ b/internal/cri/server/container_stats.go @@ -49,5 +49,5 @@ func (c *criService) ContainerStats(ctx context.Context, in *runtime.ContainerSt if err != nil { return nil, fmt.Errorf("failed to decode container metrics: %w", err) } - return &runtime.ContainerStatsResponse{Stats: cs}, nil + return &runtime.ContainerStatsResponse{Stats: cs.stats}, nil } diff --git a/internal/cri/server/container_stats_list.go b/internal/cri/server/container_stats_list.go index 2c776b25304a..476d4b999e05 100644 --- a/internal/cri/server/container_stats_list.go +++ b/internal/cri/server/container_stats_list.go @@ -43,6 +43,17 @@ func (c *criService) ListContainerStats( ctx context.Context, in *runtime.ListContainerStatsRequest, ) (*runtime.ListContainerStatsResponse, error) { + css, err := c.listContainerStats(ctx, in) + if err != nil { + return nil, fmt.Errorf("failed to fetch containers and stats: %w", err) + } + return c.toCRIContainerStats(css), nil +} + +func (c *criService) listContainerStats( + ctx context.Context, + in *runtime.ListContainerStatsRequest, +) ([]containerStats, error) { request, containers, err := c.buildTaskMetricsRequest(in) if err != nil { return nil, fmt.Errorf("failed to build metrics request: %w", err) @@ -51,14 +62,20 @@ func (c *criService) ListContainerStats( if err != nil { return nil, fmt.Errorf("failed to fetch metrics for tasks: %w", err) } - criStats, err := c.toCRIContainerStats(ctx, resp.Metrics, containers) + css, err := c.toContainerStats(ctx, resp.Metrics, containers) if err != nil { return nil, fmt.Errorf("failed to convert to cri containerd stats format: %w", err) } - return criStats, nil + return css, nil } -type metricsHandler func(containerstore.Metadata, *types.Metric) (*runtime.ContainerStats, error) +type containerStats struct { + stats *runtime.ContainerStats + // pids is only valid in linux platform + pids uint64 +} + +type metricsHandler func(containerstore.Metadata, *types.Metric) (containerStats, error) // Returns a function to be used for transforming container metrics into the right format. // Uses the platform the given sandbox advertises to implement its logic. If the platform is @@ -86,11 +103,11 @@ func (c *criService) getMetricsHandler(ctx context.Context, sandboxID string) (m switch p.OS { case "windows": - return func(meta containerstore.Metadata, stats *types.Metric) (*runtime.ContainerStats, error) { + return func(meta containerstore.Metadata, stats *types.Metric) (containerStats, error) { return c.windowsContainerMetrics(meta, stats, snapshotter) }, nil case "linux": - return func(meta containerstore.Metadata, stats *types.Metric) (*runtime.ContainerStats, error) { + return func(meta containerstore.Metadata, stats *types.Metric) (containerStats, error) { return c.linuxContainerMetrics(meta, stats, snapshotter) }, nil default: @@ -98,16 +115,16 @@ func (c *criService) getMetricsHandler(ctx context.Context, sandboxID string) (m } } -func (c *criService) toCRIContainerStats( +func (c *criService) toContainerStats( ctx context.Context, stats []*types.Metric, containers []containerstore.Container, -) (*runtime.ListContainerStatsResponse, error) { +) ([]containerStats, error) { statsMap := make(map[string]*types.Metric) for _, stat := range stats { statsMap[stat.ID] = stat } - containerStats := new(runtime.ListContainerStatsResponse) + css := []containerStats{} // Unfortunately if no filter was passed we're asking for every containers stats which // generally belong to multiple different pods, who all might have different platforms. @@ -143,17 +160,25 @@ func (c *criService) toCRIContainerStats( return nil, fmt.Errorf("failed to decode container metrics for %q: %w", cntr.ID, err) } - if cs.Cpu != nil && cs.Cpu.UsageCoreNanoSeconds != nil { + if cs.stats.Cpu != nil && cs.stats.Cpu.UsageCoreNanoSeconds != nil { // this is a calculated value and should be computed for all OSes - nanoUsage, err := c.getUsageNanoCores(cntr.Metadata.ID, false, cs.Cpu.UsageCoreNanoSeconds.Value, time.Unix(0, cs.Cpu.Timestamp)) + nanoUsage, err := c.getUsageNanoCores(cntr.Metadata.ID, false, cs.stats.Cpu.UsageCoreNanoSeconds.Value, time.Unix(0, cs.stats.Cpu.Timestamp)) if err != nil { return nil, fmt.Errorf("failed to get usage nano cores, containerID: %s: %w", cntr.Metadata.ID, err) } - cs.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoUsage} + cs.stats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoUsage} } - containerStats.Stats = append(containerStats.Stats, cs) + css = append(css, cs) } - return containerStats, nil + return css, nil +} + +func (c *criService) toCRIContainerStats(css []containerStats) *runtime.ListContainerStatsResponse { + containerStats := new(runtime.ListContainerStatsResponse) + for _, cs := range css { + containerStats.Stats = append(containerStats.Stats, cs.stats) + } + return containerStats } func (c *criService) getUsageNanoCores(containerID string, isSandbox bool, currentUsageCoreNanoSeconds uint64, currentTimestamp time.Time) (uint64, error) { @@ -275,7 +300,7 @@ func (c *criService) windowsContainerMetrics( meta containerstore.Metadata, stats *types.Metric, snapshotter string, -) (*runtime.ContainerStats, error) { +) (containerStats, error) { var cs runtime.ContainerStats var usedBytes, inodesUsed uint64 sn, err := c.GetSnapshot(meta.ID, snapshotter) @@ -303,11 +328,11 @@ func (c *criService) windowsContainerMetrics( if stats != nil { s, err := typeurl.UnmarshalAny(stats.Data) if err != nil { - return nil, fmt.Errorf("failed to extract container metrics: %w", err) + return containerStats{}, fmt.Errorf("failed to extract container metrics: %w", err) } wstats := s.(*wstats.Statistics).GetWindows() if wstats == nil { - return nil, errors.New("windows stats is empty") + return containerStats{}, errors.New("windows stats is empty") } if wstats.Processor != nil { cs.Cpu = &runtime.CpuUsage{ @@ -324,16 +349,16 @@ func (c *criService) windowsContainerMetrics( } } } - return &cs, nil + return containerStats{&cs, 0}, nil } func (c *criService) linuxContainerMetrics( meta containerstore.Metadata, stats *types.Metric, snapshotter string, -) (*runtime.ContainerStats, error) { +) (containerStats, error) { var cs runtime.ContainerStats - var usedBytes, inodesUsed uint64 + var usedBytes, inodesUsed, pids uint64 sn, err := c.GetSnapshot(meta.ID, snapshotter) // If snapshotstore doesn't have cached snapshot information // set WritableLayer usage to zero @@ -361,32 +386,37 @@ func (c *criService) linuxContainerMetrics( switch { case typeurl.Is(stats.Data, (*cg1.Metrics)(nil)): data = &cg1.Metrics{} + if err := typeurl.UnmarshalTo(stats.Data, data); err != nil { + return containerStats{}, fmt.Errorf("failed to extract container metrics: %w", err) + } + pids = data.(*cg1.Metrics).GetPids().GetCurrent() case typeurl.Is(stats.Data, (*cg2.Metrics)(nil)): data = &cg2.Metrics{} - case typeurl.Is(stats.Data, (*wstats.Statistics)(nil)): - data = &wstats.Statistics{} + if err := typeurl.UnmarshalTo(stats.Data, data); err != nil { + return containerStats{}, fmt.Errorf("failed to extract container metrics: %w", err) + } + pids = data.(*cg2.Metrics).GetPids().GetCurrent() default: - return nil, errors.New("cannot convert metric data to cgroups.Metrics or windows.Statistics") - } - - if err := typeurl.UnmarshalTo(stats.Data, data); err != nil { - return nil, fmt.Errorf("failed to extract container metrics: %w", err) + return containerStats{}, errors.New("cannot convert metric data to cgroups.Metrics") } cpuStats, err := c.cpuContainerStats(meta.ID, false /* isSandbox */, data, protobuf.FromTimestamp(stats.Timestamp)) if err != nil { - return nil, fmt.Errorf("failed to obtain cpu stats: %w", err) + return containerStats{}, fmt.Errorf("failed to obtain cpu stats: %w", err) } cs.Cpu = cpuStats memoryStats, err := c.memoryContainerStats(meta.ID, data, protobuf.FromTimestamp(stats.Timestamp)) if err != nil { - return nil, fmt.Errorf("failed to obtain memory stats: %w", err) + return containerStats{}, fmt.Errorf("failed to obtain memory stats: %w", err) } cs.Memory = memoryStats + if err != nil { + return containerStats{}, fmt.Errorf("failed to obtain pid count: %w", err) + } } - return &cs, nil + return containerStats{&cs, pids}, nil } // getWorkingSet calculates workingset memory from cgroup memory stats. diff --git a/internal/cri/server/container_stats_list_test.go b/internal/cri/server/container_stats_list_test.go index fad9c35b5012..5154d1183d3a 100644 --- a/internal/cri/server/container_stats_list_test.go +++ b/internal/cri/server/container_stats_list_test.go @@ -420,7 +420,7 @@ func TestListContainerStats(t *testing.T) { if tt.before != nil { tt.before() } - got, err := c.toCRIContainerStats(tt.args.ctx, tt.args.stats, tt.args.containers) + css, err := c.toContainerStats(tt.args.ctx, tt.args.stats, tt.args.containers) if tt.after != nil { tt.after() } @@ -428,6 +428,10 @@ func TestListContainerStats(t *testing.T) { t.Errorf("ListContainerStats() error = %v, wantErr %v", err, tt.wantErr) return } + var got *runtime.ListContainerStatsResponse + if err == nil { + got = c.toCRIContainerStats(css) + } if !reflect.DeepEqual(got, tt.want) { t.Errorf("ListContainerStats() = %v, want %v", got, tt.want) } diff --git a/internal/cri/server/sandbox_stats_linux.go b/internal/cri/server/sandbox_stats_linux.go index e1e07d99d286..e5ab75f0b733 100644 --- a/internal/cri/server/sandbox_stats_linux.go +++ b/internal/cri/server/sandbox_stats_linux.go @@ -84,40 +84,20 @@ func (c *criService) podSandboxStats( } } + listContainerStatsRequest := &runtime.ListContainerStatsRequest{Filter: &runtime.ContainerStatsFilter{PodSandboxId: meta.ID}} + css, err := c.listContainerStats(ctx, listContainerStatsRequest) + if err != nil { + return nil, fmt.Errorf("failed to obtain container stats during podSandboxStats call: %w", err) + } var pidCount uint64 - for _, cntr := range c.containerStore.List() { - if cntr.SandboxID != sandbox.ID { - continue - } - - state := cntr.Status.Get().State() - if state != runtime.ContainerState_CONTAINER_RUNNING { - continue - } - - task, err := cntr.Container.Task(ctx, nil) - if err != nil { - return nil, err - } - - processes, err := task.Pids(ctx) - if err != nil { - return nil, err - } - pidCount += uint64(len(processes)) - + for _, cs := range css { + pidCount += cs.pids + podSandboxStats.Linux.Containers = append(podSandboxStats.Linux.Containers, cs.stats) } podSandboxStats.Linux.Process = &runtime.ProcessUsage{ Timestamp: timestamp.UnixNano(), ProcessCount: &runtime.UInt64Value{Value: pidCount}, } - - listContainerStatsRequest := &runtime.ListContainerStatsRequest{Filter: &runtime.ContainerStatsFilter{PodSandboxId: meta.ID}} - resp, err := c.ListContainerStats(ctx, listContainerStatsRequest) - if err != nil { - return nil, fmt.Errorf("failed to obtain container stats during podSandboxStats call: %w", err) - } - podSandboxStats.Linux.Containers = resp.GetStats() } return podSandboxStats, nil