Skip to content

Commit

Permalink
fix: bring first walk cgroupfs back
Browse files Browse the repository at this point in the history
Signed-off-by: black-desk <[email protected]>
  • Loading branch information
black-desk committed Oct 20, 2023
1 parent 7d18af2 commit abda2b0
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 47 deletions.
18 changes: 3 additions & 15 deletions cmd/cgtproxy/cmd/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,8 @@ type chans struct {
out chan<- types.CGroupEvent
}

func provideChans() chans {
ch := make(chan types.CGroupEvent)

return chans{ch, ch}
}

func provideInputChan(chs chans) <-chan types.CGroupEvent {
return chs.in
}

func provideOutputChan(chs chans) chan<- types.CGroupEvent {
return chs.out
func provideCGroupEventChan(mon interfaces.CGroupMonitor) <-chan types.CGroupEvent {
return mon.Events()
}

func provideNftConn() (ret *nftables.Conn, err error) {
Expand Down Expand Up @@ -130,10 +120,8 @@ var set = wire.NewSet(
provideBypass,
provideCgrougMontior,
provideCgroupRoot,
provideChans,
provideCGTProxy,
provideInputChan,
provideOutputChan,
provideCGroupEventChan,
provideRuleManager,
provideNFTManager,
)
3 changes: 1 addition & 2 deletions cmd/cgtproxy/cmd/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions pkg/cgfsmon/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package cgfsmon
import "errors"

var (
ErrContextMissing = errors.New("context is missing.")
ErrCGroupRootNotFound = errors.New("cgroup v2 file system mount point is missing.")
ErrLoggerMissing = errors.New("logger is missing.")
ErrContextMissing = errors.New("context is missing.")
ErrCGroupRootNotFound = errors.New("cgroup v2 file system mount point is missing.")
ErrLoggerMissing = errors.New("logger is missing.")
ErrUnderlingWatcherExited = errors.New("underling file system watcher has exited.")
)
8 changes: 0 additions & 8 deletions pkg/cgfsmon/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,6 @@ func New(opts ...Opt) (ret *CGroupFSMonitor, err error) {
return
}

err = watcherImpl.RegisterEventHandler(&handle{
log: w.log,
events: w.events,
})
if err != nil {
return
}

ret = w

w.log.Debugw("Create a new filesystem watcher.")
Expand Down
22 changes: 11 additions & 11 deletions pkg/cgfsmon/private.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
package cgfsmon

import (
"context"
"errors"
"os"

"github.com/black-desk/cgtproxy/pkg/types"
fsevents "github.com/tywkeene/go-fsevents"
"go.uber.org/zap"
)

type handle struct {
log *zap.SugaredLogger
events chan<- types.CGroupEvent
ctx context.Context
mon *CGroupFSMonitor
}

func (h *handle) Handle(w *fsevents.Watcher, event *fsevents.FsEvent) error {
isDirRemoved := event.IsDirRemoved()
isDirCreated := event.IsDirCreated()
path := event.Path

h.log.Debugw("Handling new filesystem event.",
h.mon.log.Debugw("Handling new filesystem event.",
"event", event,
"isDirRemoved", isDirRemoved,
"isDirCreated", isDirCreated,
Expand All @@ -31,7 +31,7 @@ func (h *handle) Handle(w *fsevents.Watcher, event *fsevents.FsEvent) error {
return
}

h.log.Debugw("Add path to watcher recursively.",
h.mon.log.Debugw("Add path to watcher recursively.",
"path", path,
)

Expand All @@ -40,7 +40,7 @@ func (h *handle) Handle(w *fsevents.Watcher, event *fsevents.FsEvent) error {
fsevents.DirCreatedEvent|fsevents.DirRemovedEvent,
)

h.log.Debugw("Finish add path to watcher recursively.",
h.mon.log.Debugw("Finish add path to watcher recursively.",
"path", path,
)

Expand All @@ -49,11 +49,11 @@ func (h *handle) Handle(w *fsevents.Watcher, event *fsevents.FsEvent) error {
}

if errors.Is(err, os.ErrNotExist) {
h.log.Debugw("Try to add a non-exist path to watcher.",
h.mon.log.Debugw("Try to add a non-exist path to watcher.",
"path", path,
)
} else {
h.log.Errorw("Failed to add path to watcher.",
h.mon.log.Errorw("Failed to add path to watcher.",
"path", path,
"error", err,
)
Expand All @@ -70,7 +70,7 @@ func (h *handle) Handle(w *fsevents.Watcher, event *fsevents.FsEvent) error {
return
}

h.log.Errorw("Failed to remove descriptor from watcher.",
h.mon.log.Errorw("Failed to remove descriptor from watcher.",
"path", path,
"error", err,
)
Expand All @@ -88,10 +88,10 @@ func (h *handle) Handle(w *fsevents.Watcher, event *fsevents.FsEvent) error {
eventType = types.CgroupEventTypeDelete
}

h.events <- types.CGroupEvent{
h.mon.send(h.ctx, &types.CGroupEvent{
Path: path,
EventType: eventType,
}
})
}()

return nil
Expand Down
131 changes: 124 additions & 7 deletions pkg/cgfsmon/public.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package cgfsmon

import (
"context"
"errors"
"io/fs"
"path/filepath"
"strings"

"github.com/black-desk/cgtproxy/pkg/types"
. "github.com/black-desk/lib/go/errwrap"
Expand All @@ -12,8 +16,90 @@ func (w *CGroupFSMonitor) Events() <-chan types.CGroupEvent {
return w.events
}

func (m *CGroupFSMonitor) walkFn(ctx context.Context) func(path string, d fs.DirEntry, err error) error {
return func(path string, d fs.DirEntry, err error) error {
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
m.log.Debug(
"Cgroup had been removed.",
"path", path,
)
err = nil
}
m.log.Errorw(
"Errors occurred while first time going through cgroupfs.",
"path", path,
"error", err,
)
err = nil
}

if !d.IsDir() {
return nil
}

cgEvent := &types.CGroupEvent{
Path: path,
EventType: types.CgroupEventTypeNew,
}

err = m.send(ctx, cgEvent)
if err != nil {
return err
}

return nil
}
}

func (m *CGroupFSMonitor) walk(ctx context.Context, path string) {
err := filepath.WalkDir(path, m.walkFn(ctx))
if err == nil {
return
}

return
}

func (m *CGroupFSMonitor) send(ctx context.Context, cgEvent *types.CGroupEvent) (err error) {
path := strings.TrimRight(cgEvent.Path, "/")
cgEvent.Path = path

if cgEvent.Path == string(m.root) {
// NOTE: Ignore cgroup root.
return nil
}

m.log.Debugw("New cgroup envent.",
"event", cgEvent,
)

select {
case <-ctx.Done():
err = ctx.Err()
return
case m.events <- *cgEvent:
m.log.Debugw("Cgroup event sent.",
"path", path,
)
}

return
}

func (w *CGroupFSMonitor) Run(ctx context.Context) (err error) {
defer Wrap(&err, "running filesystem watcher.")
defer Wrap(&err, "running filesystem watcher")
defer close(w.events)

ctx, cancel := context.WithCancelCause(ctx)

err = w.watcher.RegisterEventHandler(&handle{
ctx: ctx,
mon: w,
})
if err != nil {
return
}

// FIXME(black_desk):
// This stupid inotify package
Expand All @@ -30,14 +116,45 @@ func (w *CGroupFSMonitor) Run(ctx context.Context) (err error) {
return
}

go w.watcher.WatchAndHandle()
w.log.Info("Going through cgroupfs first time...")
w.walk(ctx, string(w.root))
w.log.Info("Going through cgroupfs first time...Done.")

<-ctx.Done()
err = w.watcher.StopAll()
if err != nil {
return
}
go func() {
var err error
loop:
for {
select {
case <-ctx.Done():
break loop
case err = <-w.watcher.Errors:
w.log.Errorw(
"Underling filesystem watcher error arrives.",
"error", err,
)
}
}
}()

go func() {
w.watcher.WatchAndHandle()
cancel(ErrUnderlingWatcherExited)
}()

defer func() {
var err error
err = w.watcher.StopAll()
if err == nil {
return
}

w.log.Errorw(
"Stopping underling filesystem watcher.",
"error", err,
)
}()

<-ctx.Done()
err = ctx.Err()
if err != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/routeman/public.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func (m *RouteManager) Run() (err error) {
defer Wrap(&err, "running route manager.")
defer Wrap(&err, "running route manager")

defer m.removeRoute()
err = m.addRoute()
Expand Down

0 comments on commit abda2b0

Please sign in to comment.