Skip to content

Commit

Permalink
feat(filter): determine processing of body in header decode/encode
Browse files Browse the repository at this point in the history
  • Loading branch information
qlonik committed Mar 6, 2024
1 parent a2f04fb commit 36babae
Showing 1 changed file with 65 additions and 49 deletions.
114 changes: 65 additions & 49 deletions privacy-profile-composer/pkg/envoyfilter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ type Filter struct {
// Runtime state of the filter
parentSpanContext model.SpanContext
headerMetadata common.HeaderMetadata
processDecodeBody bool
decodeDataBuffer string
processEncodeBody bool
encodeDataBuffer string
}

Expand All @@ -72,12 +74,47 @@ func (f *Filter) DecodeHeaders(header api.RequestHeaderMap, endStream bool) api.

common.LogDecodeHeaderData(header)

// when `endStream` is true, we have a header-only request
if !endStream {
return api.StopAndBuffer
} else {
if endStream {
// here we have a header-only request
return api.Continue
}

switch f.config.direction {
case common.Inbound:
// If it is an inbound sidecar, then do process the body
// run PII Analysis + OPA directly
f.processDecodeBody = true

case common.Outbound:
// If it is an outbound sidecar, then check if it's a request to a third party
// and only process the body in this case
destinationAddress, err := f.callbacks.GetProperty("destination.address")
if err != nil {
log.Println(err)
return api.Continue
}

isInternalDestination, err := f.checkInternalAddress(destinationAddress)
if err != nil {
log.Println(err)
return api.Continue
}

if isInternalDestination {
log.Printf("outbound sidecar processed a request to another sidecar in the mesh" +
"Prose will process it through the inbound decode function\n")
return api.Continue
}

//thirdPartyURL := header.Host()
f.processDecodeBody = true

default:
log.Printf("unexpected filter direction: %s\n", f.config.direction)
return api.Continue
}

return api.StopAndBuffer
}

func (f *Filter) DecodeData(buffer api.BufferInstance, endStream bool) api.StatusType {
Expand All @@ -103,41 +140,7 @@ func (f *Filter) DecodeData(buffer api.BufferInstance, endStream bool) api.Statu

span.Tag("buffer-value", f.decodeDataBuffer)

// TODO: move the entire decision to process body or not into DecodeHeaders/EncodeHeaders
processBody := false
// If it is an inbound sidecar, then do process the body
// run PII Analysis + OPA directly
if f.config.direction == common.Inbound {
processBody = true
}

// If it is an outbound sidecar, then check if it's a request to a third party
// and only process the body in this case
if f.config.direction == common.Outbound {
destinationAddress, err := f.callbacks.GetProperty("destination.address")
if err != nil {
log.Println(err)
return api.Continue
}

isInternalDestination, err := f.checkInternalAddress(destinationAddress)
if err != nil {
log.Println(err)
return api.Continue
}

if isInternalDestination {
log.Printf("outbound sidecar processed a request to another sidecar in the mesh" +
"Prose will process it through the inbound decode function\n")
return api.Continue
}

// this can be obtained in DecodeHeader()
// thirdPartyURL := header.Host()
processBody = true
}

if processBody {
if f.processDecodeBody {
sendLocalReply, err, proseTags := f.processBody(ctx, f.decodeDataBuffer, true)
// Some of these tags may include error info,
// so need to add them irrespective of the error
Expand Down Expand Up @@ -175,11 +178,30 @@ func (f *Filter) EncodeHeaders(header api.ResponseHeaderMap, endStream bool) api
span := f.tracer.StartSpan("test span in encode headers", zipkin.Parent(f.parentSpanContext))
defer span.Finish()

if !endStream {
return api.StopAndBuffer
} else {
if endStream {
// here we have a header-only request
return api.Continue
}

switch f.config.direction {
case common.Inbound:
// if inbound then ignore
// we will just address them in the inbound call to the caller svc
f.processEncodeBody = false

case common.Outbound:
// if outbound then indirect purpose of use violation
// TODO: This is usually data obtained from another service
// but it could also be data obtained from a third party. I.e. a kind of join violation.
// Not sure if we'll run into those cases in the examples we look at.
f.processEncodeBody = true

default:
log.Printf("unexpected filter direction: %s\n", f.config.direction)
return api.Continue
}

return api.StopAndBuffer
}

// Callbacks which are called in response path
Expand All @@ -206,11 +228,7 @@ func (f *Filter) EncodeData(buffer api.BufferInstance, endStream bool) api.Statu

span.Tag("buffer-value", f.encodeDataBuffer)

// if outbound then indirect purpose of use violation
// TODO: This is usually data obtained from another service
// but it could also be data obtained from a third party. I.e. a kind of join violation.
// Not sure if we'll run into those cases in the examples we look at.
if f.config.direction == common.Outbound {
if f.processEncodeBody {
sendLocalReply, err, proseTags := f.processBody(ctx, f.encodeDataBuffer, false)
for k, v := range proseTags {
span.Tag(k, v)
Expand All @@ -229,8 +247,6 @@ func (f *Filter) EncodeData(buffer api.BufferInstance, endStream bool) api.Statu
}
}

// if inbound then ignore
// we will just address them in the inbound call to the caller svc
return api.Continue
}

Expand Down

0 comments on commit 36babae

Please sign in to comment.