Skip to content

Commit

Permalink
Update Boto client library for S3 to boto3.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 306310032
  • Loading branch information
jerlawson authored and copybara-github committed Apr 13, 2020
1 parent ea574a4 commit 4b0b941
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 59 deletions.
1 change: 0 additions & 1 deletion CHANGES.next.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
- Includes GCP implementation with static routes.
- Add a directory for PKB tutorials.
- Add two new tutorials: one for beginners and one for network dashboard.
- Update Boto client library for S3 to boto3.

### Enhancements:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
def Install(vm):
"""Installs the boto package on the VM."""
vm.Install('pip')
vm.RemoteCommand('sudo pip install boto3')
vm.RemoteCommand('sudo pip install boto')
9 changes: 4 additions & 5 deletions perfkitbenchmarker/providers/aws/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@
flags.DEFINE_integer('aws_emr_job_wait_time', 18000,
'The time to wait for an EMR job to finish, in seconds')

flags.DEFINE_string(
's3_access_point_name', None,
'If given, an access point to use for S3 transfers in place of bucket '
'names. This flag represents the *AccessPointName-AccountId* part '
'of "*AccessPointName-AccountId*.s3-accesspoint.*Region*.amazonaws.com".')
flags.DEFINE_string('s3_custom_endpoint', None,
'If given, a custom endpoint to use for S3 transfers. If '
'this flag is not given, use the standard endpoint for the '
'storage region.')
flags.DEFINE_boolean('aws_spot_instances', False,
'Whether to use AWS spot instances for any AWS VMs.')
flags.DEFINE_float('aws_spot_price', None,
Expand Down
28 changes: 12 additions & 16 deletions perfkitbenchmarker/providers/aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
AWS_CREDENTIAL_LOCATION = '.aws'
DEFAULT_AWS_REGION = 'us-east-1'

# S3 access point hostnames for a given region can be formed by following the
# `AccessPointName-AccountId` with '.s3-accesspoint.', followed by the region,
# and suffixed with '.amazonaws.com'.
AWS_S3_ARN_FORMAT_STRING = '%s.s3-accesspoint.%s.amazonaws.com'
# S3 endpoints for a given region can be formed by prefixing the region with
# 's3.' and suffixing it with '.amazonaws.com'.
AWS_S3_ENDPOINT_PREFIX = 's3.'
AWS_S3_ENDPOINT_SUFFIX = '.amazonaws.com'


class S3Service(object_storage_service.ObjectStorageService):
Expand Down Expand Up @@ -118,7 +118,7 @@ def EmptyBucket(self, bucket):

def PrepareVM(self, vm):
vm.Install('awscli')
vm.Install('boto3')
vm.Install('boto')

vm.PushFile(
object_storage_service.FindCredentialFile('~/' +
Expand All @@ -139,20 +139,16 @@ def CLIDownloadBucket(self, vm, bucket, objects, dest):
'time aws s3 sync s3://%s/ %s' % (bucket, dest))

def Metadata(self, vm):
return {
object_storage_service.BOTO_LIB_VERSION:
linux_packages.GetPipPackageVersion(vm, 'boto3')
}
return {object_storage_service.BOTO_LIB_VERSION:
linux_packages.GetPipPackageVersion(vm, 'boto')}

def APIScriptArgs(self):
if FLAGS.s3_access_point_name:
return [
'--access_point_hostname=' + AWS_S3_ARN_FORMAT_STRING %
(FLAGS.s3_access_point_name, self.region), '--region=' + self.region
]
if FLAGS.s3_custom_endpoint:
return ['--host=' + FLAGS.s3_custom_endpoint]
else:
return ['--region=' + self.region]
return ['--host=%s%s%s' % (AWS_S3_ENDPOINT_PREFIX, self.region,
AWS_S3_ENDPOINT_SUFFIX)]

@classmethod
def APIScriptFiles(cls):
return ['s3.py']
return ['boto_service.py', 's3.py']
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright 2016 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""An interface to boto-based object storage APIs."""

import logging
import time

import boto

import object_storage_interface


class BotoService(object_storage_interface.ObjectStorageServiceBase):
"""An interface to boto-based object storage APIs."""

def __init__(self, storage_schema, host_to_connect=None):
self.storage_schema = storage_schema
self.host_to_connect = host_to_connect

def _StorageURI(self, bucket, object_name=None):
"""Return a storage_uri for the given resource.
Args:
bucket: the name of a bucket.
object_name: the name of an object, if given.
Returns:
A storage_uri. If object is given, the uri will be for
the bucket-object combination. If object is not given, the uri
will be for the bucket.
"""

if object_name is not None:
path = '%s/%s' % (bucket, object_name)
else:
path = bucket
storage_uri = boto.storage_uri(path, self.storage_schema)
if self.host_to_connect is not None:
storage_uri.connect(host=self.host_to_connect)
return storage_uri

def ListObjects(self, bucket, prefix):
bucket_uri = self._StorageURI(bucket)
return [obj.name for obj in bucket_uri.list_bucket(prefix=prefix)]

def DeleteObjects(self, bucket, objects_to_delete, objects_deleted=None):
for object_name in objects_to_delete:
try:
object_uri = self._StorageURI(bucket, object_name)
object_uri.delete_key()
if objects_deleted is not None:
objects_deleted.append(object_name)
except: # pylint:disable=bare-except
logging.exception('Caught exception while deleting object %s.',
object_name)

# Not implementing WriteObjectFromBuffer because the implementation
# is different for GCS and S3.

def ReadObject(self, bucket, object_name):
start_time = time.time()
object_uri = self._StorageURI(bucket, object_name)
object_uri.new_key().get_contents_as_string()
latency = time.time() - start_time
return start_time, latency
44 changes: 13 additions & 31 deletions perfkitbenchmarker/scripts/object_storage_api_test_scripts/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,28 @@

from absl import flags

import boto3

import object_storage_interface
import boto_service

FLAGS = flags.FLAGS


class S3Service(object_storage_interface.ObjectStorageServiceBase):
"""An interface to AWS S3, using the boto library."""
class S3Service(boto_service.BotoService):
"""An interface to S3, using the boto library."""

def __init__(self):
self.client = boto3.client('s3', region_name=FLAGS.region)

def ListObjects(self, bucket, prefix):
bucket_name = FLAGS.access_point_hostname or bucket
return self.client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

def DeleteObjects(self, bucket, objects_to_delete, objects_deleted=None):
bucket_name = FLAGS.access_point_hostname or bucket
for object_name in objects_to_delete:
response = self.client.delete_object(Bucket=bucket_name, Key=object_name)
if response['ResponseMetadata']['DeleteMarker']:
if objects_deleted is not None:
objects_deleted.append(object_name)
else:
logging.exception(
'Encountered error while deleting object %s. '
'Response metadata: %s', object_name, response)
if FLAGS.host is not None:
logging.info('Will use user-specified host endpoint: %s', FLAGS.host)
super(S3Service, self).__init__('s3', host_to_connect=FLAGS.host)

def WriteObjectFromBuffer(self, bucket, object_name, stream, size):
start_time = time.time()
bucket_name = FLAGS.access_point_hostname or bucket
stream.seek(0)
self.client.upload_fileobj(stream, bucket_name, object_name)
latency = time.time() - start_time
return start_time, latency

def ReadObject(self, bucket, object_name):
start_time = time.time()
bucket_name = FLAGS.access_point_hostname or bucket
self.client.get_object(Bucket=bucket_name, Key=object_name)
object_uri = self._StorageURI(bucket, object_name)
# We need to access the raw key object so we can set its storage
# class
key = object_uri.new_key()
if FLAGS.object_storage_class is not None:
key._set_storage_class(FLAGS.object_storage_class) # pylint:disable=protected-access
key.set_contents_from_file(stream, size=size)
latency = time.time() - start_time
return start_time, latency
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,4 @@

from absl import flags

flags.DEFINE_string(
'access_point_hostname', None,
'If given, an access point hostname to use for S3 transfers in place of '
'bucket names.')
flags.DEFINE_string('region', None, 'The S3 region to use.')
flags.DEFINE_string('host', None, 'The hostname of the storage endpoint.')

0 comments on commit 4b0b941

Please sign in to comment.