From 690b027f6089fc33aaaded65db122e96b66ddb10 Mon Sep 17 00:00:00 2001 From: Michal Rostecki Date: Wed, 1 Jun 2022 14:09:31 +0200 Subject: [PATCH 01/10] plugins(s3): Do not add TLV info to packets For now we are going to send uncompresseed packets to S3. Therefore there is no need for TLV info. Signed-off-by: Michal Rostecki --- pkg/plugins/s3/s3.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/plugins/s3/s3.go b/pkg/plugins/s3/s3.go index 6beb613..54a0c4a 100644 --- a/pkg/plugins/s3/s3.go +++ b/pkg/plugins/s3/s3.go @@ -3,16 +3,17 @@ package s3 import ( "bytes" "context" - "encoding/binary" "fmt" + "log" + "time" + "github.com/aws/aws-sdk-go-v2/aws" awsConfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/deepfence/PacketStreamer/pkg/config" "github.com/deepfence/PacketStreamer/pkg/file" - "log" - "time" ) const ( @@ -78,7 +79,6 @@ func (mpu *MultipartUpload) appendToBuffer(data []byte) { func (p *Plugin) Start(ctx context.Context) chan<- string { inputChan := make(chan string) go func() { - payloadMarker := []byte{0x0, 0x0, 0x0, 0x0} var mpu *MultipartUpload for { @@ -101,9 +101,6 @@ func (p *Plugin) Start(ctx context.Context) chan<- string { } } data := []byte(chunk) - dataLen := len(data) - binary.LittleEndian.PutUint32(payloadMarker[:], uint32(dataLen)) - mpu.appendToBuffer(payloadMarker) mpu.appendToBuffer(data) if uint64(len(mpu.Buffer)) >= p.UploadChunkSize { From f212507a95ef15d3455011eec602d75bde22e04f Mon Sep 17 00:00:00 2001 From: Michal Rostecki Date: Wed, 1 Jun 2022 14:15:36 +0200 Subject: [PATCH 02/10] plugins: Log on the plugin start Log the fact that a plugin is started. Signed-off-by: Michal Rostecki --- pkg/plugins/plugins.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/plugins/plugins.go b/pkg/plugins/plugins.go index d9ac253..8b1bdcf 100644 --- a/pkg/plugins/plugins.go +++ b/pkg/plugins/plugins.go @@ -3,6 +3,8 @@ package plugins import ( "context" "fmt" + "log" + "github.com/deepfence/PacketStreamer/pkg/config" "github.com/deepfence/PacketStreamer/pkg/plugins/kafka" "github.com/deepfence/PacketStreamer/pkg/plugins/s3" @@ -18,6 +20,7 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) { var plugins []chan<- string if config.Output.Plugins.S3 != nil { + log.Println("Starting S3 plugin") s3plugin, err := s3.NewPlugin(ctx, config.Output.Plugins.S3) if err != nil { @@ -29,6 +32,7 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) { } if config.Output.Plugins.Kafka != nil { + log.Println("Starting Kafka plugin") kafkaPlugin, err := kafka.NewPlugin(config.Output.Plugins.Kafka) if err != nil { From 9527ce43e738ce424f8095ddaf2bad7aac1a5a2a Mon Sep 17 00:00:00 2001 From: Michal Rostecki Date: Wed, 1 Jun 2022 14:16:29 +0200 Subject: [PATCH 03/10] plugins(s3): Use a pcap header Before this change, we were using some custom file header, not readable by tcpdump or any pcap tool. This change replaces it with a correct pcap header. Signed-off-by: Michal Rostecki --- pkg/plugins/plugins.go | 2 +- pkg/plugins/s3/s3.go | 38 +++++++++++++++++++++----------------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/pkg/plugins/plugins.go b/pkg/plugins/plugins.go index 8b1bdcf..e09d2b4 100644 --- a/pkg/plugins/plugins.go +++ b/pkg/plugins/plugins.go @@ -21,7 +21,7 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) { if config.Output.Plugins.S3 != nil { log.Println("Starting S3 plugin") - s3plugin, err := s3.NewPlugin(ctx, config.Output.Plugins.S3) + s3plugin, err := s3.NewPlugin(ctx, config) if err != nil { return nil, fmt.Errorf("error starting S3 plugin, %v", err) diff --git a/pkg/plugins/s3/s3.go b/pkg/plugins/s3/s3.go index 54a0c4a..49b5c1f 100644 --- a/pkg/plugins/s3/s3.go +++ b/pkg/plugins/s3/s3.go @@ -11,9 +11,10 @@ import ( awsConfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcapgo" "github.com/deepfence/PacketStreamer/pkg/config" - "github.com/deepfence/PacketStreamer/pkg/file" ) const ( @@ -24,6 +25,7 @@ type Plugin struct { S3Client *s3.Client Region string Bucket string + InputPacketLen int TotalFileSize uint64 UploadChunkSize uint64 UploadTimeout time.Duration @@ -37,8 +39,9 @@ type MultipartUpload struct { TotalDataSent int } -func NewPlugin(ctx context.Context, config *config.S3PluginConfig) (*Plugin, error) { - awsCfg, err := awsConfig.LoadDefaultConfig(ctx, awsConfig.WithRegion(config.Region)) +func NewPlugin(ctx context.Context, config *config.Config) (*Plugin, error) { + awsCfg, err := awsConfig.LoadDefaultConfig(ctx, + awsConfig.WithRegion(config.Output.Plugins.S3.Region)) if err != nil { return nil, fmt.Errorf("error loading AWS config when creating S3 client, %v", err) @@ -52,12 +55,12 @@ func NewPlugin(ctx context.Context, config *config.S3PluginConfig) (*Plugin, err return &Plugin{ S3Client: s3Client, - Region: config.Region, - Bucket: config.Bucket, - TotalFileSize: uint64(*config.TotalFileSize), - UploadChunkSize: uint64(*config.UploadChunkSize), - UploadTimeout: config.UploadTimeout, - CannedACL: config.CannedACL, + Region: config.Output.Plugins.S3.Region, + Bucket: config.Output.Plugins.S3.Bucket, + TotalFileSize: uint64(*config.Output.Plugins.S3.TotalFileSize), + UploadChunkSize: uint64(*config.Output.Plugins.S3.UploadChunkSize), + UploadTimeout: config.Output.Plugins.S3.UploadTimeout, + CannedACL: config.Output.Plugins.S3.CannedACL, }, nil } @@ -92,13 +95,6 @@ func (p *Plugin) Start(ctx context.Context) chan<- string { log.Printf("error creating multipart upload, stopping... - %v\n", err) return } - - mpu.appendToBuffer(file.Header) - - if err != nil { - log.Printf("error adding header to buffer, stopping... - %v\n", err) - return - } } data := []byte(chunk) mpu.appendToBuffer(data) @@ -227,5 +223,13 @@ func (p *Plugin) createMultipartUpload(ctx context.Context) (*MultipartUpload, e return nil, fmt.Errorf("error creating multipart upload, %v", err) } - return newMultipartUpload(output), nil + mpu := newMultipartUpload(output) + + var pcapBuffer bytes.Buffer + pcapWriter := pcapgo.NewWriter(&pcapBuffer) + pcapWriter.WriteFileHeader(uint32(p.InputPacketLen), layers.LinkTypeEthernet) + + mpu.appendToBuffer(pcapBuffer.Bytes()) + + return mpu, nil } From b33a90fffa79d43650153e3842b31157e3588545 Mon Sep 17 00:00:00 2001 From: Michal Rostecki Date: Wed, 1 Jun 2022 14:18:33 +0200 Subject: [PATCH 04/10] streamer(receiver): Fix the output loop break Before this change, breaking the output loop was ineffective. Signed-off-by: Michal Rostecki --- pkg/streamer/receiver.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/streamer/receiver.go b/pkg/streamer/receiver.go index bb3d8ed..d55c941 100644 --- a/pkg/streamer/receiver.go +++ b/pkg/streamer/receiver.go @@ -6,19 +6,20 @@ import ( "crypto/tls" "encoding/binary" "fmt" - "github.com/deepfence/PacketStreamer/pkg/plugins" "io" "log" "net" "os" "time" + "github.com/deepfence/PacketStreamer/pkg/plugins" + "github.com/deepfence/PacketStreamer/pkg/config" ) const ( - maxNumPkts = 100 - connTimeout = 60 + maxNumPkts = 100 + connTimeout = 60 ) func readDataFromSocket(hostConn net.Conn, dataBuff []byte, bytesToRead int) error { @@ -107,12 +108,13 @@ func readPkts(clientConn net.Conn, config *config.Config, pktUncompressChannel c } func receiverOutput(ctx context.Context, config *config.Config, consolePktOutputChannel chan string, pluginChan chan<- string) { +loop: for { select { case tmpData, chanExitVal := <-consolePktOutputChannel: if !chanExitVal { log.Println("Error while reading from output channel") - break + break loop } if pluginChan != nil { @@ -120,10 +122,10 @@ func receiverOutput(ctx context.Context, config *config.Config, consolePktOutput } if writeOutput(config, []byte(tmpData)) == 1 { - break + break loop } case <-ctx.Done(): - break + break loop } } } From 8c854c5177922d173a8f1b76426783f5ef9292d1 Mon Sep 17 00:00:00 2001 From: Michal Rostecki Date: Wed, 1 Jun 2022 14:20:44 +0200 Subject: [PATCH 05/10] sensor: Add a posssibility to output directly to plugins Before this change, using output plugins like S3 and Kafka was possible only from receiver, which adds additional complexity for users interested only in streaming packets directly to the external cloud storage. Sensors always had to send packets to some receiver first. This change makes it simplier by allowing to stream directly from receiver to plugins. Signed-off-by: Michal Rostecki --- cmd/sensor.go | 15 +++++-- pkg/streamer/interfaces.go | 6 +-- pkg/streamer/sensor.go | 84 +++++++++++++++++++++++++------------- 3 files changed, 70 insertions(+), 35 deletions(-) diff --git a/cmd/sensor.go b/cmd/sensor.go index 76b081b..5eb10a1 100644 --- a/cmd/sensor.go +++ b/cmd/sensor.go @@ -1,7 +1,11 @@ package cmd import ( + "context" "log" + "os" + "os/signal" + "syscall" "github.com/spf13/cobra" @@ -19,17 +23,20 @@ var sensorCmd = &cobra.Command{ log.Fatalf("Invalid configuration: %v", err) } - mainSignalChannel := make(chan bool) - proto := "tcp" if err := streamer.InitOutput(cfg, proto); err != nil { log.Fatalf("Failed to connect: %v", err) } + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + ctx, cancel := context.WithCancel(context.Background()) + log.Println("Start sending") - streamer.StartSensor(cfg, mainSignalChannel) + streamer.StartSensor(ctx, cfg) log.Println("Now waiting in main") - <-mainSignalChannel + <-sigs + cancel() }, } diff --git a/pkg/streamer/interfaces.go b/pkg/streamer/interfaces.go index 2252ebd..c5a4c23 100644 --- a/pkg/streamer/interfaces.go +++ b/pkg/streamer/interfaces.go @@ -86,7 +86,7 @@ func initAllInterfaces(config *config.Config) ([]*pcap.Handle, error) { return intfPtr, nil } -func grabInterface(config *config.Config, mainSignalChannel chan bool) chan intfPorts { +func grabInterface(ctx context.Context, config *config.Config) chan intfPorts { res := make(chan intfPorts) ticker := time.NewTicker(PROCESS_SCAN_FREQUENCY) go func() { @@ -96,7 +96,7 @@ func grabInterface(config *config.Config, mainSignalChannel chan bool) chan intf err := setupInterfacesAndPortMappings(config) if err != nil { select { - case <-mainSignalChannel: + case <-ctx.Done(): break case <-ticker.C: } @@ -112,7 +112,7 @@ func grabInterface(config *config.Config, mainSignalChannel chan bool) chan intf } } select { - case <-mainSignalChannel: + case <-ctx.Done(): break case <-ticker.C: } diff --git a/pkg/streamer/sensor.go b/pkg/streamer/sensor.go index bf89c8a..b9a9570 100644 --- a/pkg/streamer/sensor.go +++ b/pkg/streamer/sensor.go @@ -1,6 +1,7 @@ package streamer import ( + "context" "encoding/binary" "log" "net" @@ -11,9 +12,10 @@ import ( "github.com/google/gopacket/pcap" "github.com/deepfence/PacketStreamer/pkg/config" + "github.com/deepfence/PacketStreamer/pkg/plugins" ) -func StartSensor(config *config.Config, mainSignalChannel chan bool) { +func StartSensor(ctx context.Context, config *config.Config) { ticker := time.NewTicker(1 * time.Minute) go func() { for { @@ -24,41 +26,53 @@ func StartSensor(config *config.Config, mainSignalChannel chan bool) { } }() agentOutputChan := make(chan string, maxNumPkts) - go sensorOutput(config, agentOutputChan, mainSignalChannel) - go processIntfCapture(config, agentOutputChan, mainSignalChannel) + pluginChan, err := plugins.Start(ctx, config) + if err != nil { + // log but carry on, we still might want to see the receiver output despite the broken plugins + log.Println(err) + } + go sensorOutput(ctx, config, agentOutputChan) + go processIntfCapture(ctx, config, agentOutputChan, pluginChan) } -func sensorOutput(config *config.Config, agentPktOutputChannel chan string, mainSignalChannel chan bool) { +func sensorOutput(ctx context.Context, config *config.Config, agentPktOutputChan chan string) { outputErr := 0 payloadMarkerBuff := [...]byte{0x0, 0x0, 0x0, 0x0} dataToSend := make([]byte, config.MaxPayloadLen) copy(dataToSend[0:], hdrData[:]) + +loop: for { if outputErr == maxWriteAttempts { log.Printf("Error while writing %d packets to output. Giving up \n", maxWriteAttempts) break } - tmpData, chanExitVal := <-agentPktOutputChannel - if !chanExitVal { - log.Println("Error while reading from output channel") - break - } - outputData := []byte(tmpData) - outputDataLen := len(outputData) - startIdx := len(hdrData) - binary.LittleEndian.PutUint32(payloadMarkerBuff[:], uint32(outputDataLen)) - copy(dataToSend[startIdx:], payloadMarkerBuff[:]) - startIdx += len(payloadMarkerBuff) - copy(dataToSend[startIdx:], outputData[:]) - startIdx += outputDataLen - if writeOutput(config, dataToSend[0:startIdx]) == 1 { - break + select { + case tmpData, chanExitVal := <-agentPktOutputChan: + if !chanExitVal { + log.Println("Error while reading from output channel") + break loop + } + + outputData := []byte(tmpData) + outputDataLen := len(outputData) + startIdx := len(hdrData) + binary.LittleEndian.PutUint32(payloadMarkerBuff[:], uint32(outputDataLen)) + copy(dataToSend[startIdx:], payloadMarkerBuff[:]) + startIdx += len(payloadMarkerBuff) + copy(dataToSend[startIdx:], outputData[:]) + startIdx += outputDataLen + if writeOutput(config, dataToSend[0:startIdx]) == 1 { + break loop + } + case <-ctx.Done(): + break loop } } - mainSignalChannel <- true } -func gatherPkts(config *config.Config, pktGatherChannel, output chan string) { +func gatherPkts(config *config.Config, pktGatherChannel, compressChan chan string, + pluginChan chan<- string) { var totalLen = 0 var currLen = 0 @@ -75,25 +89,40 @@ func gatherPkts(config *config.Config, pktGatherChannel, output chan string) { tmpPacketData = []byte(tmpChanData) currLen = len(tmpPacketData) if (totalLen + currLen) > config.MaxGatherLen { + // NOTE(vadorovsky): Currently we output an uncompressed packet to + // two channels: + // * `compressChan` - to output the compressed packets to an another + // PacketStreamer server + // * `pluginChan` - to output the raw packets to plugins + // TODO(vadorovsky): We eventually want to compress plugin outputs + // as well. But there is no CLI tool for uncompressing S2. Probably + // the best thing to do would be providing a CLI in PacketStreamer + // to read S2-compressed pcap files. + select { + case compressChan <- string(packetData[:totalLen]): + default: + log.Println("Gather compression queue is full. Discarding") + } select { - case output <- string(packetData[0:totalLen]): + case pluginChan <- string(packetData[:totalLen]): default: log.Println("Gather output queue is full. Discarding") } totalLen = 0 } - copy(packetData[totalLen:], tmpPacketData[0:currLen]) + copy(packetData[totalLen:], tmpPacketData[:currLen]) totalLen += currLen } } -func processIntfCapture(config *config.Config, agentPktOutputChannel chan string, mainSignalChannel chan bool) { +func processIntfCapture(ctx context.Context, config *config.Config, + agentPktOutputChannel chan string, pluginChan chan<- string) { pktGatherChannel := make(chan string, maxNumPkts*500) pktCompressChannel := make(chan string, maxNumPkts) var wg sync.WaitGroup - go gatherPkts(config, pktGatherChannel, pktCompressChannel) + go gatherPkts(config, pktGatherChannel, pktCompressChannel, pluginChan) go compressPkts(config, pktCompressChannel, agentPktOutputChannel) if len(config.CapturePorts) == 0 && len(config.CaptureInterfacesPorts) == 0 { @@ -110,12 +139,12 @@ func processIntfCapture(config *config.Config, agentPktOutputChannel chan string } } else { capturing := make(map[string]*pcap.Handle) - toUpdate := grabInterface(config, mainSignalChannel) + toUpdate := grabInterface(ctx, config) for { var intfPorts intfPorts select { case intfPorts = <-toUpdate: - case <-mainSignalChannel: + case <-ctx.Done(): break } if capturing[intfPorts.name] == nil { @@ -147,5 +176,4 @@ func processIntfCapture(config *config.Config, agentPktOutputChannel chan string wg.Wait() close(pktGatherChannel) close(pktCompressChannel) - mainSignalChannel <- true } From 56f040ba36988e8527f310cadff74e5c6d0dc764 Mon Sep 17 00:00:00 2001 From: Michal Rostecki Date: Wed, 1 Jun 2022 14:42:01 +0200 Subject: [PATCH 06/10] streamer(common): Don't require an fd when plugin is enabled Before this change, PacketStreamer was always expecting some file descriptor to be available. That was forcing users wanting to output only to plugin(s) to configure a /dev/null file anyway. Then PacketStreamer was performing a wasteful job of writing to /dev/null. This change fixes that by allowing to configure only plugins and not writing to fd when it's not present. Signed-off-by: Michal Rostecki --- pkg/config/sensor.go | 4 +++- pkg/streamer/common.go | 13 +++++++------ pkg/streamer/receiver.go | 3 ++- pkg/streamer/sensor.go | 3 ++- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/pkg/config/sensor.go b/pkg/config/sensor.go index c941176..4032a6c 100644 --- a/pkg/config/sensor.go +++ b/pkg/config/sensor.go @@ -10,7 +10,9 @@ var ( ) func ValidateSensorConfig(config *Config) error { - if config.Output.File == nil && config.Output.Server == nil { + if config.Output.File == nil && config.Output.Server == nil && + (config.Output.Plugins == nil || + (config.Output.Plugins.S3 == nil && config.Output.Plugins.Kafka == nil)) { return ErrNoOutputConfigured } if config.Output.Server != nil && config.Output.Server.Port == nil { diff --git a/pkg/streamer/common.go b/pkg/streamer/common.go index 91b6769..5720e6a 100644 --- a/pkg/streamer/common.go +++ b/pkg/streamer/common.go @@ -26,7 +26,10 @@ var ( hdrData = [...]byte{0xde, 0xef, 0xec, 0xe0} ) -func writeOutput(config *config.Config, tmpData []byte) int { +func writeOutput(config *config.Config, tmpData []byte) error { + if outputFd == nil { + return nil + } var numAttempts = 0 reconnectAttempt := false @@ -38,15 +41,13 @@ func writeOutput(config *config.Config, tmpData []byte) int { reconnectAttempt = true err := InitOutput(config, "tcp") if err != nil { - log.Printf("Tried to reconnect but got: %v\n", err) - return 1 + return fmt.Errorf("tried to reconnect but got: %w", err) } log.Printf("Tried to write for %d times. Reconnecting once. \n", numAttempts) numAttempts = 0 continue } - log.Printf("Tried to write for %d times. Bailing out. \n", numAttempts) - return 1 + return fmt.Errorf("tried to write for %d times", numAttempts) } bytesWritten, err := outputFd.Write(tmpData[totalBytesWritten:]) @@ -63,7 +64,7 @@ func writeOutput(config *config.Config, tmpData []byte) int { continue } - return 0 + return nil } } diff --git a/pkg/streamer/receiver.go b/pkg/streamer/receiver.go index d55c941..87a1c13 100644 --- a/pkg/streamer/receiver.go +++ b/pkg/streamer/receiver.go @@ -121,7 +121,8 @@ loop: pluginChan <- tmpData } - if writeOutput(config, []byte(tmpData)) == 1 { + if err := writeOutput(config, []byte(tmpData)); err != nil { + log.Printf("Error while writing to output: %v\n", err) break loop } case <-ctx.Done(): diff --git a/pkg/streamer/sensor.go b/pkg/streamer/sensor.go index b9a9570..ff597bc 100644 --- a/pkg/streamer/sensor.go +++ b/pkg/streamer/sensor.go @@ -62,7 +62,8 @@ loop: startIdx += len(payloadMarkerBuff) copy(dataToSend[startIdx:], outputData[:]) startIdx += outputDataLen - if writeOutput(config, dataToSend[0:startIdx]) == 1 { + if err := writeOutput(config, dataToSend[0:startIdx]); err != nil { + log.Printf("Error while writing to output: %s\n", err) break loop } case <-ctx.Done(): From 557b97d22ae892987aef43af87415889efdcf9cc Mon Sep 17 00:00:00 2001 From: Michal Rostecki Date: Wed, 1 Jun 2022 14:45:10 +0200 Subject: [PATCH 07/10] go.mod: Update aws-sdk-go Signed-off-by: Michal Rostecki --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index e807d31..22d6a6f 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/deepfence/PacketStreamer go 1.17 require ( - github.com/aws/aws-sdk-go-v2 v1.16.2 + github.com/aws/aws-sdk-go-v2 v1.16.4 github.com/aws/aws-sdk-go-v2/config v1.15.3 github.com/aws/aws-sdk-go-v2/service/s3 v1.26.3 github.com/confluentinc/confluent-kafka-go v1.8.2 diff --git a/go.sum b/go.sum index 2086277..814f372 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/aws/aws-sdk-go-v2 v1.16.2 h1:fqlCk6Iy3bnCumtrLz9r3mJ/2gUT0pJ0wLFVIdWh+JA= github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU= +github.com/aws/aws-sdk-go-v2 v1.16.4 h1:swQTEQUyJF/UkEA94/Ga55miiKFoXmm/Zd67XHgmjSg= +github.com/aws/aws-sdk-go-v2 v1.16.4/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 h1:SdK4Ppk5IzLs64ZMvr6MrSficMtjY2oS0WOORXTlxwU= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1/go.mod h1:n8Bs1ElDD2wJ9kCRTczA83gYbBmjSwZp3umc6zF4EeM= github.com/aws/aws-sdk-go-v2/config v1.15.3 h1:5AlQD0jhVXlGzwo+VORKiUuogkG7pQcLJNzIzK7eodw= From 2f2f51da5894f0f6f1a88259ee10744e77fb6207 Mon Sep 17 00:00:00 2001 From: Michal Rostecki Date: Wed, 1 Jun 2022 14:46:06 +0200 Subject: [PATCH 08/10] contrib(config): Add an example config for sensor streaming to S3 Signed-off-by: Michal Rostecki --- contrib/config/sensor-s3.yaml | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 contrib/config/sensor-s3.yaml diff --git a/contrib/config/sensor-s3.yaml b/contrib/config/sensor-s3.yaml new file mode 100644 index 0000000..aa69bf2 --- /dev/null +++ b/contrib/config/sensor-s3.yaml @@ -0,0 +1,10 @@ +output: + plugins: + s3: + region: eu-west-1 + bucket: foo-pcap + totalFileSize: 10MB + uploadChunkSize: 5MB + uploadTimeout: 1m + cannedACL: bucket-owner-full-control +pcapMode: all From c26e6a69298f368c26c81e24139623864e21a7d4 Mon Sep 17 00:00:00 2001 From: Michal Rostecki Date: Wed, 1 Jun 2022 15:22:27 +0200 Subject: [PATCH 09/10] docs: Add section about S3 plugin Signed-off-by: Michal Rostecki --- docs/src/SUMMARY.md | 2 ++ docs/src/configuration.md | 32 ++++++++++++-------- docs/src/plugins/README.md | 15 +++++++++ docs/src/plugins/s3.md | 62 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 99 insertions(+), 12 deletions(-) create mode 100644 docs/src/plugins/README.md create mode 100644 docs/src/plugins/s3.md diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index 6a80a4f..c423857 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -7,6 +7,8 @@ - [Using with Docker](./quickstart/docker.md) - [Using on Kubernetes](./quickstart/kubernetes.md) - [Using on Vagrant](./quickstart/vagrant.md) +- [Plugins](./plugins/README.md) + - [S3](./plugins/s3.md) - [Using with other tools](./tools/README.md) - [Suricata](./tools/suricata.md) - [Configuration](./configuration.md) diff --git a/docs/src/configuration.md b/docs/src/configuration.md index b0a4427..22ed80b 100644 --- a/docs/src/configuration.md +++ b/docs/src/configuration.md @@ -3,29 +3,37 @@ `packetstreamer` is configured using a yaml-formatted configuration file. ```yaml -input: # required in 'receiver' mode +input: # required in 'receiver' mode address: _ip-address_ port: _listen-port_ output: - server: # required in 'sensor' mode + server: # required in 'sensor' mode address: _ip-address_ port: _listen-port_ - file: # required in 'receiver' mode - path: _filename_|stdout # 'stdout' is a reserved name. Receiver will write to stdout -tls: # optional + file: # required in 'receiver' mode + path: _filename_|stdout # 'stdout' is a reserved name. Receiver will write to stdout + plugins: # optional + s3: + bucket: _string_ + region: _string_ + totalFileSize: _file_size_ # optional; default: 10 MB + uploadChunkSize: _file_size_ # optional; default: 5 MB + uploadTimeout: _timeout_ # optional; default: 1m + cannedACL: _acl_ # optional; default: Bucket owner enforced +tls: # optional enable: _true_|_false_ certfile: _filename_ keyfile: _filename_ -auth: # optional; receiver and sensor must use same shared key +auth: # optional; receiver and sensor must use same shared key enable: _true_|_false_ key: _string_ -compressBlockSize: _integer_ # optional; default: 65 -inputPacketLen: _integer_ # optional; default: 65535 -logFilename: _filename_ # optional -pcapMode: _Allow_|_Deny_|_All_ # optional -capturePorts: _list-of-ports_ # optional +compressBlockSize: _integer_ # optional; default: 65 +inputPacketLen: _integer_ # optional; default: 65535 +logFilename: _filename_ # optional +pcapMode: _Allow_|_Deny_|_All_ # optional +capturePorts: _list-of-ports_ # optional captureInterfacesPorts: _map: interface-name:port_ # optional -ignorePorts: _list-of-ports_ # optional +ignorePorts: _list-of-ports_ # optional ``` You can find example configuration files in the [`/contrib/config/`](https://github.com/deepfence/PacketStreamer/tree/main/contrib/config) diff --git a/docs/src/plugins/README.md b/docs/src/plugins/README.md new file mode 100644 index 0000000..339cb5b --- /dev/null +++ b/docs/src/plugins/README.md @@ -0,0 +1,15 @@ +# Plugins + +This documentation section is about plugins which allow to stream packets to +various external storage services. + +Plugins can be used both from: + +- **sensor** - in that case, locally captured packets are streamed through the + plugin +- **receiver** - all packets retrieved from (potentially multiple) sensors are + streamed through the plugin + +Currently the plugins are: + +- [S3](./s3.md) diff --git a/docs/src/plugins/s3.md b/docs/src/plugins/s3.md new file mode 100644 index 0000000..e0fc5b0 --- /dev/null +++ b/docs/src/plugins/s3.md @@ -0,0 +1,62 @@ +# S3 + +The S3 plugins allows to stream packets to the given S3 buckets. + +## Configuration + +### AWS credentials + +Before running PacketStreamer, AWS credentials need to be configured by one of +the following ways: + +- `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables +- `~/.aws/config` file - it can be created by `aws configure` + +The first way might be more convenient when running as root (required when +running a sensor). + +### Configuration scheme + +S3 plugin configuration has the following syntax: + +```yaml +output: + plugins: # optional + s3: + bucket: _string_ + region: _string_ + totalFileSize: _file_size_ # optional; default: 10 MB + uploadChunkSize: _file_size_ # optional; default: 5 MB + uploadTimeout: _timeout_ # optional; default: 1m + cannedACL: _acl_ # optional; default: Bucket owner enforced +``` + +### Sensor configuration + +If you want to stream locally captured packets from sensor to S3, you can use +the following example configuration from +[contrib/config/sensor-s3.yaml](https://raw.githubusercontent.com/deepfence/PacketStreamer/main/contrib/config/sensor-s3.yaml): + +```yaml +{{#rustdoc_include ../../../contrib/config/sensor-s3.yaml}} +``` + +And run PacketStreamer with it: + +```bash +sudo packetstreamer sensor --config ./contrib/config/sensor-s3.yaml +``` + +### Receiver configuration + +If you want to stream packets from receiver to S3, you can use the following +example configuration from +[contrib/config/receiver-s3.yaml] + +```yaml +{{#rustdoc_include ../../../contrib/config/receiver-s3.yaml}} +``` + +```bash +packetstreamer receiver --config ./contrib/config/receiver-s3.yaml +``` \ No newline at end of file From e9dd2141c4caa1d3ee9796a3de98cb4fb3409fff Mon Sep 17 00:00:00 2001 From: Michal Rostecki Date: Wed, 1 Jun 2022 16:07:31 +0200 Subject: [PATCH 10/10] contrib(config): Update receiver S3 config Using /dev/null file is not needed anymore. Signed-off-by: Michal Rostecki --- contrib/config/receiver-s3.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/contrib/config/receiver-s3.yaml b/contrib/config/receiver-s3.yaml index 77cb8a1..e48c775 100644 --- a/contrib/config/receiver-s3.yaml +++ b/contrib/config/receiver-s3.yaml @@ -2,8 +2,6 @@ input: address: 0.0.0.0 port: 8081 output: - file: - path: /dev/null plugins: s3: region: eu-west-1