Skip to content

Commit

Permalink
Add support for cross bucket replication feature (#12562)
Browse files Browse the repository at this point in the history
  • Loading branch information
googlyrahman authored Dec 26, 2024
1 parent ed32cb0 commit bcbb098
Show file tree
Hide file tree
Showing 3 changed files with 363 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,35 @@ import (
)

var (
objectConditionsKeys = []string{
transferSpecObjectConditionsKeys = []string{
"transfer_spec.0.object_conditions.0.min_time_elapsed_since_last_modification",
"transfer_spec.0.object_conditions.0.max_time_elapsed_since_last_modification",
"transfer_spec.0.object_conditions.0.include_prefixes",
"transfer_spec.0.object_conditions.0.exclude_prefixes",
"transfer_spec.0.object_conditions.0.last_modified_since",
"transfer_spec.0.object_conditions.0.last_modified_before",
}
replicationSpecObjectConditionsKeys = []string{
"replication_spec.0.object_conditions.0.min_time_elapsed_since_last_modification",
"replication_spec.0.object_conditions.0.max_time_elapsed_since_last_modification",
"replication_spec.0.object_conditions.0.include_prefixes",
"replication_spec.0.object_conditions.0.exclude_prefixes",
"replication_spec.0.object_conditions.0.last_modified_since",
"replication_spec.0.object_conditions.0.last_modified_before",
}

transferOptionsKeys = []string{
transferSpecTransferOptionsKeys = []string{
"transfer_spec.0.transfer_options.0.overwrite_objects_already_existing_in_sink",
"transfer_spec.0.transfer_options.0.delete_objects_unique_in_sink",
"transfer_spec.0.transfer_options.0.delete_objects_from_source_after_transfer",
"transfer_spec.0.transfer_options.0.overwrite_when",
}
replicationSpecTransferOptionsKeys = []string{
"replication_spec.0.transfer_options.0.overwrite_objects_already_existing_in_sink",
"replication_spec.0.transfer_options.0.delete_objects_unique_in_sink",
"replication_spec.0.transfer_options.0.delete_objects_from_source_after_transfer",
"replication_spec.0.transfer_options.0.overwrite_when",
}

transferSpecDataSourceKeys = []string{
"transfer_spec.0.gcs_data_source",
Expand All @@ -47,6 +61,14 @@ var (
"transfer_spec.0.gcs_data_sink",
"transfer_spec.0.posix_data_sink",
}

replicationSpecDataSourceKeys = []string{
"replication_spec.0.gcs_data_source",
}
replicationSpecDataSinkKeys = []string{
"replication_spec.0.gcs_data_sink",
}

awsS3AuthKeys = []string{
"transfer_spec.0.aws_s3_data_source.0.aws_access_key",
"transfer_spec.0.aws_s3_data_source.0.role_arn",
Expand Down Expand Up @@ -98,6 +120,7 @@ func ResourceStorageTransferJob() *schema.Resource {
Optional: true,
MaxItems: 1,
ConflictsWith: []string{"schedule"},
DiffSuppressFunc: diffSuppressEventStream,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"name": {
Expand All @@ -120,14 +143,46 @@ func ResourceStorageTransferJob() *schema.Resource {
},
},
},
"replication_spec": {
Type: schema.TypeList,
MaxItems: 1,
Optional: true,
ConflictsWith: []string{"transfer_spec", "schedule"},
ExactlyOneOf: []string{"transfer_spec", "replication_spec"},
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"object_conditions": objectConditionsSchema(replicationSpecObjectConditionsKeys),
"transfer_options": transferOptionsSchema(replicationSpecTransferOptionsKeys),
"gcs_data_sink": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: gcsDataSchema(),
ExactlyOneOf: replicationSpecDataSinkKeys,
Description: `A Google Cloud Storage data sink.`,
},
"gcs_data_source": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: gcsDataSchema(),
ExactlyOneOf: replicationSpecDataSourceKeys,
Description: `A Google Cloud Storage data source.`,
},
},
},
Description: `Replication specification.`,
},
"transfer_spec": {
Type: schema.TypeList,
Required: true,
Optional: true,
MaxItems: 1,
ConflictsWith: []string{"replication_spec"},
ExactlyOneOf: []string{"transfer_spec", "replication_spec"},
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"object_conditions": objectConditionsSchema(),
"transfer_options": transferOptionsSchema(),
"object_conditions": objectConditionsSchema(transferSpecObjectConditionsKeys),
"transfer_options": transferOptionsSchema(transferSpecTransferOptionsKeys),
"source_agent_pool_name": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -307,7 +362,7 @@ func ResourceStorageTransferJob() *schema.Resource {
}
}

func objectConditionsSchema() *schema.Schema {
func objectConditionsSchema(objectConditionsKeys []string) *schema.Schema {
return &schema.Schema{
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -368,7 +423,7 @@ func objectConditionsSchema() *schema.Schema {
}
}

func transferOptionsSchema() *schema.Schema {
func transferOptionsSchema(transferOptionsKeys []string) *schema.Schema {
return &schema.Schema{
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -642,6 +697,7 @@ func resourceStorageTransferJobCreate(d *schema.ResourceData, meta interface{})
Schedule: expandTransferSchedules(d.Get("schedule").([]interface{})),
EventStream: expandEventStream(d.Get("event_stream").([]interface{})),
TransferSpec: expandTransferSpecs(d.Get("transfer_spec").([]interface{})),
ReplicationSpec: expandReplicationSpecs(d.Get("replication_spec").([]interface{})),
NotificationConfig: expandTransferJobNotificationConfig(d.Get("notification_config").([]interface{})),
}

Expand Down Expand Up @@ -726,6 +782,11 @@ func resourceStorageTransferJobRead(d *schema.ResourceData, meta interface{}) er
return err
}

err = d.Set("replication_spec", flattenReplicationSpec(res.ReplicationSpec))
if err != nil {
return err
}

err = d.Set("notification_config", flattenTransferJobNotificationConfig(res.NotificationConfig))
if err != nil {
return err
Expand Down Expand Up @@ -784,6 +845,13 @@ func resourceStorageTransferJobUpdate(d *schema.ResourceData, meta interface{})
}
}

if d.HasChange("replication_spec") {
fieldMask = append(fieldMask, "replication_spec")
if v, ok := d.GetOk("replication_spec"); ok {
transferJob.ReplicationSpec = expandReplicationSpecs(v.([]interface{}))
}
}

if d.HasChange("notification_config") {
fieldMask = append(fieldMask, "notification_config")
if v, ok := d.GetOk("notification_config"); ok {
Expand Down Expand Up @@ -1277,6 +1345,9 @@ func expandTransferSpecs(transferSpecs []interface{}) *storagetransfer.TransferS
}

func flattenTransferSpec(transferSpec *storagetransfer.TransferSpec, d *schema.ResourceData) []map[string]interface{} {
if transferSpec == nil || reflect.DeepEqual(transferSpec, &storagetransfer.TransferSpec{}) {
return nil
}

data := map[string]interface{}{}

Expand Down Expand Up @@ -1354,3 +1425,44 @@ func flattenTransferJobNotificationConfig(notificationConfig *storagetransfer.No

return []map[string]interface{}{data}
}

func diffSuppressEventStream(k, old, new string, d *schema.ResourceData) bool {
// Check if it's a replication job.
_, is_replication := d.GetOk("replication_spec")
return is_replication
}

func expandReplicationSpecs(replicationSpecs []interface{}) *storagetransfer.ReplicationSpec {
if len(replicationSpecs) == 0 || replicationSpecs[0] == nil {
return nil
}

replicationSpec := replicationSpecs[0].(map[string]interface{})
return &storagetransfer.ReplicationSpec{
GcsDataSink: expandGcsData(replicationSpec["gcs_data_sink"].([]interface{})),
ObjectConditions: expandObjectConditions(replicationSpec["object_conditions"].([]interface{})),
TransferOptions: expandTransferOptions(replicationSpec["transfer_options"].([]interface{})),
GcsDataSource: expandGcsData(replicationSpec["gcs_data_source"].([]interface{})),
}
}

func flattenReplicationSpec(replicationSpec *storagetransfer.ReplicationSpec) []map[string]interface{} {
if replicationSpec == nil || reflect.DeepEqual(replicationSpec, &storagetransfer.ReplicationSpec{}) {
return nil
}

data := map[string]interface{}{}
if replicationSpec.GcsDataSink != nil {
data["gcs_data_sink"] = flattenGcsData(replicationSpec.GcsDataSink)
}
if replicationSpec.GcsDataSource != nil {
data["gcs_data_source"] = flattenGcsData(replicationSpec.GcsDataSource)
}
if replicationSpec.ObjectConditions != nil {
data["object_conditions"] = flattenObjectCondition(replicationSpec.ObjectConditions)
}
if replicationSpec.TransferOptions != nil {
data["transfer_options"] = flattenTransferOption(replicationSpec.TransferOptions)
}
return []map[string]interface{}{data}
}
Loading

0 comments on commit bcbb098

Please sign in to comment.