Skip to content

Commit

Permalink
add: LIFO option to dedupe processor (#148)
Browse files Browse the repository at this point in the history
Signed-off-by: Jem Davies <[email protected]>
  • Loading branch information
gregfurman authored and jem-davies committed Nov 7, 2024
1 parent bf03539 commit dc98f1c
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 9 deletions.
38 changes: 38 additions & 0 deletions config/test/deduplicate_lifo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
pipeline:
processors:
- dedupe:
cache: local
key: ${! json("user_uuid") }:${! json("month") }
strategy: LIFO

cache_resources:
- label: local
memory:
default_ttl: 1m

tests:
- name: de-duplicate LIFO across batches
input_batches:
-
- content: '{"json":"data-1","month":"2024-01-01","user_uuid":"id-1"}'
- content: '{"json":"data-1","month":"2024-01-01","user_uuid":"id-2"}'
- content: '{"json":"data-1","month":"2024-01-01","user_uuid":"id-3"}'
- content: '{"json":"data-1","month":"2024-01-01","user_uuid":"id-4"}'
- content: '{"json":"updated-data-1","month":"2024-01-01","user_uuid":"id-1"}'
-
- content: '{"json":"data-2","month":"2024-02-01","user_uuid":"id-1"}'
- content: '{"json":"data-2","month":"2024-02-01","user_uuid":"id-2"}'
- content: '{"json":"data-2","month":"2024-02-01","user_uuid":"id-3"}'
- content: '{"json":"data-2","month":"2024-02-01","user_uuid":"id-4"}'
- content: '{"json":"updated-data-2","month":"2024-02-01","user_uuid":"id-1"}'
output_batches:
-
- json_equals: {"user_uuid": "id-2", "month": "2024-01-01", "json": "data-1"}
- json_equals: {"user_uuid": "id-3", "month": "2024-01-01", "json": "data-1"}
- json_equals: {"user_uuid": "id-4", "month": "2024-01-01", "json": "data-1"}
- json_equals: {"user_uuid": "id-1", "month": "2024-01-01", "json": "updated-data-1"}
-
- json_equals: {"user_uuid": "id-2", "month": "2024-02-01", "json": "data-2"}
- json_equals: {"user_uuid": "id-3", "month": "2024-02-01", "json": "data-2"}
- json_equals: {"user_uuid": "id-4", "month": "2024-02-01", "json": "data-2"}
- json_equals: {"user_uuid": "id-1", "month": "2024-02-01", "json": "updated-data-2"}
50 changes: 43 additions & 7 deletions internal/impl/pure/processor_dedupe.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
dedupFieldCache = "cache"
dedupFieldKey = "key"
dedupFieldDropOnCacheErr = "drop_on_err"
dedupFieldStrategy = "strategy"
)

func dedupeProcSpec() *service.ConfigSpec {
Expand All @@ -40,7 +41,7 @@ This processor enacts on individual messages only, in order to perform a dedupli
Performing deduplication on a stream using a distributed cache voids any at-least-once guarantees that it previously had. This is because the cache will preserve message signatures even if the message fails to leave the Bento pipeline, which would cause message loss in the event of an outage at the output sink followed by a restart of the Bento instance (or a server crash, etc).
This problem can be mitigated by using an in-memory cache and distributing messages to horizontally scaled Bento pipelines partitioned by the deduplication key. However, in situations where at-least-once delivery guarantees are important it is worth avoiding deduplication in favour of implement idempotent behaviour at the edge of your stream pipelines.`).
This problem can be mitigated by using an in-memory cache and distributing messages to horizontally scaled Bento pipelines partitioned by the deduplication key. However, in situations where at-least-once delivery guarantees are important it is worth avoiding deduplication in favour of implementing idempotent behaviour at the edge of your stream pipelines.`).
Example(
"Deduplicate based on Kafka key",
"The following configuration demonstrates a pipeline that deduplicates messages based on the Kafka key.",
Expand All @@ -66,6 +67,12 @@ cache_resources:
service.NewBoolField(dedupFieldDropOnCacheErr).
Description("Whether messages should be dropped when the cache returns a general error such as a network issue.").
Default(true),
service.NewStringAnnotatedEnumField(dedupFieldStrategy, map[string]string{
"FIFO": "Keeps the first value seen for each key.",
"LIFO": "Keeps the last value seen for each key.",
}).
Description("Controls how to handle duplicate values.").
Default("FIFO").Advanced(),
)
}

Expand All @@ -88,8 +95,21 @@ func init() {
return nil, err
}

dedupeStrategy, err := conf.FieldString(dedupFieldStrategy)
if err != nil {
return nil, err
}

var isFifo bool
switch dedupeStrategy {
case "FIFO":
isFifo = true
case "LIFO":
isFifo = false
}

mgr := interop.UnwrapManagement(res)
p, err := newDedupe(cache, keyStr, dropOnErr, mgr)
p, err := newDedupe(cache, keyStr, dropOnErr, isFifo, mgr)
if err != nil {
return nil, err
}
Expand All @@ -100,16 +120,20 @@ func init() {
}
}

// type cacheOperation func(cache cache.V1, ctx context.Context, key string, value []byte, ttl *time.Duration) error

type dedupeProc struct {
log log.Modular

dropOnErr bool
key *field.Expression
mgr bundle.NewManagement
cacheName string
isFifo bool
// cacheOp cacheOperation
}

func newDedupe(cache, keyStr string, dropOnErr bool, mgr bundle.NewManagement) (*dedupeProc, error) {
func newDedupe(cacheStr, keyStr string, dropOnErr, isFifo bool, mgr bundle.NewManagement) (*dedupeProc, error) {
if keyStr == "" {
return nil, errors.New("dedupe key must not be empty")
}
Expand All @@ -118,22 +142,28 @@ func newDedupe(cache, keyStr string, dropOnErr bool, mgr bundle.NewManagement) (
return nil, fmt.Errorf("failed to parse key expression: %v", err)
}

if !mgr.ProbeCache(cache) {
return nil, fmt.Errorf("cache resource '%v' was not found", cache)
if !mgr.ProbeCache(cacheStr) {
return nil, fmt.Errorf("cache resource '%v' was not found", cacheStr)
}

return &dedupeProc{
log: mgr.Logger(),
dropOnErr: dropOnErr,
isFifo: isFifo,
key: key,
mgr: mgr,
cacheName: cache,
cacheName: cacheStr,
}, nil
}

func (d *dedupeProc) ProcessBatch(ctx *processor.BatchProcContext, batch message.Batch) ([]message.Batch, error) {
newBatch := message.QuickBatch(nil)
_ = batch.Iter(func(i int, p *message.Part) error {
if !d.isFifo {
// reverse order if LIFO
i = len(batch) - i - 1
p = batch[i]
}
key, err := d.key.String(i, batch)
if err != nil {
err = fmt.Errorf("key interpolation error: %w", err)
Expand Down Expand Up @@ -161,7 +191,13 @@ func (d *dedupeProc) ProcessBatch(ctx *processor.BatchProcContext, batch message
ctx.OnError(err, i, p)
}

newBatch = append(newBatch, p)
if d.isFifo {
// append if FIFO
newBatch = append(newBatch, p)
} else {
// prepend if LIFO since we are reversing the order
newBatch = append([]*message.Part{p}, newBatch...)
}
return nil
})

Expand Down
37 changes: 37 additions & 0 deletions internal/impl/pure/processor_dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,43 @@ dedupe:
assert.Equal(t, 2, msgOut[0].Len())
}

func TestDedupeLIFO(t *testing.T) {
mgr := mock.NewManager()
mgr.Caches["foocache"] = map[string]mock.CacheItem{}

conf, err := testutil.ProcessorFromYAML(`
dedupe:
cache: foocache
key: ${!json("key")}
strategy: LIFO
`)
require.NoError(t, err)

proc, err := mgr.NewProcessor(conf)
require.NoError(t, err)

ctx := context.Background()

msgIn := message.QuickBatch([][]byte{
[]byte(`{"key":"1","value":"foo 1"}`),
[]byte(`{"key":"2","value":"foo 2"}`),
[]byte(`{"key":"1","value":"updated foo"}`),
})

expectedOut := [][]byte{
[]byte(`{"key":"2","value":"foo 2"}`),
[]byte(`{"key":"1","value":"updated foo"}`),
}

msgOut, err := proc.ProcessBatch(ctx, msgIn)
require.NoError(t, err)
require.Len(t, msgOut, 1)
require.Len(t, msgOut[0], 2)

require.Equal(t, expectedOut[0], msgOut[0].Get(0).AsBytes())
require.Equal(t, expectedOut[1], msgOut[0].Get(1).AsBytes())
}

func TestDedupeBadCache(t *testing.T) {
conf, err := testutil.ProcessorFromYAML(`
dedupe:
Expand Down
42 changes: 40 additions & 2 deletions website/docs/components/processors/dedupe.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,39 @@ import TabItem from '@theme/TabItem';

Deduplicates messages by storing a key value in a cache using the `add` operator. If the key already exists within the cache it is dropped.


<Tabs defaultValue="common" values={[
{ label: 'Common', value: 'common', },
{ label: 'Advanced', value: 'advanced', },
]}>

<TabItem value="common">

```yml
# Common config fields, showing default values
label: ""
dedupe:
cache: "" # No default (required)
key: ${! meta("kafka_key") } # No default (required)
drop_on_err: true
```
</TabItem>
<TabItem value="advanced">
```yml
# Config fields, showing default values
# All config fields, showing default values
label: ""
dedupe:
cache: "" # No default (required)
key: ${! meta("kafka_key") } # No default (required)
drop_on_err: true
strategy: FIFO
```
</TabItem>
</Tabs>
Caches must be configured as resources, for more information check out the [cache documentation here](/docs/components/caches/about).
When using this processor with an output target that might fail you should always wrap the output within an indefinite [`retry`](/docs/components/outputs/retry) block. This ensures that during outages your messages aren't reprocessed after failures, which would result in messages being dropped.
Expand All @@ -38,7 +62,7 @@ This processor enacts on individual messages only, in order to perform a dedupli

Performing deduplication on a stream using a distributed cache voids any at-least-once guarantees that it previously had. This is because the cache will preserve message signatures even if the message fails to leave the Bento pipeline, which would cause message loss in the event of an outage at the output sink followed by a restart of the Bento instance (or a server crash, etc).

This problem can be mitigated by using an in-memory cache and distributing messages to horizontally scaled Bento pipelines partitioned by the deduplication key. However, in situations where at-least-once delivery guarantees are important it is worth avoiding deduplication in favour of implement idempotent behaviour at the edge of your stream pipelines.
This problem can be mitigated by using an in-memory cache and distributing messages to horizontally scaled Bento pipelines partitioned by the deduplication key. However, in situations where at-least-once delivery guarantees are important it is worth avoiding deduplication in favour of implementing idempotent behaviour at the edge of your stream pipelines.

## Fields

Expand Down Expand Up @@ -73,6 +97,20 @@ Whether messages should be dropped when the cache returns a general error such a
Type: `bool`
Default: `true`

### `strategy`

Controls how to handle duplicate values.


Type: `string`
Default: `"FIFO"`

| Option | Summary |
|---|---|
| `FIFO` | Keeps the first value seen for each key. |
| `LIFO` | Keeps the last value seen for each key. |


## Examples

<Tabs defaultValue="Deduplicate based on Kafka key" values={[
Expand Down

0 comments on commit dc98f1c

Please sign in to comment.