Skip to content

Commit

Permalink
sdks/python: enable recursive deletion for GCSFileSystem Paths (#33611)
Browse files Browse the repository at this point in the history
* sdks/python: enable recursive deletion for GCS

In this commit, we enable recursive deletion for
GCS (Google Cloud Storage) paths, including directories
and blobs.

Changes include:
- Updated the `delete` method to support recursive deletion of GCS
  directories (prefixes).
- If the path points to a directory, all blobs under that prefix are
  deleted.
- Refactored logic to handle both single blob and directory deletion
  cases.

* sdks/python: update delete test case for GCS

In this commit, we update the delete test to verify
recursive deletion of directories (prefixes) in GCS.

Changes include:
- Added test for deleting a GCS directory (prefix) with multiple files.
- Verified files under a directory are deleted recursively when using the delete method.

* CHANGES.md: update CHANGES for `2.63.0`

* CHANGES.md: capitalize `enable` word
  • Loading branch information
mohamedawnallah authored Feb 5, 2025
1 parent fc43c12 commit 75cf1cb
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
* Add BigQuery vector/embedding ingestion and enrichment components to apache_beam.ml.rag (Python) ([#33413](https://github.com/apache/beam/pull/33413)).
* Upgraded to protobuf 4 (Java) ([#33192](https://github.com/apache/beam/issues/33192)).
* [GCSIO] Added retry logic to each batch method of the GCS IO (Python) ([#33539](https://github.com/apache/beam/pull/33539))
* [GCSIO] Enable recursive deletion for GCSFileSystem Paths (Python) ([#33611](https://github.com/apache/beam/pull/33611)).
* External, Process based Worker Pool support added to the Go SDK container. ([#33572](https://github.com/apache/beam/pull/33572))
* This is used to enable sidecar containers to run SDK workers for some runners.
* See https://beam.apache.org/documentation/runtime/sdk-harness-config/ for details.
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ def delete(self, paths):

for path in paths:
if path.endswith('/'):
path_to_use = path + '*'
self._gcsIO().delete(path, recursive=True)
continue
else:
path_to_use = path
match_result = self.match([path_to_use])[0]
Expand Down
25 changes: 23 additions & 2 deletions sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,37 @@ def open(
else:
raise ValueError('Invalid file open mode: %s.' % mode)

def delete(self, path):
def delete(self, path, recursive=False):
"""Deletes the object at the given GCS path.
If the path is a directory (prefix), it deletes all blobs under that prefix
when recursive=True.
Args:
path: GCS file path pattern in the form gs://<bucket>/<name>.
recursive (bool, optional): If True, deletes all objects under the prefix
when the path is a directory (default: False).
"""
bucket_name, blob_name = parse_gcs_path(path)
bucket = self.client.bucket(bucket_name)
if recursive:
# List and delete all blobs under the prefix.
blobs = bucket.list_blobs(prefix=blob_name)
for blob in blobs:
self._delete_blob(bucket, blob.name)
else:
# Delete only the specific blob.
self._delete_blob(bucket, blob_name)

def _delete_blob(self, bucket, blob_name):
"""Helper method to delete a single blob from GCS.
Args:
bucket: The GCS bucket object.
blob_name: The name of the blob to delete under the bucket.
"""
if self._use_blob_generation:
# blob can be None if not found
# Fetch blob generation if required.
blob = bucket.get_blob(blob_name, retry=self._storage_client_retry)
generation = getattr(blob, "generation", None)
else:
Expand Down
29 changes: 28 additions & 1 deletion sdks/python/apache_beam/io/gcp/gcsio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ def delete_blob(self, name, **kwargs):
if name in bucket.blobs:
del bucket.blobs[name]

def list_blobs(self, prefix=None, **kwargs):
bucket = self._get_canonical_bucket()
return self.client.list_blobs(bucket, prefix, **kwargs)


class FakeBlob(object):
def __init__(
Expand Down Expand Up @@ -445,20 +449,43 @@ def test_bad_file_modes(self):
self.gcs.open(file_name, 'r+b')

def test_delete(self):
# File path.
file_name = 'gs://gcsio-test/delete_me'
file_size = 1024
bucket_name, blob_name = gcsio.parse_gcs_path(file_name)

# Test deletion of non-existent file.
bucket = self.client.get_bucket(bucket_name)
self.gcs.delete(file_name)

# Insert a random file for testing.
self._insert_random_file(self.client, file_name, file_size)
self.assertTrue(blob_name in bucket.blobs)

# Deleting the file.
self.gcs.delete(file_name)

self.assertFalse(blob_name in bucket.blobs)

# Now test deleting a directory (prefix) with multiple files.
prefix = 'gs://gcsio-test/directory_to_delete/'
file_names = [f"{prefix}file1", f"{prefix}file2", f"{prefix}file3"]
blobs = [gcsio.parse_gcs_path(file_name) for file_name in file_names]

# Insert random files under the prefix.
for file_name in file_names:
self._insert_random_file(self.client, file_name, file_size)

# Verify the files exist before deletion
for blob in blobs:
self.assertTrue(blob[1] in bucket.blobs)

# Deleting the directory (all files under the prefix).
self.gcs.delete(prefix, recursive=True)

# Verify that the files are deleted.
for blob in blobs:
self.assertFalse(blob[1] in bucket.blobs)

def test_copy(self):
src_file_name = 'gs://gcsio-test/source'
dest_file_name = 'gs://gcsio-test/dest'
Expand Down

0 comments on commit 75cf1cb

Please sign in to comment.