diff --git a/machine-learning-box/multi-touch-attribution/LICENSE b/machine-learning-box/multi-touch-attribution/LICENSE deleted file mode 100644 index 532931d6..00000000 --- a/machine-learning-box/multi-touch-attribution/LICENSE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "{}" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright 2020 Arm Research - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/machine-learning-box/multi-touch-attribution/README.md b/machine-learning-box/multi-touch-attribution/README.md deleted file mode 100644 index 5f082997..00000000 --- a/machine-learning-box/multi-touch-attribution/README.md +++ /dev/null @@ -1,124 +0,0 @@ -Data-Driven Multi-Touch Attribution -=== - -This Box provides a data-driven, machine learning-based templatized solution for **[Multi-Touch Attribution](https://en.wikipedia.org/wiki/Attribution_(marketing))** (MTA), running on your pageview data stored in Treasure Data. - -Unlike traditional rule-based MTA solutions such as first-touch and last-touch model, our template takes an advanced machine learning-based approach to accurately model customer's path to conversion and understand better about how/why marketing touchpoints bring your customer to the goal. Eventually, the insights enable you to effectively and efficiently optimize the marketing campaigns with optimal budget allocation. - -The implementation is based on a [state-of-the-art academic paper](https://arxiv.org/abs/1902.00215) employing a Deep Learning technique, and one of the key concepts used in the technique is called the [Shapley value](https://en.wikipedia.org/wiki/Shapley_value) calculation. The overall performance of this template has been proven on the Treasure Data platform with some of our real datasets. - -The template ultimately generates deeper insights about conversion histories and allows you to build the following dashboard, for example: - -![dashboard](./docs/images/dashboard.png) - -## Input - -Assume we have a following [`touchpoints`](https://gist.github.com/takuti/c890cdcbae7946f21a0afc3a4d88ec9f) table that collects user behaviors and conversion events with their sources (i.e., marketing channels): - -| `time` | `user_id` | `source` | `conversion` | -|:---:|:---:|:---:|:---:| -| 1596012307 | yl38g61s2x | sfmc | 0 | -| 1596012340 | d4dbvpwcyj | instagram | 0 | -| 1596012427 | egeaf1po46 | facebook | 0 | -| 1596012553 | gls9vyk2de | google | 1 | -| 1596012645 | ps6cc25f24 | instagram | 0 | -| ... | ... | ...| ... | - - -If you have a `pageviews` table collected by [td-js-sdk](https://github.com/treasure-data/td-js-sdk), preprocess the records in advance and parse `td_url` & `td_referrer` for extracting source channel of every single touchpoint. A query snippet below is an example of how to extract a variety of sources from the `pageviews` data: - -```sql -select - ${time_col}, - ${unique_id}, - ( - CASE - WHEN url_extract_parameter(td_url, 'utm_source') IS NULL THEN ( - CASE - WHEN regexp_like(td_referrer, 'google.co') THEN 'google' - WHEN regexp_like(td_referrer, 'instagram') THEN 'instagram' - WHEN regexp_like(td_referrer, 'facebook') THEN 'facebook' - WHEN regexp_like(td_referrer, 'youtube') THEN 'youtube' - WHEN regexp_like(td_referrer, 'twitter') THEN 'twitter' - WHEN regexp_like(td_referrer, 'linkedin') THEN 'linkedin' - -- ... (as many possible sources as you want) - ELSE 'direct & others' - END - ) - ELSE url_extract_parameter(td_url, 'utm_source') - END - ) as ${source_col} , - -- ... (assign 0 or 1 depending on your definition of "conversion") - as ${conversion_col} -from - pageviews -where - TD_INTERVAL(${time_col}, '-30d') -- last 30 days, for example -``` - -## Workflow - -```sh -# Push workflow to TD -$ td wf push mta - -# Set secrets from STDIN like: td.apikey=1/xxxxx, td.apiserver=https://api.treasuredata.com -$ td workflow secrets \ - --project mta \ - --set td.apikey \ - --set td.apiserver - -$ td wf start mta mta_shapley --session now -``` - -By default, the workflow automatically imports a dummy dataset to `mta_sample.touchpoints`. Edit [`config/params.yml`](./config/params.yml) if you use your own dataset. - -Meanwhile, [`config/model.json`](./config/model.json) enables you to further customize the model. See [`docs/more.md`](./docs/more.md) for more information about the advanced config parameters. - -## Output - -Eventually, three tables are derived as a result of successful workflow execution. - -### Table: `metrics` - -| `loss` | `rmse` | `val_loss` | `val_rmse` | -|:---|:---|:---|:---| -|0.49751710891723633|0.4046436846256256|0.6924436688423157|0.499650239944458| -|0.42745542526245117|0.37608763575553894|0.6913946270942688|0.4991241991519928| -|0.3927740454673767|0.3654404878616333|0.6890262365341187|0.4979383051395416| - -The table contains evaluation metrics obtained from the training and validation process. A single row corresponds to a single epoch, and `loss` / `rmse` and `val_loss` / `val_rmse` are respectively represent the values for training and validation. - -Check if `val_loss` or `val_rmse` stop decreasing and start getting bigger; next time, you might want to stop the model training at a specific epoch. You also want to make sure there is not a huge difference between training and validation metrics, because such a gap might point to model overfitting. - -### Table: `shapley` - -| `instagram` | `google` | `sfmc` | `facebook` | `direct` | `days_before_conversion` | -|:---|:---|:---|:---|:---|:---| -|-0.0010403504129499197|-0.004529354628175497|-0.0004913342418149114|-0.004142973572015762|-0.0002199627342633903|2| -|-0.0007449646363966167|-0.006828240118920803|-0.000607220979873091|-0.00694135669618845|-0.0003867909254040569|1| -|0.18350449204444885|0.34311071038246155|0.08801353722810745|0.33066612482070923|0.08063764125108719|0| - -This is the main table we've all been waiting for. The number of rows is equal to the number of lookback days that we chose in [`config/params.yml`](./config/params.yml). - -In a column called `days_before_conversion`, `0` indicates, for example, the effect of each channel on the conversion events when users are exposed to a channel in less than 24 hours before their conversion. Going up the day's index, you will see how Shapley values and attribution percentages change at farther touchpoints from the final conversion event. - -By visualizing the values as follows, this table gives insights into how different channels perform throughout the customer journey to conversion and which ones are more effective as a first-touch vs. last-touch on that journey. - -![shapley](./docs/images/shapley_by_day.png) - -### Table: `shapley_channel` - -| `instagram` | `google` | `sfmc` | `facebook` | `direct` | -|:---|:---|:---|:---|:---| -|0.18171918392181396|0.33175310492515564|0.08691497892141342|0.3195818066596985|0.0800308883190155| - -This table has the aggregate Shapley values summed across all days of the lookback window. These values can be considered as the total attribution percentage that each channel should be assigned towards conversions. - -## How this workflow works - -For further reading for algorithm and workflow details, refer [`docs/more.md`](./docs/more.md). - -## Want to learn more & try on your data? - -The model is fully customizable for your data depending on your own definition of conversion. Contact your Customer Success Representative if you are interested in building and testing the advanced MTA solution. \ No newline at end of file diff --git a/machine-learning-box/multi-touch-attribution/config/model.json b/machine-learning-box/multi-touch-attribution/config/model.json deleted file mode 100644 index 3ac2eb96..00000000 --- a/machine-learning-box/multi-touch-attribution/config/model.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "metrics_table": "metrics", - "shapley_table": "shapley", - "if_exists": "overwrite", - "time_column": "time", - "action_column": "source", - "conversion_column": "conversion", - "users_per_tfrecord": 10000, - "lookback_window_days": 3, - "dropout_rate": 0.5, - "model_width": 128, - "train_epochs": 3, - "train_batch_size": 100, - "validation_batch_size": 1000, - "validation_fraction": 0.1, - "shuffle_buffer_size": 10000 -} \ No newline at end of file diff --git a/machine-learning-box/multi-touch-attribution/config/params.yml b/machine-learning-box/multi-touch-attribution/config/params.yml deleted file mode 100644 index a3bcbd8c..00000000 --- a/machine-learning-box/multi-touch-attribution/config/params.yml +++ /dev/null @@ -1,15 +0,0 @@ -# Name of database where model will read and write data to. -in_db: mta_sample - -# Enter name of a touchpoints table. `queries/shuffle.sql` enriches the table -# with a rand attribute specified by `rnd_column` below. -in_table: touchpoints - -# This points to the column that contains unique user IDs, most commonly -# 'canonical_id' since this is typically the output of our ID unification -# algorithm. -unique_id: user_id - -# Name of rand digit attribute for random -# shuffling. -rnd_column: ab_rnd diff --git a/machine-learning-box/multi-touch-attribution/docs/images/dashboard.png b/machine-learning-box/multi-touch-attribution/docs/images/dashboard.png deleted file mode 100644 index 3fb5e5c2..00000000 Binary files a/machine-learning-box/multi-touch-attribution/docs/images/dashboard.png and /dev/null differ diff --git a/machine-learning-box/multi-touch-attribution/docs/images/shapley_by_day.png b/machine-learning-box/multi-touch-attribution/docs/images/shapley_by_day.png deleted file mode 100644 index 1181c39b..00000000 Binary files a/machine-learning-box/multi-touch-attribution/docs/images/shapley_by_day.png and /dev/null differ diff --git a/machine-learning-box/multi-touch-attribution/docs/more.md b/machine-learning-box/multi-touch-attribution/docs/more.md deleted file mode 100644 index 01f79a98..00000000 --- a/machine-learning-box/multi-touch-attribution/docs/more.md +++ /dev/null @@ -1,115 +0,0 @@ -This document focuses on how to execute the SQL functions of the current workflow inside Treasure Data (TD). For more details on configuring the Python model and its hyper-parameters, refer to [`td_mta/README.md`](../td_mta/README.md). - -## What data will you need for building this model? - -Any table in CDP that tracks historical touchpoints of users engaging with the client brand/content can be used. The touchpoints correspond to marketing channels, sources, or campaigns, which can be both digital and physical, that the user was exposed to during their journey to conversion. - -To model **Multi-Touch Attribution** (MTA), you need to define which touchpoints count as conversion events in the data. See a sample schema of an input table below: - -| `time` | `user_id` | `channel` | `source` | `conversion` | -|:-----:|:--------:|:-------:|:--------:|:----------:| -| 14563 | GhY5Q3 | Direct | organic | 0 | -| 56412 | Hu7YYh | Social | facebook | 0 | -| 16788 | Yu90G3 | Email | SFMC | 1 | - -This table is often created by using SQL code to parse and join multiple user-activity tables together, which could be a combination of both digital and physical touchpoints, depending on available datasets stored in the CDP and marketing campaigns you are actively running. More concretely, to meet your use cases and business rules, the preprocessing queries need to properly extract `utm_source`, `utm_medium`, and channel parameters along with defining conversion events. - -## How to configure the Python model - -The workflow template reads some of the basic configurations from [`config/params.yml`](../config/params.yml) and [`config/model.json`](../config/model.json), and dynamically passes the parameters to [`td_mta/config.py`](../td_mta/config.py) that Python scripts eventually load for running machine learning operations. - -See more details below: - -```python -db: str = 'DATABASE NAME' # name of the database where the model will read and write data to. -table: str = 'INPUT TABLE' # enter name of the final touchpoints union_table with ab_rnd attribute created by the SQL Query `queries/union_ab_rand.sql` -metrics_table: str = 'MODEL METRICS' # this will be the name of the table that the model will write to TD with LSTM performance metrics such as RMSE and LogLoss. -shapley_table: str = 'SHAPLEY VALUES' # this will be the name of the table with the final Shapley values, broken down by each day of the customer journey. Note***Aggregate final Shapley values across the full journey are output in a separate table that has the same name as shapley_table but with a "_channel" prefix at the end. -if_exists: str = 'overwrite' # this can overwrite metrics and Shapley values table after each new run. If you prefer to preserve old values and just append new values, then change the parameter to 'append'. -user_column: str = 'canonical_id' # this points to the column that contains unique user IDs, most commonly 'canonical_id' since this is typically the output of our ID unification algorithm. -time_column: str = 'time' # name of the column that contains timestamp for each touchpoint. -action_column: str = 'channels' # this tells the LSTM model to use that column as the categorical features when predicting probabilities of conversion. In other words - this is the column that you are trying to get the final shapely values for. Other columns that can be used here are 'source', 'channel_source', 'campaign', depending on what the marketing team wants to measure. -conversion_column: str = 'conversion' # name of the column that marks conversion events -user_rnd_column: str = 'ab_rnd' # name of rand digit attribute for random shuffling -users_per_tfrecord: int = 20000 # this is how many unique users will be saved in each individual TensorFlow record. The rule of thumb is to try to get about 2-3% of users per record, so if you have a table with 1M users, then you can make this number 40,000, which will draw from the dataset 25 times with random shuffling to create separate TensorFlow records. -lookback_window_days: int = 3 # this is the lookback window you want to set before the model starts to define how many days from conversion events do you want to track back to define what marketing touchpoints were part of the user journey. -positive_tfrecords_dir: str = os.path.join(data_dir, 'positive_tfrecords') # no need to change this, it just tells the model to store TF records in the current project directory. -negative_tfrecords_dir: str = os.path.join(data_dir, 'negative_tfrecords') # no need to change this, it just tells the model to store TF records in the current project directory. -downsample_epoch_fraction: int = 0.5 # each epoch uses a randomly subsampled fraction of all the training data, but it's a different fraction every epoch. This will reduce the training time, but will not reduce the Shapley value calculation time. - -model_dir: str = os.path.join(data_dir, 'model') # no need to change this - -# Model hyper-parameters -dropout_rate: float = 0.5 # regularization metric for LSTM models aimed at reducing overfitting and improving model performance. Historically, dropout rate values between 0.4 and 0.6 have proven to be the preferred for a variety of LSTM models and hidden layers. -model_width: int = 128 # means we're using 128th-dimensional vectors for the linear transformations of Neural Network layers. 128x128 matrices are used. - -# Training hyper-parameters -train_epochs: int = 3 # how many epochs you want the LSTM to run. Note that for very large datasets a single epoch can take more than 5hrs, so it is advisable to keep that number small at first and increase as you see fit. Typically, you want to watch how val_loss and val_rmse change with each epoch and stop at the epoch after which val_loss stops decreasing. -train_batch_size: int = 1000 # how many samples go in each training batch. The larger this number, the faster each epoch will run, but this might have a slightly negative effect on performance. It's recommended testing different variations and finding a good balance between run speed and model performance. -validation_batch_size: int = 10000 # same rules apply as train_batch size, except you always want your validation batch size 10x or so larger than train_batch since training has been done at this point and we're only using the validation dataset at the end to estimate value metrics. -validation_fraction: float = 0.10 # decides what fraction of the total data you will hold-out for validation and how many records will go into the model training. We recommend 10% as the default metric, but other fractions can be tested if needed. -shuffle_buffer_size: int = 10000 # not very important since we are already doing random shuffling in the beginning during TF record random sampling. -``` - -You technically do not need to change anything in [`td_mta/`](../td_mta/), as the code has been tested and optimized multiple times by TD. However, if your team wants to inspect the code and try to tweak different things, then contact your Customer Success Representative and we will be happy to provide more detailed technical documentation. - -## How to set up the Python Custom Scripting execution - -The final task of the sample workflow below instantiates a temporal Python execution environment in TD. - -```yaml -+execute_python_code: - docker: - image: "digdag/digdag-python:3.9" - py>: py_scripts.main.run - # ... -``` - -The important syntax here is the notation `py_scripts.main.run`; after the `py>` operator tells workflow to look at the `py_scripts` folder, the workflow task finds `main.py` and execute the `run()` function defined in the script. This is where the full Python code for the Deep Learning/LSTM-based Shapely values model is executed. - -It is also important to copy your Master API Key from your TD account and create a `secret` in the workflow called `td.apikey`, where you can paste the API Key to keep it safe. That secret is then interpreted as an environmental variable `TD_API_KEY`, which is needed by the Python code to read and write data to TD. - -## What to do if Workflow returns ERROR because of reaching its max limit capacity - -Depending on a tier your TD contract determines, the `py>` tasks have a memory limit of 8 or 30GB and 1 or 4 virtual CPUs (not GPUs), which might result in an error if we attempt to run the model on a very large volume of data. - -In our experiment, the model ran successfully with the larger tier on 20M rows of data and 2 epochs, which took about 10-12 hours. However, when attempting on 30M rows of data and 5 epochs, the model took over 24 hours to run, which caused TD to automatically kill the Workflow; TD Workflow has a 24-hour limit per single process. - -Hence, note that on large volumes of data and running model with higher matrix dimensionality (e.g., longer lookback periods, more distinct channels in `channel_col`), it will take about 5-6 hours per training epoch and another 2-4 hours for TensorFlow record creation, random shuffling, and Shapely Value calculations. Thus, you should expect a model to take 24+ hours to complete. In those cases, we advise running only the SQL portion of the workflow first, and download the Python code and run locally or inside an EC2 instance. More instructions on how to do that are described as follows and can also be found in [`td_mta/README.md`](../td_mta/README.md). - -## How to execute Python code locally, independent from workflow functions - -As mentioned earlier, if you hit a memory limit error in TD Workflow, you might want to run the model on your local machine or possibly in an EC2 instance with multiple GPUs available. - -1. Run SQL code in TD workflow - - You need to comment out the `py>` part of the workflow and then run the workflow to only execute the initial SQL data transformation steps and output the final table of touchpoints and conversions. -2. Download the workflow locally via [TD Toolbelt CLI](https://toolbelt.treasuredata.com/) - - Or, just extract the [`td_mta`](../td_mta/) folder from the original project file. The folder contains a `README.md` file that explains the steps you need to take to install the `requirements.txt` file and configure the `config.py` file. -3. Follow the steps shared earlier in the doc on how to configure the `config.py` file with the proper database, table names, and hyper-parameters before you execute the `main.py`. -4. In CLI, navigate to the `td_mta` project folder directory and first, make sure you set your `TD_API_KEY`: - ``` - $ export TD_API_KEY='PASTE MASTER API KEY HERE' - ``` -5. Requires Python 3.7 or higher. Install requirements: - ``` - $ pip install -r requirements.txt - ``` -6. Lastly, execute the command below in CLI, to run the `main.py` file inside the `td_mta` folder: - ``` - $ python3 -m td_mta.main - ``` - This will trigger the `main.py` file in the `td_mta` folder, and you will be able to see the log messages the Python code prints in the CLI. - -The model could take in a rage from 2 to 24+ hours, depending on the volume of data that is being fed and the number of training epochs in the `config.py` file (default = 3). - -## How to interpret the model output and use its insights to help marketing teams understand the effectiveness of different advertising channels and plan marketing budgets more efficiently - -Typically, it is normal to see Shapley values decreasing with time. However, some channels may retain decent Shapely ratios even when further away from conversion, which will indicate that they are more influential than other channels and better at initiating user interest and possible journeys to conversion. That is, such channels can justify higher marketing budgets and more aggressive spending, especially during direct response-focused campaigns with high Return On Investment (ROI) / Cost Per Acquisition (CPA) goals. - -> **NOTE**: -> -> In a perfect world, you would not expect to see negative Shapley values, but in reality, we often do. Why? The reason is some marketing channels have a proclivity to bring in a lot of unqualified traffic. For example, it is common for display ad click-throughs to be accidental, which means that if I see a visitor to my site from a display ad click-through, we can usually predict that visitors will not convert. The Shapley values pick up on that fact and sometimes exhibit negative values in our matrix as a result. -> -> Other channels might only have slightly positive values one or two days close to conversion and then get close to 0 or negative Shapley values after that, which means that they don't seem to have a long-lasting effect on user behavior and are most effective as a final push when the user is further down the customer journey funnel and closer to the final purchase decision. Such channels would not be justified constant heavy spending of marketing budget and might be more effective at a campaign set up of less daily impressions on unknown or new users, but higher impression frequency when the user has shown a higher intent to purchase such as actively browsed the product page or maybe added something to cart more recently. - -Another effective use of the Shapley attribution ratios is to combine with specific campaign data such as impression counts and Cost Per Mille (CPM) per channel. For instance, knowing the total number of conversions that occur during the time interval of the data that the MTA model was trained on, we can multiply the Shapely ratio for each channel time total conversion count to estimate how many conversion events get attributed to each channel. From there we can add data for our marketing budget spend per channel during that same period the MTA model was trained on. Knowing total spend and number of conversions, we can now calculate CPA for each channel and start understanding where we can shift budgets to optimize ad spend and improve marketing KPIs such as ROMS. diff --git a/machine-learning-box/multi-touch-attribution/mta_shapley.dig b/machine-learning-box/multi-touch-attribution/mta_shapley.dig deleted file mode 100644 index ca810a11..00000000 --- a/machine-learning-box/multi-touch-attribution/mta_shapley.dig +++ /dev/null @@ -1,41 +0,0 @@ -timezone: UTC - -_export: - !include : config/params.yml - td: - engine: presto - database: ${in_db} - -+data: - py>: py_scripts.data.import_sample - database: ${td.database} - table: ${in_table} - docker: - image: 'digdag/digdag-python:3.9' - _env: - TD_API_KEY: ${secret:td.apikey} - TD_API_SERVER: ${secret:td.apiserver} - -# Add rand attribute for Random shuffling of TensorFlow records -+shuffle_random: - td>: queries/shuffle.sql - create_table: ${in_table}_shuffled - engine: hive - engine_version: stable - -##---------------------------- -##---------------------------- -##PLEASE COMMENT CODE BELOW OUT IF DATA VOLUME IS LARGE AND YOU PLAN TO RUN MODEL LOCALLY! -##------------------------------ -##------------------------------ - -+execute_python_code: - docker: - image: "digdag/digdag-python:3.9" - py>: py_scripts.main.run - db: ${in_db} - table: ${in_table}_shuffled - user_column: ${unique_id} - user_rnd_column: ${rnd_column} - _env: - TD_API_KEY: ${secret:td.apikey} diff --git a/machine-learning-box/multi-touch-attribution/py_scripts/data.py b/machine-learning-box/multi-touch-attribution/py_scripts/data.py deleted file mode 100644 index cb8a28fd..00000000 --- a/machine-learning-box/multi-touch-attribution/py_scripts/data.py +++ /dev/null @@ -1,26 +0,0 @@ -import os -import sys -import pandas as pd - -os.system(f"{sys.executable} -m pip install -U pytd==1.0.0") - - -CSV_URL = 'https://gist.githubusercontent.com/takuti/c890cdcbae7946f21a0afc3a4d88ec9f/raw/8dae87e0ba4a258581f3be7ff52d16099eeceadd/touchpoints.csv' - - -def import_sample(database, table): - import pytd - - apikey = os.environ['TD_API_KEY'] - apiserver = os.environ['TD_API_SERVER'] - client = pytd.Client(database=database, apikey=apikey, endpoint=apiserver) - - if client.exists(database, table): - print('Target database and tables exists. Skip') - return True - - df = pd.read_csv(CSV_URL) - - print('Upload sample data to Treasure Data') - client.create_database_if_not_exists(database) - client.load_table_from_dataframe(df, table, if_exists="overwrite") diff --git a/machine-learning-box/multi-touch-attribution/py_scripts/main.py b/machine-learning-box/multi-touch-attribution/py_scripts/main.py deleted file mode 100644 index d120f97f..00000000 --- a/machine-learning-box/multi-touch-attribution/py_scripts/main.py +++ /dev/null @@ -1,46 +0,0 @@ -import json -import os -import sys - -os.system(f"{sys.executable} -m pip install -r requirements.txt") - -from td_mta.config import Config -from td_mta.parse_and_save_tfrecords import parse_and_save_tfrecords -from td_mta.td_connector import TDConnector -from td_mta.train_model_from_tfrecords import train_model_from_tfrecords -from td_mta.shapley import calculate_shapley - - -def run(db, table, user_column, user_rnd_column): - common_config = { - 'db': db, - 'table': table, - 'user_column': user_column, - 'user_rnd_column': user_rnd_column - } - - path = os.path.join(os.path.dirname(os.path.abspath(__file__)), os.pardir, 'config', 'model.json') - with open(path) as f: - model_config = json.load(f) - - config = Config(**{**common_config, **model_config}) - - print(f'==Config:') - print(json.dumps(config.__dict__, indent=4)) - with open('config.json', 'w') as f: - f.write(json.dumps(config.__dict__, indent=4)) - - count_positive, count_negative = parse_and_save_tfrecords(config) - - metric_df = train_model_from_tfrecords(config=config, count_positive=count_positive, count_negative=count_negative) - - print("==Saving training metrics to TD") - TDConnector.write(metric_df, db=config.db, table=config.metrics_table, if_exists=config.if_exists) - - print("==Calculating Shapley values") - shapley_df, channel_shapley_df = calculate_shapley(config) - - print("==Saving Shapley values to TD") - TDConnector.write(shapley_df, db=config.db, table=config.shapley_table, if_exists=config.if_exists) - TDConnector.write(channel_shapley_df, db=config.db, table=config.shapley_table + '_channel', - if_exists=config.if_exists) diff --git a/machine-learning-box/multi-touch-attribution/queries/shuffle.sql b/machine-learning-box/multi-touch-attribution/queries/shuffle.sql deleted file mode 100644 index 9b62a2fd..00000000 --- a/machine-learning-box/multi-touch-attribution/queries/shuffle.sql +++ /dev/null @@ -1,25 +0,0 @@ --- Code below assigns a rand attribute to each unique user for the random --- shuffling of TensorFlow record creation -WITH T1 as ( - select - distinct ${unique_id} - from - ${in_table} -), -T2 as ( - select - rand(18) as ${rnd_column}, - ${unique_id} - from - T1 - cluster by - rand(43) -) --- DIGDAG_INSERT_LINE -select - A.*, - T2.${rnd_column} -from - ${in_table} A -join T2 - on A.${unique_id} = T2.${unique_id} diff --git a/machine-learning-box/multi-touch-attribution/requirements.txt b/machine-learning-box/multi-touch-attribution/requirements.txt deleted file mode 100644 index 4d5c38ff..00000000 --- a/machine-learning-box/multi-touch-attribution/requirements.txt +++ /dev/null @@ -1,46 +0,0 @@ -absl-py==0.9.0 -astunparse==1.6.3 -cachetools==4.1.1 -certifi==2020.6.20 -chardet==3.0.4 -click==7.1.2 -gast==0.3.3 -google-auth==1.20.1 -google-auth-oauthlib==0.4.1 -google-pasta==0.2.0 -grpcio==1.31.0 -h5py==2.10.0 -idna==2.10 -importlib-metadata==1.7.0 -Keras-Preprocessing==1.1.2 -Markdown==3.2.2 -more-itertools==8.4.0 -msgpack==1.0.0 -numpy==1.18.5 -oauthlib==3.1.0 -opt-einsum==3.3.0 -pandas==1.1.0 -presto-python-client==0.7.0 -protobuf==3.12.4 -pyasn1==0.4.8 -pyasn1-modules==0.2.8 -pytd==1.3.0 -python-dateutil==2.8.1 -pytz==2020.1 -PyYAML==5.3.1 -requests==2.24.0 -requests-oauthlib==1.3.0 -rsa==4.6 -scipy==1.4.1 -six==1.15.0 -td-client==1.2.1 -tensorboard==2.3.0 -tensorboard-plugin-wit==1.7.0 -tensorflow==2.11.1 -tensorflow-estimator==2.3.0 -termcolor==1.1.0 -tqdm==4.48.2 -urllib3==1.24.3 -Werkzeug==2.2.3 -wrapt==1.12.1 -zipp==3.1.0 diff --git a/machine-learning-box/multi-touch-attribution/td_mta/README.md b/machine-learning-box/multi-touch-attribution/td_mta/README.md deleted file mode 100644 index 77a947a5..00000000 --- a/machine-learning-box/multi-touch-attribution/td_mta/README.md +++ /dev/null @@ -1,79 +0,0 @@ -# td-mta - -Multi Touch Attribution for Treasure Data - -## Getting Started - -Requires Python 3.7 or higher. Install requirements: -```shell script -$ pip install -r requirements.txt -``` - -Running on mock data saved under `mta` db in `mock_data` table: - -```shell script -$ python -m td_mta.main -``` - -The above script is split into 3 separate functions: -- `save_tfrecords()`: Reads data from TD and saves tfrecords files to a local directory. -- `train_model()`: Trains the MTA model using the tfrecords files and saves it to a local directory. -- `calculate_shapely()`: Loads the MTA model and calculates the shapley values for positive examples. - -These functions can be run independently in separate containers as long as they run the same version of configuration and code, and have access to the files saved from previous step. The default mock data in this demo is very small and random so the results and model do not make sense. - - -## Configuration -All the configurable parameters in `config.py` via a dataclass. The parameters and their default values are: -```python -@dataclass -class Config: - db: str = 'mta' - table: str = 'mock_data' - metrics_table: str = 'metrics' - shapely_table: str = 'shapley' - if_exists: str = 'overwrite' - user_table: str = 'user_rnd' - user_column: str = 'user_id' - time_column: str = 'time' - action_column: str = 'source' - conversion_column: str = 'conversion' - user_rnd_column: str = 'ab_rnd' - users_per_tfrecord: int = 10000 - lookback_window_days: int = 5 - positive_tfrecords_dir: str = os.path.join(data_dir, 'positive_tfrecords') - negative_tfrecords_dir: str = os.path.join(data_dir, 'negative_tfrecords') - model_dir: str = os.path.join(data_dir, 'model') - # Training hyper-parameters - steps_per_epoch: int = 100 - train_epochs: int = 5 - train_batch_size: int = 1000 - validation_batch_size: int = 1000 - validation_fraction: float = 0.5 - shuffle_buffer_size: int = 100000 - # Network hyper-parameters - dropout_rate: float = 0.5 - model_width: int = 64 -``` - -To change a parameter you can initialize the configuration in the `main.py` accordingly. For example to update the lookback window, database, tables names from environment variables change the configuration initialization: `configuration = Config()` in `main.py` to: - -```python - configuration = Config(db=os.environ['DB'], - table=os.environ['DATA_TABLE'], - lookback_window_days=int(os.environ['LOOKBACK_WINDOW']), - positive_tfrecords_dir='/save_dir/positive_tfrecords', - negative_tfrecords_dir='/save_dir/negative_tfrecords') -``` - -## User table -The script assumes the presence of a table that stores mapping of user ids to a random hex. This mapping is used to generate batch queries if the number of users is greater than `users_per_tfrecord` configuration parameter (default is 10000). The name of this table can be specified via `user_table` configuration parameter (default is `usr_rnd`). The sql query to generate this table is: - -``` -select rand(18) as ab_rnd, {user_id_column} -from (select distinct {user_id_column} from {table}) as T1 -cluster by rand(43)''' -``` - -## Batch queries -The number of users queried in a single batch is controlled by the `users_per_tfrecord` configuration parameter (default is 10000). This number should be tuned according to the available memory. If it is too large, it can cause out of memory error. If it is too low, it would slow down the script. \ No newline at end of file diff --git a/machine-learning-box/multi-touch-attribution/td_mta/__init__.py b/machine-learning-box/multi-touch-attribution/td_mta/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/machine-learning-box/multi-touch-attribution/td_mta/config.py b/machine-learning-box/multi-touch-attribution/td_mta/config.py deleted file mode 100644 index 4c0c4020..00000000 --- a/machine-learning-box/multi-touch-attribution/td_mta/config.py +++ /dev/null @@ -1,40 +0,0 @@ -import os -from dataclasses import dataclass - -from td_mta.data import data_dir -from td_mta.td_connector import TDConnector - - -@dataclass -class Config: - db: str = 'DATABASE NAME' - table: str = 'INPUT UNION_TABLE NAME from SQL QUERIES WITH AB_RND' - metrics_table: str = 'MODEL METRICS TABLE NAME' - shapley_table: str = 'SHAPLEY VALUES TABLE NAME' - if_exists: str = 'overwrite' - user_column: str = 'canonical_id' - time_column: str = 'time' - action_column: str = 'channels' - conversion_column: str = 'conversion' - user_rnd_column: str = 'ab_rnd' - users_per_tfrecord: int = 10000 - lookback_window_days: int = 3 - positive_tfrecords_dir: str = os.path.join(data_dir, 'positive_tfrecords') - negative_tfrecords_dir: str = os.path.join(data_dir, 'negative_tfrecords') - model_dir: str = os.path.join(data_dir, 'model') - - # Model hyper-parameters - dropout_rate: float = 0.5 - model_width: int = 128 - - # Training hyper-parameters - train_epochs: int = 3 - train_batch_size: int = 100 - validation_batch_size: int = 1000 - validation_fraction: float = 0.1 - shuffle_buffer_size: int = 10000 - downsample_epoch_fraction: float = 0.5 - - def __post_init__(self): - self.action_vocab = TDConnector.distinct(self.db, self.table, self.action_column) - print(f'Action vocab: {self.action_vocab}') diff --git a/machine-learning-box/multi-touch-attribution/td_mta/data/__init__.py b/machine-learning-box/multi-touch-attribution/td_mta/data/__init__.py deleted file mode 100644 index 58a806da..00000000 --- a/machine-learning-box/multi-touch-attribution/td_mta/data/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -import os - -data_dir = os.path.dirname(os.path.abspath(__file__)) \ No newline at end of file diff --git a/machine-learning-box/multi-touch-attribution/td_mta/dataset_from_tfrecords.py b/machine-learning-box/multi-touch-attribution/td_mta/dataset_from_tfrecords.py deleted file mode 100644 index cd9e6769..00000000 --- a/machine-learning-box/multi-touch-attribution/td_mta/dataset_from_tfrecords.py +++ /dev/null @@ -1,30 +0,0 @@ -import os -import tensorflow as tf -from functools import partial -from typing import Optional, Tuple - - -def dataset_from_tfrecords(tfrecords_directory: str, - dense_shape: Tuple[int, int], - leave_as_sparse: Optional[bool] = False, - ) -> tf.data.Dataset: - def listdir(directory): - return [os.path.join(directory, filename) for filename in os.listdir(directory)] - - dataset = tf.data.TFRecordDataset(listdir(tfrecords_directory)) - - def make_parse_function(): - features = dict(features=tf.io.FixedLenFeature(shape=(3,), dtype=tf.dtypes.string), # shape=3 for sparse tensor - labels=tf.io.FixedLenFeature(shape=(), dtype=tf.dtypes.int64)) - return partial(tf.io.parse_single_example, features=features) - - def deserialize_sparse_function(inputs): - features, labels = inputs['features'], inputs['labels'] - features = tf.io.deserialize_many_sparse(features[tf.newaxis, ...], dtype=tf.dtypes.float32) - if not leave_as_sparse: - features = tf.sparse.to_dense(features) - features = tf.squeeze(features, axis=0) - features = tf.ensure_shape(features, shape=dense_shape) - return dict(features=features, labels=labels) - - return dataset.map(make_parse_function()).map(deserialize_sparse_function) diff --git a/machine-learning-box/multi-touch-attribution/td_mta/example_extractor.py b/machine-learning-box/multi-touch-attribution/td_mta/example_extractor.py deleted file mode 100644 index f1dd607f..00000000 --- a/machine-learning-box/multi-touch-attribution/td_mta/example_extractor.py +++ /dev/null @@ -1,55 +0,0 @@ -import itertools -import more_itertools -from dataclasses import dataclass -from typing import Generator, List, Tuple, Union - -from td_mta.parser import Session, Journey - -ExampleIndices = List[Tuple[int, int]] - - -@dataclass -class ExampleExtractor: - lookback_window_days: int - - def extract_positive(self, journey: Journey) -> Generator[Union[ExampleIndices, None], None, None]: - - segments = more_itertools.split_after(journey, lambda session: session.conversion) - - for segment in segments: - if segment[-1].conversion: - - def in_window(session: Session) -> bool: - return session.date + self.lookback_window_days > segment[-1].date - - def date_action(session: Session) -> Tuple[int, int]: - return session.date - segment[-1].date + self.lookback_window_days - 1, session.action - - yield sorted(set(date_action(session) for session in segment if in_window(session))) - else: - yield - - def extract_negative(self, journey: Journey) -> Generator[Union[ExampleIndices, None], None, None]: - - if any(session.conversion for session in journey): - yield # don't extract negative examples from journeys with conversions - else: - # extract all subsequences of length up to lookback_window_days from regressors - daily_segments = list(more_itertools.split_when(journey, lambda s1, s2: s1.date != s2.date)) - - first, last = 0, 1 - while first < len(daily_segments): - - def date_action(session: Session) -> Tuple[int, int]: - date = session.date - daily_segments[last - 1][0].date + self.lookback_window_days - 1 - return date, session.action - - chained_daily_segments = itertools.chain.from_iterable(daily_segments[first:last]) - yield sorted(set(date_action(session) for session in chained_daily_segments)) - - if last < len(daily_segments): - last += 1 - while daily_segments[first][0].date + self.lookback_window_days <= daily_segments[last - 1][0].date: - first += 1 - else: - first += 1 diff --git a/machine-learning-box/multi-touch-attribution/td_mta/main.py b/machine-learning-box/multi-touch-attribution/td_mta/main.py deleted file mode 100644 index 469e3625..00000000 --- a/machine-learning-box/multi-touch-attribution/td_mta/main.py +++ /dev/null @@ -1,48 +0,0 @@ -import json - -from td_mta.config import Config -from td_mta.parse_and_save_tfrecords import parse_and_save_tfrecords -from td_mta.td_connector import TDConnector -from td_mta.train_model_from_tfrecords import train_model_from_tfrecords -from td_mta.shapley import calculate_shapley - -config = Config() - - -def run(): - print(f'==Config:') - print(json.dumps(config.__dict__, indent=4)) - with open('config.json', 'w') as f: - f.write(json.dumps(config.__dict__, indent=4)) - - count_positive, count_negative = parse_and_save_tfrecords(config) - - metric_df = train_model_from_tfrecords(config=config, count_positive=count_positive, count_negative=count_negative) - - print("==Saving training metrics to TD") - TDConnector.write(metric_df, db=config.db, table=config.metrics_table, if_exists=config.if_exists) - - print("==Calculating Shapley values") - shapley_df, channel_shapley_df = calculate_shapley(config) - - print("==Saving Shapley values to TD") - TDConnector.write(shapley_df, db=config.db, table=config.shapley_table, if_exists=config.if_exists) - TDConnector.write(channel_shapley_df, db=config.db, table=config.shapley_table + '_channel', - if_exists=config.if_exists) - - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser(description='Customer Journey MTA', - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument('--config', '-c', metavar='', help='json file with configuration') - args = parser.parse_args() - - if args.config is not None: - with open(args.config) as f: - config = json.load(f) - print(f"==Read config from {args.config}: {config}") - config = Config(**config) - - run() diff --git a/machine-learning-box/multi-touch-attribution/td_mta/model.py b/machine-learning-box/multi-touch-attribution/td_mta/model.py deleted file mode 100644 index 3817d34b..00000000 --- a/machine-learning-box/multi-touch-attribution/td_mta/model.py +++ /dev/null @@ -1,49 +0,0 @@ -import tensorflow as tf - -METRICS = [ - tf.keras.metrics.TruePositives(name='tp'), - tf.keras.metrics.FalsePositives(name='fp'), - tf.keras.metrics.TrueNegatives(name='tn'), - tf.keras.metrics.FalseNegatives(name='fn'), - tf.keras.metrics.BinaryAccuracy(name='accuracy'), - tf.keras.metrics.Precision(name='precision'), - tf.keras.metrics.Recall(name='recall'), - tf.keras.metrics.AUC(name='auc'), -] - - -class Model(tf.keras.Model): - - def __init__(self, units: int, dropout_rate: float, mask_value: float, *args, **kwargs): - super().__init__(*args, **kwargs) - - self.masking = tf.keras.layers.Masking(mask_value=mask_value) - self.lstm_1 = tf.keras.layers.Bidirectional( - tf.keras.layers.LSTM(units=units, dropout=dropout_rate, recurrent_dropout=dropout_rate, - return_sequences=True)) - self.lstm_2 = tf.keras.layers.Bidirectional( - tf.keras.layers.LSTM(units=units, dropout=dropout_rate, recurrent_dropout=dropout_rate)) - self.dense = tf.keras.layers.Dense(units=units, activation=tf.keras.activations.relu) - self.sigmoid = tf.keras.layers.Dense(units=1, activation=tf.keras.activations.sigmoid) - - def call(self, inputs, training=None, mask=None): - return self.sigmoid(self.dense(self.lstm_2(self.lstm_1(self.masking(inputs))))) - - -class CalibratedModel(tf.keras.Model): - """ - See: https://ieeexplore.ieee.org/abstract/document/7376606 - """ - def __init__(self, - class_balanced_model: tf.keras.Model, - count_positive: int, - count_negative: int, - *args, **kwargs): - super().__init__(*args, **kwargs) - self.class_balanced_model = class_balanced_model - self.beta = tf.constant(count_positive / count_negative, dtype=tf.dtypes.float32) - assert self.beta <= 1., 'calibration assumes more negative than positive examples' # FIXME: generalize - - def call(self, inputs, training=None, mask=None): - class_balanced_probability = self.class_balanced_model(inputs, training=training) - return self.beta * class_balanced_probability / (1. - (1. - self.beta) * class_balanced_probability) diff --git a/machine-learning-box/multi-touch-attribution/td_mta/mta_train.py b/machine-learning-box/multi-touch-attribution/td_mta/mta_train.py deleted file mode 100644 index 8fe656bd..00000000 --- a/machine-learning-box/multi-touch-attribution/td_mta/mta_train.py +++ /dev/null @@ -1,59 +0,0 @@ -import os -import tensorflow as tf -from dataclasses import dataclass -from typing import Optional - -from td_mta.model import Model -from td_mta.config import Config - -_dir = os.path.dirname(os.path.abspath(__file__)) - -METRICS = [ - tf.keras.metrics.RootMeanSquaredError(name='rmse'), -] - - -class MTATrain: - mask_value: Optional[float] = -1. - - @dataclass - class HyperParameters: - units: int = 128 - dropout_rate: float = 0.5 - train_epochs: int = 10 - - @classmethod - def from_config(cls, config: Config): - return cls( - units=config.model_width, - dropout_rate=config.dropout_rate, - train_epochs=config.train_epochs, - ) - - def __init__(self, seq_length: int, hyper_parameters: Optional[HyperParameters] = None): - self._params = MTATrain.HyperParameters() if hyper_parameters is None else hyper_parameters - self.seq_length = seq_length - self.model = None - self.model_train_history = None - - def train(self, dataset_train: tf.data.Dataset, dataset_validate: tf.data.Dataset, filepath: str) -> \ - tf.keras.callbacks.History: - self.model = Model(units=self._params.units, dropout_rate=self._params.dropout_rate, mask_value=self.mask_value) - self.model.compile(optimizer='adam', loss='binary_crossentropy', metrics=METRICS) - - # FIXME: uncomment for saving best model only - # save_model_callback = tf.keras.callbacks.ModelCheckpoint(filepath=filepath, save_best_only=True) - # callbacks = [save_model_callback] - callbacks = [] - - history = self.model.fit(dataset_train, - shuffle=False, # dataset_train assumed shuffled - epochs=self._params.train_epochs, - validation_data=dataset_validate, - callbacks=callbacks, - ) - return history - - def load(self, filepath: str): - # FIXME: save using file name, not prefix - self.model = tf.keras.models.load_model(filepath=filepath) diff --git a/machine-learning-box/multi-touch-attribution/td_mta/parse_and_save_tfrecords.py b/machine-learning-box/multi-touch-attribution/td_mta/parse_and_save_tfrecords.py deleted file mode 100644 index c0e164ac..00000000 --- a/machine-learning-box/multi-touch-attribution/td_mta/parse_and_save_tfrecords.py +++ /dev/null @@ -1,79 +0,0 @@ -import os -import shutil -import itertools -import tensorflow as tf -from typing import Iterable, List, Tuple - -from td_mta.config import Config -from td_mta.parser import Parser, Journey -from td_mta.example_extractor import ExampleExtractor -from td_mta.td_connector import TDConnector - - -def parse_and_save_tfrecords(config: Config) -> Tuple[int, int]: - for tfrecords_dir in [config.positive_tfrecords_dir, config.negative_tfrecords_dir]: - print(f'==cleaning up tfrecords directory: {tfrecords_dir}') - if os.path.isdir(tfrecords_dir): - shutil.rmtree(tfrecords_dir) - os.mkdir(tfrecords_dir) - - parser = Parser(action_vocab=config.action_vocab, - user_column=config.user_column, - action_column=config.action_column, - time_column=config.time_column, - conversion_column=config.conversion_column) - - dataframes = TDConnector.paginate(db=config.db, - table=config.table, - group_by=parser.user_column, - user_rnd_column=config.user_rnd_column, - count_per_page=config.users_per_tfrecord) - - count_positive, count_negative = 0, 0 - - for page_num, dataframe in enumerate(dataframes): - - journeys: Iterable[Journey] = parser.parse(dataframe) - - print(f'==Extracting positive and negative examples') - example_extractor = ExampleExtractor(lookback_window_days=config.lookback_window_days) - - positive_iterator, negative_iterator = itertools.tee(journeys, 2) - del journeys - - # bucketing by day and fix number of days window positive and negative features extraction - positive_examples = map(lambda t: example_extractor.extract_positive(t), positive_iterator) - negative_examples = map(lambda t: example_extractor.extract_negative(t), negative_iterator) - - positive_examples = itertools.chain.from_iterable(positive_examples) - negative_examples = itertools.chain.from_iterable(negative_examples) - - positive_examples = filter(None, positive_examples) - negative_examples = filter(None, negative_examples) - - def serialize_example(indices: List[Tuple[int, int]], label: int) -> bytes: - dense_shape = config.lookback_window_days, len(config.action_vocab) - sparse_tensor = tf.sparse.SparseTensor(indices=indices, values=[1.] * len(indices), dense_shape=dense_shape) - serialized = tf.io.serialize_sparse(sparse_tensor).numpy() - feature = dict(features=tf.train.Feature(bytes_list=tf.train.BytesList(value=serialized)), - labels=tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))) - return tf.train.Example(features=tf.train.Features(feature=feature)).SerializeToString() - - positive_dataset_file_name = os.path.join(config.positive_tfrecords_dir, f'positive_page{page_num}.tfrecord') - negative_dataset_file_name = os.path.join(config.negative_tfrecords_dir, f'negative_page{page_num}.tfrecord') - - writer = tf.io.TFRecordWriter(positive_dataset_file_name) - for example in positive_examples: - writer.write(serialize_example(example, label=1)) - count_positive += 1 - writer.close() - - writer = tf.io.TFRecordWriter(negative_dataset_file_name) - for example in negative_examples: - writer.write(serialize_example(example, label=0)) - count_negative += 1 - writer.close() - - print(f'==wrote {count_positive} positive examples and {count_negative} negative examples to TFRecord files') - - return count_positive, count_negative diff --git a/machine-learning-box/multi-touch-attribution/td_mta/parser.py b/machine-learning-box/multi-touch-attribution/td_mta/parser.py deleted file mode 100644 index e05f2f67..00000000 --- a/machine-learning-box/multi-touch-attribution/td_mta/parser.py +++ /dev/null @@ -1,38 +0,0 @@ -import more_itertools -import pandas as pd -from typing import List, Iterable -from dataclasses import dataclass - - -@dataclass -class Session: - action: int - date: int - conversion: bool - - -Journey = List[Session] - - -@dataclass -class Parser: - action_vocab: List[str] - user_column: str - time_column: str - action_column: str - conversion_column: str - - def parse(self, df: pd.DataFrame) -> Iterable[Journey]: - groups = (group for _, group in df.groupby(self.user_column, sort=False)) - groups = more_itertools.random_permutation(groups) # randomly shuffle users - return map(self.extract, groups) - - def extract(self, group: pd.DataFrame) -> Journey: - session_data = group[[self.time_column, self.action_column, self.conversion_column]] - session_data = session_data.sort_values(self.time_column).values.tolist() - return [self.session(timestamp, action, conversion) for timestamp, action, conversion in session_data] - - def session(self, timestamp: int, action: str, conversion: bool) -> Session: - return Session(action=self.action_vocab.index(action), - date=timestamp // (24 * 3600), - conversion=conversion) diff --git a/machine-learning-box/multi-touch-attribution/td_mta/shapley.py b/machine-learning-box/multi-touch-attribution/td_mta/shapley.py deleted file mode 100644 index 9ec6f152..00000000 --- a/machine-learning-box/multi-touch-attribution/td_mta/shapley.py +++ /dev/null @@ -1,140 +0,0 @@ -import pandas as pd -import tensorflow as tf -from typing import Callable, Tuple - -from td_mta.config import Config -from td_mta.dataset_from_tfrecords import dataset_from_tfrecords -from td_mta.mta_train import MTATrain - - -def shapley_inputs_one_sample(sparse_positive_example: tf.sparse.SparseTensor, dense_shape: Tuple[int, int]) -> \ - Tuple[tf.Tensor, tf.Tensor, tf.Tensor]: - ones_like_values = tf.ones_like(sparse_positive_example.values) - tf.debugging.assert_equal(sparse_positive_example.values, ones_like_values), 'assuming non-zero elements equal 1.' - - indices = tf.random.shuffle(sparse_positive_example.indices) - num_indices = tf.shape(indices)[0] - - random_len = tf.random.uniform(shape=(), minval=1, maxval=num_indices + 1, dtype=tf.dtypes.int32) - - ones = tf.ones(shape=(random_len,), dtype=tf.dtypes.bool) - zeros = tf.zeros(shape=(num_indices - random_len + 1,), dtype=tf.dtypes.bool) - retain_inclusive = tf.concat((ones, zeros[:-1]), axis=0) - retain_exclusive = tf.concat((ones[:-1], zeros), axis=0) - toggled_index = indices[random_len - 1] - - sparse_positive_example_shuffled = tf.sparse.SparseTensor(indices=indices, values=ones_like_values, - dense_shape=sparse_positive_example.dense_shape) - - example_inclusive = tf.sparse.retain(sp_input=sparse_positive_example_shuffled, to_retain=retain_inclusive) - example_exclusive = tf.sparse.retain(sp_input=sparse_positive_example_shuffled, to_retain=retain_exclusive) - - example_inclusive = tf.sparse.reorder(example_inclusive) - example_exclusive = tf.sparse.reorder(example_exclusive) - - example_inclusive = tf.sparse.to_dense(example_inclusive) - example_exclusive = tf.sparse.to_dense(example_exclusive) - - example_inclusive = tf.ensure_shape(example_inclusive, shape=dense_shape) - example_exclusive = tf.ensure_shape(example_exclusive, shape=dense_shape) - - return example_exclusive, example_inclusive, toggled_index - - -def make_shapley_inputs(num_samples: int, dense_shape: Tuple[int, int]) -> \ - Callable[[tf.sparse.SparseTensor], Tuple[tf.Tensor, tf.Tensor, tf.Tensor]]: - def shapley_inputs(sparse: tf.sparse.SparseTensor) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]: - # sparse = tf.sparse.from_dense(tensor) - - examples_exclusive, examples_inclusive, included_excluded_indices = [], [], [] - for _ in range(num_samples): - example_inclusive, example_exclusive, included_excluded_index = \ - shapley_inputs_one_sample(sparse_positive_example=sparse, dense_shape=dense_shape) - examples_exclusive.append(example_inclusive) - examples_inclusive.append(example_exclusive) - included_excluded_indices.append(included_excluded_index) - - examples_exclusive = tf.stack(examples_exclusive) - examples_inclusive = tf.stack(examples_inclusive) - included_excluded_indices = tf.stack(included_excluded_indices) - - return examples_exclusive, examples_inclusive, included_excluded_indices - - return shapley_inputs - - -def make_shapley(model: tf.keras.Model) -> Callable[[tf.Tensor, tf.Tensor, tf.Tensor], tf.Tensor]: - def shapley(example_exclusive: tf.Tensor, examples_inclusive: tf.Tensor, included_excluded_indices: tf.Tensor) \ - -> tf.Tensor: - dense_shape = tf.cast(tf.shape(examples_inclusive), dtype=tf.dtypes.int64) - - diff = model(examples_inclusive) - model(example_exclusive) - - arange = tf.range(start=0, limit=tf.shape(included_excluded_indices)[0], dtype=included_excluded_indices.dtype) - included_excluded_indices = tf.concat((arange[..., tf.newaxis], included_excluded_indices), axis=1) - - diff = tf.squeeze(diff, axis=1) - diff = tf.sparse.SparseTensor(indices=included_excluded_indices, values=diff, dense_shape=dense_shape) - return tf.sparse.reduce_sum(sp_input=diff, axis=0) - - return shapley - - -def squeeze_batch_dim(s: tf.sparse.SparseTensor) -> tf.sparse.SparseTensor: - return tf.sparse.SparseTensor(indices=s.indices[:, 1:], values=s.values, dense_shape=s.dense_shape[1:]) - - -def calculate_shapley(config: Config) -> Tuple[pd.DataFrame, pd.DataFrame]: - hyper_parameters = MTATrain.HyperParameters.from_config(config) - - mta = MTATrain(seq_length=config.lookback_window_days, hyper_parameters=hyper_parameters) - mta.load(filepath=config.model_dir) - - # # FIXME: DEBUG - # import itertools - # import numpy as np - # inputs = np.array(list(map(list, itertools.product((0., 1.), repeat=2))), dtype=np.float32) - # inputs = inputs[:, np.newaxis, :] - # print('inputs:\n', inputs) - # print('outputs:\n', mta.model(inputs).numpy()) - - dense_shape = config.lookback_window_days, len(config.action_vocab) - - positive_dataset = dataset_from_tfrecords(tfrecords_directory=config.positive_tfrecords_dir, - dense_shape=dense_shape, - leave_as_sparse=True) - negative_dataset = dataset_from_tfrecords(tfrecords_directory=config.negative_tfrecords_dir, - dense_shape=dense_shape, - leave_as_sparse=True) - - dataset = positive_dataset.concatenate(negative_dataset) - - features_dataset = dataset.map(lambda row: row['features']) - features_dataset = features_dataset.map(squeeze_batch_dim) - - num_random_permutations = 10 - shapley_inputs = make_shapley_inputs(num_samples=num_random_permutations, dense_shape=dense_shape) - shapley_inputs_dataset = features_dataset.map(shapley_inputs) - - shapley = make_shapley(mta.model) - shapley_outputs_dataset = shapley_inputs_dataset.map(shapley) - - reduced = shapley_outputs_dataset.reduce(initial_state=tf.zeros(shape=dense_shape), reduce_func=tf.math.add) - shapley_values = (reduced / tf.reduce_sum(reduced)).numpy() - - print('shapley values by time:') - for day_in_window, value in enumerate(shapley_values.sum(axis=1)): - print(f'{config.lookback_window_days - 1 - day_in_window} days before conversion: {value}') - print() - - print('shapley values by channel:') - for action, value in zip(config.action_vocab, shapley_values.sum(axis=0)): - print(f'{action}: {value}') - print() - - print('all shapley values:') - print(shapley_values) - - df = pd.DataFrame(shapley_values, columns=config.action_vocab) - df['days_before_conversion'] = range(config.lookback_window_days - 1, -1, -1) - return df, df.sum().to_frame().transpose().drop('days_before_conversion', axis=1) diff --git a/machine-learning-box/multi-touch-attribution/td_mta/td_connector.py b/machine-learning-box/multi-touch-attribution/td_mta/td_connector.py deleted file mode 100644 index 424be2be..00000000 --- a/machine-learning-box/multi-touch-attribution/td_mta/td_connector.py +++ /dev/null @@ -1,64 +0,0 @@ -import pandas as pd -import pytd - - -class TDConnector: - - @staticmethod - def read(db: str, table: str) -> pd.DataFrame: - client = pytd.Client(database=db) - results = client.query(f'select * from {table}') - columns = results['columns'] - data = results['data'] - return pd.DataFrame(data=data, columns=columns) - - @staticmethod - def write_group_map(client: pytd.Client, db: str, table: str, group_by: str, write_to: str): - query = f'''select rand(18) as ab_rnd, {group_by} -from (select distinct {group_by} from {table}) as T1 -cluster by rand(43)''' - - results = client.query(query, engine='hive') - columns = results['columns'] - data = results['data'] - df = pd.DataFrame(data=data, columns=columns) - write_table = pytd.table.Table(client, db, write_to) - writer = pytd.writer.BulkImportWriter() - writer.write_dataframe(df, write_table, if_exists='overwrite') - - return len(df) - - @staticmethod - def batch_query(table: str, start: float, end: float, user_rnd_column: str): - return f'''SELECT * from {table} -WHERE {user_rnd_column} >= {start} AND {user_rnd_column} < {end}''' - - @staticmethod - def paginate(db: str, table: str, group_by: str, user_rnd_column: str, count_per_page: int): - client = pytd.Client(database=db) - - total_count = client.query(f'select COUNT(DISTINCT({group_by})) from {table}')['data'][0][0] - batches = list(range(0, count_per_page * total_count // count_per_page, count_per_page)) + [total_count] - batches = [b / total_count for b in batches] - print(f'==Total users: {total_count}. Splitting into {len(batches)} batches') - - for start, end in zip(batches[:-1], batches[1:]): - query = TDConnector.batch_query(table=table, start=start, end=end, - user_rnd_column=user_rnd_column) - print(f'==Fetching {query}') - results = client.query(query) - columns = results['columns'] - data = results['data'] - yield pd.DataFrame(data=data, columns=columns) - - @staticmethod - def write(df: pd.DataFrame, db: str, table: str, if_exists: str = 'error'): - client = pytd.Client(database=db) - table = pytd.table.Table(client, db, table) - writer = pytd.writer.BulkImportWriter() - writer.write_dataframe(df, table, if_exists=if_exists) - - @staticmethod - def distinct(db: str, table: str, column: str): - client = pytd.Client(database=db) - return [row[0] for row in client.query(f'select DISTINCT({column}) from {table}')['data']] diff --git a/machine-learning-box/multi-touch-attribution/td_mta/train_model_from_tfrecords.py b/machine-learning-box/multi-touch-attribution/td_mta/train_model_from_tfrecords.py deleted file mode 100644 index 820255cd..00000000 --- a/machine-learning-box/multi-touch-attribution/td_mta/train_model_from_tfrecords.py +++ /dev/null @@ -1,85 +0,0 @@ -import pandas as pd -import tensorflow as tf -from typing import Dict, Tuple - -from td_mta.config import Config -from td_mta.mta_train import MTATrain -from td_mta.model import CalibratedModel -from td_mta.dataset_from_tfrecords import dataset_from_tfrecords - - -def train_model_from_tfrecords(config: Config, count_positive: int, count_negative: int) -> pd.DataFrame: - dense_shape = config.lookback_window_days, len(config.action_vocab) - - positive_dataset = dataset_from_tfrecords(tfrecords_directory=config.positive_tfrecords_dir, - dense_shape=dense_shape) - - negative_dataset = dataset_from_tfrecords(tfrecords_directory=config.negative_tfrecords_dir, - dense_shape=dense_shape) - - def dict_to_tuple(row: Dict[str, tf.Tensor]) -> Tuple[tf.Tensor, tf.Tensor]: - return row['features'], row['labels'] - - positive_dataset = positive_dataset.map(dict_to_tuple) - negative_dataset = negative_dataset.map(dict_to_tuple) - - num_validate_positive = int(count_positive * config.validation_fraction) - num_validate_negative = int(count_negative * config.validation_fraction) - validate_cardinality = max(num_validate_positive, num_validate_negative) - - num_train_positive = count_positive - num_validate_positive - num_train_negative = count_negative - num_validate_negative - train_cardinality = max(num_train_positive, num_train_negative) - - positive_dataset_train, positive_dataset_validate = \ - positive_dataset.skip(num_validate_positive), positive_dataset.take(num_validate_positive) - - negative_dataset_train, negative_dataset_validate = \ - negative_dataset.skip(num_validate_negative), negative_dataset.take(num_validate_negative) - - def random_downsample_bool(features, labels): - p = config.downsample_epoch_fraction - r = tf.random.categorical(tf.math.log([[1. - p, p]]), 1) - return tf.cast(r[0, 0], dtype=tf.dtypes.bool) - - train_cardinality = int(train_cardinality * config.downsample_epoch_fraction) - - dataset_train = tf.data.experimental.choose_from_datasets( - datasets=[positive_dataset_train.repeat().filter(random_downsample_bool), - negative_dataset_train.repeat().filter(random_downsample_bool)], - choice_dataset=tf.data.Dataset.range(2).repeat(train_cardinality)) - - dataset_train = dataset_train.shuffle(buffer_size=config.shuffle_buffer_size) - dataset_train = dataset_train.batch(batch_size=config.train_batch_size) - - def augment_batch_with_zeros(features, labels): - features_without_last_day_impressions = features[..., :-1, :] - zeroed_last_day_impressions = tf.zeros_like(features[..., -1:, :]) - augmented_features = tf.concat((features_without_last_day_impressions, zeroed_last_day_impressions), axis=-2) - zero_labels = tf.zeros_like(labels) - return tf.concat((features, augmented_features), axis=0), tf.concat((labels, zero_labels), axis=0) - - dataset_train = dataset_train.map(augment_batch_with_zeros) - - dataset_validate = tf.data.experimental.choose_from_datasets( - datasets=[positive_dataset_validate.repeat(), negative_dataset_validate.repeat()], - choice_dataset=tf.data.Dataset.range(2).repeat(validate_cardinality) - ) - - dataset_validate = dataset_validate.batch(config.validation_batch_size) - - hyper_parameters = MTATrain.HyperParameters.from_config(config) - mta = MTATrain(seq_length=config.lookback_window_days, hyper_parameters=hyper_parameters) - training_history = mta.train(dataset_train=dataset_train, - dataset_validate=dataset_validate, - filepath=config.model_dir) - - # FIXME: model can also be saved in tf.keras.Model.fit() via tf.keras.callbacks.ModelCheckpoint() - - calibrated_model = CalibratedModel(class_balanced_model=mta.model, - count_positive=count_positive, - count_negative=count_negative) - _ = calibrated_model.predict(dataset_train.take(1)) # calling once to determine dynamic shapes - calibrated_model.save(filepath=config.model_dir) - - return pd.DataFrame.from_dict(training_history.history)