Skip to content

Commit

Permalink
Merge pull request #23 from imperva/matt/SR-2139-sync-state
Browse files Browse the repository at this point in the history
Wait for remote sync state [SR-2139]
  • Loading branch information
mattJsonar authored Nov 18, 2024
2 parents 8750314 + cb41280 commit 7d659cc
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 31 deletions.
32 changes: 30 additions & 2 deletions dsfhub/resource_cloud_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ func resourceCloudAccount() *schema.Resource {
}

func resourceCloudAccountCreateContext(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
var diags diag.Diagnostics
client := m.(*Client)
if isOk, err := checkResourceRequiredFields(requiredCloudAccountJson, ignoreCloudAccountParamsByServerType, d); !isOk {
return diag.FromErr(err)
Expand All @@ -516,14 +517,27 @@ func resourceCloudAccountCreateContext(ctx context.Context, d *schema.ResourceDa
return diag.FromErr(err)
}

// get asset_id
assetId := d.Get("asset_id").(string)

// wait for remoteSyncState
err = waitForRemoteSyncState(ctx, dsfCloudAccountResourceType, assetId, m)
if err != nil {
diags = append(diags, diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("Error while waiting for remoteSyncState = \"SYNCED\" for asset: %s", assetId),
Detail: fmt.Sprintf("Error: %s\n", err),
})
}

// set ID
cloudAccountId := createCloudAccountResponse.Data.AssetData.AssetID
d.SetId(cloudAccountId)

// Set the rest of the state from the resource read
resourceCloudAccountReadContext(ctx, d, m)

return nil
return diags
}

func resourceCloudAccountReadContext(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
Expand Down Expand Up @@ -658,6 +672,7 @@ func resourceCloudAccountReadContext(ctx context.Context, d *schema.ResourceData
}

func resourceCloudAccountUpdateContext(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
var diags diag.Diagnostics
client := m.(*Client)

// check provided fields against schema
Expand All @@ -679,13 +694,26 @@ func resourceCloudAccountUpdateContext(ctx context.Context, d *schema.ResourceDa
return diag.FromErr(err)
}

// get asset_id
assetId := d.Get("asset_id").(string)

// wait for remoteSyncState
err = waitForRemoteSyncState(ctx, dsfCloudAccountResourceType, assetId, m)
if err != nil {
diags = append(diags, diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("Error while waiting for remoteSyncState = \"SYNCED\" for asset: %s", assetId),
Detail: fmt.Sprintf("Error: %s\n", err),
})
}

// set ID
d.SetId(cloudAccountId)

// Set the rest of the state from the resource read
resourceCloudAccountReadContext(ctx, d, m)

return nil
return diags
}

func resourceCloudAccountDeleteContext(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
Expand Down
148 changes: 126 additions & 22 deletions dsfhub/resource_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,46 @@ func contains(l []string, x string) bool {
return false
}

// readAsset reads an asset of any resource type
func readAsset(client Client, resourceType string, assetId string) (*ResourceWrapper, error) {
var result *ResourceWrapper
var err error

switch resourceType {
case dsfDataSourceResourceType:
{
log.Printf("[INFO] reading data_source asset %v", assetId)
result, err = client.ReadDSFDataSource(assetId)
}
case dsfLogAggregatorResourceType:
{
log.Printf("[INFO] reading log_aggregator asset %v", assetId)
result, err = client.ReadLogAggregator(assetId)
}
case dsfCloudAccountResourceType:
{
log.Printf("[INFO] reading cloud_account asset %v", assetId)
result, err = client.ReadSecretManager(assetId)
}
case dsfSecretManagerResourceType:
{
log.Printf("[INFO] reading secret_manager asset %v", assetId)
result, err = client.ReadLogAggregator(assetId)
}
default:
{
return nil, fmt.Errorf("invalid resourceType: %v", resourceType)
}
}

if err != nil {
return result, err
}

return result, nil
}

// waitUntilAuditState reads an asset periodically to check the status of audit_pull_enabled
func waitUntilAuditState(ctx context.Context, desiredState bool, resourceType string, assetId string, m interface{}) error {
client := m.(*Client)

Expand All @@ -426,7 +466,7 @@ func waitUntilAuditState(ctx context.Context, desiredState bool, resourceType st
targetState,
},
Refresh: auditStateRefreshFunc(*client, resourceType, assetId),
Timeout: 8 * time.Minute,
Timeout: 5 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 5 * time.Second,
}
Expand All @@ -440,27 +480,13 @@ func waitUntilAuditState(ctx context.Context, desiredState bool, resourceType st
return nil
}

// auditStateRefreshFunc reads an asset to check the status of audit_pull_enabled
func auditStateRefreshFunc(client Client, resourceType string, assetId string) retry.StateRefreshFunc {
return func() (any, string, error) {
var result *ResourceWrapper
var err error

switch resourceType {
case dsfDataSourceResourceType:
{
log.Printf("[INFO] checking audit state for data_source asset %v", assetId)
result, err = client.ReadDSFDataSource(assetId)
}
case dsfLogAggregatorResourceType:
{
log.Printf("[INFO] checking audit state for log_aggregator asset %v", assetId)
result, err = client.ReadLogAggregator(assetId)
}
default:
{
return nil, "", fmt.Errorf("invalid resourceType: %v", resourceType)
}
}
result, err = readAsset(client, resourceType, assetId)
if err != nil {
return 0, "", err
}
Expand All @@ -469,6 +495,67 @@ func auditStateRefreshFunc(client Client, resourceType string, assetId string) r
}
}

// waitForRemoteSyncState reads an asset periodically to check the status of remoteSyncState becomes "SYNCED"
// posible values for remoteSyncState = ["SYNCED", "NOT_SYNCED", "UNKNOWN"]
func waitForRemoteSyncState(ctx context.Context, resourceType string, assetId string, m interface{}) error {
client := m.(*Client)

stateChangeConf := &retry.StateChangeConf{
Pending: []string{
"NOT_SYNCED",
"UNKNOWN",
},
Target: []string{
"SYNCED",
},
Refresh: remoteSyncStateRefreshFunc(*client, resourceType, assetId),
Timeout: 5 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 5 * time.Second,
}

_, err := stateChangeConf.WaitForStateContext(ctx)
if err != nil {
log.Printf("[ERROR] error while waiting for remoteSyncState = \"SYNCED\" for asset %v", assetId)
return err
}

return nil
}

// remoteSyncStateRefreshFunc reads an asset to check the status of remoteSyncState
func remoteSyncStateRefreshFunc(client Client, resourceType string, assetId string) retry.StateRefreshFunc {
return func() (any, string, error) {
var result *ResourceWrapper
var err error

result, err = readAsset(client, resourceType, assetId)
if err != nil {
return 0, "", err
}

return result, result.Data.RemoteSyncState, nil
}
}

// checkAuditState reads an asset to check the status of audit_pull_enabled
func checkAuditState(ctx context.Context, m interface{}, assetId string, resourceType string) (bool, error) {
client := m.(*Client)
var auditPullState bool
var result *ResourceWrapper
var err error

result, err = readAsset(*client, resourceType, assetId)
if err != nil {
return false, err
}

auditPullState = result.Data.AssetData.AuditPullEnabled

return auditPullState, nil
}

// connectDisconnectGateway determines whether an asset should be connected to or disconnected from gateway
func connectDisconnectGateway(ctx context.Context, d *schema.ResourceData, resourceType string, m interface{}) error {
assetId := d.Get("asset_id").(string)
auditPullEnabled := d.Get("audit_pull_enabled").(bool)
Expand Down Expand Up @@ -511,6 +598,7 @@ func connectDisconnectGateway(ctx context.Context, d *schema.ResourceData, resou
return nil
}

// connectGateway connects an asset to gateway
func connectGateway(ctx context.Context, m interface{}, assetId string, resourceType string) error {
client := m.(*Client)
_, err := client.EnableAuditDSFDataSource(assetId)
Expand All @@ -519,14 +607,22 @@ func connectGateway(ctx context.Context, m interface{}, assetId string, resource
return err
}

err2 := waitUntilAuditState(ctx, true, resourceType, assetId, m)
if err2 != nil {
return err2
// ensure asset is synced to gateway
err = waitForRemoteSyncState(ctx, resourceType, assetId, m)
if err != nil {
return err
}

// confirm asset is connected to gateway
isAuditPullEnabled, err := checkAuditState(ctx, m, assetId, resourceType)
if err != nil || !isAuditPullEnabled {
return err
}

return nil
}

// disconnectGateway disconnects an asset from gateway
func disconnectGateway(ctx context.Context, m interface{}, assetId string, resourceType string) error {
client := m.(*Client)
_, err := client.DisableAuditDSFDataSource(assetId)
Expand All @@ -535,15 +631,23 @@ func disconnectGateway(ctx context.Context, m interface{}, assetId string, resou
return err
}

err = waitUntilAuditState(ctx, false, resourceType, assetId, m)
// ensure asset is synced to gateway
err = waitForRemoteSyncState(ctx, resourceType, assetId, m)
if err != nil {
log.Printf("[INFO] Error while waiting for audit state to update for assetId: %s\n", assetId)
return err
}

// confirm asset is disconnected from gateway
isAuditPullEnabled, err := checkAuditState(ctx, m, assetId, resourceType)
if err != nil || isAuditPullEnabled {
return err
}

return nil
}

// reconnectGateway first disconnects and then reconnects an asset to gateway
func reconnectGateway(ctx context.Context, m interface{}, assetId string, resourceType string) error {
log.Printf("[INFO] Re-enabling audit for assetId: %s\n", assetId)

Expand Down Expand Up @@ -636,7 +740,7 @@ func resourceConnectionDataOauthParametersHash(v interface{}) int {
return PositiveHash(buf.String())
}

// AssetData resource hash functions
// AssetData resource hash functions
func resourceAssetDataAuditInfoHash(v interface{}) int {
var buf bytes.Buffer
m := v.(map[string]interface{})
Expand Down
30 changes: 28 additions & 2 deletions dsfhub/resource_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1349,12 +1349,25 @@ func resourceDSFDataSourceCreateContext(ctx context.Context, d *schema.ResourceD
return diag.FromErr(err)
}

// get asset_id
assetId := d.Get("asset_id").(string)

// wait for remoteSyncState
err = waitForRemoteSyncState(ctx, dsfDataSourceResourceType, assetId, m)
if err != nil {
diags = append(diags, diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("Error while waiting for remoteSyncState = \"SYNCED\" for asset: %s", assetId),
Detail: fmt.Sprintf("Error: %s\n", err),
})
}

// Connect/disconnect asset to gateway
err = connectDisconnectGateway(ctx, d, dsfDataSourceResourceType, m)
if err != nil {
diags = append(diags, diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("Error while updating audit state for asset: %s", d.Get("asset_id")),
Summary: fmt.Sprintf("Error while updating audit state for asset: %s", assetId),
Detail: fmt.Sprintf("Error: %s\n", err),
})
}
Expand Down Expand Up @@ -1693,9 +1706,22 @@ func resourceDSFDataSourceUpdateContext(ctx context.Context, d *schema.ResourceD
auditPullEnabled, _ := d.GetChange("audit_pull_enabled")
dsfDataSource.Data.AssetData.AuditPullEnabled = auditPullEnabled.(bool)

// get asset_id
assetId := d.Get("asset_id").(string)

// wait for remoteSyncState
err := waitForRemoteSyncState(ctx, dsfDataSourceResourceType, assetId, m)
if err != nil {
diags = append(diags, diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("Error while waiting for remoteSyncState = \"SYNCED\" for asset: %s", assetId),
Detail: fmt.Sprintf("Error: %s\n", err),
})
}

// update resource
log.Printf("[INFO] Updating DSF data source for serverType: %s and gatewayId: %s assetId: %s\n", dsfDataSource.Data.ServerType, dsfDataSource.Data.GatewayID, dsfDataSource.Data.AssetData.AssetID)
_, err := client.UpdateDSFDataSource(dsfDataSourceId, dsfDataSource)
_, err = client.UpdateDSFDataSource(dsfDataSourceId, dsfDataSource)
if err != nil {
log.Printf("[ERROR] Updating data source for serverType: %s and gatewayId: %s assetId: %s | err:%s\n", dsfDataSource.Data.ServerType, dsfDataSource.Data.GatewayID, dsfDataSource.Data.AssetData.AssetID, err)
return diag.FromErr(err)
Expand Down
Loading

0 comments on commit 7d659cc

Please sign in to comment.