Skip to content

Commit

Permalink
#4 - Feature: Add Index to existing table
Browse files Browse the repository at this point in the history
  • Loading branch information
bblommers committed Oct 28, 2019
1 parent f61c34d commit 62da8e3
Show file tree
Hide file tree
Showing 22 changed files with 766 additions and 108 deletions.
34 changes: 14 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,27 @@ This library will help you do just that.
from migrator.dynamodb_migrator import Migrator
migrator = Migrator()
@migrator.version(1)
@migrator.create(AttributeDefinitions=[{
'AttributeName': 'hash_key',
'AttributeType': 'N'
}],
TableName='my_new_table',
KeySchema=[{
'AttributeName': 'hash_key',
'KeyType': 'HASH'
}],
BillingMode='PAY_PER_REQUEST')
@migrator.create(AttributeDefinitions=[{'AttributeName': 'hash_key', 'AttributeType': 'N'}],
TableName='my_new_table',
KeySchema=[{'AttributeName': 'hash_key', 'KeyType': 'HASH'}],
BillingMode='PAY_PER_REQUEST')
def v1(created_table):
print("Table created using the kwargs provided")
print("Note that the keyword-args are passed onto boto as is")
print(created_table)


@NotYetImplemented
@migrator.version(2)
@migrator.add_index("secondary_index_we_forgot_about")
def v2(migrate):
print("About to:")
print(" - Create new table (first_table_v2) with the appropriate index")
print(" - Create DynamoDB Stream on 'first_table' that copies changes into the new table")
print(" - Execute a script that automatically updates all existing data")
print(" (This will trigger all data in 'first_table' to be copied into the new table")
migrate()
print("Table with new index is ready to use")
@migrator.add_index(AttributeDefinitions=[{'AttributeName': 'postcode', 'AttributeType': 'S'}],
LocalSecondaryIndexes=[{'IndexName': 'string',
'KeySchema': [{'AttributeName': 'customer_nr', 'KeyType': 'HASH'},
{'AttributeName': 'postcode', 'KeyType': 'RANGE'}],
'Projection': {'ProjectionType': 'ALL'}}])
def v2(created_table):
print("Created a new table with the new index")
print("Created a DynamoDB stream that sends all updates to the old table to a custom Lambda-function")
print("The custom Lambda-function sends all updates to the new table")
print(created_table)


@NotYetImplemented
Expand Down
2 changes: 1 addition & 1 deletion requirements-to-freeze.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ flake8-print==3.1.0
pytest-mock==1.10.4
pytest-pythonpath==0.7.3

moto==1.3.14.dev250
moto==1.3.14.dev466
34 changes: 19 additions & 15 deletions src/migrator/dynamodb_migrator.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import logging
import os
from functools import wraps
from migrator.exceptions.MigratorScriptException import MigratorScriptException
from migrator.steps.BaseStep import BaseStep
from migrator.steps.CreateTableStep import CreateTableStep
from migrator.steps.AddIndexStep import AddIndexStep


class Migrator():
Expand All @@ -16,17 +16,18 @@ def __init__(self, identifier = None):
self._ch.setFormatter(self._formatter)
self._logger.addHandler(self._ch)
self._logger.setLevel(logging.DEBUG)
self._functions = []
self._steps = []
self._steps.append(BaseStep())
self._version = None
self._current_identifier = identifier if identifier else os.path.basename(__file__)
self._table_created = False
BaseStep().execute()

def version(self, version_number):
def inner_function(function):
@wraps(function)
def wrapper(*args, **kwargs):
self.function(*args, **kwargs)
return wrapper
self._version = version_number

def inner_function(func):
pass
return inner_function

def create(self, **kwargs):
Expand All @@ -37,14 +38,17 @@ def create(self, **kwargs):
raise MigratorScriptException("Unable to create multiple tables per script")

def inner_function(function):
self._steps.append(CreateTableStep(identifier=self._current_identifier,
properties=kwargs,
func=function))
created_table = CreateTableStep(identifier=self._current_identifier,
version=self._version,
properties=kwargs).execute()
self._table_created = True
return function(created_table)
return inner_function

def migrate(self):
if not self._steps:
self._logger.warning("No migration-steps have been found")
for step in self._steps:
step.execute()
def add_indexes(self, **kwargs):
def inner_function(function):
created_table = AddIndexStep(identifier=self._current_identifier,
version=self._version,
properties=kwargs).execute()
return function(created_table)
return inner_function
32 changes: 32 additions & 0 deletions src/migrator/iam_roles/lambda_stream_role.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"dynamodb:PutItem",
"dynamodb:DeleteItem",
"dynamodb:UpdateItem",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:dynamodb:*:486285699788:table/testtable2",
"arn:aws:logs:us-west-2:486285699788:log-group:/aws/lambda/testfunction:*"
]
}, {
"Effect": "Allow",
"Action": [
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams",
"dynamodb:GetRecords"
],
"Resource": "*"
}, {
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:us-west-2:486285699788:*"
}
]
}
176 changes: 176 additions & 0 deletions src/migrator/steps/AddIndexStep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import io
import zipfile
from migrator.steps.Step import Step
from string import Template
from time import sleep


class AddIndexStep(Step):

_accepted_table_properties = ['AttributeDefinitions',
'TableName',
'KeySchema',
'LocalSecondaryIndexes', 'GlobalSecondaryIndexes',
'BillingMode', 'ProvisionedThroughput',
'StreamSpecification',
'SSESpecification',
'Tags']
_accepted_index_properties = ["IndexName", "KeySchema", "Projection"]

_lambda_stream_policy = Template("""{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"dynamodb:PutItem",
"dynamodb:DeleteItem",
"dynamodb:UpdateItem"
],
"Resource": "$newtable"
}, {
"Effect": "Allow",
"Action": [
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams",
"dynamodb:GetRecords"
],
"Resource": "$oldtable"
}, {
"Effect": "Allow",
"Action": "logs:*",
"Resource": "*"
}
]}""")

_lambda_stream_assume_role = """{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}"""

_lambda_code = Template("""import boto3
import json
dynamodb = boto3.client('dynamodb')
table_name = "$newtable"
def copy(event, context):
print(event)
for record in event['Records']:
if record['eventName'] == 'REMOVE':
response = dynamodb.delete_item(TableName=table_name,
Key=record['dynamodb']['Keys'])
if record['eventName'] == 'INSERT' or record['eventName'] == 'MODIFY':
response = dynamodb.put_item(TableName=table_name,
Item=record['dynamodb']['NewImage'])
return {
'statusCode': 200
}
""")

def __init__(self, identifier, version, properties):
self._identifier = identifier
self._version = version
self._properties = properties
super().__init__()

def execute(self):
self._logger.debug(f"Adding Index with properties '{self._properties}'")
# TODO: Check whether table already exists
previous_version = self._version - 1
metadata = self._get_metadata()
previous_table_name = metadata[str(previous_version)]['S']
new_table_name = f"{previous_table_name}_V{self._version}"
previous_table = self._dynamodb.describe_table(TableName=previous_table_name)['Table']
new_table = {key: previous_table[key] for key in previous_table if key in self._accepted_table_properties}
if 'LocalSecondaryIndexes' not in new_table:
new_table['LocalSecondaryIndexes'] = []
new_table['LocalSecondaryIndexes'] = [{k:props[k] for k in props if k in self._accepted_index_properties}
for props in new_table['LocalSecondaryIndexes']]
new_table['LocalSecondaryIndexes'].extend(self._properties['LocalSecondaryIndexes'])
new_table['AttributeDefinitions'].extend(self._properties['AttributeDefinitions'])
if 'BillingModeSummary' in previous_table:
new_table['BillingMode'] = previous_table['BillingModeSummary']['BillingMode']
del new_table['ProvisionedThroughput']
if 'ProvisionedThroughput' in new_table:
del new_table['ProvisionedThroughput']['NumberOfDecreasesToday']
new_table['TableName'] = new_table_name
self._logger.debug(f"Creating new table with properties: {new_table}")
# CREATE table based on old table
created_table = self._dynamodb.create_table(**new_table)['TableDescription']
status = 'CREATING'
while status != 'ACTIVE':
created_table = self._dynamodb.describe_table(TableName=new_table_name)['Table']
status = created_table['TableStatus']
sleep(1)
# Create Role
policy_document = self._lambda_stream_policy.substitute(region=self.get_region(),
oldtable=previous_table['LatestStreamArn'],
newtable=created_table['TableArn'])
desc = ' created by dynamodb_migrator, migrating data from ' + previous_table_name + ' to ' + new_table_name
created_policy = self._iam.create_policy(PolicyName='dynamodb_migrator_' + previous_table_name,
PolicyDocument=policy_document,
Description='Policy' + desc)
created_role = self._iam.create_role(RoleName='dynamodb_migrator_' + previous_table_name,
AssumeRolePolicyDocument=self._lambda_stream_assume_role,
Description='Role' + desc)
sleep(15)
self._iam.attach_role_policy(PolicyArn=created_policy['Policy']['Arn'],
RoleName=created_role['Role']['RoleName'])
sleep(10)
# Create Lambda
f = io.BytesIO()
z = zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED)
info = zipfile.ZipInfo('lambda_stream.py')
info.external_attr = 0o777 << 16 # give full access to included file
z.writestr(info, self._lambda_code.substitute(newtable=created_table['TableName']))
z.close()
zipped_lambda_code = f.getvalue()
func = self._lambda.create_function(FunctionName='dynamodb_migrator_' + previous_table_name,
Runtime='python3.7',
Role=created_role['Role']['Arn'],
Handler='lambda_stream.copy',
Code={'ZipFile': zipped_lambda_code})
# Create stream
mapping = self._lambda.create_event_source_mapping(EventSourceArn=previous_table['LatestStreamArn'],
FunctionName=func['FunctionArn'],
Enabled=True,
BatchSize=10,
MaximumBatchingWindowInSeconds=5,
StartingPosition='TRIM_HORIZON')
while mapping['State'] != 'Enabled':
mapping = self._lambda.get_event_source_mapping(UUID=mapping['UUID'])
sleep(1)
self._logger.info(f"Created stream: {mapping}")
sleep(120)
self._logger.info(f"Created table {new_table_name}")
# Update metadata table
self._dynamodb.update_item(
TableName=self._metadata_table_name,
Key={
'identifier': {'S': self._identifier}
},
UpdateExpression="set #attr = :val",
ExpressionAttributeNames={'#attr': str(self._version)},
ExpressionAttributeValues={':val': {'M': {'table': {'S': new_table_name},
'policy': {'S': created_policy['Policy']['Arn']},
'role': {'S': created_role['Role']['Arn']},
'role_name': {'S': created_role['Role']['RoleName']},
'stream': {'S': previous_table['LatestStreamArn']},
'mapping': {'S': mapping['UUID']},
'lambda': {'S': func['FunctionArn']}}}}
)
return created_table

def _get_metadata(self):
return self._dynamodb.get_item(TableName=self._metadata_table_name,
Key={'identifier': {'S': self._identifier}})['Item']
15 changes: 9 additions & 6 deletions src/migrator/steps/CreateTableStep.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,34 @@

class CreateTableStep(Step):

def __init__(self, identifier, properties, func):
def __init__(self, identifier, version, properties):
self._identifier = identifier
self._version = version
self._properties = properties
self._func = func

def execute(self):
self._logger.debug(f"Creating table '{self._properties['TableName']}'")
table_name = self._properties['TableName']
self._logger.debug(f"Creating table '{table_name}'")
if self._table_exists():
self._logger.debug(f"Table with identifier '{self._identifier}' has already been created")
created_table = self._dynamodb.describe_table(TableName=table_name)['Table']
else:
self._properties['StreamSpecification'] = {'StreamEnabled': True,
'StreamViewType': 'NEW_AND_OLD_IMAGES'}
created_table = self._dynamodb.create_table(**self._properties)
self._dynamodb.put_item(
TableName=self._metadata_table_name,
Item={
'identifier': {'S': self._identifier},
'version': {'N': "1"}
str(self._version): {'S': table_name}
})
status = 'CREATING'
while status != 'ACTIVE':
created_table = self._dynamodb.describe_table(TableName=table_name)['Table']
status = created_table['TableStatus']
sleep(1)
self._logger.info(f"Created table '{self._properties['TableName']}'")
self._func(created_table)
self._logger.info(f"Created table '{table_name}'")
return created_table

def _table_exists(self):
curr_item = self._dynamodb.get_item(TableName=self._metadata_table_name,
Expand Down
8 changes: 8 additions & 0 deletions src/migrator/steps/Step.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
class Step:
_dynamodb = boto3.client('dynamodb')
_metadata_table_name = 'dynamodb_migrator_metadata'
_lambda = boto3.client('lambda')
_iam = boto3.client('iam')
_account_id = None
_sts = boto3.client('sts')
_ch = logging.StreamHandler()
_formatter = logging.Formatter('%(asctime)s %(levelname)8s %(name)s | %(message)s')
_logger = logging.getLogger('dynamodb_migrator_library')
Expand All @@ -16,3 +20,7 @@ def __init__(self):

def execute(self):
pass

def get_region(self):
my_session = boto3.session.Session()
return my_session.region_name
File renamed without changes.
Loading

0 comments on commit 62da8e3

Please sign in to comment.