diff --git a/cmd/sensor.go b/cmd/sensor.go index 76b081b..5eb10a1 100644 --- a/cmd/sensor.go +++ b/cmd/sensor.go @@ -1,7 +1,11 @@ package cmd import ( + "context" "log" + "os" + "os/signal" + "syscall" "github.com/spf13/cobra" @@ -19,17 +23,20 @@ var sensorCmd = &cobra.Command{ log.Fatalf("Invalid configuration: %v", err) } - mainSignalChannel := make(chan bool) - proto := "tcp" if err := streamer.InitOutput(cfg, proto); err != nil { log.Fatalf("Failed to connect: %v", err) } + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + ctx, cancel := context.WithCancel(context.Background()) + log.Println("Start sending") - streamer.StartSensor(cfg, mainSignalChannel) + streamer.StartSensor(ctx, cfg) log.Println("Now waiting in main") - <-mainSignalChannel + <-sigs + cancel() }, } diff --git a/contrib/config/receiver-s3.yaml b/contrib/config/receiver-s3.yaml index 77cb8a1..e48c775 100644 --- a/contrib/config/receiver-s3.yaml +++ b/contrib/config/receiver-s3.yaml @@ -2,8 +2,6 @@ input: address: 0.0.0.0 port: 8081 output: - file: - path: /dev/null plugins: s3: region: eu-west-1 diff --git a/contrib/config/sensor-s3.yaml b/contrib/config/sensor-s3.yaml new file mode 100644 index 0000000..aa69bf2 --- /dev/null +++ b/contrib/config/sensor-s3.yaml @@ -0,0 +1,10 @@ +output: + plugins: + s3: + region: eu-west-1 + bucket: foo-pcap + totalFileSize: 10MB + uploadChunkSize: 5MB + uploadTimeout: 1m + cannedACL: bucket-owner-full-control +pcapMode: all diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index 6a80a4f..c423857 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -7,6 +7,8 @@ - [Using with Docker](./quickstart/docker.md) - [Using on Kubernetes](./quickstart/kubernetes.md) - [Using on Vagrant](./quickstart/vagrant.md) +- [Plugins](./plugins/README.md) + - [S3](./plugins/s3.md) - [Using with other tools](./tools/README.md) - [Suricata](./tools/suricata.md) - [Configuration](./configuration.md) diff --git a/docs/src/configuration.md b/docs/src/configuration.md index b0a4427..22ed80b 100644 --- a/docs/src/configuration.md +++ b/docs/src/configuration.md @@ -3,29 +3,37 @@ `packetstreamer` is configured using a yaml-formatted configuration file. ```yaml -input: # required in 'receiver' mode +input: # required in 'receiver' mode address: _ip-address_ port: _listen-port_ output: - server: # required in 'sensor' mode + server: # required in 'sensor' mode address: _ip-address_ port: _listen-port_ - file: # required in 'receiver' mode - path: _filename_|stdout # 'stdout' is a reserved name. Receiver will write to stdout -tls: # optional + file: # required in 'receiver' mode + path: _filename_|stdout # 'stdout' is a reserved name. Receiver will write to stdout + plugins: # optional + s3: + bucket: _string_ + region: _string_ + totalFileSize: _file_size_ # optional; default: 10 MB + uploadChunkSize: _file_size_ # optional; default: 5 MB + uploadTimeout: _timeout_ # optional; default: 1m + cannedACL: _acl_ # optional; default: Bucket owner enforced +tls: # optional enable: _true_|_false_ certfile: _filename_ keyfile: _filename_ -auth: # optional; receiver and sensor must use same shared key +auth: # optional; receiver and sensor must use same shared key enable: _true_|_false_ key: _string_ -compressBlockSize: _integer_ # optional; default: 65 -inputPacketLen: _integer_ # optional; default: 65535 -logFilename: _filename_ # optional -pcapMode: _Allow_|_Deny_|_All_ # optional -capturePorts: _list-of-ports_ # optional +compressBlockSize: _integer_ # optional; default: 65 +inputPacketLen: _integer_ # optional; default: 65535 +logFilename: _filename_ # optional +pcapMode: _Allow_|_Deny_|_All_ # optional +capturePorts: _list-of-ports_ # optional captureInterfacesPorts: _map: interface-name:port_ # optional -ignorePorts: _list-of-ports_ # optional +ignorePorts: _list-of-ports_ # optional ``` You can find example configuration files in the [`/contrib/config/`](https://github.com/deepfence/PacketStreamer/tree/main/contrib/config) diff --git a/docs/src/plugins/README.md b/docs/src/plugins/README.md new file mode 100644 index 0000000..339cb5b --- /dev/null +++ b/docs/src/plugins/README.md @@ -0,0 +1,15 @@ +# Plugins + +This documentation section is about plugins which allow to stream packets to +various external storage services. + +Plugins can be used both from: + +- **sensor** - in that case, locally captured packets are streamed through the + plugin +- **receiver** - all packets retrieved from (potentially multiple) sensors are + streamed through the plugin + +Currently the plugins are: + +- [S3](./s3.md) diff --git a/docs/src/plugins/s3.md b/docs/src/plugins/s3.md new file mode 100644 index 0000000..e0fc5b0 --- /dev/null +++ b/docs/src/plugins/s3.md @@ -0,0 +1,62 @@ +# S3 + +The S3 plugins allows to stream packets to the given S3 buckets. + +## Configuration + +### AWS credentials + +Before running PacketStreamer, AWS credentials need to be configured by one of +the following ways: + +- `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables +- `~/.aws/config` file - it can be created by `aws configure` + +The first way might be more convenient when running as root (required when +running a sensor). + +### Configuration scheme + +S3 plugin configuration has the following syntax: + +```yaml +output: + plugins: # optional + s3: + bucket: _string_ + region: _string_ + totalFileSize: _file_size_ # optional; default: 10 MB + uploadChunkSize: _file_size_ # optional; default: 5 MB + uploadTimeout: _timeout_ # optional; default: 1m + cannedACL: _acl_ # optional; default: Bucket owner enforced +``` + +### Sensor configuration + +If you want to stream locally captured packets from sensor to S3, you can use +the following example configuration from +[contrib/config/sensor-s3.yaml](https://raw.githubusercontent.com/deepfence/PacketStreamer/main/contrib/config/sensor-s3.yaml): + +```yaml +{{#rustdoc_include ../../../contrib/config/sensor-s3.yaml}} +``` + +And run PacketStreamer with it: + +```bash +sudo packetstreamer sensor --config ./contrib/config/sensor-s3.yaml +``` + +### Receiver configuration + +If you want to stream packets from receiver to S3, you can use the following +example configuration from +[contrib/config/receiver-s3.yaml] + +```yaml +{{#rustdoc_include ../../../contrib/config/receiver-s3.yaml}} +``` + +```bash +packetstreamer receiver --config ./contrib/config/receiver-s3.yaml +``` \ No newline at end of file diff --git a/go.mod b/go.mod index e807d31..22d6a6f 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/deepfence/PacketStreamer go 1.17 require ( - github.com/aws/aws-sdk-go-v2 v1.16.2 + github.com/aws/aws-sdk-go-v2 v1.16.4 github.com/aws/aws-sdk-go-v2/config v1.15.3 github.com/aws/aws-sdk-go-v2/service/s3 v1.26.3 github.com/confluentinc/confluent-kafka-go v1.8.2 diff --git a/go.sum b/go.sum index 2086277..814f372 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/aws/aws-sdk-go-v2 v1.16.2 h1:fqlCk6Iy3bnCumtrLz9r3mJ/2gUT0pJ0wLFVIdWh+JA= github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU= +github.com/aws/aws-sdk-go-v2 v1.16.4 h1:swQTEQUyJF/UkEA94/Ga55miiKFoXmm/Zd67XHgmjSg= +github.com/aws/aws-sdk-go-v2 v1.16.4/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1 h1:SdK4Ppk5IzLs64ZMvr6MrSficMtjY2oS0WOORXTlxwU= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.1/go.mod h1:n8Bs1ElDD2wJ9kCRTczA83gYbBmjSwZp3umc6zF4EeM= github.com/aws/aws-sdk-go-v2/config v1.15.3 h1:5AlQD0jhVXlGzwo+VORKiUuogkG7pQcLJNzIzK7eodw= diff --git a/pkg/config/sensor.go b/pkg/config/sensor.go index c941176..4032a6c 100644 --- a/pkg/config/sensor.go +++ b/pkg/config/sensor.go @@ -10,7 +10,9 @@ var ( ) func ValidateSensorConfig(config *Config) error { - if config.Output.File == nil && config.Output.Server == nil { + if config.Output.File == nil && config.Output.Server == nil && + (config.Output.Plugins == nil || + (config.Output.Plugins.S3 == nil && config.Output.Plugins.Kafka == nil)) { return ErrNoOutputConfigured } if config.Output.Server != nil && config.Output.Server.Port == nil { diff --git a/pkg/plugins/plugins.go b/pkg/plugins/plugins.go index d9ac253..e09d2b4 100644 --- a/pkg/plugins/plugins.go +++ b/pkg/plugins/plugins.go @@ -3,6 +3,8 @@ package plugins import ( "context" "fmt" + "log" + "github.com/deepfence/PacketStreamer/pkg/config" "github.com/deepfence/PacketStreamer/pkg/plugins/kafka" "github.com/deepfence/PacketStreamer/pkg/plugins/s3" @@ -18,7 +20,8 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) { var plugins []chan<- string if config.Output.Plugins.S3 != nil { - s3plugin, err := s3.NewPlugin(ctx, config.Output.Plugins.S3) + log.Println("Starting S3 plugin") + s3plugin, err := s3.NewPlugin(ctx, config) if err != nil { return nil, fmt.Errorf("error starting S3 plugin, %v", err) @@ -29,6 +32,7 @@ func Start(ctx context.Context, config *config.Config) (chan<- string, error) { } if config.Output.Plugins.Kafka != nil { + log.Println("Starting Kafka plugin") kafkaPlugin, err := kafka.NewPlugin(config.Output.Plugins.Kafka) if err != nil { diff --git a/pkg/plugins/s3/s3.go b/pkg/plugins/s3/s3.go index 6beb613..49b5c1f 100644 --- a/pkg/plugins/s3/s3.go +++ b/pkg/plugins/s3/s3.go @@ -3,16 +3,18 @@ package s3 import ( "bytes" "context" - "encoding/binary" "fmt" + "log" + "time" + "github.com/aws/aws-sdk-go-v2/aws" awsConfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcapgo" + "github.com/deepfence/PacketStreamer/pkg/config" - "github.com/deepfence/PacketStreamer/pkg/file" - "log" - "time" ) const ( @@ -23,6 +25,7 @@ type Plugin struct { S3Client *s3.Client Region string Bucket string + InputPacketLen int TotalFileSize uint64 UploadChunkSize uint64 UploadTimeout time.Duration @@ -36,8 +39,9 @@ type MultipartUpload struct { TotalDataSent int } -func NewPlugin(ctx context.Context, config *config.S3PluginConfig) (*Plugin, error) { - awsCfg, err := awsConfig.LoadDefaultConfig(ctx, awsConfig.WithRegion(config.Region)) +func NewPlugin(ctx context.Context, config *config.Config) (*Plugin, error) { + awsCfg, err := awsConfig.LoadDefaultConfig(ctx, + awsConfig.WithRegion(config.Output.Plugins.S3.Region)) if err != nil { return nil, fmt.Errorf("error loading AWS config when creating S3 client, %v", err) @@ -51,12 +55,12 @@ func NewPlugin(ctx context.Context, config *config.S3PluginConfig) (*Plugin, err return &Plugin{ S3Client: s3Client, - Region: config.Region, - Bucket: config.Bucket, - TotalFileSize: uint64(*config.TotalFileSize), - UploadChunkSize: uint64(*config.UploadChunkSize), - UploadTimeout: config.UploadTimeout, - CannedACL: config.CannedACL, + Region: config.Output.Plugins.S3.Region, + Bucket: config.Output.Plugins.S3.Bucket, + TotalFileSize: uint64(*config.Output.Plugins.S3.TotalFileSize), + UploadChunkSize: uint64(*config.Output.Plugins.S3.UploadChunkSize), + UploadTimeout: config.Output.Plugins.S3.UploadTimeout, + CannedACL: config.Output.Plugins.S3.CannedACL, }, nil } @@ -78,7 +82,6 @@ func (mpu *MultipartUpload) appendToBuffer(data []byte) { func (p *Plugin) Start(ctx context.Context) chan<- string { inputChan := make(chan string) go func() { - payloadMarker := []byte{0x0, 0x0, 0x0, 0x0} var mpu *MultipartUpload for { @@ -92,18 +95,8 @@ func (p *Plugin) Start(ctx context.Context) chan<- string { log.Printf("error creating multipart upload, stopping... - %v\n", err) return } - - mpu.appendToBuffer(file.Header) - - if err != nil { - log.Printf("error adding header to buffer, stopping... - %v\n", err) - return - } } data := []byte(chunk) - dataLen := len(data) - binary.LittleEndian.PutUint32(payloadMarker[:], uint32(dataLen)) - mpu.appendToBuffer(payloadMarker) mpu.appendToBuffer(data) if uint64(len(mpu.Buffer)) >= p.UploadChunkSize { @@ -230,5 +223,13 @@ func (p *Plugin) createMultipartUpload(ctx context.Context) (*MultipartUpload, e return nil, fmt.Errorf("error creating multipart upload, %v", err) } - return newMultipartUpload(output), nil + mpu := newMultipartUpload(output) + + var pcapBuffer bytes.Buffer + pcapWriter := pcapgo.NewWriter(&pcapBuffer) + pcapWriter.WriteFileHeader(uint32(p.InputPacketLen), layers.LinkTypeEthernet) + + mpu.appendToBuffer(pcapBuffer.Bytes()) + + return mpu, nil } diff --git a/pkg/streamer/common.go b/pkg/streamer/common.go index 91b6769..5720e6a 100644 --- a/pkg/streamer/common.go +++ b/pkg/streamer/common.go @@ -26,7 +26,10 @@ var ( hdrData = [...]byte{0xde, 0xef, 0xec, 0xe0} ) -func writeOutput(config *config.Config, tmpData []byte) int { +func writeOutput(config *config.Config, tmpData []byte) error { + if outputFd == nil { + return nil + } var numAttempts = 0 reconnectAttempt := false @@ -38,15 +41,13 @@ func writeOutput(config *config.Config, tmpData []byte) int { reconnectAttempt = true err := InitOutput(config, "tcp") if err != nil { - log.Printf("Tried to reconnect but got: %v\n", err) - return 1 + return fmt.Errorf("tried to reconnect but got: %w", err) } log.Printf("Tried to write for %d times. Reconnecting once. \n", numAttempts) numAttempts = 0 continue } - log.Printf("Tried to write for %d times. Bailing out. \n", numAttempts) - return 1 + return fmt.Errorf("tried to write for %d times", numAttempts) } bytesWritten, err := outputFd.Write(tmpData[totalBytesWritten:]) @@ -63,7 +64,7 @@ func writeOutput(config *config.Config, tmpData []byte) int { continue } - return 0 + return nil } } diff --git a/pkg/streamer/interfaces.go b/pkg/streamer/interfaces.go index 2252ebd..c5a4c23 100644 --- a/pkg/streamer/interfaces.go +++ b/pkg/streamer/interfaces.go @@ -86,7 +86,7 @@ func initAllInterfaces(config *config.Config) ([]*pcap.Handle, error) { return intfPtr, nil } -func grabInterface(config *config.Config, mainSignalChannel chan bool) chan intfPorts { +func grabInterface(ctx context.Context, config *config.Config) chan intfPorts { res := make(chan intfPorts) ticker := time.NewTicker(PROCESS_SCAN_FREQUENCY) go func() { @@ -96,7 +96,7 @@ func grabInterface(config *config.Config, mainSignalChannel chan bool) chan intf err := setupInterfacesAndPortMappings(config) if err != nil { select { - case <-mainSignalChannel: + case <-ctx.Done(): break case <-ticker.C: } @@ -112,7 +112,7 @@ func grabInterface(config *config.Config, mainSignalChannel chan bool) chan intf } } select { - case <-mainSignalChannel: + case <-ctx.Done(): break case <-ticker.C: } diff --git a/pkg/streamer/receiver.go b/pkg/streamer/receiver.go index bb3d8ed..87a1c13 100644 --- a/pkg/streamer/receiver.go +++ b/pkg/streamer/receiver.go @@ -6,19 +6,20 @@ import ( "crypto/tls" "encoding/binary" "fmt" - "github.com/deepfence/PacketStreamer/pkg/plugins" "io" "log" "net" "os" "time" + "github.com/deepfence/PacketStreamer/pkg/plugins" + "github.com/deepfence/PacketStreamer/pkg/config" ) const ( - maxNumPkts = 100 - connTimeout = 60 + maxNumPkts = 100 + connTimeout = 60 ) func readDataFromSocket(hostConn net.Conn, dataBuff []byte, bytesToRead int) error { @@ -107,23 +108,25 @@ func readPkts(clientConn net.Conn, config *config.Config, pktUncompressChannel c } func receiverOutput(ctx context.Context, config *config.Config, consolePktOutputChannel chan string, pluginChan chan<- string) { +loop: for { select { case tmpData, chanExitVal := <-consolePktOutputChannel: if !chanExitVal { log.Println("Error while reading from output channel") - break + break loop } if pluginChan != nil { pluginChan <- tmpData } - if writeOutput(config, []byte(tmpData)) == 1 { - break + if err := writeOutput(config, []byte(tmpData)); err != nil { + log.Printf("Error while writing to output: %v\n", err) + break loop } case <-ctx.Done(): - break + break loop } } } diff --git a/pkg/streamer/sensor.go b/pkg/streamer/sensor.go index bf89c8a..ff597bc 100644 --- a/pkg/streamer/sensor.go +++ b/pkg/streamer/sensor.go @@ -1,6 +1,7 @@ package streamer import ( + "context" "encoding/binary" "log" "net" @@ -11,9 +12,10 @@ import ( "github.com/google/gopacket/pcap" "github.com/deepfence/PacketStreamer/pkg/config" + "github.com/deepfence/PacketStreamer/pkg/plugins" ) -func StartSensor(config *config.Config, mainSignalChannel chan bool) { +func StartSensor(ctx context.Context, config *config.Config) { ticker := time.NewTicker(1 * time.Minute) go func() { for { @@ -24,41 +26,54 @@ func StartSensor(config *config.Config, mainSignalChannel chan bool) { } }() agentOutputChan := make(chan string, maxNumPkts) - go sensorOutput(config, agentOutputChan, mainSignalChannel) - go processIntfCapture(config, agentOutputChan, mainSignalChannel) + pluginChan, err := plugins.Start(ctx, config) + if err != nil { + // log but carry on, we still might want to see the receiver output despite the broken plugins + log.Println(err) + } + go sensorOutput(ctx, config, agentOutputChan) + go processIntfCapture(ctx, config, agentOutputChan, pluginChan) } -func sensorOutput(config *config.Config, agentPktOutputChannel chan string, mainSignalChannel chan bool) { +func sensorOutput(ctx context.Context, config *config.Config, agentPktOutputChan chan string) { outputErr := 0 payloadMarkerBuff := [...]byte{0x0, 0x0, 0x0, 0x0} dataToSend := make([]byte, config.MaxPayloadLen) copy(dataToSend[0:], hdrData[:]) + +loop: for { if outputErr == maxWriteAttempts { log.Printf("Error while writing %d packets to output. Giving up \n", maxWriteAttempts) break } - tmpData, chanExitVal := <-agentPktOutputChannel - if !chanExitVal { - log.Println("Error while reading from output channel") - break - } - outputData := []byte(tmpData) - outputDataLen := len(outputData) - startIdx := len(hdrData) - binary.LittleEndian.PutUint32(payloadMarkerBuff[:], uint32(outputDataLen)) - copy(dataToSend[startIdx:], payloadMarkerBuff[:]) - startIdx += len(payloadMarkerBuff) - copy(dataToSend[startIdx:], outputData[:]) - startIdx += outputDataLen - if writeOutput(config, dataToSend[0:startIdx]) == 1 { - break + select { + case tmpData, chanExitVal := <-agentPktOutputChan: + if !chanExitVal { + log.Println("Error while reading from output channel") + break loop + } + + outputData := []byte(tmpData) + outputDataLen := len(outputData) + startIdx := len(hdrData) + binary.LittleEndian.PutUint32(payloadMarkerBuff[:], uint32(outputDataLen)) + copy(dataToSend[startIdx:], payloadMarkerBuff[:]) + startIdx += len(payloadMarkerBuff) + copy(dataToSend[startIdx:], outputData[:]) + startIdx += outputDataLen + if err := writeOutput(config, dataToSend[0:startIdx]); err != nil { + log.Printf("Error while writing to output: %s\n", err) + break loop + } + case <-ctx.Done(): + break loop } } - mainSignalChannel <- true } -func gatherPkts(config *config.Config, pktGatherChannel, output chan string) { +func gatherPkts(config *config.Config, pktGatherChannel, compressChan chan string, + pluginChan chan<- string) { var totalLen = 0 var currLen = 0 @@ -75,25 +90,40 @@ func gatherPkts(config *config.Config, pktGatherChannel, output chan string) { tmpPacketData = []byte(tmpChanData) currLen = len(tmpPacketData) if (totalLen + currLen) > config.MaxGatherLen { + // NOTE(vadorovsky): Currently we output an uncompressed packet to + // two channels: + // * `compressChan` - to output the compressed packets to an another + // PacketStreamer server + // * `pluginChan` - to output the raw packets to plugins + // TODO(vadorovsky): We eventually want to compress plugin outputs + // as well. But there is no CLI tool for uncompressing S2. Probably + // the best thing to do would be providing a CLI in PacketStreamer + // to read S2-compressed pcap files. + select { + case compressChan <- string(packetData[:totalLen]): + default: + log.Println("Gather compression queue is full. Discarding") + } select { - case output <- string(packetData[0:totalLen]): + case pluginChan <- string(packetData[:totalLen]): default: log.Println("Gather output queue is full. Discarding") } totalLen = 0 } - copy(packetData[totalLen:], tmpPacketData[0:currLen]) + copy(packetData[totalLen:], tmpPacketData[:currLen]) totalLen += currLen } } -func processIntfCapture(config *config.Config, agentPktOutputChannel chan string, mainSignalChannel chan bool) { +func processIntfCapture(ctx context.Context, config *config.Config, + agentPktOutputChannel chan string, pluginChan chan<- string) { pktGatherChannel := make(chan string, maxNumPkts*500) pktCompressChannel := make(chan string, maxNumPkts) var wg sync.WaitGroup - go gatherPkts(config, pktGatherChannel, pktCompressChannel) + go gatherPkts(config, pktGatherChannel, pktCompressChannel, pluginChan) go compressPkts(config, pktCompressChannel, agentPktOutputChannel) if len(config.CapturePorts) == 0 && len(config.CaptureInterfacesPorts) == 0 { @@ -110,12 +140,12 @@ func processIntfCapture(config *config.Config, agentPktOutputChannel chan string } } else { capturing := make(map[string]*pcap.Handle) - toUpdate := grabInterface(config, mainSignalChannel) + toUpdate := grabInterface(ctx, config) for { var intfPorts intfPorts select { case intfPorts = <-toUpdate: - case <-mainSignalChannel: + case <-ctx.Done(): break } if capturing[intfPorts.name] == nil { @@ -147,5 +177,4 @@ func processIntfCapture(config *config.Config, agentPktOutputChannel chan string wg.Wait() close(pktGatherChannel) close(pktCompressChannel) - mainSignalChannel <- true }