Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix request.body rego builtin sporadic unexpected EOF #31

Merged
merged 1 commit into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions internal/pkg/beskar/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/distribution/distribution/v3"
"github.com/hashicorp/memberlist"
"github.com/sirupsen/logrus"
"go.ciq.dev/beskar/internal/pkg/config"
"go.ciq.dev/beskar/internal/pkg/gossip"
"go.ciq.dev/beskar/internal/pkg/router"
eventv1 "go.ciq.dev/beskar/pkg/api/event/v1"
Expand Down Expand Up @@ -118,7 +119,7 @@ func (pm *pluginManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
pl.ServeHTTP(w, r)
}

func (pm *pluginManager) register(node *memberlist.Node, meta *gossip.BeskarMeta) error {
func (pm *pluginManager) register(node *memberlist.Node, meta *gossip.BeskarMeta, beskarConfig *config.BeskarConfig) error {
hostport := net.JoinHostPort(node.Addr.String(), strconv.Itoa(int(meta.ServicePort)))
info, err := pm.getPluginInfo(hostport)
if err != nil {
Expand Down Expand Up @@ -146,7 +147,7 @@ func (pm *pluginManager) register(node *memberlist.Node, meta *gossip.BeskarMeta
logger: pm.logger,
}

if err := pl.initRouter(info); err != nil {
if err := pl.initRouter(info, beskarConfig.Router.BodyLimit); err != nil {
return err
}

Expand All @@ -160,7 +161,7 @@ func (pm *pluginManager) register(node *memberlist.Node, meta *gossip.BeskarMeta
pl.version = info.Version
pl.mediaTypes = mediaTypes

if err := pl.initRouter(info); err != nil {
if err := pl.initRouter(info, beskarConfig.Router.BodyLimit); err != nil {
return err
}
}
Expand Down Expand Up @@ -343,7 +344,7 @@ func (p *plugin) sendEvent(ctx context.Context, event *eventv1.EventPayload, nod
}, backoff.WithContext(eb, ctx))
}

func (p *plugin) initRouter(info *pluginv1.Info) error {
func (p *plugin) initRouter(info *pluginv1.Info, bodyLimit int64) error {
var routerOptions []router.RegoRouterOption

if info.Router == nil {
Expand All @@ -352,6 +353,9 @@ func (p *plugin) initRouter(info *pluginv1.Info) error {
if len(info.Router.Data) > 0 {
routerOptions = append(routerOptions, router.WithData(bytes.NewReader(info.Router.Data)))
}
if bodyLimit > 0 {
routerOptions = append(routerOptions, router.WithBodyLimit(bodyLimit))
}
rr, err := router.New(info.Name, string(info.Router.Rego), routerOptions...)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/beskar/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (br *Registry) startGossipWatcher() {
br.logger.Debugf("Added groupcache peer %s", peer)
case gossip.PluginInstance:
br.logger.Infof("Register plugin")
if err := br.pluginManager.register(node, meta); err != nil {
if err := br.pluginManager.register(node, meta, br.beskarConfig); err != nil {
br.logger.Errorf("plugin register error: %s", err)
}
}
Expand All @@ -182,7 +182,7 @@ func (br *Registry) startGossipWatcher() {
br.logger.Debugf("Added groupcache peer %s", peer)
case gossip.PluginInstance:
br.logger.Infof("Register plugin")
if err := br.pluginManager.register(node, meta); err != nil {
if err := br.pluginManager.register(node, meta, br.beskarConfig); err != nil {
br.logger.Errorf("plugin register error: %s", err)
}
}
Expand Down
5 changes: 5 additions & 0 deletions internal/pkg/config/beskar.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ type Cache struct {
Size uint32 `yaml:"size"`
}

type Router struct {
BodyLimit int64 `yaml:"bodylimit"`
}

type BeskarConfig struct {
Version string `yaml:"version"`
Profiling bool `yaml:"profiling"`
Hostname string `yaml:"hostname"`
Cache Cache `yaml:"cache"`
Gossip gossip.Config `yaml:"gossip"`
Registry *configuration.Configuration `yaml:"registry"`
Router Router `yaml:"router"`
}

type BeskarConfigV1 BeskarConfig
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/config/default/beskar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ gossip:
key: XD1IOhcp0HWFgZJ/HAaARqMKJwfMWtz284Yj7wxmerA=
peers: []

router:
bodyLimit: 8192

# hostname returned to plugins to access registry service,
# automatically set when deployed on kubernetes
hostname: localhost
Expand Down
65 changes: 22 additions & 43 deletions internal/pkg/router/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"io"
"net/http"
"strings"
"sync"

"github.com/distribution/distribution/v3"
"github.com/distribution/reference"
Expand All @@ -28,6 +27,7 @@ type funcContext struct {
req *http.Request
registry distribution.Namespace
builtinErr error
bodyLimit int64
}

var ociBlobDigestBuiltin = rego.Function3(
Expand Down Expand Up @@ -129,22 +129,6 @@ var ociBlobDigestBuiltin = rego.Function3(
},
)

type bodyCloser struct {
io.Reader
closeFn func() error
}

func (bc bodyCloser) Close() error {
return bc.closeFn()
}

var bufferPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 8192)
return &buffer
},
}

var requestBodyBuiltin = rego.FunctionDyn(
&rego.Function{
Name: "request.body",
Expand All @@ -165,38 +149,33 @@ var requestBodyBuiltin = rego.FunctionDyn(
}
}()

if funcContext.req.Body != nil && funcContext.req.Body != http.NoBody {
buf := bufferPool.Get().(*[]byte)

n, err := io.ReadAtLeast(funcContext.req.Body, *buf, 1)
if err != nil {
return nil, fmt.Errorf("empty body request")
}

bodyReader := bytes.NewReader((*buf)[:n])

v, err := ast.ValueFromReader(bodyReader)
if err != nil {
return nil, err
}
if funcContext.req.Body == nil || funcContext.req.Body == http.NoBody {
v, err := ast.InterfaceToValue(nil)
return ast.NewTerm(v), err
}

_, _ = bodyReader.Seek(0, io.SeekStart)
buf := new(bytes.Buffer)

originalBody := funcContext.req.Body
// plugin API are receiving small JSON objects, so we limit it to 8KB by default
// configurable via beskar router configuration.
_, err := buf.ReadFrom(io.LimitReader(funcContext.req.Body, funcContext.bodyLimit))
if err != nil {
return nil, err
} else if err := funcContext.req.Body.Close(); err != nil {
return nil, err
}

funcContext.req.Body = &bodyCloser{
Reader: bodyReader,
closeFn: func() error {
defer bufferPool.Put(buf)
return originalBody.Close()
},
}
bodyReader := bytes.NewReader(buf.Bytes())

return ast.NewTerm(v), nil
v, err := ast.ValueFromReader(bodyReader)
if err != nil {
return nil, err
} else if _, err = bodyReader.Seek(0, io.SeekStart); err != nil {
return nil, err
}

v, err := ast.InterfaceToValue(nil)
funcContext.req.Body = io.NopCloser(bodyReader)

return ast.NewTerm(v), err
return ast.NewTerm(v), nil
},
)
20 changes: 15 additions & 5 deletions internal/pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ type Result struct {
type RegoOption = func(r *rego.Rego)

type RegoRouter struct {
name string
options []RegoOption
peq rego.PreparedEvalQuery
name string
options []RegoOption
peq rego.PreparedEvalQuery
bodyLimit int64
}

type RegoRouterOption func(r *RegoRouter) error
Expand Down Expand Up @@ -59,6 +60,13 @@ func WithOption(option RegoOption) RegoRouterOption {
}
}

func WithBodyLimit(limit int64) RegoRouterOption {
return func(r *RegoRouter) error {
r.bodyLimit = limit
return nil
}
}

func New(name, module string, options ...RegoRouterOption) (_ *RegoRouter, err error) {
router := &RegoRouter{
name: name,
Expand All @@ -68,6 +76,7 @@ func New(name, module string, options ...RegoRouterOption) (_ *RegoRouter, err e
ociBlobDigestBuiltin,
requestBodyBuiltin,
},
bodyLimit: 8192,
}

for _, opt := range options {
Expand All @@ -86,8 +95,9 @@ func New(name, module string, options ...RegoRouterOption) (_ *RegoRouter, err e

func (rr *RegoRouter) Decision(req *http.Request, registry distribution.Namespace) (*Result, error) {
fctx := &funcContext{
req: req,
registry: registry,
req: req,
registry: registry,
bodyLimit: rr.bodyLimit,
}
ctx := context.WithValue(req.Context(), &funcContextKey, fctx)

Expand Down