Skip to content

Commit

Permalink
Clouds scanner - start scan after cloud resources refresh is complete
Browse files Browse the repository at this point in the history
  • Loading branch information
ramanan-ravi committed Jul 1, 2024
1 parent 2d0d3c6 commit a473c71
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 167 deletions.
16 changes: 10 additions & 6 deletions deepfence_agent/Dockerfile.cloud-agent
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,19 @@ WORKDIR /opt/steampipe

USER deepfence

ENV DF_INSTALL_DIR=/home/deepfence
ENV DF_INSTALL_DIR=/home/deepfence \
STEAMPIPE_AWS_PLUGIN_VERSION=0.118.1 \
STEAMPIPE_GCP_PLUGIN_VERSION=0.43.0 \
STEAMPIPE_AZURE_PLUGIN_VERSION=0.49.0 \
STEAMPIPE_AZURE_AD_PLUGIN_VERSION=0.12.0

COPY supervisord-cloud.conf /home/deepfence/supervisord.conf
COPY --from=steampipe /usr/local/bin/steampipe /usr/local/bin/steampipe

RUN steampipe service start \
&& steampipe plugin install steampipe \
# plugin version should be in sync with Deepfence fork https://github.com/deepfence/steampipe-plugin-aws
&& steampipe plugin install aws@0.118.1 gcp@0.43.0 azure@0.49.0 azuread@0.12.0 \
&& steampipe plugin install aws@${STEAMPIPE_AWS_PLUGIN_VERSION} gcp@${STEAMPIPE_GCP_PLUGIN_VERSION} azure@${STEAMPIPE_AZURE_PLUGIN_VERSION} azuread@${STEAMPIPE_AZURE_AD_PLUGIN_VERSION} \
&& git clone https://github.com/turbot/steampipe-mod-aws-compliance.git --branch v0.79 --depth 1 \
&& git clone https://github.com/turbot/steampipe-mod-gcp-compliance.git --branch v0.21 --depth 1 \
&& git clone https://github.com/turbot/steampipe-mod-azure-compliance.git --branch v0.35 --depth 1 \
Expand All @@ -56,10 +60,10 @@ ENV PUBLISH_CLOUD_RESOURCES_INTERVAL_MINUTES=5 \

EXPOSE 8080

COPY --from=steampipe /usr/local/bin/steampipe-plugin-aws.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/aws@latest/steampipe-plugin-aws.plugin
COPY --from=steampipe /usr/local/bin/steampipe-plugin-gcp.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/gcp@latest/steampipe-plugin-gcp.plugin
COPY --from=steampipe /usr/local/bin/steampipe-plugin-azure.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azure@latest/steampipe-plugin-azure.plugin
COPY --from=steampipe /usr/local/bin/steampipe-plugin-azuread.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azuread@latest/steampipe-plugin-azuread.plugin
COPY --from=steampipe /usr/local/bin/steampipe-plugin-aws.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/aws@${STEAMPIPE_AWS_PLUGIN_VERSION}/steampipe-plugin-aws.plugin
COPY --from=steampipe /usr/local/bin/steampipe-plugin-gcp.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/gcp@${STEAMPIPE_GCP_PLUGIN_VERSION}/steampipe-plugin-gcp.plugin
COPY --from=steampipe /usr/local/bin/steampipe-plugin-azure.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azure@${STEAMPIPE_AZURE_PLUGIN_VERSION}/steampipe-plugin-azure.plugin
COPY --from=steampipe /usr/local/bin/steampipe-plugin-azuread.plugin /home/deepfence/.steampipe/plugins/hub.steampipe.io/plugins/turbot/azuread@${STEAMPIPE_AZURE_AD_PLUGIN_VERSION}/steampipe-plugin-azuread.plugin

COPY plugins/cloud-scanner/cloud_scanner /home/deepfence/bin/cloud_scanner

Expand Down
50 changes: 2 additions & 48 deletions deepfence_bootstrapper/router/cloud_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func RefreshResources(req ctl.RefreshResourcesRequest) error {
return nil
}

func GetCloudScannerJobCount() int32 {
func GetCloudScannerJobCount(action ctl.ActionID) int32 {
conn, err := net.Dial("unix", CloudScannerSocketPath)
if err != nil {
log.Error().Err(err).Msgf("GetCloudScannerJobCount: error in creating cloud compliance scanner client with socket %s", CloudScannerSocketPath)
Expand All @@ -98,7 +98,7 @@ func GetCloudScannerJobCount() int32 {
defer conn.Close()

jobCountReq := map[string]interface{}{
"action": ctl.CloudScannerJobCount,
"action": action,
}
jobCountReqBytes, err := json.Marshal(jobCountReq)
if err != nil {
Expand Down Expand Up @@ -128,49 +128,3 @@ func GetCloudScannerJobCount() int32 {
return jobCount
}
}

func GetCloudNodeID() (string, error) {
cloudNodeID := ""
conn, err := net.Dial("unix", CloudScannerSocketPath)
if err != nil {
log.Error().Err(err).Msgf("Error creating cloud scanner client with socket %s", CloudScannerSocketPath)
return cloudNodeID, err
}
defer conn.Close()
reqMap := make(map[string]interface{})
reqMap["GetCloudNodeID"] = true
cloudNodeIDReq := map[string]interface{}{
"args": reqMap,
}

cloudNodeIDReqBytes, err := json.Marshal(cloudNodeIDReq)
if err != nil {
log.Error().Err(err).Msg("Error in converting request into valid json")
return cloudNodeID, err
}

_, err = conn.Write(cloudNodeIDReqBytes)
if err != nil {
log.Error().Err(err).Msgf("Error in writing data to unix socket %s", CloudScannerSocketPath)
return cloudNodeID, err
}

responseTimeout := 10 * time.Second
deadline := time.Now().Add(responseTimeout)
buf := make([]byte, 1024)
for {
conn.SetReadDeadline(deadline)
n, err := conn.Read(buf[:])
if err != nil {
log.Error().Err(err).Msg("Error in read")
return cloudNodeID, err
}

count, err := fmt.Sscan(string(buf[0:n]), &cloudNodeID)
if err != nil || count != 1 {
return cloudNodeID, err
}
break
}
return cloudNodeID, err
}
24 changes: 14 additions & 10 deletions deepfence_bootstrapper/router/openapi_client_controls.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,25 +119,29 @@ func (ct *OpenapiClient) StartControlsWatching(nodeID string,

const (
MaxAgentWorkload = 2
MaxCloudAgentWorkload = 1
MaxCloudAgentWorkload = 2
)

func GetScannersWorkloads(nodeType string) int32 {
res := int32(0)
var secret, malware, vuln, cloud int32
if nodeType == ctl.CLOUD_AGENT {
cloud = GetCloudScannerJobCount()
var cloudPostureScan, cloudResourceRefreshCount int32

cloudPostureScan = GetCloudScannerJobCount(ctl.CloudScannerJobCount)
cloudResourceRefreshCount = GetCloudScannerJobCount(ctl.CloudScannerResourceRefreshCount)

log.Info().Msgf("workloads = cloud posture: %d, cloud resource refresh: %d", cloudPostureScan, cloudResourceRefreshCount)
return cloudPostureScan + cloudResourceRefreshCount
} else {
var secret, malware, vuln int32

secret = GetSecretScannerJobCount()
malware = GetMalwareScannerJobCount()
vuln = GetPackageScannerJobCount()
}

//TODO: Add more scanners workload
log.Info().Msgf("workloads = vuln: %d, secret: %d, malware: %d, cloud: %d",
vuln, secret, malware, cloud)
res = secret + malware + vuln + cloud
return res
//TODO: Add more scanners workload
log.Info().Msgf("workloads = vuln: %d, secret: %d, malware: %d", vuln, secret, malware)
return secret + malware + vuln
}
}

var upgrade atomic.Bool
Expand Down
41 changes: 27 additions & 14 deletions deepfence_server/controls/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func GetAgentActions(ctx context.Context, agentID model.AgentID, consoleURL stri
actions = append(actions, upgradeActions...)
}

scanActions, scanErr := ExtractStartingAgentScans(ctx, nodeID, workNumToExtract)
scanActions, scanErr := ExtractStartingAgentScans(ctx, nodeID, agentType, workNumToExtract)
workNumToExtract -= len(scanActions)
if scanErr == nil {
actions = append(actions, scanActions...)
Expand Down Expand Up @@ -312,7 +312,7 @@ func hasPendingAgentScans(ctx context.Context, client neo4j.DriverWithContext, n
return len(records) != 0, err
}

func ExtractStartingAgentScans(ctx context.Context, nodeID string, maxWork int) ([]controls.Action, error) {
func ExtractStartingAgentScans(ctx context.Context, nodeID string, agentType string, maxWork int) ([]controls.Action, error) {

ctx, span := telemetry.NewSpan(ctx, "control", "extract-starting-agent-scans")
defer span.End()
Expand Down Expand Up @@ -340,14 +340,27 @@ func ExtractStartingAgentScans(ctx context.Context, nodeID string, maxWork int)
}
defer tx.Close(ctx)

r, err := tx.Run(ctx, `MATCH (s) -[:SCHEDULED]-> (n:Node{node_id:$id})
var r neo4j.ResultWithContext
if agentType == controls.CLOUD_AGENT {
r, err = tx.Run(ctx, `MATCH (s) -[:SCHEDULED]-> (n:Node{node_id:$id}) -[:HOSTS]-> (c:CloudNode)
WHERE s.status = '`+utils.ScanStatusStarting+`'
AND c.refresh_status = 'COMPLETE'
AND s.retries < 3
WITH s ORDER BY s.is_priority DESC, s.updated_at ASC LIMIT $max_work
SET s.status = '`+utils.ScanStatusInProgress+`', s.updated_at = TIMESTAMP()
WITH s
RETURN s.trigger_action`,
map[string]interface{}{"id": nodeID, "max_work": maxWork})
map[string]interface{}{"id": nodeID, "max_work": maxWork})
} else {
r, err = tx.Run(ctx, `MATCH (s) -[:SCHEDULED]-> (n:Node{node_id:$id})
WHERE s.status = '`+utils.ScanStatusStarting+`'
AND s.retries < 3
WITH s ORDER BY s.is_priority DESC, s.updated_at ASC LIMIT $max_work
SET s.status = '`+utils.ScanStatusInProgress+`', s.updated_at = TIMESTAMP()
WITH s
RETURN s.trigger_action`,
map[string]interface{}{"id": nodeID, "max_work": maxWork})
}

if err != nil {
return res, err
Expand Down Expand Up @@ -659,15 +672,14 @@ func ExtractRefreshResourceAction(ctx context.Context, nodeID string,
defer tx.Close(ctx)

r, err := tx.Run(ctx, `
MATCH(n:Node{node_id:$id}) -[:HOSTS]-> (c:CloudNode)
MATCH(r:CloudNodeRefresh{node_id:c.node_id})
WHERE r.refresh=true
WITH HEAD(collect(r)) AS rnode
WHERE rnode IS NOT NULL
WITH rnode, rnode.node_id AS node_id
DETACH DELETE rnode
RETURN node_id`,
map[string]interface{}{"id": nodeID})
MATCH(n:Node{node_id:$id}) -[:HOSTS]-> (r:CloudNode)
WHERE r.refresh_status = '`+utils.ScanStatusQueued+`'
AND NOT COALESCE(r.cloud_compliance_scan_status, '') IN ['`+utils.ScanStatusStarting+`', '`+utils.ScanStatusInProgress+`']
WITH r LIMIT $max_work
SET r.refresh_status = '`+utils.ScanStatusStarting+`', r.refresh_message = ''
WITH r
RETURN r.node_id, r.node_name AS account_id`,
map[string]interface{}{"id": nodeID, "max_work": 1}) // Maximum one account can be refreshed at a time

if err != nil {
return res, err
Expand All @@ -683,13 +695,14 @@ func ExtractRefreshResourceAction(ctx context.Context, nodeID string,

for _, record := range records {
var action controls.Action
if record.Values[0] == nil {
if record.Values[0] == nil || record.Values[1] == nil {
log.Error().Msgf("Invalid CloudNode ID, skipping")
continue
}

req := controls.RefreshResourcesRequest{}
req.NodeId = record.Values[0].(string)
req.AccountID = record.Values[1].(string)
req.NodeType = controls.CloudAccount

reqBytes, err := json.Marshal(req)
Expand Down
12 changes: 11 additions & 1 deletion deepfence_server/handler/cloud_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"

"github.com/deepfence/ThreatMapper/deepfence_server/model"
"github.com/deepfence/ThreatMapper/deepfence_server/reporters"
reporters_scan "github.com/deepfence/ThreatMapper/deepfence_server/reporters/scan"
ctl "github.com/deepfence/ThreatMapper/deepfence_utils/controls"
"github.com/deepfence/ThreatMapper/deepfence_utils/directory"
Expand All @@ -19,6 +20,9 @@ import (

var (
cloudAccountNodeType = ctl.ResourceTypeToString(ctl.CloudAccount)
refreshAccountFilter = reporters.FieldsFilters{
ContainsFilter: reporters.ContainsFilter{FieldsValues: map[string][]interface{}{"refresh_status": {"COMPLETE", "ERROR"}}},
}
)

func (h *Handler) RegisterCloudNodeAccountHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -124,13 +128,19 @@ func (h *Handler) RefreshCloudAccountHandler(w http.ResponseWriter, r *http.Requ
nodeIdentifiers[i] = model.NodeIdentifier{NodeID: id, NodeType: cloudAccountNodeType}
}

cloudNodeIds, err := reporters_scan.GetCloudAccountIDs(r.Context(), nodeIdentifiers)
cloudNodeIds, err := reporters_scan.GetCloudAccountIDs(r.Context(), nodeIdentifiers, &refreshAccountFilter)
if err != nil {
log.Error().Msgf(err.Error())
h.respondError(&BadDecoding{err}, w)
return
}

if len(cloudNodeIds) == 0 {
// Refresh already in progress for all requested cloud accounts
w.WriteHeader(http.StatusNoContent)
return
}

resolvedRequest := model.CloudAccountRefreshReq{NodeIDs: make([]string, len(cloudNodeIds))}
for i, id := range cloudNodeIds {
resolvedRequest.NodeIDs[i] = id.NodeID
Expand Down
2 changes: 1 addition & 1 deletion deepfence_server/handler/scan_reports.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (h *Handler) StartComplianceScanHandler(w http.ResponseWriter, r *http.Requ

regular, k8s, _, _ := extractBulksNodes(reqs.NodeIDs)

cloudNodeIds, err := reportersScan.GetCloudAccountIDs(ctx, regular)
cloudNodeIds, err := reportersScan.GetCloudAccountIDs(ctx, regular, nil)
if err != nil {
h.respondError(err, w)
return
Expand Down
50 changes: 3 additions & 47 deletions deepfence_server/model/cloud_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type CloudNodeAccountInfo struct {
LastScanID string `json:"last_scan_id"`
LastScanStatus string `json:"last_scan_status"`
RefreshMessage string `json:"refresh_message"`
RefreshStatus string `json:"refresh_status" enum:"STARTING,IN_PROGRESS,ERROR,COMPLETE"`
RefreshStatus string `json:"refresh_status" enum:"QUEUED,STARTING,IN_PROGRESS,ERROR,COMPLETE"`
ScanStatusMap map[string]int64 `json:"scan_status_map"`
Version string `json:"version"`
HostNodeID string `json:"host_node_id"`
Expand Down Expand Up @@ -532,8 +532,8 @@ func (c *CloudAccountRefreshReq) SetCloudAccountRefresh(ctx context.Context) err

if _, err = tx.Run(ctx, `
UNWIND $batch as cloudNode
MERGE (n:CloudNodeRefresh{node_id: cloudNode})
SET n.refresh = true, n.updated_at = TIMESTAMP()`,
MATCH (m:CloudNode{node_id: cloudNode})
SET m.refresh_status = '`+utils.ScanStatusQueued+`', m.refresh_message = ''`,
map[string]interface{}{
"batch": c.NodeIDs,
}); err != nil {
Expand All @@ -542,50 +542,6 @@ func (c *CloudAccountRefreshReq) SetCloudAccountRefresh(ctx context.Context) err
return tx.Commit(ctx)
}

func (c *CloudAccountRefreshReq) GetCloudAccountRefresh(ctx context.Context) ([]string, error) {

ctx, span := telemetry.NewSpan(ctx, "model", "get-cloud-account-refresh")
defer span.End()

var updatedNodeIDs []string
driver, err := directory.Neo4jClient(ctx)
if err != nil {
return updatedNodeIDs, err
}

session := driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close(ctx)

tx, err := session.BeginTransaction(ctx, neo4j.WithTxTimeout(30*time.Second))
if err != nil {
return updatedNodeIDs, err
}
defer tx.Close(ctx)

res, err := tx.Run(ctx, `
UNWIND $batch as cloudNode
MATCH (n:CloudNodeRefresh{node_id: cloudNode})
WHERE n.refresh=true
WITH n, n.node_id as deletedNodeID
DELETE n
RETURN deletedNodeID`,
map[string]interface{}{
"batch": c.NodeIDs,
})
if err != nil {
return updatedNodeIDs, err
}
recs, err := res.Collect(ctx)
if err != nil {
return updatedNodeIDs, err
}

for _, rec := range recs {
updatedNodeIDs = append(updatedNodeIDs, rec.Values[0].(string))
}
return updatedNodeIDs, tx.Commit(ctx)
}

type CloudAccountDeleteReq struct {
NodeIDs []string `json:"node_ids" validate:"required,gt=0" required:"true"`
}
Loading

0 comments on commit a473c71

Please sign in to comment.