Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⭐️ replace dque with pdque #951

Merged
merged 22 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/go-plugin v1.5.2 // indirect
github.com/hashicorp/go-version v1.6.0
github.com/joncrlsn/dque v0.0.0-20211108142734-c2ef48c5192a
github.com/jstemmer/go-junit-report/v2 v2.1.0
github.com/mattn/go-isatty v0.0.20
github.com/mitchellh/mapstructure v1.5.0
Expand Down Expand Up @@ -322,7 +321,7 @@ require (
github.com/sivchari/tenv v1.7.1 // indirect
github.com/sonatard/noctx v0.0.2 // indirect
github.com/sourcegraph/go-diff v0.7.0 // indirect
github.com/spf13/afero v1.10.0 // indirect
github.com/spf13/afero v1.10.0
github.com/spf13/cast v1.5.1 // indirect
github.com/ssgreg/nlreturn/v2 v2.2.1 // indirect
github.com/stbenjam/no-sprintf-host-port v0.1.1 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJA
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
Expand Down Expand Up @@ -548,8 +547,6 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/joncrlsn/dque v0.0.0-20211108142734-c2ef48c5192a h1:sfe532Ipn7GX0V6mHdynBk393rDmqgI0QmjLK7ct7TU=
github.com/joncrlsn/dque v0.0.0-20211108142734-c2ef48c5192a/go.mod h1:dNKs71rs2VJGBAmttu7fouEsRQlRjxy0p1Sx+T5wbpY=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down Expand Up @@ -589,7 +586,6 @@ github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down
28 changes: 12 additions & 16 deletions policy/scan/disk_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ import (
"sync"
"time"

"github.com/joncrlsn/dque"
"github.com/rs/zerolog/log"
"go.mondoo.com/cnspec/v9/policy/scan/pdque"
"google.golang.org/protobuf/proto"
)

type diskQueueConfig struct {
dir string
filename string
segmentSize int
sync bool
dir string
filename string
maxSize int
sync bool
}

var defaultDqueConfig = diskQueueConfig{
dir: "/tmp/cnspec-queue", // TODO: consider configurable path
filename: "disk-queue",
segmentSize: 500,
sync: false,
dir: "/tmp/cnspec-queue", // TODO: consider configurable path
filename: "disk-queue",
maxSize: 500,
sync: false,
}

// queueMsg is the being stored in disk queue
Expand All @@ -40,7 +40,7 @@ type queuePayload struct {
}

type diskQueueClient struct {
queue *dque.DQue
queue *pdque.Queue
once sync.Once
wg sync.WaitGroup
entries chan Job
Expand Down Expand Up @@ -68,15 +68,11 @@ func newDqueClient(config diskQueueConfig, handler func(job *Job)) (*diskQueueCl
return nil, fmt.Errorf("cannot create queue directory: %s", err)
}

q.queue, err = dque.NewOrOpen(config.filename, config.dir, config.segmentSize, diskQueueEntryBuilder)
q.queue, err = pdque.NewOrOpen(config.filename, config.dir, config.maxSize, diskQueueEntryBuilder)
if err != nil {
return nil, err
}

if !config.sync {
_ = q.queue.TurboOn()
}

q.entries = make(chan Job)

q.wg.Add(2)
Expand Down Expand Up @@ -127,7 +123,7 @@ func (c *diskQueueClient) popper() {
entry, err := c.queue.DequeueBlock()
if err != nil {
switch err {
case dque.ErrQueueClosed:
case pdque.ErrQueueClosed:
return
default:
log.Error().Err(err).Msg("could not pop job from disk queue")
Expand Down
71 changes: 71 additions & 0 deletions policy/scan/disk_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) Mondoo, Inc.
// SPDX-License-Identifier: BUSL-1.1

package scan

import (
"os"
"testing"

"go.mondoo.com/cnquery/v9/providers-sdk/v1/inventory"
)

func TestDiskQueueClient_EnqueueDequeue(t *testing.T) {
mariuskimmina marked this conversation as resolved.
Show resolved Hide resolved
tempDir, err := os.MkdirTemp("", "testdir")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir) // Clean up

// Update the configuration to use the temporary directory
testConfig := defaultDqueConfig
testConfig.dir = tempDir

completionChannel := make(chan struct{}, 50) // Channel to signal job completion

handler := func(job *Job) {
completionChannel <- struct{}{} // Signal completion
}

client, err := newDqueClient(testConfig, handler)
if err != nil {
t.Fatalf("Failed to create diskQueueClient: %v", err)
}
defer client.Stop()

// Test Enqueue
testJob := &Job{
Inventory: &inventory.Inventory{
Spec: &inventory.InventorySpec{
Assets: []*inventory.Asset{
{
Connections: []*inventory.Config{
{
Type: "k8s",
Options: map[string]string{
"path": "./testdata/2pods.yaml",
},
Discover: &inventory.Discovery{
Targets: []string{"auto"},
},
},
},
ManagedBy: "mondoo-operator-123",
},
},
},
},
}
for i := 0; i < 50; i++ {
client.Channel() <- *testJob
}

for i := 0; i < 50; i++ {
<-completionChannel
}

// Verify that all jobs have been processed
if len(completionChannel) != 0 {
t.Errorf("Expected handler to be called 50 times, but was called %d times", 50-len(completionChannel))
}
}
Loading