Skip to content

Commit

Permalink
Merge pull request #31 from ctrliq/plugin-api-eof-fix
Browse files Browse the repository at this point in the history
Fix request.body rego builtin sporadic unexpected EOF
  • Loading branch information
ikaneshiro authored Jan 15, 2024
2 parents 01976d3 + 3c0282d commit 445099c
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 54 deletions.
12 changes: 8 additions & 4 deletions internal/pkg/beskar/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/distribution/distribution/v3"
"github.com/hashicorp/memberlist"
"github.com/sirupsen/logrus"
"go.ciq.dev/beskar/internal/pkg/config"
"go.ciq.dev/beskar/internal/pkg/gossip"
"go.ciq.dev/beskar/internal/pkg/router"
eventv1 "go.ciq.dev/beskar/pkg/api/event/v1"
Expand Down Expand Up @@ -123,7 +124,7 @@ func (pm *pluginManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
pl.ServeHTTP(w, r)
}

func (pm *pluginManager) register(node *memberlist.Node, meta *gossip.BeskarMeta) error {
func (pm *pluginManager) register(node *memberlist.Node, meta *gossip.BeskarMeta, beskarConfig *config.BeskarConfig) error {
hostport := net.JoinHostPort(node.Addr.String(), strconv.Itoa(int(meta.ServicePort)))
info, err := pm.getPluginInfo(hostport)
if err != nil {
Expand Down Expand Up @@ -151,7 +152,7 @@ func (pm *pluginManager) register(node *memberlist.Node, meta *gossip.BeskarMeta
logger: pm.logger,
}

if err := pl.initRouter(info); err != nil {
if err := pl.initRouter(info, beskarConfig.Router.BodyLimit); err != nil {
return err
}

Expand All @@ -165,7 +166,7 @@ func (pm *pluginManager) register(node *memberlist.Node, meta *gossip.BeskarMeta
pl.version = info.Version
pl.mediaTypes = mediaTypes

if err := pl.initRouter(info); err != nil {
if err := pl.initRouter(info, beskarConfig.Router.BodyLimit); err != nil {
return err
}
}
Expand Down Expand Up @@ -351,7 +352,7 @@ func (p *plugin) sendEvent(ctx context.Context, event *eventv1.EventPayload, nod
}, backoff.WithContext(eb, ctx))
}

func (p *plugin) initRouter(info *pluginv1.Info) error {
func (p *plugin) initRouter(info *pluginv1.Info, bodyLimit int64) error {
var routerOptions []router.RegoRouterOption

if info.Router == nil {
Expand All @@ -360,6 +361,9 @@ func (p *plugin) initRouter(info *pluginv1.Info) error {
if len(info.Router.Data) > 0 {
routerOptions = append(routerOptions, router.WithData(bytes.NewReader(info.Router.Data)))
}
if bodyLimit > 0 {
routerOptions = append(routerOptions, router.WithBodyLimit(bodyLimit))
}
rr, err := router.New(info.Name, string(info.Router.Rego), routerOptions...)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/beskar/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (br *Registry) startGossipWatcher() {
br.logger.Debugf("Added groupcache peer %s", peer)
case gossip.PluginInstance:
br.logger.Infof("Register plugin")
if err := br.pluginManager.register(node, meta); err != nil {
if err := br.pluginManager.register(node, meta, br.beskarConfig); err != nil {
br.logger.Errorf("plugin register error: %s", err)
}
}
Expand All @@ -182,7 +182,7 @@ func (br *Registry) startGossipWatcher() {
br.logger.Debugf("Added groupcache peer %s", peer)
case gossip.PluginInstance:
br.logger.Infof("Register plugin")
if err := br.pluginManager.register(node, meta); err != nil {
if err := br.pluginManager.register(node, meta, br.beskarConfig); err != nil {
br.logger.Errorf("plugin register error: %s", err)
}
}
Expand Down
5 changes: 5 additions & 0 deletions internal/pkg/config/beskar.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ type Cache struct {
Size uint32 `yaml:"size"`
}

type Router struct {
BodyLimit int64 `yaml:"bodylimit"`
}

type BeskarConfig struct {
Version string `yaml:"version"`
Profiling bool `yaml:"profiling"`
Hostname string `yaml:"hostname"`
Cache Cache `yaml:"cache"`
Gossip gossip.Config `yaml:"gossip"`
Registry *configuration.Configuration `yaml:"registry"`
Router Router `yaml:"router"`
}

type BeskarConfigV1 BeskarConfig
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/config/default/beskar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ gossip:
key: XD1IOhcp0HWFgZJ/HAaARqMKJwfMWtz284Yj7wxmerA=
peers: []

router:
bodyLimit: 8192

# hostname returned to plugins to access registry service,
# automatically set when deployed on kubernetes
hostname: localhost
Expand Down
65 changes: 22 additions & 43 deletions internal/pkg/router/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"io"
"net/http"
"strings"
"sync"

"github.com/distribution/distribution/v3"
"github.com/distribution/reference"
Expand All @@ -28,6 +27,7 @@ type funcContext struct {
req *http.Request
registry distribution.Namespace
builtinErr error
bodyLimit int64
}

var ociBlobDigestBuiltin = rego.Function3(
Expand Down Expand Up @@ -129,22 +129,6 @@ var ociBlobDigestBuiltin = rego.Function3(
},
)

type bodyCloser struct {
io.Reader
closeFn func() error
}

func (bc bodyCloser) Close() error {
return bc.closeFn()
}

var bufferPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 8192)
return &buffer
},
}

var requestBodyBuiltin = rego.FunctionDyn(
&rego.Function{
Name: "request.body",
Expand All @@ -165,38 +149,33 @@ var requestBodyBuiltin = rego.FunctionDyn(
}
}()

if funcContext.req.Body != nil && funcContext.req.Body != http.NoBody {
buf := bufferPool.Get().(*[]byte)

n, err := io.ReadAtLeast(funcContext.req.Body, *buf, 1)
if err != nil {
return nil, fmt.Errorf("empty body request")
}

bodyReader := bytes.NewReader((*buf)[:n])

v, err := ast.ValueFromReader(bodyReader)
if err != nil {
return nil, err
}
if funcContext.req.Body == nil || funcContext.req.Body == http.NoBody {
v, err := ast.InterfaceToValue(nil)
return ast.NewTerm(v), err
}

_, _ = bodyReader.Seek(0, io.SeekStart)
buf := new(bytes.Buffer)

originalBody := funcContext.req.Body
// plugin API are receiving small JSON objects, so we limit it to 8KB by default
// configurable via beskar router configuration.
_, err := buf.ReadFrom(io.LimitReader(funcContext.req.Body, funcContext.bodyLimit))
if err != nil {
return nil, err
} else if err := funcContext.req.Body.Close(); err != nil {
return nil, err
}

funcContext.req.Body = &bodyCloser{
Reader: bodyReader,
closeFn: func() error {
defer bufferPool.Put(buf)
return originalBody.Close()
},
}
bodyReader := bytes.NewReader(buf.Bytes())

return ast.NewTerm(v), nil
v, err := ast.ValueFromReader(bodyReader)
if err != nil {
return nil, err
} else if _, err = bodyReader.Seek(0, io.SeekStart); err != nil {
return nil, err
}

v, err := ast.InterfaceToValue(nil)
funcContext.req.Body = io.NopCloser(bodyReader)

return ast.NewTerm(v), err
return ast.NewTerm(v), nil
},
)
20 changes: 15 additions & 5 deletions internal/pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ type Result struct {
type RegoOption = func(r *rego.Rego)

type RegoRouter struct {
name string
options []RegoOption
peq rego.PreparedEvalQuery
name string
options []RegoOption
peq rego.PreparedEvalQuery
bodyLimit int64
}

type RegoRouterOption func(r *RegoRouter) error
Expand Down Expand Up @@ -59,6 +60,13 @@ func WithOption(option RegoOption) RegoRouterOption {
}
}

func WithBodyLimit(limit int64) RegoRouterOption {
return func(r *RegoRouter) error {
r.bodyLimit = limit
return nil
}
}

func New(name, module string, options ...RegoRouterOption) (_ *RegoRouter, err error) {
router := &RegoRouter{
name: name,
Expand All @@ -68,6 +76,7 @@ func New(name, module string, options ...RegoRouterOption) (_ *RegoRouter, err e
ociBlobDigestBuiltin,
requestBodyBuiltin,
},
bodyLimit: 8192,
}

for _, opt := range options {
Expand All @@ -86,8 +95,9 @@ func New(name, module string, options ...RegoRouterOption) (_ *RegoRouter, err e

func (rr *RegoRouter) Decision(req *http.Request, registry distribution.Namespace) (*Result, error) {
fctx := &funcContext{
req: req,
registry: registry,
req: req,
registry: registry,
bodyLimit: rr.bodyLimit,
}
ctx := context.WithValue(req.Context(), &funcContextKey, fctx)

Expand Down

0 comments on commit 445099c

Please sign in to comment.