Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-14887: Respect field selection #67

Open
wants to merge 45 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
da759d3
make keys automatic
Sep 2, 2021
23a93b3
pylint resolve
Sep 2, 2021
f3de6a9
add full replication test case
Sep 2, 2021
d08a706
use transformation
Sep 2, 2021
e71aaf3
resolve pylint
Sep 2, 2021
bf15242
added code change for data extension stream
Sep 3, 2021
6c70f73
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Sep 3, 2021
f25c815
pylint resolve
Sep 3, 2021
42714f7
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Sep 3, 2021
bbf14a3
updated test case for data extension
Sep 3, 2021
af3ea3b
added comment
Sep 7, 2021
237d8b1
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Sep 7, 2021
eef1780
added comments
Sep 7, 2021
9b20b19
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Sep 7, 2021
5f179fc
added comments and optimized that condition
Sep 7, 2021
88a8630
added code change for tranformation function in base file
Sep 7, 2021
bf8b487
pylint resolve
Sep 7, 2021
2b12baf
disabled pylint error
Sep 7, 2021
e5a98cf
test: removed disable pylint code
Sep 8, 2021
fa07e1c
added comment in base file
Sep 8, 2021
b544e52
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Sep 8, 2021
c204136
updated comment
Sep 10, 2021
04ce605
updated the comment for skipping streams
Sep 13, 2021
d344f21
updated discovery test and removed full replication test
Sep 17, 2021
5290186
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Sep 17, 2021
4630c71
added verification of unique records
Sep 17, 2021
087b156
updated start date
Sep 20, 2021
612d1a8
updated the code
Sep 22, 2021
0291021
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Sep 22, 2021
e7d7e09
updated the code
Sep 22, 2021
8ab8353
added a comment explaining subscriber and list subscriber syncing
Sep 23, 2021
684f752
added comments
Oct 7, 2021
a59243d
Merge branch 'automatic-fields' of https://github.com/singer-io/tap-e…
Oct 7, 2021
4abadef
updated comment
Oct 7, 2021
0232535
made separate files for schemas
Oct 8, 2021
ab43995
resolve pylint
Oct 8, 2021
5864acc
resolve integration test
Oct 8, 2021
2a2b20c
corrected typo
Oct 8, 2021
0418198
TDL-14621: Add retry logic to requests and TDL-14622: Retry Connectio…
hpatel41 Oct 13, 2021
a947b16
TDL-14890: Print user friendly error messages (#73)
hpatel41 Oct 13, 2021
1a7909b
TDL-14989: Check best practices (#74)
hpatel41 Oct 13, 2021
ca2ca09
TDL-14889: Keys should be marked automatic and TDL-14891: list_sends …
hpatel41 Oct 13, 2021
3cd7436
resolve merge cconflicts: crest-master branch
Oct 13, 2021
fc9fa00
updated the code
Oct 13, 2021
0824e00
resolved unittest case error
Oct 13, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2
jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester
steps:
- checkout
- run:
Expand All @@ -11,7 +11,7 @@ jobs:
python3 -mvenv /usr/local/share/virtualenvs/tap-exacttarget
source /usr/local/share/virtualenvs/tap-exacttarget/bin/activate
pip install -U 'pip<19.2' 'setuptools<51.0.0'
pip install .[dev]
pip install .[test]
- run:
name: 'unittest'
command: |
Expand All @@ -26,16 +26,10 @@ jobs:
- run:
name: 'Integration Tests'
command: |
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox dev_env.sh
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh
source dev_env.sh
source /usr/local/share/virtualenvs/tap-tester/bin/activate
run-test --tap=tap-exacttarget \
--target=target-stitch \
--orchestrator=stitch-orchestrator \
[email protected] \
--password=$SANDBOX_PASSWORD \
--client-id=50 \
tests
run-test --tap=tap-exacttarget tests
workflows:
version: 2
commit:
Expand Down
17 changes: 11 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,27 @@
py_modules=['tap_exacttarget'],
install_requires=[
'funcy==1.9.1',
'singer-python==5.9.0',
'singer-python==5.12.1',
'python-dateutil==2.6.0',
'voluptuous==0.10.5',
'Salesforce-FuelSDK==1.3.0'
],
extras_require={
'dev': [
'ipdb==0.11',
'pylint==2.1.1',
'astroid==2.1.0',
'test': [
'pylint==2.10.2',
'astroid==2.7.3',
'nose'
],
'dev': [
'ipdb==0.11'
]
},
entry_points='''
[console_scripts]
tap-exacttarget=tap_exacttarget:main
''',
packages=find_packages()
packages=find_packages(),
package_data={
'tap_exacttarget': ['schemas/*.json']
}
)
15 changes: 12 additions & 3 deletions tap_exacttarget/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import argparse
import json

import sys

import singer
from singer import utils
from singer import metadata
Expand Down Expand Up @@ -102,6 +104,13 @@ def do_sync(args):
.format(stream_catalog.get('stream')))
continue

# The 'subscribers' stream is the child stream of 'list_subscribers'
# When we sync 'list_subscribers', it makes the list of subscriber's
# 'SubscriberKey' that were returned as part of 'list_subscribers' records
# and pass that list to 'subscribers' stream and thus 'subscribers' stream
# will only sync records of subscribers that are present in the list.
# Hence, for different start dates the 'SubscriberKey' list will differ and
# thus 'subscribers' records will also be different for different start dates.
if SubscriberDataAccessObject.matches_catalog(stream_catalog):
subscriber_selected = True
subscriber_catalog = stream_catalog
Expand All @@ -123,7 +132,7 @@ def do_sync(args):
LOGGER.fatal('Cannot replicate `subscriber` without '
'`list_subscriber`. Please select `list_subscriber` '
'and try again.')
exit(1)
sys.exit(1)

for stream_accessor in stream_accessors:
if isinstance(stream_accessor, ListSubscriberDataAccessObject) and \
Expand Down Expand Up @@ -161,10 +170,10 @@ def main():

if success:
LOGGER.info("Completed successfully, exiting.")
exit(0)
sys.exit(0)
else:
LOGGER.info("Run failed, exiting.")
exit(1)
sys.exit(1)

if __name__ == '__main__':
main()
6 changes: 4 additions & 2 deletions tap_exacttarget/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def get_auth_stub(config):
LOGGER.info('Failed to auth using V1 endpoint')
if not config.get('tenant_subdomain'):
LOGGER.warning('No tenant_subdomain found, will not attempt to auth with V2 endpoint')
raise e
message = f"{str(e)}. Please check your \'client_id\', \'client_secret\' or try adding the \'tenant_subdomain\'."
raise Exception(message) from None

# Next try V2
# Move to OAuth2: https://help.salesforce.com/articleView?id=mc_rn_january_2019_platform_ip_remove_legacy_package_create_ability.htm&type=5
Expand All @@ -77,7 +78,8 @@ def get_auth_stub(config):
transport=transport)
except Exception as e:
LOGGER.info('Failed to auth using V2 endpoint')
raise e
message = f"{str(e)}. Please check your \'client_id\', \'client_secret\' or \'tenant_subdomain\'."
raise Exception(message) from None

LOGGER.info("Success.")
return auth_stub
Expand Down
67 changes: 60 additions & 7 deletions tap_exacttarget/dao.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import backoff
import socket
import functools
import singer
from singer import metadata
import os
from singer import metadata, Transformer, utils

from funcy import project

Expand All @@ -11,6 +15,34 @@
def _get_catalog_schema(catalog):
return catalog.get('schema', {}).get('properties')

def get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)

# function to load the fields in the 'definitions' which contains the reference fields
def load_schema_references():
shared_schema_path = get_abs_path('schemas/definitions.json')

refs = {}
# load json from the path
refs["definitions.json"] = utils.load_json(shared_schema_path)

return refs

# function to load schema from json file
def load_schema(stream):
path = get_abs_path('schemas/{}s.json'.format(stream))
# load json from the path
schema = utils.load_json(path)

return schema

# decorator for retrying on error
def exacttarget_error_handling(fnc):
@backoff.on_exception(backoff.expo, (socket.timeout, ConnectionError), max_tries=5, factor=2)
@functools.wraps(fnc)
def wrapper(*args, **kwargs):
return fnc(*args, **kwargs)
return wrapper

class DataAccessObject():

Expand All @@ -27,17 +59,30 @@ def matches_catalog(cls, catalog):
def generate_catalog(self):
cls = self.__class__

# get the reference schemas
refs = load_schema_references()
# resolve the schema reference and make final schema
schema = singer.resolve_schema_references(load_schema(cls.TABLE), refs)
mdata = metadata.new()
metadata.write(mdata, (), 'inclusion', 'available')
for prop in cls.SCHEMA['properties']: # pylint:disable=unsubscriptable-object
metadata.write(mdata, ('properties', prop), 'inclusion', 'available')

# use 'get_standard_metadata' with primary key, replication key and replication method
mdata = metadata.get_standard_metadata(schema=schema,
key_properties=self.KEY_PROPERTIES,
valid_replication_keys=self.REPLICATION_KEYS if self.REPLICATION_KEYS else None,
replication_method=self.REPLICATION_METHOD)

mdata_map = metadata.to_map(mdata)

# make 'automatic' inclusion for replication keys
for replication_key in self.REPLICATION_KEYS:
mdata_map[('properties', replication_key)]['inclusion'] = 'automatic'

return [{
'tap_stream_id': cls.TABLE,
'stream': cls.TABLE,
'key_properties': cls.KEY_PROPERTIES,
'schema': cls.SCHEMA,
'metadata': metadata.to_list(mdata)
'schema': schema,
'metadata': metadata.to_list(mdata_map)
}]

def filter_keys_and_parse(self, obj):
Expand All @@ -52,6 +97,13 @@ def get_catalog_keys(self):
def parse_object(self, obj):
return project(obj, self.get_catalog_keys())

# a function to write records by applying transformation
@staticmethod
def write_records_with_transform(record, catalog, table):
with Transformer() as transformer:
rec = transformer.transform(record, catalog.get('schema'), metadata.to_map(catalog.get('metadata')))
singer.write_record(table, rec)

def write_schema(self):
singer.write_schema(
self.catalog.get('stream'),
Expand All @@ -75,9 +127,10 @@ def sync(self):

# OVERRIDE THESE TO IMPLEMENT A NEW DAO:

SCHEMA = None
TABLE = None
KEY_PROPERTIES = None
REPLICATION_KEYS = []
REPLICATION_METHOD = None

def sync_data(self): # pylint: disable=no-self-use
raise RuntimeError('sync_data is not implemented!')
34 changes: 7 additions & 27 deletions tap_exacttarget/endpoints/campaigns.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,29 @@
import FuelSDK
import copy
import singer

from tap_exacttarget.client import request
from tap_exacttarget.dao import DataAccessObject
from tap_exacttarget.schemas import with_properties
Copy link
Contributor Author

@hpatel41 hpatel41 Oct 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed unused import as the schema is removed.
Similarly for other files too.

from tap_exacttarget.dao import (DataAccessObject, exacttarget_error_handling)

LOGGER = singer.get_logger()


class CampaignDataAccessObject(DataAccessObject):

SCHEMA = with_properties({
Copy link
Contributor Author

@hpatel41 hpatel41 Oct 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed schema.
Similarly for other files too.

'id': {
'type': ['null', 'string'],
},
'createdDate': {
'type': ['null', 'string'],
},
'modifiedDate': {
'type': ['null', 'string'],
},
'name': {
'type': ['null', 'string'],
},
'description': {
'type': ['null', 'string'],
},
'campaignCode': {
'type': ['null', 'string'],
},
'color': {
'type': ['null', 'string'],
}
})

TABLE = 'campaign'
KEY_PROPERTIES = ['id']
REPLICATION_METHOD = 'FULL_TABLE'

@exacttarget_error_handling
def sync_data(self):
cursor = request(
'Campaign',
FuelSDK.ET_Campaign,
self.auth_stub)

catalog_copy = copy.deepcopy(self.catalog)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do a deep copy of the catalog?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior of the transform function is in such a manner (here) that when we pass catalog in transform function, it is re-ordering the data-type values for eg. before transform ["null", "string"] and after transform ["string", "null"].
So we made a copy of the catalog to save the original catalog as we are using self.catalog for writing the schema.


for campaign in cursor:
campaign = self.filter_keys_and_parse(campaign)

singer.write_records(self.__class__.TABLE, [campaign])
self.write_records_with_transform(campaign, catalog_copy, self.TABLE)
Loading