Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
mmetc committed Dec 18, 2024
1 parent 43f68a3 commit bd3c61d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 4 deletions.
31 changes: 28 additions & 3 deletions pkg/acquisition/modules/kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"os"
"runtime"
"strconv"
"strings"
"testing"
"time"
Expand All @@ -29,13 +30,16 @@ import (

func getLocalStackEndpoint() (string, error) {
endpoint := "http://localhost:4566"

if v := os.Getenv("AWS_ENDPOINT_FORCE"); v != "" {
v = strings.TrimPrefix(v, "http://")

_, err := net.Dial("tcp", v)
if err != nil {
return "", fmt.Errorf("while dialing %s: %w: aws endpoint isn't available", v, err)
}
}

return endpoint, nil
}

Expand All @@ -49,13 +53,14 @@ func GenSubObject(t *testing.T, i int) []byte {
LogEvents: []CloudwatchSubscriptionLogEvent{
{
ID: "testid",
Message: fmt.Sprintf("%d", i),
Message: strconv.Itoa(i),
Timestamp: time.Now().UTC().Unix(),
},
},
}
body, err := json.Marshal(r)
require.NoError(t, err)

var b bytes.Buffer
gz := gzip.NewWriter(&b)
gz.Write(body)
Expand All @@ -68,19 +73,24 @@ func GenSubObject(t *testing.T, i int) []byte {
func WriteToStream(t *testing.T, streamName string, count int, shards int, sub bool) {
endpoint, err := getLocalStackEndpoint()
require.NoError(t, err)

sess := session.Must(session.NewSession())
kinesisClient := kinesis.New(sess, aws.NewConfig().WithEndpoint(endpoint).WithRegion("us-east-1"))

for i := range count {
partition := "partition"
if shards != 1 {
partition = fmt.Sprintf("partition-%d", i%shards)
}

var data []byte

if sub {
data = GenSubObject(t, i)
} else {
data = []byte(fmt.Sprintf("%d", i))
data = []byte(strconv.Itoa(i))
}

_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
Data: data,
PartitionKey: aws.String(partition),
Expand All @@ -105,6 +115,7 @@ func TestBadConfiguration(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows")
}

tests := []struct {
config string
expectedErr string
Expand Down Expand Up @@ -136,6 +147,7 @@ stream_arn: arn:aws:kinesis:eu-west-1:123456789012:stream/my-stream`,
}

subLogger := log.WithField("type", "kinesis")

for _, test := range tests {
f := KinesisSource{}
err := f.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE)
Expand All @@ -145,9 +157,11 @@ stream_arn: arn:aws:kinesis:eu-west-1:123456789012:stream/my-stream`,

func TestReadFromStream(t *testing.T) {
ctx := context.Background()

if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows")
}

tests := []struct {
config string
count int
Expand All @@ -163,32 +177,38 @@ stream_name: stream-1-shard`,
},
}
endpoint, _ := getLocalStackEndpoint()

for _, test := range tests {
f := KinesisSource{}
config := fmt.Sprintf(test.config, endpoint)
err := f.Configure([]byte(config), log.WithField("type", "kinesis"), configuration.METRICS_NONE)
require.NoError(t, err)

tomb := &tomb.Tomb{}
out := make(chan types.Event)
err = f.StreamingAcquisition(ctx, out, tomb)
require.NoError(t, err)
// Allow the datasource to start listening to the stream
time.Sleep(4 * time.Second)
WriteToStream(t, f.Config.StreamName, test.count, test.shards, false)

for i := range test.count {
e := <-out
assert.Equal(t, fmt.Sprintf("%d", i), e.Line.Raw)
assert.Equal(t, strconv.Itoa(i), e.Line.Raw)
}

tomb.Kill(nil)
tomb.Wait()
}
}

func TestReadFromMultipleShards(t *testing.T) {
ctx := context.Background()

if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows")
}

tests := []struct {
config string
count int
Expand All @@ -204,6 +224,7 @@ stream_name: stream-2-shards`,
},
}
endpoint, _ := getLocalStackEndpoint()

for _, test := range tests {
f := KinesisSource{}
config := fmt.Sprintf(test.config, endpoint)
Expand All @@ -216,7 +237,9 @@ stream_name: stream-2-shards`,
// Allow the datasource to start listening to the stream
time.Sleep(4 * time.Second)
WriteToStream(t, f.Config.StreamName, test.count, test.shards, false)

c := 0

for range test.count {
<-out
c += 1
Expand All @@ -229,9 +252,11 @@ stream_name: stream-2-shards`,

func TestFromSubscription(t *testing.T) {
ctx := context.Background()

if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows")
}

tests := []struct {
config string
count int
Expand Down
2 changes: 1 addition & 1 deletion pkg/parser/parsing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (

"github.com/davecgh/go-spew/spew"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"

"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
"github.com/crowdsecurity/crowdsec/pkg/types"
Expand Down

0 comments on commit bd3c61d

Please sign in to comment.