Skip to content

Commit

Permalink
Fix static check lints on worker
Browse files Browse the repository at this point in the history
  • Loading branch information
noboruma committed Nov 1, 2023
1 parent a04ce4a commit 4bef4a5
Show file tree
Hide file tree
Showing 16 changed files with 129 additions and 129 deletions.
5 changes: 4 additions & 1 deletion deepfence_worker/cronjobs/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ func CheckAgentUpgrade(ctx context.Context, task *asynq.Task) error {
log.Info().Msg("Start agent version check")

res := []map[string]interface{}{}
getVersionMetadata("https://api.github.com/repos/deepfence/ThreatMapper/tags", &res)
err := getVersionMetadata("https://api.github.com/repos/deepfence/ThreatMapper/tags", &res)
if err != nil {
return err
}

tags_to_ingest := []string{}
for _, tag := range res {
Expand Down
4 changes: 0 additions & 4 deletions deepfence_worker/cronjobs/cloud_compliance.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,6 @@ func CachePostureProviders(ctx context.Context, task *asynq.Task) error {
RETURN count(distinct c)`

} else if postureProviderName == model.PostureProviderAWSOrg || postureProviderName == model.PostureProviderGCPOrg {
postureProviderName := model.PostureProviderGCP
if postureProviderName == model.PostureProviderAWSOrg {
postureProviderName = model.PostureProviderAWS
}
postureProvider.NodeLabel = "Organizations"

account_count_query = `
Expand Down
13 changes: 10 additions & 3 deletions deepfence_worker/cronjobs/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"database/sql"
"encoding/json"
"errors"
reporters_search "github.com/deepfence/ThreatMapper/deepfence_server/reporters/search"
"strconv"
"sync"
"time"

reporters_search "github.com/deepfence/ThreatMapper/deepfence_server/reporters/search"

"github.com/deepfence/ThreatMapper/deepfence_server/model"
"github.com/deepfence/ThreatMapper/deepfence_server/pkg/integration"
"github.com/deepfence/ThreatMapper/deepfence_server/reporters"
Expand Down Expand Up @@ -133,7 +134,10 @@ func SendNotifications(ctx context.Context, task *asynq.Task) error {
},
}
}
pgClient.UpdateIntegrationStatus(ctx, params)
err = pgClient.UpdateIntegrationStatus(ctx, params)
if err != nil {
log.Error().Msg(err.Error())
}
}
}(integrationRow)
}
Expand Down Expand Up @@ -199,7 +203,7 @@ func injectNodeDatamap(results []map[string]interface{}, common model.ScanResult

if _, ok := r["updated_at"]; ok {
flag := integration.IsMessagingFormat(integrationType)
if flag == true {
if flag {
ts := r["updated_at"].(int64)
tm := time.Unix(0, ts*int64(time.Millisecond))
r["updated_at"] = tm
Expand Down Expand Up @@ -275,6 +279,9 @@ func processIntegration[T any](ctx context.Context, task *asynq.Task, integratio
results, common, err := reporters_scan.GetScanResults[T](ctx,
utils.DetectedNodeScanType[integrationRow.Resource], scan.ScanId,
filters.FieldsFilters, model.FetchWindow{})
if err != nil {
return err
}
totalQueryTime = totalQueryTime + time.Since(profileStart).Milliseconds()

if len(results) == 0 {
Expand Down
4 changes: 2 additions & 2 deletions deepfence_worker/cronjobs/reports.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ func CleanUpReports(ctx context.Context, task *asynq.Task) error {
cleanup(minioReportsPrefix)

// delete the reports which are in failed state
deleteFailedReports(ctx, session)
err = deleteFailedReports(ctx, session)

log.Info().Msg("Complete reports cleanup")

return nil
return err
}

func deleteReport(ctx context.Context, session neo4j.Session, path string) error {
Expand Down
100 changes: 50 additions & 50 deletions deepfence_worker/cronscheduler/init_neo4j.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,75 +22,75 @@ func initNeo4jDatabase(ctx context.Context) error {
session := nc.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()

session.Run("CREATE CONSTRAINT ON (n:CloudProvider) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:CloudRegion) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:AgentVersion) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:KubernetesCluster) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:ContainerImage) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:ImageStub) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:ImageTag) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:Node) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:Container) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:Pod) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:Process) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:Secret) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:SecretRule) ASSERT n.rule_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:Malware) ASSERT n.malware_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:MalwareRule) ASSERT n.rule_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:Vulnerability) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:VulnerabilityStub) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:SecurityGroup) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:CloudNode) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:CloudResource) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:RegistryAccount) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:Compliance) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:ComplianceRule) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:CloudCompliance) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:AgentDiagnosticLogs) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:CloudScannerDiagnosticLogs) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:CloudComplianceExecutable) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:CloudComplianceControl) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run("CREATE CONSTRAINT ON (n:CloudComplianceBenchmark) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_SECRET_SCAN), map[string]interface{}{})
session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_VULNERABILITY_SCAN), map[string]interface{}{})
session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_COMPLIANCE_SCAN), map[string]interface{}{})
session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_CLOUD_COMPLIANCE_SCAN), map[string]interface{}{})
session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_MALWARE_SCAN), map[string]interface{}{})
session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:Bulk%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_SECRET_SCAN), map[string]interface{}{})
session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:Bulk%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_VULNERABILITY_SCAN), map[string]interface{}{})
session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:Bulk%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_COMPLIANCE_SCAN), map[string]interface{}{})
session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:Bulk%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_CLOUD_COMPLIANCE_SCAN), map[string]interface{}{})
session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:Bulk%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_MALWARE_SCAN), map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:CloudProvider) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:CloudRegion) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:AgentVersion) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:KubernetesCluster) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:ContainerImage) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:ImageStub) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:ImageTag) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:Node) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:Container) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:Pod) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:Process) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:Secret) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:SecretRule) ASSERT n.rule_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:Malware) ASSERT n.malware_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:MalwareRule) ASSERT n.rule_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:Vulnerability) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:VulnerabilityStub) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:SecurityGroup) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:CloudNode) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:CloudResource) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:RegistryAccount) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:Compliance) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:ComplianceRule) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:CloudCompliance) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:AgentDiagnosticLogs) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:CloudScannerDiagnosticLogs) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:CloudComplianceExecutable) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:CloudComplianceControl) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run("CREATE CONSTRAINT ON (n:CloudComplianceBenchmark) ASSERT n.node_id IS UNIQUE", map[string]interface{}{})
_, _ = session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_SECRET_SCAN), map[string]interface{}{})
_, _ = session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_VULNERABILITY_SCAN), map[string]interface{}{})
_, _ = session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_COMPLIANCE_SCAN), map[string]interface{}{})
_, _ = session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_CLOUD_COMPLIANCE_SCAN), map[string]interface{}{})
_, _ = session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_MALWARE_SCAN), map[string]interface{}{})
_, _ = session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:Bulk%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_SECRET_SCAN), map[string]interface{}{})
_, _ = session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:Bulk%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_VULNERABILITY_SCAN), map[string]interface{}{})
_, _ = session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:Bulk%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_COMPLIANCE_SCAN), map[string]interface{}{})
_, _ = session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:Bulk%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_CLOUD_COMPLIANCE_SCAN), map[string]interface{}{})
_, _ = session.Run(fmt.Sprintf("CREATE CONSTRAINT ON (n:Bulk%s) ASSERT n.node_id IS UNIQUE", utils.NEO4J_MALWARE_SCAN), map[string]interface{}{})

session.Run("MERGE (n:Node{node_id:'in-the-internet'}) SET n.node_name='The Internet (Inbound)', n.pseudo=true, n.cloud_provider='internet', n.cloud_region='internet', n.depth=0, n.active=true", map[string]interface{}{})
session.Run("MERGE (n:Node{node_id:'out-the-internet'}) SET n.node_name='The Internet (Outbound)', n.pseudo=true, n.cloud_provider='internet', n.cloud_region='internet', n.depth=0, n.active=true", map[string]interface{}{})
session.Run("MERGE (n:Node{node_id:'"+cronjobs.ConsoleAgentId+"'}) SET n.node_name='Console', n.pseudo=true, n.cloud_provider='internet', n.cloud_region='internet', n.depth=0, n.push_back=COALESCE(n.push_back,1)", map[string]interface{}{})
_, _ = session.Run("MERGE (n:Node{node_id:'in-the-internet'}) SET n.node_name='The Internet (Inbound)', n.pseudo=true, n.cloud_provider='internet', n.cloud_region='internet', n.depth=0, n.active=true", map[string]interface{}{})
_, _ = session.Run("MERGE (n:Node{node_id:'out-the-internet'}) SET n.node_name='The Internet (Outbound)', n.pseudo=true, n.cloud_provider='internet', n.cloud_region='internet', n.depth=0, n.active=true", map[string]interface{}{})
_, _ = session.Run("MERGE (n:Node{node_id:'"+cronjobs.ConsoleAgentId+"'}) SET n.node_name='Console', n.pseudo=true, n.cloud_provider='internet', n.cloud_region='internet', n.depth=0, n.push_back=COALESCE(n.push_back,1)", map[string]interface{}{})

// Indexes for fast searching & ordering
addIndexOnIssuesCount(session, "ContainerImage")
addIndexOnIssuesCount(session, "Container")

session.Run("CREATE INDEX NodeDepth IF NOT EXISTS FOR (n:Node) ON (n.depth)", map[string]interface{}{})
session.Run("CREATE INDEX CloudResourceDepth IF NOT EXISTS FOR (n:CloudResource) ON (n.depth)", map[string]interface{}{})
session.Run("CREATE INDEX CloudResourceLinked IF NOT EXISTS FOR (n:CloudResource) ON (n.linked)", map[string]interface{}{})
_, _ = session.Run("CREATE INDEX NodeDepth IF NOT EXISTS FOR (n:Node) ON (n.depth)", map[string]interface{}{})
_, _ = session.Run("CREATE INDEX CloudResourceDepth IF NOT EXISTS FOR (n:CloudResource) ON (n.depth)", map[string]interface{}{})
_, _ = session.Run("CREATE INDEX CloudResourceLinked IF NOT EXISTS FOR (n:CloudResource) ON (n.linked)", map[string]interface{}{})

return nil
}

func addIndexOnIssuesCount(session neo4j.Session, node_type string) {
session.Run(fmt.Sprintf("CREATE INDEX %sOrderByVulnerabilitiesCount IF NOT EXISTS FOR (n:%s) ON (n.vulnerabilities_count)",
_, _ = session.Run(fmt.Sprintf("CREATE INDEX %sOrderByVulnerabilitiesCount IF NOT EXISTS FOR (n:%s) ON (n.vulnerabilities_count)",
node_type, node_type),
map[string]interface{}{})
session.Run(fmt.Sprintf("CREATE INDEX %sOrderBySecretsCount IF NOT EXISTS FOR (n:%s) ON (n.vulnerabilities_count)",
_, _ = session.Run(fmt.Sprintf("CREATE INDEX %sOrderBySecretsCount IF NOT EXISTS FOR (n:%s) ON (n.vulnerabilities_count)",
node_type, node_type),
map[string]interface{}{})
session.Run(fmt.Sprintf("CREATE INDEX %sOrderByMalwaresCount IF NOT EXISTS FOR (n:%s) ON (n.secrets_count)",
_, _ = session.Run(fmt.Sprintf("CREATE INDEX %sOrderByMalwaresCount IF NOT EXISTS FOR (n:%s) ON (n.secrets_count)",
node_type, node_type),
map[string]interface{}{})
session.Run(fmt.Sprintf("CREATE INDEX %sOrderByCompliancesCount IF NOT EXISTS FOR (n:%s) ON (n.compliances_count)",
_, _ = session.Run(fmt.Sprintf("CREATE INDEX %sOrderByCompliancesCount IF NOT EXISTS FOR (n:%s) ON (n.compliances_count)",
node_type, node_type),
map[string]interface{}{})
session.Run(fmt.Sprintf("CREATE INDEX %sOrderByCloudCompliancesCount IF NOT EXISTS FOR (n:%s) ON (n.cloud_compliances_count)",
_, _ = session.Run(fmt.Sprintf("CREATE INDEX %sOrderByCloudCompliancesCount IF NOT EXISTS FOR (n:%s) ON (n.cloud_compliances_count)",
node_type, node_type),
map[string]interface{}{})
}
5 changes: 4 additions & 1 deletion deepfence_worker/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ func startIngester(cfg config) error {
processors.StartKafkaProcessors(ctx)

// start audit log processor
processors.StartAuditLogProcessor(ctx)
err = processors.StartAuditLogProcessor(ctx)
if err != nil {
log.Error().Msgf("%v", err)
}

// start kafka consumers for all given topics
err = processors.StartKafkaConsumers(
Expand Down
9 changes: 3 additions & 6 deletions deepfence_worker/ingesters/cloud_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@ func CommitFuncCloudResource(ns string, cs []ingestersUtil.CloudResource) error
if err != nil {
return err
}
session, err := driver.Session(neo4j.AccessModeWrite)

if err != nil {
return err
}
session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()

batch, hosts, clusters := ResourceToMaps(cs)
Expand Down Expand Up @@ -173,11 +170,11 @@ func ResourceToMaps(ms []ingestersUtil.CloudResource) ([]map[string]interface{},
// TODO: Call somewhere
func LinkNodesWithCloudResources(ctx context.Context) error {
driver, err := directory.Neo4jClient(ctx)
session, err := driver.Session(neo4j.AccessModeWrite)

if err != nil {
return err
}

session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close()

tx, err := session.BeginTransaction(neo4j.WithTxTimeout(30 * time.Second))
Expand Down
8 changes: 3 additions & 5 deletions deepfence_worker/processors/bulk_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@ func init() {
neo4j_host := os.Getenv("DEEPFENCE_NEO4J_HOST")
go func() {
for {
select {
case <-wait:
breaker.Lock()
log.Info().Msgf("Breaker opened")
}
<-wait
breaker.Lock()
log.Info().Msgf("Breaker opened")
for {
err := utils.WaitServiceTcpConn(neo4j_host, neo4j_port, time.Second*30)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions deepfence_worker/processors/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,16 @@ func StartKafkaProcessors(ctx context.Context) {
1_000)

for i := range processors {
processors[i].Start(ctx)
err := processors[i].Start(ctx)
if err != nil {
log.Error().Msg(err.Error())
}
}
}

func StopKafkaProcessors() {
for i := range processors {
processors[i].Stop()
_ = processors[i].Stop()
}
}

Expand Down
7 changes: 5 additions & 2 deletions deepfence_worker/tasks/malwarescan/malwarescan.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ func (s MalwareScan) StartMalwareScan(ctx context.Context, task *asynq.Task) err
malwareScanner := malwareScan.New(opts, yaraconfig, yrScanner, params.ScanId)

// send inprogress status
scanCtx.Checkpoint("After initialization")
err = scanCtx.Checkpoint("After initialization")
if err != nil {
return err
}

// get registry credentials
authDir, creds, err := workerUtils.GetConfigFileFromRegistry(ctx, params.RegistryId)
Expand Down Expand Up @@ -199,7 +202,7 @@ func (s MalwareScan) StartMalwareScan(ctx context.Context, task *asynq.Task) err
return err
}

scanCtx.Checkpoint("After skopeo download")
err = scanCtx.Checkpoint("After skopeo download")

if err != nil {
return err
Expand Down
Loading

0 comments on commit 4bef4a5

Please sign in to comment.