Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-14887: Respect field selection #67

Open
wants to merge 45 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
da759d3
make keys automatic
Sep 2, 2021
23a93b3
pylint resolve
Sep 2, 2021
f3de6a9
add full replication test case
Sep 2, 2021
d08a706
use transformation
Sep 2, 2021
e71aaf3
resolve pylint
Sep 2, 2021
bf15242
added code change for data extension stream
Sep 3, 2021
6c70f73
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Sep 3, 2021
f25c815
pylint resolve
Sep 3, 2021
42714f7
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Sep 3, 2021
bbf14a3
updated test case for data extension
Sep 3, 2021
af3ea3b
added comment
Sep 7, 2021
237d8b1
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Sep 7, 2021
eef1780
added comments
Sep 7, 2021
9b20b19
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Sep 7, 2021
5f179fc
added comments and optimized that condition
Sep 7, 2021
88a8630
added code change for tranformation function in base file
Sep 7, 2021
bf8b487
pylint resolve
Sep 7, 2021
2b12baf
disabled pylint error
Sep 7, 2021
e5a98cf
test: removed disable pylint code
Sep 8, 2021
fa07e1c
added comment in base file
Sep 8, 2021
b544e52
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Sep 8, 2021
c204136
updated comment
Sep 10, 2021
04ce605
updated the comment for skipping streams
Sep 13, 2021
d344f21
updated discovery test and removed full replication test
Sep 17, 2021
5290186
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Sep 17, 2021
4630c71
added verification of unique records
Sep 17, 2021
087b156
updated start date
Sep 20, 2021
612d1a8
updated the code
Sep 22, 2021
0291021
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Sep 22, 2021
e7d7e09
updated the code
Sep 22, 2021
8ab8353
added a comment explaining subscriber and list subscriber syncing
Sep 23, 2021
684f752
added comments
Oct 7, 2021
a59243d
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Oct 7, 2021
4abadef
updated comment
Oct 7, 2021
0232535
made separate files for schemas
Oct 8, 2021
ab43995
resolve pylint
Oct 8, 2021
5864acc
resolve integration test
Oct 8, 2021
2a2b20c
corrected typo
Oct 8, 2021
0418198
TDL-14621: Add retry logic to requests and TDL-14622: Retry Connectio…
hpatel41 Oct 13, 2021
a947b16
TDL-14890: Print user friendly error messages (#73)
hpatel41 Oct 13, 2021
1a7909b
TDL-14989: Check best practices (#74)
hpatel41 Oct 13, 2021
ca2ca09
TDL-14889: Keys should be marked automatic and TDL-14891: list_sends …
hpatel41 Oct 13, 2021
3cd7436
resolve merge cconflicts: crest-master branch
Oct 13, 2021
fc9fa00
updated the code
Oct 13, 2021
0824e00
resolved unittest case error
Oct 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions tap_exacttarget/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ def do_sync(args):
.format(stream_catalog.get('stream')))
continue

# The 'subscribers' stream is the child stream of 'list_subscribers'
# When we sync 'list_subscribers', it makes the list of subscriber's
# 'SubscriberKey' that were returned as part of 'list_subscribers' records
# and pass that list to 'subscribers' stream and thus 'subscribers' stream
# will only sync records of subscribers that are present in the list.
# Hence, for different start dates the 'SubscriberKey' list will differ and
# thus 'subscribers' records will also be different for different start dates.
if SubscriberDataAccessObject.matches_catalog(stream_catalog):
subscriber_selected = True
subscriber_catalog = stream_catalog
Expand Down
28 changes: 23 additions & 5 deletions tap_exacttarget/dao.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import singer
from singer import metadata
from singer import metadata, Transformer

from funcy import project

Expand Down Expand Up @@ -28,16 +28,25 @@ def generate_catalog(self):
cls = self.__class__

mdata = metadata.new()
metadata.write(mdata, (), 'inclusion', 'available')
for prop in cls.SCHEMA['properties']: # pylint:disable=unsubscriptable-object
metadata.write(mdata, ('properties', prop), 'inclusion', 'available')

# use 'get_standard_metadata' with primary key, replication key and replication method
mdata = metadata.get_standard_metadata(schema=self.SCHEMA,
key_properties=self.KEY_PROPERTIES,
valid_replication_keys=self.REPLICATION_KEYS if self.REPLICATION_KEYS else None,
replication_method=self.REPLICATION_METHOD)

mdata_map = metadata.to_map(mdata)

# make 'automatic' inclusion for replication keys
for replication_key in self.REPLICATION_KEYS:
mdata_map[('properties', replication_key)]['inclusion'] = 'automatic'

return [{
'tap_stream_id': cls.TABLE,
'stream': cls.TABLE,
'key_properties': cls.KEY_PROPERTIES,
'schema': cls.SCHEMA,
'metadata': metadata.to_list(mdata)
'metadata': metadata.to_list(mdata_map)
}]

def filter_keys_and_parse(self, obj):
Expand All @@ -52,6 +61,13 @@ def get_catalog_keys(self):
def parse_object(self, obj):
return project(obj, self.get_catalog_keys())

# a function to write records by applying transformation
@staticmethod
def write_records_with_transform(record, catalog, table):
with Transformer() as transformer:
rec = transformer.transform(record, catalog.get('schema'), metadata.to_map(catalog.get('metadata')))
singer.write_record(table, rec)

def write_schema(self):
singer.write_schema(
self.catalog.get('stream'),
Expand All @@ -78,6 +94,8 @@ def sync(self):
SCHEMA = None
TABLE = None
KEY_PROPERTIES = None
REPLICATION_KEYS = []
REPLICATION_METHOD = None

def sync_data(self): # pylint: disable=no-self-use
raise RuntimeError('sync_data is not implemented!')
6 changes: 5 additions & 1 deletion tap_exacttarget/endpoints/campaigns.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import FuelSDK
import copy
import singer

from tap_exacttarget.client import request
Expand Down Expand Up @@ -36,14 +37,17 @@ class CampaignDataAccessObject(DataAccessObject):

TABLE = 'campaign'
KEY_PROPERTIES = ['id']
REPLICATION_METHOD = 'FULL_TABLE'

def sync_data(self):
cursor = request(
'Campaign',
FuelSDK.ET_Campaign,
self.auth_stub)

catalog_copy = copy.deepcopy(self.catalog)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do a deep copy of the catalog?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior of the transform function is in such a manner (here) that when we pass catalog in transform function, it is re-ordering the data-type values for eg. before transform ["null", "string"] and after transform ["string", "null"].
So we made a copy of the catalog to save the original catalog as we are using self.catalog for writing the schema.


for campaign in cursor:
campaign = self.filter_keys_and_parse(campaign)

singer.write_records(self.__class__.TABLE, [campaign])
self.write_records_with_transform(campaign, catalog_copy, self.TABLE)
7 changes: 6 additions & 1 deletion tap_exacttarget/endpoints/content_areas.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import FuelSDK
import copy
import singer

from tap_exacttarget.client import request
Expand Down Expand Up @@ -104,6 +105,8 @@ class ContentAreaDataAccessObject(DataAccessObject):

TABLE = 'content_area'
KEY_PROPERTIES = ['ID']
REPLICATION_METHOD = 'INCREMENTAL'
REPLICATION_KEYS = ['ModifiedDate']

def sync_data(self):
table = self.__class__.TABLE
Expand All @@ -124,6 +127,8 @@ def sync_data(self):
self.auth_stub,
search_filter)

catalog_copy = copy.deepcopy(self.catalog)

for content_area in stream:
content_area = self.filter_keys_and_parse(content_area)

Expand All @@ -132,6 +137,6 @@ def sync_data(self):
'ModifiedDate',
content_area.get('ModifiedDate'))

singer.write_records(table, [content_area])
self.write_records_with_transform(content_area, catalog_copy, table)

save_state(self.state)
107 changes: 95 additions & 12 deletions tap_exacttarget/endpoints/data_extensions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import FuelSDK
import copy
import singer

from funcy import set_in, update_in, merge
Expand Down Expand Up @@ -79,34 +80,62 @@ def _get_extensions(self):
}
}
},
'metadata': [{'breadcrumb': (), 'metadata': {'inclusion':'available'}},
{'breadcrumb': ('properties', '_CustomObjectKey'),
'metadata': {'inclusion':'available'}},
{'breadcrumb': ('properties', 'CategoryID'),
'metadata': {'inclusion':'available'}}]
'metadata': [
{
'breadcrumb': (),
'metadata': {
'inclusion':'available',
'forced-replication-method': 'FULL_TABLE',
'table-key-properties': ['_CustomObjectKey'],
'valid-replication-keys': []
}
},
{
'breadcrumb': ('properties', '_CustomObjectKey'),
'metadata': {'inclusion':'automatic'}
},
{
'breadcrumb': ('properties', 'CategoryID'),
'metadata': {'inclusion':'available'}
}
]
}

return to_return

def _get_fields(self, extensions):
def _get_fields(self, extensions): # pylint: disable=too-many-branches
to_return = extensions.copy()

result = request(
'DataExtensionField',
FuelSDK.ET_DataExtension_Column,
self.auth_stub)

# iterate through all the fields and determine if it is primary key
# or replication key and update the catalog file accordingly:
# is_primary_key:
# update catalog file by appending that field in 'table-key-properties'
# is_replication_key:
# update value of 'forced-replication-method' as INCREMENTAL
# update catalog file by appending that field in 'valid-replication-keys'
# add 'AUTOMATIC' replication method for both primary and replication keys
for field in result:
is_replication_key = False
is_primary_key = False
extension_id = field.DataExtension.CustomerKey
field = sudsobj_to_dict(field)
field_name = field['Name']

if field.get('IsPrimaryKey'):
is_primary_key = True
to_return = _merge_in(
to_return,
[extension_id, 'key_properties'],
field_name)

if field_name in ['ModifiedDate', 'JoinDate']:
is_replication_key = True

field_schema = {
'type': [
'null',
Expand All @@ -120,13 +149,65 @@ def _get_fields(self, extensions):
[extension_id, 'schema', 'properties', field_name],
field_schema)

# add primary key in 'table-key-properties'
if is_primary_key:
for mdata in to_return[extension_id]['metadata']:
if not mdata.get('breadcrumb'):
mdata.get('metadata').get('table-key-properties').append(field_name)

# add replication key in 'valid-replication-keys'
# and change 'forced-replication-method' to INCREMENTAL
if is_replication_key:
for mdata in to_return[extension_id]['metadata']:
if not mdata.get('breadcrumb'):
mdata.get('metadata')['forced-replication-method'] = "INCREMENTAL"
mdata.get('metadata').get('valid-replication-keys').append(field_name)

# These fields are defaulted into the schema, do not add to metadata again.
if field_name not in {'_CustomObjectKey', 'CategoryID'}:
to_return[extension_id]['metadata'].append({
'breadcrumb': ('properties', field_name),
'metadata': {'inclusion': 'available'}
})

# if primary of replication key, then mark it as automatic
if is_primary_key or is_replication_key:
to_return[extension_id]['metadata'].append({
'breadcrumb': ('properties', field_name),
'metadata': {'inclusion': 'automatic'}
})
else:
to_return[extension_id]['metadata'].append({
'breadcrumb': ('properties', field_name),
'metadata': {'inclusion': 'available'}
})

# the structure of 'to_return' is like:
# {
# 'de1': {
# 'tap_stream_id': 'data_extension.de1',
# 'stream': 'data_extension.de1',
# 'key_properties': ['_CustomObjectKey'],
# 'schema': {
# 'type': 'object',
# 'properties': {...}
# },
# 'metadata': [...]
# },
# 'de2': {
# 'tap_stream_id': 'data_extension.de2',
# 'stream': 'data_extension.de2',
# 'key_properties': ['_CustomObjectKey'],
# 'schema': {
# 'type': 'object',
# 'properties': {...}
# },
# 'metadata': [...]
# }
# }

# loop through all the data extension catalog in 'to_return'
# and remove empty 'valid-replication-keys' present in metadata
for catalog in to_return.values():
for mdata in catalog.get('metadata'):
if not mdata.get('breadcrumb'):
if not mdata.get('metadata').get('valid-replication-keys'):
del mdata.get('metadata')['valid-replication-keys']
return to_return

def generate_catalog(self):
Expand Down Expand Up @@ -206,6 +287,8 @@ def _replicate(self, customer_key, keys,
result = request_from_cursor('DataExtensionObject', cursor,
batch_size=batch_size)

catalog_copy = copy.deepcopy(self.catalog)

for row in result:
row = self.filter_keys_and_parse(row)
row['CategoryID'] = parent_category_id
Expand All @@ -215,7 +298,7 @@ def _replicate(self, customer_key, keys,
replication_key,
row.get(replication_key))

singer.write_records(table, [row])
self.write_records_with_transform(row, catalog_copy, table)

if partial:
self.state = incorporate(self.state,
Expand Down
7 changes: 6 additions & 1 deletion tap_exacttarget/endpoints/emails.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import FuelSDK
import copy
import singer

from tap_exacttarget.client import request
Expand Down Expand Up @@ -107,6 +108,8 @@ class EmailDataAccessObject(DataAccessObject):

TABLE = 'email'
KEY_PROPERTIES = ['ID']
REPLICATION_METHOD = 'INCREMENTAL'
REPLICATION_KEYS = ['ModifiedDate']

def parse_object(self, obj):
to_return = obj.copy()
Expand Down Expand Up @@ -139,6 +142,8 @@ def sync_data(self):
self.auth_stub,
search_filter)

catalog_copy = copy.deepcopy(self.catalog)

for email in stream:
email = self.filter_keys_and_parse(email)

Expand All @@ -147,6 +152,6 @@ def sync_data(self):
'ModifiedDate',
email.get('ModifiedDate'))

singer.write_records(table, [email])
self.write_records_with_transform(email, catalog_copy, table)

save_state(self.state)
7 changes: 6 additions & 1 deletion tap_exacttarget/endpoints/events.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import FuelSDK
import copy
import singer

from tap_exacttarget.client import request
Expand Down Expand Up @@ -45,6 +46,8 @@ class EventDataAccessObject(DataAccessObject):

TABLE = 'event'
KEY_PROPERTIES = ['SendID', 'EventType', 'SubscriberKey', 'EventDate']
REPLICATION_METHOD = 'INCREMENTAL'
REPLICATION_KEYS = ['EventDate']

def sync_data(self):
table = self.__class__.TABLE
Expand Down Expand Up @@ -87,6 +90,8 @@ def sync_data(self):
self.auth_stub,
search_filter)

catalog_copy = copy.deepcopy(self.catalog)

for event in stream:
event = self.filter_keys_and_parse(event)

Expand All @@ -102,7 +107,7 @@ def sync_data(self):
event.get('EventDate')))
continue

singer.write_records(table, [event])
self.write_records_with_transform(event, catalog_copy, table)

self.state = incorporate(self.state,
event_name,
Expand Down
7 changes: 6 additions & 1 deletion tap_exacttarget/endpoints/folders.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import FuelSDK
import copy
import singer

from tap_exacttarget.client import request
Expand Down Expand Up @@ -52,6 +53,8 @@ class FolderDataAccessObject(DataAccessObject):

TABLE = 'folder'
KEY_PROPERTIES = ['ID']
REPLICATION_METHOD = 'INCREMENTAL'
REPLICATION_KEYS = ['ModifiedDate']

def parse_object(self, obj):
to_return = obj.copy()
Expand Down Expand Up @@ -80,6 +83,8 @@ def sync_data(self):
self.auth_stub,
search_filter)

catalog_copy = copy.deepcopy(self.catalog)

for folder in stream:
folder = self.filter_keys_and_parse(folder)

Expand All @@ -88,6 +93,6 @@ def sync_data(self):
'ModifiedDate',
folder.get('ModifiedDate'))

singer.write_records(table, [folder])
self.write_records_with_transform(folder, catalog_copy, table)

save_state(self.state)
Loading