diff --git a/scenarios/cdp_campaign_management/README.md b/scenarios/cdp_campaign_management/README.md index 8b137891..bff38d82 100644 --- a/scenarios/cdp_campaign_management/README.md +++ b/scenarios/cdp_campaign_management/README.md @@ -1 +1,568 @@ +# CDP Campaign Management +## Overview + +This WF is used to measure campaign effectiveness for each activation performed on Journey Orchestration. The revenue generated by each activation can be calculated using Multi Touch Attribution, allowing for the calculation of ROI. + +![How to measure the effectiveness of TD campaigns](docs/images/slide_top.png) +Please be sure to read the slide: [How to measure the effectiveness of TD campaigns](https://docs.google.com/presentation/d/e/2PACX-1vRTQ2YhgXuEKQ7SDn-l-xAHCMW6L4NF2AhXHlP6ADz1vU7hRDPLwxk8hajFGObkq_t08WyMHLqNPgRk/embed) first before proceeding with this document. + +## Requirements + +### Monitoring WFs + +Before installing this WF, it is assumed that the Monitoring WFs has been installed. the Monitoring WF consists of the following 3 WF groups, all of which need to be installed. + +- [basic_monitoring](https://github.com/treasure-data/treasure-boxes/tree/master/scenarios/monitoring/basic_monitoring) +- [cdp_monitoring](https://github.com/treasure-data/treasure-boxes/tree/master/scenarios/monitoring/cdp_monitoring) +- [workflow_monitoring](https://github.com/treasure-data/treasure-boxes/tree/master/scenarios/monitoring/workflow_monitoring) + +### Required CDP Features + +#### Journey Orchestration + +(required) +Currently, only activations performed on Journey Orchestration are tracked. In other words, activations from segments are not tracked. + +#### activation_log + +(optional) +activation_log is an option to record in the TD a history of who, when, and where all activations were made. When this history is available, it will be easy to refer to the activation history for each user. + +If this table is not available, you can still scan the journey table to create an activation history, but you will need Custom Scripts. + +## Main Output Tables + +The `cdp_campaigns_${ps_id}` is automatically created and the following tables are output. + +| Table | Utilized in AS? | Description | +| --- | --- | --- | +| activations | Y | Activation History | +| clicks | Y | Click History | +| conversions | Y | Conversion History | +| conversion_journeys | | activations+clicks+conversions | +| mta_conversion_journeys | | Acquired revenue calculation results per campaign by Multi Touch Attribution | +| daily_activations | | | +| daily_activations_info | | Execution history information of activations in the Journey | +| daily_clicks | | | +| daily_conversions | | | +| daily_mta_conversion_journeys | | | +| existing_campaigns | | All utm parameters present in the Click table | +| journeys | | Journey listings within parent segment | +| master_activations | | List of activations within a journey | + +### activations + +This table is the output of the activation history from Journey Orchestration. This table is output in the process of effectiveness measurement, but it can be set up and used as a behavior table in Audience Studio. + +#### table example + +| time | *td_client_id | activation_step_id | syndication_id | activation_type | activation_name | cv_name | utm_campaign | utm_medium | utm_source | utm_content | utm_term | utm_connector | +| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |--- | +| 1701705887 | ffdae365-24b7-4d69-ae54-05340ee5c57f | 50659 | 237814 | journeyActivationStep | to_td | PURCHASE | td_plazma12 | email | treasuredatajp | control | | mailchimp | + +\* The user identifier set as `user_id` is used. Here it is set as td_client_id. + +### clicks + +This table is the campaign click history. This table is output in the process of effectiveness measurement, but it can be set up and used as a behavior table in Audience Studio. + +#### table example + +| time | db_name | table_name | *td_client_id | activation_step_id | cv_name | utm_campaign | utm_medium | utm_source | utm_content | utm_term | utm_connector | +| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | +| 1695535786 | cdp_audience_507568 | behavior_behv_website | e05c10e0-1943-4afe-b799-92ef9f24ed9f | 21 | SUBSCRIBE | td_plazma2022summerinv_link1 | email | nikkei_bpdmp | welcome_mail | nikkei | marketo | + +\* The user identifier set as `user_id` is used. Here it is set as td_client_id. + +### conversions + +This table is the conversion history. This table is output in the process of effectiveness measurement, but it can be set up and used as a behavior table in Audience Studio. + +#### table example +| time | db_name | table_name | *td_client_id | val | revenue | cv_name | +| --- | --- | --- | --- | --- | --- | --- | +| 1701481555 | cdp_audience_507568 | behavior_behv_website | f80a229c-149a-4f0d-a0a3-9dbb09e3fa28 | 1 | 20000 | DOWNLOAD | +| 1702531838 | cdp_audience_507568 | behavior_behv_orders | 5502ace7-2e4c-4c39-8f26-1f96b6a3ef4a | 8129 | 8129 | PURCHASE | + +\* The user identifier set as `user_id` is used. Here it is set as td_client_id. + +### conversion_journeys + +We can union activations and clicks and conversions to create a conversion journey. + + +| table_name | | | | | | | | | | | +| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | +| activations | time | user_id | | activation_step_id | | | | | | | +| clicks | time | user_id | cv_name | activation_step_id | utm_campaign | utm_medium | utm_source | utm_content | utm_connector | utm_term | +| conversions | time | user_id | cv_name | | | | | | | | + +- activations and clicks table are unioned by `user_id` and `activation_step_id`. +- clicks and conversions table are unioned by `user_id` and `cv_name`. + +Hence, we can create an `activation -> click -> conversion` journey for each user and each conversion. + +#### table example +| time | type | td_client_id | activation_step_id | cv_name | utm_campaign | utm_medium | utm_source | utm_content | utm_connector | cv_flg | val | revenue | time_hour_from_activation | +| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | +| 1700605034 | Activation | 007a5c3d-1355-4352-af1d-440f2d803f90 | 1928 | DOWNLOAD | td_14947 | push | pushcode | | | 0 | 0 | 0 | | +| 1700606147 | Click | 007a5c3d-1355-4352-af1d-440f2d803f90 | 1928 | DOWNLOAD | td_14947 | push | pushcode | | | 0 | 0 | 0 | 46.3 | +| 1700821234 | Activation | 007a5c3d-1355-4352-af1d-440f2d803f90 | 51418 | DOWNLOAD | td_plazma15 | email | diamondrm | | | 0 | 0 | 0 | | +| 1700822271 | Click | 007a5c3d-1355-4352-af1d-440f2d803f90 | 51418 | DOWNLOAD | td_plazma15 | email | diamondrm | | | 0 | 0 | 0 | 43.2 | +| 1700995234 | Activation | 007a5c3d-1355-4352-af1d-440f2d803f90 | 51397 | DOWNLOAD | td_newsletter20210330 | email | treasuredatajp | | | 0 | 0 | 0 | | +| 1700995634 | Click | 007a5c3d-1355-4352-af1d-440f2d803f90 | 51397 | DOWNLOAD | td_newsletter20210330 | email | treasuredatajp | | | 0 | 0 | 0 | 16.7 | +| 1701167880 | Conversion | 007a5c3d-1355-4352-af1d-440f2d803f90 | | DOWNLOAD | | | | | | 1 | 1 | 20000 | | + +![example of conversion journeys](docs/images/ex_conversion_journey.png) + +### mta_conversion_journeys + +This table is the result of the calculation of acquired revenue per campaign by Multi Touch Attribution. + +#### table example +| time | date | cv_time | *td_client_id | cv_id | position | time_hour_to_cv | time_hour_to_next | time_hour_from_activation | type | click_type | activation_step_id | utm_source | utm_medium | utm_campaign | utm_content | utm_connector | cv_name | size_journey | size_cv_session | size_middle_click | is_within_cv_session | revenue | acquired_person_last_click_model | acquired_revenue_last_click_model | acquired_person_first_click_model | acquired_revenue_first_click_model | acquired_person_session_model | acquired_revenue_session_model | +| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | +| 1700606147 | 2023-11-22 | 1702167880 | 007a5c3d-1355-4352-af1d-440f2d803f90 | 02a0ca2ff7412557b1a617d5855de2a1 | 1 | 433 | 337 | | Click | First Click | 1928 | pushcode | push | td_14947 | | | DOWNLOAD | 3 | 2 | 1 | 0 | 0 | 0 | 0 | 1 | 20000 | 0 | 0 | +| 1701822271 | 2023-12-06 | 1702167880 | 007a5c3d-1355-4352-af1d-440f2d803f90 | 02a0ca2ff7412557b1a617d5855de2a1 | 2 | 96 | 48 | | Click | Middle Click | 51418 | diamondrm | email | td_plazma15 | | | DOWNLOAD | 3 | 2 | 1 | 1 | 0 | 0 | 0 | 0 | 0 | 0.5 | 10000 | +| 1701995634 | 2023-12-08 | 1702167880 | 007a5c3d-1355-4352-af1d-440f2d803f90 | 02a0ca2ff7412557b1a617d5855de2a1 | 3 | 47 | 47 | | Click | Last Click | 51397 | treasuredatajp | email | td_newsletter20210330 | | | DOWNLOAD | 3 | 2 | 1 | 1 | 0 | 1 | 20000 | 0 | 0 | 0.5 | 10000 | +| 1702167880 | 2023-12-10 | 1702167880 | 007a5c3d-1355-4352-af1d-440f2d803f90 | 02a0ca2ff7412557b1a617d5855de2a1 | 4 | 0 | | | Conversion | Conversion | | | | | | | DOWNLOAD | 3 | 2 | 1 | 0 | 20000 | 0 | 0 | 0 | 0 | 0 | 0 | + +\* The user identifier set as `user_id` is used. Here it is set as td_client_id. + +![example of mta conversion journeys](docs/images/ex_mta_conversion_journey.png) + + + + +### daily_mta_conversion_journeys + +This is the main output table on which the indicator calculation is based. + +#### table example +| time | date | activation_step_id | is_internal_campaign_click | type | utm_source | utm_medium | utm_campaign | utm_content | utm_connector | cv_name | cnt_activations | cnt_clicks | cnt_clicks_related_conversion | acquired_person_last_click_model | acquired_person_first_click_model | acquired_person_session_model | acquired_revenue_last_click_model | acquired_revenue_first_click_model | acquired_revenue_session_model | size_journey | cnt_cv_id | +| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | +| 1701788400 | 2023-12-06 | 51397 | internal | Click | treasuredatajp | email | td_newsletter20200327 | | | DOWNLOAD | 366 | 1636 | 740 | 289 | 199 | 345.1166667 | 5780000 | 3980000 | 6902333.333 | 2161 | 740 | +| 1701788400 | 2023-12-06 | 51418 | internal | Click | diamondrm | email | td_plazma15 | | | DOWNLOAD | 0 | 1597 | 723 | 150 | 404 | 336.95 | 3000000 | 8080000 | 6739000 | 2076 | 723 | + +From the result, for example, if the Last Click Model is applied, the metrics for each campaign in a given time period can be easily calculated as follows. + +| No. | metrics | how to calculation | example value | +| --- | --- | --- | --- | +| (1) | #activations | SUM(cnt_activations) | 100,000 | +| (2) | #clicks | SUM(cnt_clicks) | 1,000 | +| (3) | #conversions | SUM(acquired_person_last_click_model) | 100 | +| (4) | acquired_revenue | SUM(acquired_revenue_last_click_model) | ¥1,000,000 | + +In addition, if costs per campaign are defined, ROI and other performance metrics can be calculated in conjunction with these metrics. + +| No. | metrics | how to calculation | example value | +| --- | --- | --- | --- | +| (5) | %click_ratio | (2) / (1) | 1% | +| (6) | %cv_ratio (from #activations) | (3) / (1) | 0.1% | +| (7) | %cv_ratio (from #clicks) | (3) / (2) | 10% | +| (8) | cost | User Defined | ¥200,000 | +| (9) | return | (4) - (8) | ¥800,000 | +| (10) | ROI | (9) / (8) | 4x | +| (11) | Return on Campaign Spend | (4) / (8) | 5x | + +## Setup for Execution + +### user_settings.yaml + +This file must be edited for execution. The following sample is an example to illustrate the process. + +```yaml +timezone: JST +td: + user_id: td_client_id + + ps: + - 489726 + + activations_tables: + 489726: + scan_journey_tables: true + + clicks_tables: + 489726: + - + is_audience_table: true + table: behavior_behv_orders + url_col: td_url + - + is_audience_table: false + db: treasurebikes + table: behv_website + url_col: td_url + time_col: time + filter: td_url IS NOT NULL + use_distinct: false + + conversions_tables: + 489726: + - + is_audience_table: true + table: behavior_behv_website + filter: td_app IN ('Android', 'iOS') + cv_name: DOWNLOAD + val_col: 1 + acquired_revenue_per_person: 5000 + use_distinct: false + - + is_audience_table: true + table: behavior_behv_orders + filter: checkout_event = 'true' + cv_name: PURCHASE + val_col: total_order + acquired_revenue_per_person: 1 + use_distinct: false + - + is_audience_table: false + db: treasurebikes + table: behv_website + filter: td_subscription = 'true' + cv_name: SUBSCRIBE + time_col: time + val_col: 1 + acquired_revenue_per_person: 20000 + use_distinct: true + + master_campaigns_tables: + 489726: + - + db: taka + table: master_campaigns_489726 + + mta: + session_model: + allowable_time_to_cv: 24*10 + + utm_names: + utm_id: activation_id + + api_endpoint: api.treasuredata.com +``` + +#### timezone + +(optional) +Specify a timezone here if you wish to base the timezone other than the one defined in the WF schedule. This timezone mainly affects the time range for the daily summary. + +#### user_id + +(required) +This is a common user identifier for all activations, clicks, and conversions tables. Essentially, this is a user identifier that exists in the master table. + +If this user_id is an e-mail address, it may be a `NULL` value in some records. In that case, it will not be measured correctly, and the user_id should be specified for which all records have a value, such as member_id. + +#### ps + +(required) + +```yml + ps: + - 489726 +``` + +Specify the parent segment ID (ps_id). Multiple parent segments can be specified, so this is specified as an array. Effectiveness measurement is performed for each parent segment. + +#### activations_tables + +(optional) + +```yml + activations_tables: + 489726: + scan_journey_tables: true +``` + +If activation_log feature is enabled, you do not need to set anything. If not, scan `journey_table` to get activation history of each profile. In that case, specify `scan_journey_tables: true` for each ps_id. + +Since activations tables are derived almost automatically based on `ps_id`, there is no other setting item. + +#### clicks_tables + +(required) + +```yml + clicks_tables: + 489726: + - + is_audience_table: true + table: behavior_behv_orders + url_col: td_url + - + is_audience_table: false + db: treasurebikes + table: behv_website + url_col: td_url + time_col: time + filter: td_url IS NOT NULL + use_distinct: false +``` + +Specify the access log of the incoming site when the campaign link is clicked. +Assuming that the utm parameter is set for the campaign link, each campaign link must be one-to-one with each activation of the journey. + +##### is_audience_table + +`is_audience_table` specifies whether the specified table is in the Parent Segment (i.e. in the `cdp_audience_${ps_id}` database). + +- If `true`, it must be a `behavior_*` table in `cdp_audience_${ps_id}`. +- If `false`, you can specify any table, but it must contain `user_id`. + +Required options depend on the value of `is_audience_table`. + +###### is_audience_table: true + +| option_name | required? | default | description | +| --- | --- | --- | --- | +| table | Y | | Specify the table name. | +| url_col | Y | | Specify a url column in a table. | +| filter | | (no filter) | If necessary, the WHERE clause can be used to narrow down the search.Fill in the conditions after WHERE. | +| use_distinct | | false | If there are duplicates in a record, deduplication by DISTINCT can be performed, but is basically not specified because of heavy processing. | + +###### is_audience_table: false + +| option_name | required? | default | description | +| --- | --- | --- | --- | +| db | Y | | Specify the database name. | +| table | Y | | Specify the table name. | +| url_col | Y | | Specify a url column in a table. | +| time_col | Y | | Specify the time column in the table. (If it is a `time` column, which is often the case, specify it explicitly) | +| filter | | (no filter) | If necessary, the WHERE clause can be used to narrow down the search.Fill in the conditions after WHERE. | +| use_distinct | | FALSE | If there are duplicates in a record, deduplication by DISTINCT can be performed, but is basically not specified because of heavy processing. | + +#### conversions_tables + +```yml + conversions_tables: + 489726: + - + is_audience_table: true + table: behavior_behv_website + cv_name: DOWNLOAD + filter: td_app IN ('Android', 'iOS') + val_col: 1 + acquired_revenue_per_person: 5000 + use_distinct: false + - + is_audience_table: true + table: behavior_behv_orders + cv_name: PURCHASE + filter: checkout_event = 'true' + val_col: total_order + acquired_revenue_per_person: 1 + use_distinct: false + - + is_audience_table: false + db: treasurebikes + table: behv_website + cv_name: SUBSCRIBE + filter: td_subscription = 'true' + time_col: time + val_col: 1 + acquired_revenue_per_person: 20000 + use_distinct: true +``` + +Specify a conversion table in which actions that can be regarded as conversions, such as "purchase" and "new registration", are recorded. The `filter` option extracts only the conversion actions in the conversion table. + +##### is_audience_table + +`is_audience_table` specifies whether the specified table is in the Parent Segment (i.e. in the `cdp_audience_${ps_id}` database). + +- If `true`, it must be a `behavior_*` table in `cdp_audience_${ps_id}`. +- If `false`, you can specify any table, but it must contain `user_id`. + +Required options depend on the value of `is_audience_table`. + +###### is_audience_table: true + +| option_name | required? | default | description | +| --- | --- | --- | --- | +| table | Y | | Specify the table name. | +| cv_name | Y | | Specify a conversion name. A conversion name should be the same as the value of `custom_event_type` defined [here](https://developers.facebook.com/docs/meta-pixel/reference/) for clarity . If you define a new one, specify it in the same format. | +| val_col | Y | | Specify the column containing the value obtained by the conversion. For conversions aimed at purchase, the column should contain the amount of the purchase, and for conversions aimed at acquisition, the column should be set to `1`. | +| acquired_revenue_per_person | Y | | The acquired revenue per conversion is to be calculated as `val_col * acquired_revenue_per_person`. For conversions aiming at purchase, the acquired revenue is already in `val_col`, so `1` is used, and for conversions aiming at acquisition, this option specifies the value of "future revenue assumed from the acquisition of one person".| +| filter | | (no filter) | The WHERE clause can be used to narrow down the search. Fill in the conditions after WHERE. | +| use_distinct | | false | If there are duplicates in a record, deduplication by DISTINCT can be performed, but is basically not specified because of heavy processing. | + +###### is_audience_table: false + +| option_name | required? | default | description | +| --- | --- | --- | --- | +| db | Y | | Specify the database name. | +| table | Y | | Specify the table name. | +| cv_name | Y | | Specify a conversion name. A conversion name should be the same as the value of `custom_event_type` defined [here](https://developers.facebook.com/docs/meta-pixel/reference/) for clarity . If you define a new one, specify it in the same format. | +| val_col | Y | | Specify the column containing the value obtained by the conversion. For conversions aimed at purchasing, the column should contain the amount of the purchase, and for conversions aimed at acquiring, the column should be set to `1`. | +| acquired_revenue_per_person | Y | | The acquired revenue per conversion is to be calculated as `val_col * acquired_revenue_per_person`. For conversions aiming at purchase, the acquired revenue is already in `val_col`, so `1` is used, and for conversions aiming at acquisition, this option specifies the value of "future revenue assumed from the acquisition of one person".| +| time_col | Y | | Specify the time column in the table. (If it is a `time` column, which is often the case, specify it explicitly) | +| filter | | (no filter) | The WHERE clause can be used to narrow down the search. Fill in the conditions after WHERE. | +| use_distinct | | false | If there are duplicates in a record, deduplication by DISTINCT can be performed, but is basically not specified because of heavy processing. | + +#### master_campaigns_tables +(optional) + +```yml + master_campaigns_tables: + 489726: + - + db: taka + table: master_campaigns_489726 +``` + +Specify the source of the master_campaigns table that the user will be uploading. By setting this table, the utm parameter of the clicks table can be retrofitted. + +The motivation for setting up this table is when you want to retrofit the utm parameter, for example in the following cases: + +- Complement misconfigurations or omissions in the utm parameter of existing campaign links +- Assign `cv_name` to campaigns outside of TD so that they can be measured for effectiveness as well +- Organize utm parameters for past campaigns so that they can be measured for effectiveness + +The master_campaigns table will be based on the `existing_campaigns` output from this WF, with the utm parameter values of the campaigns you wish to edit or add edited. + +However, since the following 3 parameter values are used to match records in the original clicks table, it is not possible to link records in the original clicks table that do not have these 3 values set: + +- utm_source +- utm_medium +- utm_campaign + +Basically, the aim of having master_campaign table can be considered to have utm parameters other than these 3 (cv_name, activation_step_id, etc...) later. And the value of the utm parameter set here takes precedence over the value of the utm parameter in the original clicks table. + +##### Required columns for table + +The required table columns for the master_campaigns table are as follows. It must have all these columns even if no values are set. (Other columns may also exist.) + +| column_name | value required? | description | +| --- | --- | --- | +| utm_source | Y | The same value as the record in the original clicks table. | +| utm_medium | Y | The same value as the record in the original clicks table. | +| utm_campaign | Y | The same value as the record in the original clicks table. | +| cv_name | Y | Setting a value will be priority reflected when measuring effectiveness. Unless this value is set, it is not subject to effectiveness measurement. | +| activation_step_id | | Setting a value will be priority reflected when measuring effectiveness. Specify if the campaign link is tied to an activation. | +| utm_content | | Setting a value will be priority reflected when measuring effectiveness. | +| utm_connector | | Setting a value will be priority reflected when measuring effectiveness. | +| utm_term | | Setting a value will be priority reflected when measuring effectiveness. | + +##### Example + +###### Records in the original clicks table + +| utm_source | utm_medium | utm_campaign | cv_name | activation_step_id | utm_content | utm_connector | +| --- | --- | --- | --- | --- | --- | --- | +| google | cpc | company_summer_sale_2019 | | | A | td | +| sfmc | email | mnt_helmet_abandoned_cart | | | A | td | +| facebook | social-post | td_plazma2022summerinv_link1 | DOWNLOAD | | A | | + +###### Records in the master_campaigns table + +| utm_source | utm_medium | utm_campaign | cv_name | activation_step_id | utm_content | utm_connector | +| --- | --- | --- | --- | --- | --- | --- | +| google | cpc | company_summer_sale_2019 | PURCHASE | 1234 | | | +| sfmc | email | mnt_helmet_abandoned_cart | DOWNLOAD | 5678 | B | marketo | +| facebook | social-post | td_plazma2022summerinv_link1 | SUBSCRIBE | 9101 | C | mailchimp | + +###### Records in the clicks table after merge + +| utm_source | utm_medium | utm_campaign | cv_name | activation_step_id | utm_content | utm_connector | +| --- | --- | --- | --- | --- | --- | --- | +| google | cpc | company_summer_sale_2019 | PURCHASE | 1234 | A | td | +| sfmc | email | mnt_helmet_abandoned_cart | DOWNLOAD | 5678 | B | marketo | +| facebook | social-post | td_plazma2022summerinv_link1 | SUBSCRIBE | 9101 | C | mailchimp | + +#### mta + +(required) + +```yml + mta: + session_model: + allowable_time_to_cv: 24*10 +``` + +Set parameters for each model in Multi Touch Attribution. Currently, the following models are available, and of which only `session_model` can be parameterized. + +- Last Touch Model + - Evaluate only campaign that is clicked on immediately prior to conversion. +- First Touch Model + - Evaluate only campaign that is clicked on at the very beginning of the conversion journey. +- Session Model + - Equally evaluate campaigns clicked on within N hours retroactively from the conversion. + +##### session_model: allowable_time_to_cv + +Specify N for "within N hours retroactively from the conversion." The unit is specified in hours. The above example specifies 240 hours (10 days). + +#### utm_names + +(optional) +Specify the parameter names to be extracted from the url column of the clicks table. By default, the parameter is extracted with the same parameter name as the option name, as shown below. + +```yml + utm_names: + utm_id: utm_id + utm_source: utm_source + utm_medium: utm_medium + utm_campaign: utm_campaign + utm_content: utm_content + utm_cv: utm_cv + utm_term: utm_term + utm_connector: utm_connector +``` + +On the other hand, if logging is done by a parameter name different from the option name, for example, if `utm_id` is used by other tools such as Google Analytics and therefore is logged with the parameter name `activation_id`, the default is overridden by writing the following. + +```yml + utm_names: + utm_id: activation_id +``` + +#### api_endpoint + +(optional) +If `scan_journey_tables: true` is specified, the tables are accessed through python. In this case, you need to specify the API endpoint. The default is `api.treasuredata.com`, so if you need to specify other endpoints, specify them here. + +- [Endpoint List](https://docs.treasuredata.com/display/public/PD/Sites+and+Endpoints) (Refer to the value of the "API" item) + + +### gsheet_settings.yaml + +```yml +gsheet: + result_connection: ***** + sheet_folder: ***** + spreadsheet_title: cdp_campaign_management +``` + +If you want to export a group of major output tables to Google Sheet, please set up this file. + +## Execution + +### Upload + +First, please upload the workflow. + +```sh +$td wf push cdp_campaign_management +``` + +### Set secrets + +Second, you register td.apikey as a secret. + +```sh +$td wf secrets --project basic_monitoring --set td.apikey=1234/abcdefg... +``` + +### Execute + +- Let's execute `main_initial_ingest.dig` for the first time. +- After that, set up a schedule for `main_incremental_ingest.dig` and execute this one on a daily schedule. \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/check_settings.dig b/scenarios/cdp_campaign_management/check_settings.dig new file mode 100644 index 00000000..362bc076 --- /dev/null +++ b/scenarios/cdp_campaign_management/check_settings.dig @@ -0,0 +1,10 @@ ++check_settings: + py>: py_scripts.check_settings.run + user_id: ${td.user_id} + clicks_tables: ${td.clicks_tables} + conversions_tables: ${td.conversions_tables} + mta_settings: ${td.mta} + docker: + image: "digdag/digdag-python:3.9" + _env: + TD_API_KEY: ${secret:td.apikey} diff --git a/scenarios/cdp_campaign_management/common/gsheet_settings.yaml b/scenarios/cdp_campaign_management/common/gsheet_settings.yaml new file mode 100644 index 00000000..9b22c408 --- /dev/null +++ b/scenarios/cdp_campaign_management/common/gsheet_settings.yaml @@ -0,0 +1,4 @@ +gsheet: + result_connection: gsheet_for_roi + sheet_folder: 1fLWGown_zelyAkSPKxwxxqnSgi6riAqp + spreadsheet_title: cdp_campaign_management \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/common/system_settings.yaml b/scenarios/cdp_campaign_management/common/system_settings.yaml new file mode 100644 index 00000000..4c2bc4e3 --- /dev/null +++ b/scenarios/cdp_campaign_management/common/system_settings.yaml @@ -0,0 +1,77 @@ +parallel: true +td: + base_db_name: cdp_campaigns + tables: + activation_log: activation_log + existing_campaigns: existing_campaigns + + master_campaigns: master_campaigns + tmp_master_campaigns: tmp_master_campaigns + + journeys: journeys + tmp_journeys: tmp_journeys + + master_activations: master_activations + tmp_master_activations: tmp_master_activations + + daily_activations_info: daily_activations_info + tmp_daily_activations_info: tmp_daily_activations_info + + activations: activations + tmp_activations: tmp_activations + + clicks: clicks + tmp_clicks: tmp_clicks + + conversions: conversions + tmp_conversions: tmp_conversions + + conversion_journeys: conversion_journeys + tmp_conversion_journeys: tmp_conversion_journeys + + mta_conversion_journeys: mta_conversion_journeys + tmp_mta_conversion_journeys: tmp_mta_conversion_journeys + + daily_activations: daily_activations + tmp_daily_activations: tmp_daily_activations + + daily_clicks: daily_clicks + tmp_daily_clicks: tmp_daily_clicks + + daily_conversions: daily_conversions + tmp_daily_conversions: tmp_daily_conversions + + daily_conversion_journeys: daily_conversion_journeys + tmp_daily_conversion_journeys: tmp_daily_conversion_journeys + + daily_mta_conversion_journeys: daily_mta_conversion_journeys + tmp_daily_mta_conversion_journeys: tmp_daily_mta_conversion_journeys + + query_store: query_store + + monitoring: + db: + cdp_monitoring: cdp_monitoring + basic_monitoring: basic_monitoring + wf_monitoring: wf_monitoring + tables: + entities: entities + activations: activations + activations_history: activations_history + jobs: jobs + journey_activation: journey_activation + journey_summary: journey_summary + connections_details: connections_details + connections: connections + + utm_names: + utm_id: utm_id + utm_source: utm_source + utm_medium: utm_medium + utm_campaign: utm_campaign + utm_content: utm_content + utm_cv: utm_cv + utm_term: utm_term + utm_connector: utm_connector + + api_endpoint: api.treasuredata.com diff --git a/scenarios/cdp_campaign_management/common/user_settings.yaml b/scenarios/cdp_campaign_management/common/user_settings.yaml new file mode 100644 index 00000000..4b778da9 --- /dev/null +++ b/scenarios/cdp_campaign_management/common/user_settings.yaml @@ -0,0 +1,17 @@ +# timezone: +td: + user_id: #required + + ps: #required + + activations_tables: #required + + clicks_tables: #required + + conversions_tables: #required + + master_campaigns_tables: #optional + + mta: #required + session_model: + allowable_time_to_cv: 24*10 diff --git a/scenarios/cdp_campaign_management/common/user_settings_sample.yaml b/scenarios/cdp_campaign_management/common/user_settings_sample.yaml new file mode 100644 index 00000000..d962df6a --- /dev/null +++ b/scenarios/cdp_campaign_management/common/user_settings_sample.yaml @@ -0,0 +1,70 @@ +timezone: JST +td: + user_id: td_client_id # One of the user identifiers defined in the Parent Segment + ps: + - 507568 + activations_tables: + 507568: + scan_journey_tables: true + clicks_tables: + 507568: + - + is_audience_table: false + db: treasurebikes + table: behv_website + url_col: td_url + time_col: time + filter: td_url IS NOT NULL + use_distinct: false + - + is_audience_table: true + # db: + table: behavior_behv_orders + url_col: td_url + # time_col: timestamp + # filter: + # use_distinct: false + conversions_tables: + 507568: + - + is_audience_table: false + db: treasurebikes + table: behv_website + filter: td_subscription = 'true' + cv_name: SUBSCRIBE + time_col: time + val_col: 1 + acquired_revenue_per_person: 20000 + use_distinct: true + - + is_audience_table: true + table: behavior_behv_website + filter: td_app IN ('Android', 'iOS') + cv_name: DOWNLOAD + # time_col: timestamp + val_col: 1 + acquired_revenue_per_person: 20000 + use_distinct: false + - + is_audience_table: true + table: behavior_behv_orders + filter: checkout_event = 'true' + cv_name: PURCHASE + # time_col: timestamp + val_col: total_order + acquired_revenue_per_person: 1 + # use_distinct: false + + master_campaigns_tables: + # 507568: + # - + # db: taka + # table: master_campaigns_507568 + + mta: + session_model: + allowable_time_to_cv: 24*10 + + utm_names: + utm_cv: utm_term + diff --git a/scenarios/cdp_campaign_management/docs/images/ex_conversion_journey.png b/scenarios/cdp_campaign_management/docs/images/ex_conversion_journey.png new file mode 100644 index 00000000..f8429af8 Binary files /dev/null and b/scenarios/cdp_campaign_management/docs/images/ex_conversion_journey.png differ diff --git a/scenarios/cdp_campaign_management/docs/images/ex_mta_conversion_journey.png b/scenarios/cdp_campaign_management/docs/images/ex_mta_conversion_journey.png new file mode 100644 index 00000000..5a29996b Binary files /dev/null and b/scenarios/cdp_campaign_management/docs/images/ex_mta_conversion_journey.png differ diff --git a/scenarios/cdp_campaign_management/docs/images/slide_top.png b/scenarios/cdp_campaign_management/docs/images/slide_top.png new file mode 100644 index 00000000..f28247df Binary files /dev/null and b/scenarios/cdp_campaign_management/docs/images/slide_top.png differ diff --git a/scenarios/cdp_campaign_management/incremental_ingest.dig b/scenarios/cdp_campaign_management/incremental_ingest.dig new file mode 100644 index 00000000..8c3a21a6 --- /dev/null +++ b/scenarios/cdp_campaign_management/incremental_ingest.dig @@ -0,0 +1,253 @@ +_export: + time_from: ${session_unixtime-60*60*24} + time_to: ${session_unixtime} + ++ingest_journeys: + +ingest: + td>: queries/ingest_journeys.sql + create_table: ${td.tables.tmp_journeys} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_journeys} + insert_into: ${td.database}.${td.tables.journeys} + + ++ingest_master_activations: + +ingest_to_tmp_table: + td>: queries/ingest_master_activations.sql + create_table: ${td.tables.tmp_master_activations} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_master_activations} + insert_into: ${td.database}.${td.tables.master_activations} + + ++ingest_daily_activations_info: + +ingest_to_tmp_table: + td>: queries/ingest_daily_activations_info.sql + create_table: ${td.tables.tmp_daily_activations_info} + + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_daily_activations_info} + insert_into: ${td.database}.${td.tables.daily_activations_info} + ++ingest_clicks: + +prepare_table: + td_ddl>: + drop_tables: + - ${td.tables.tmp_clicks} + create_tables: + - ${td.tables.tmp_clicks} + + +prepare_master_campaings: + +ingest_master_campaigns: + if>: ${typeof td.master_campaigns_tables === 'undefined' || td.master_campaigns_tables == null} + _do: + +create_table: + td>: queries/create_master_campaigns.sql + dest_db: ${td.database} + dest_table: ${td.tables.master_campaigns} + _else_do: + +create_table: + td_ddl>: + drop_tables: + - ${td.tables.tmp_master_campaigns} + create_tables: + - ${td.tables.tmp_master_campaigns} + +ingest_to_tmp_table: + _parallel: ${parallel} + for_each>: + tbl_info: ${td.master_campaigns_tables[ps_id]} + _do: + td>: + query: SELECT * FROM ${tbl_info.db}.${tbl_info.table} + insert_into: ${td.database}.${td.tables.tmp_master_campaigns} + + +de_duplication: + td>: queries/de_duplicate_master_campaings.sql + create_table: ${td.database}.${td.tables.master_campaigns} + + +ingest_to_tmp_table: + _parallel: ${parallel} + for_each>: + tbl_info: ${td.clicks_tables[ps_id]} + _do: + _export: + url_column: ${tbl_info.url_col} + input_table: ${tbl_info.table} + campaign_db: ${td.database} + filter: ${(typeof tbl_info.filter === 'undefined') || tbl_info.filter} + master_campaigns_table: ${td.tables.master_campaigns} + + time_column: "${tbl_info.is_audience_table ? 'timestamp' : tbl_info.time_col}" + input_db: "${tbl_info.is_audience_table ? 'cdp_audience_' + ps_id : tbl_info.db}" + user_column: "${tbl_info.is_audience_table ? 't2.' + td.user_id : 't1.' + td.user_id}" + user_column_inner: "${tbl_info.is_audience_table ? 'cdp_customer_id' : td.user_id}" + join_part: "${tbl_info.is_audience_table ? 'JOIN cdp_audience_' + ps_id + '.customers t2 ON t1.cdp_customer_id = t2.cdp_customer_id' : ''}" + distinct: "${typeof tbl_info.use_distinct === 'undefined' || !tbl_info.use_distinct ? '' : 'DISTINCT'}" + + td>: queries/ingest_clicks.sql + insert_into: ${td.database}.${td.tables.tmp_clicks} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_clicks} + insert_into: ${td.database}.${td.tables.clicks} + + ++ingest_daily_clicks: + +ingest_to_tmp_table: + td>: queries/ingest_daily_clicks.sql + create_table: ${td.database}.${td.tables.tmp_daily_clicks} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_daily_clicks} + insert_into: ${td.database}.${td.tables.daily_clicks} + ++ingest_activations: + +prepare_table: + td_ddl>: + drop_tables: + - ${td.tables.tmp_activations} + create_tables: + - ${td.tables.tmp_activations} + + +ingest_to_tmp_table: + if>: ${typeof td.activations_tables === 'undefined' || td.activations_tables == null || td.activations_tables[ps_id] == null || !td.activations_tables[ps_id].scan_journey_tables} + _do: + _export: + cdp_audience_db: cdp_audience_${ps_id} + + td>: queries/ingest_activations.sql + create_table: ${td.database}.${td.tables.tmp_activations} + + _else_do: + call>: sub_ingest_activations.dig + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_activations} + insert_into: ${td.database}.${td.tables.activations} + + ++ingest_daily_activations: + +ingest_to_tmp_table: + td>: queries/ingest_daily_activations.sql + create_table: ${td.database}.${td.tables.tmp_daily_activations} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_daily_activations} + insert_into: ${td.database}.${td.tables.daily_activations} + + ++ingest_conversions: + +prepare_table: + td_ddl>: + drop_tables: + - ${td.tables.tmp_conversions} + create_tables: + - ${td.tables.tmp_conversions} + + +ingest_to_tmp_table: + _parallel: ${parallel} + for_each>: + tbl_info: ${td.conversions_tables[ps_id]} + _do: + _export: + input_table: ${tbl_info.table} + filter: ${(typeof tbl_info.filter === 'undefined') || tbl_info.filter} + cv_name: ${tbl_info.cv_name} + val_col: ${tbl_info.val_col} + acquired_revenue_per_person: ${tbl_info.acquired_revenue_per_person} + + time_column: "${tbl_info.is_audience_table ? 'timestamp' : tbl_info.time_col}" + user_column: "${tbl_info.is_audience_table ? 't2.' + td.user_id : 't1.' + td.user_id}" + inner_user_column: "${tbl_info.is_audience_table ? 'cdp_customer_id' : td.user_id}" + input_db: "${tbl_info.is_audience_table ? 'cdp_audience_' + ps_id : tbl_info.db}" + join_part: "${tbl_info.is_audience_table ? 'JOIN cdp_audience_' + ps_id + '.customers t2 ON t1.cdp_customer_id = t2.cdp_customer_id' : ''}" + distinct: "${typeof tbl_info.use_distinct === 'undefined' || !tbl_info.use_distinct ? '' : 'DISTINCT'}" + + td>: queries/ingest_conversions.sql + insert_into: ${td.database}.${td.tables.tmp_conversions} + + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_conversions} + insert_into: ${td.database}.${td.tables.conversions} + + ++ingest_daily_conversions: + +ingest_to_tmp_table: + td>: queries/ingest_daily_conversions.sql + create_table: ${td.database}.${td.tables.tmp_daily_conversions} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_daily_conversions} + insert_into: ${td.database}.${td.tables.daily_conversions} + + ++ingest_conversion_journeys: + +prepare_table: + td_ddl>: + drop_tables: + - ${td.tables.tmp_conversion_journeys} + create_tables: + - ${td.tables.tmp_conversion_journeys} + + +ingest_to_tmp_table: + _parallel: ${parallel} + for_each>: + tbl_info: ${td.conversions_tables[ps_id]} + _do: + _export: + cv_name: ${tbl_info.cv_name} + user_id: ${td.user_id} + input_db: ${td.database} + input_table_activations: ${td.tables.activations} + input_table_clicks: ${td.tables.clicks} + input_table_conversions: ${td.tables.conversions} + + td>: queries/ingest_conversion_journeys.sql + insert_into: ${td.database}.${td.tables.tmp_conversion_journeys} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_conversion_journeys} + insert_into: ${td.database}.${td.tables.conversion_journeys} + + ++ingest_mta_conversion_journeys: + +ingest_to_tmp_table: + td>: queries/ingest_mta_conversion_journeys.sql + create_table: ${td.tables.tmp_mta_conversion_journeys} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_mta_conversion_journeys} + insert_into: ${td.database}.${td.tables.mta_conversion_journeys} + + ++ingest_daily_mta_conversion_journeys: + +ingest_to_tmp_table: + td>: queries/ingest_daily_mta_conversion_journeys.sql + create_table: ${td.tables.tmp_daily_mta_conversion_journeys} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_daily_mta_conversion_journeys} + insert_into: ${td.database}.${td.tables.daily_mta_conversion_journeys} + + ++ingest_existing_campaigns: + td>: queries/ingest_existing_campaigns.sql + create_table: ${td.tables.existing_campaigns} + + diff --git a/scenarios/cdp_campaign_management/initial_ingest.dig b/scenarios/cdp_campaign_management/initial_ingest.dig new file mode 100644 index 00000000..719e1f67 --- /dev/null +++ b/scenarios/cdp_campaign_management/initial_ingest.dig @@ -0,0 +1,388 @@ +_export: + time_from: 0 + time_to: ${session_unixtime} + ++ingest_journeys: + +ingest: + td>: queries/ingest_journeys.sql + create_table: ${td.tables.tmp_journeys} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_journeys} + create_table: ${td.database}.${td.tables.journeys} + + +check_record_num: + td>: + query: SELECT COUNT(*) AS cnt FROM ${td.database}.${td.tables.journeys} + store_last_results: true + + +create_table_if_0_record: + if>: ${td.last_results.cnt == 0} + _do: + td>: queries/create_journeys.sql + dest_db: ${td.database} + dest_table: ${td.tables.journeys} + + ++ingest_master_activations: + +ingest: + td>: queries/ingest_master_activations.sql + create_table: ${td.tables.tmp_master_activations} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_master_activations} + create_table: ${td.database}.${td.tables.master_activations} + + +check_record_num: + td>: + query: SELECT COUNT(*) AS cnt FROM ${td.database}.${td.tables.master_activations} + store_last_results: true + + +create_table_if_0_record: + if>: ${td.last_results.cnt == 0} + _do: + td>: queries/create_master_activations.sql + dest_db: ${td.database} + dest_table: ${td.tables.master_activations} + ++ingest_daily_activations_info: + +ingest_to_tmp_table: + td>: queries/ingest_daily_activations_info.sql + create_table: ${td.tables.tmp_daily_activations_info} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_daily_activations_info} + create_table: ${td.database}.${td.tables.daily_activations_info} + + +check_record_num: + td>: + query: SELECT COUNT(*) AS cnt FROM ${td.database}.${td.tables.daily_activations_info} + store_last_results: true + + +create_table_if_0_record: + if>: ${td.last_results.cnt == 0} + _do: + td>: queries/create_daily_activations_info.sql + dest_db: ${td.database} + dest_table: ${td.tables.daily_activations_info} + ++ingest_clicks: + +prepare_table: + td_ddl>: + drop_tables: + - ${td.tables.tmp_clicks} + create_tables: + - ${td.tables.tmp_clicks} + + +prepare_master_campaings: + +ingest_master_campaigns: + if>: ${typeof td.master_campaigns_tables === 'undefined' || td.master_campaigns_tables == null} + _do: + +create_table: + td>: queries/create_master_campaigns.sql + dest_db: ${td.database} + dest_table: ${td.tables.master_campaigns} + _else_do: + +create_table: + td_ddl>: + drop_tables: + - ${td.tables.tmp_master_campaigns} + create_tables: + - ${td.tables.tmp_master_campaigns} + +ingest_to_tmp_table: + _parallel: ${parallel} + for_each>: + tbl_info: ${td.master_campaigns_tables[ps_id]} + _do: + td>: + query: SELECT * FROM ${tbl_info.db}.${tbl_info.table} + insert_into: ${td.database}.${td.tables.tmp_master_campaigns} + + +de_duplication: + td>: queries/de_duplicate_master_campaings.sql + create_table: ${td.database}.${td.tables.master_campaigns} + + + +ingest_to_tmp_table: + _parallel: ${parallel} + for_each>: + tbl_info: ${td.clicks_tables[ps_id]} + _do: + _export: + url_column: ${tbl_info.url_col} + input_table: ${tbl_info.table} + campaign_db: ${td.database} + filter: ${(typeof tbl_info.filter === 'undefined') || tbl_info.filter} + master_campaigns_table: ${td.tables.master_campaigns} + + time_column: "${tbl_info.is_audience_table ? 'timestamp' : tbl_info.time_col}" + input_db: "${tbl_info.is_audience_table ? 'cdp_audience_' + ps_id : tbl_info.db}" + user_column: "${tbl_info.is_audience_table ? 't2.' + td.user_id : 't1.' + td.user_id}" + user_column_inner: "${tbl_info.is_audience_table ? 'cdp_customer_id' : td.user_id}" + join_part: "${tbl_info.is_audience_table ? 'JOIN cdp_audience_' + ps_id + '.customers t2 ON t1.cdp_customer_id = t2.cdp_customer_id' : ''}" + distinct: "${typeof tbl_info.use_distinct === 'undefined' || !tbl_info.use_distinct ? '' : 'DISTINCT'}" + + td>: queries/ingest_clicks.sql + insert_into: ${td.database}.${td.tables.tmp_clicks} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_clicks} + create_table: ${td.database}.${td.tables.clicks} + + +check_record_num: + td>: + query: SELECT COUNT(*) AS cnt FROM ${td.database}.${td.tables.clicks} + store_last_results: true + + +create_table_if_0_record: + if>: ${td.last_results.cnt == 0} + _do: + td>: queries/create_clicks.sql + dest_db: ${td.database} + dest_table: ${td.tables.clicks} + ++ingest_daily_clicks: + +ingest_to_tmp_table: + td>: queries/ingest_daily_clicks.sql + create_table: ${td.database}.${td.tables.tmp_daily_clicks} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_daily_clicks} + create_table: ${td.database}.${td.tables.daily_clicks} + + +check_record_num: + td>: + query: SELECT COUNT(*) AS cnt FROM ${td.database}.${td.tables.daily_clicks} + store_last_results: true + + +create_table_if_0_record: + if>: ${td.last_results.cnt == 0} + _do: + td>: queries/create_daily_clicks.sql + dest_db: ${td.database} + dest_table: ${td.tables.daily_clicks} + ++ingest_activations: + +prepare_table: + td_ddl>: + drop_tables: + - ${td.tables.tmp_activations} + create_tables: + - ${td.tables.tmp_activations} + + +ingest_to_tmp_table: + if>: ${typeof td.activations_tables === 'undefined' || td.activations_tables == null || td.activations_tables[ps_id] == null || !td.activations_tables[ps_id].scan_journey_tables} + _do: + _export: + cdp_audience_db: cdp_audience_${ps_id} + + td>: queries/ingest_activations.sql + create_table: ${td.database}.${td.tables.tmp_activations} + + _else_do: + call>: sub_ingest_activations.dig + + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_activations} + create_table: ${td.database}.${td.tables.activations} + + +check_record_num: + td>: + query: SELECT COUNT(*) AS cnt FROM ${td.database}.${td.tables.activations} + store_last_results: true + + +create_table_if_0_record: + if>: ${td.last_results.cnt == 0} + _do: + td>: queries/create_activations.sql + dest_db: ${td.database} + dest_table: ${td.tables.activations} + ++ingest_daily_activations: + +ingest_to_tmp_table: + td>: queries/ingest_daily_activations.sql + create_table: ${td.database}.${td.tables.tmp_daily_activations} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_daily_activations} + create_table: ${td.database}.${td.tables.daily_activations} + + +check_record_num: + td>: + query: SELECT COUNT(*) AS cnt FROM ${td.database}.${td.tables.daily_activations} + store_last_results: true + + +create_table_if_0_record: + if>: ${td.last_results.cnt == 0} + _do: + td>: queries/create_daily_activations.sql + dest_db: ${td.database} + dest_table: ${td.tables.daily_activations} + + ++ingest_conversions: + +prepare_table: + td_ddl>: + drop_tables: + - ${td.tables.tmp_conversions} + create_tables: + - ${td.tables.tmp_conversions} + + +ingest_to_tmp_table: + _parallel: ${parallel} + for_each>: + tbl_info: ${td.conversions_tables[ps_id]} + _do: + _export: + input_table: ${tbl_info.table} + filter: ${(typeof tbl_info.filter === 'undefined') || tbl_info.filter} + cv_name: ${tbl_info.cv_name} + val_col: ${tbl_info.val_col} + acquired_revenue_per_person: ${tbl_info.acquired_revenue_per_person} + + time_column: "${tbl_info.is_audience_table ? 'timestamp' : tbl_info.time_col}" + user_column: "${tbl_info.is_audience_table ? 't2.' + td.user_id : 't1.' + td.user_id}" + inner_user_column: "${tbl_info.is_audience_table ? 'cdp_customer_id' : td.user_id}" + input_db: "${tbl_info.is_audience_table ? 'cdp_audience_' + ps_id : tbl_info.db}" + join_part: "${tbl_info.is_audience_table ? 'JOIN cdp_audience_' + ps_id + '.customers t2 ON t1.cdp_customer_id = t2.cdp_customer_id' : ''}" + distinct: "${typeof tbl_info.use_distinct === 'undefined' || !tbl_info.use_distinct ? '' : 'DISTINCT'}" + + td>: queries/ingest_conversions.sql + insert_into: ${td.database}.${td.tables.tmp_conversions} + + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_conversions} + create_table: ${td.database}.${td.tables.conversions} + + +check_record_num: + td>: + query: SELECT COUNT(*) AS cnt FROM ${td.database}.${td.tables.conversions} + store_last_results: true + + +create_table_if_0_record: + if>: ${td.last_results.cnt == 0} + _do: + td>: queries/create_conversions.sql + dest_db: ${td.database} + dest_table: ${td.tables.conversions} + ++ingest_daily_conversions: + +ingest_to_tmp_table: + td>: queries/ingest_daily_conversions.sql + create_table: ${td.database}.${td.tables.tmp_daily_conversions} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_daily_conversions} + create_table: ${td.database}.${td.tables.daily_conversions} + + +check_record_num: + td>: + query: SELECT COUNT(*) AS cnt FROM ${td.database}.${td.tables.daily_conversions} + store_last_results: true + + +create_table_if_0_record: + if>: ${td.last_results.cnt == 0} + _do: + td>: queries/create_daily_conversions.sql + dest_db: ${td.database} + dest_table: ${td.tables.daily_conversions} + ++ingest_conversion_journeys: + +prepare_table: + td_ddl>: + drop_tables: + - ${td.tables.tmp_conversion_journeys} + create_tables: + - ${td.tables.tmp_conversion_journeys} + + +ingest_to_tmp_table: + _parallel: ${parallel} + for_each>: + tbl_info: ${td.conversions_tables[ps_id]} + _do: + _export: + cv_name: ${tbl_info.cv_name} + user_id: ${td.user_id} + input_db: ${td.database} + input_table_activations: ${td.tables.activations} + input_table_clicks: ${td.tables.clicks} + input_table_conversions: ${td.tables.conversions} + + td>: queries/ingest_conversion_journeys.sql + insert_into: ${td.database}.${td.tables.tmp_conversion_journeys} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_conversion_journeys} + create_table: ${td.database}.${td.tables.conversion_journeys} + + +check_record_num: + td>: + query: SELECT COUNT(*) AS cnt FROM ${td.database}.${td.tables.conversion_journeys} + store_last_results: true + + +create_table_if_0_record: + if>: ${td.last_results.cnt == 0} + _do: + td>: queries/create_conversion_journeys.sql + dest_db: ${td.database} + dest_table: ${td.tables.conversion_journeys} + ++ingest_mta_conversion_journeys: + +ingest_to_tmp_table: + td>: queries/ingest_mta_conversion_journeys.sql + create_table: ${td.tables.tmp_mta_conversion_journeys} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_mta_conversion_journeys} + create_table: ${td.database}.${td.tables.mta_conversion_journeys} + + +check_record_num: + td>: + query: SELECT COUNT(*) AS cnt FROM ${td.database}.${td.tables.mta_conversion_journeys} + store_last_results: true + + +create_table_if_0_record: + if>: ${td.last_results.cnt == 0} + _do: + td>: queries/create_mta_conversion_journeys.sql + dest_db: ${td.database} + dest_table: ${td.tables.mta_conversion_journeys} + ++ingest_daily_mta_conversion_journeys: + +ingest_to_tmp_table: + td>: queries/ingest_daily_mta_conversion_journeys.sql + create_table: ${td.tables.tmp_daily_mta_conversion_journeys} + + +write_tmp_to_dest_table: + td>: + query: SELECT * FROM ${td.database}.${td.tables.tmp_daily_mta_conversion_journeys} + create_table: ${td.database}.${td.tables.daily_mta_conversion_journeys} + + +check_record_num: + td>: + query: SELECT COUNT(*) AS cnt FROM ${td.database}.${td.tables.daily_mta_conversion_journeys} + store_last_results: true + + +create_table_if_0_record: + if>: ${td.last_results.cnt == 0} + _do: + td>: queries/create_daily_mta_conversion_journeys.sql + dest_db: ${td.database} + dest_table: ${td.tables.daily_mta_conversion_journeys} + ++ingest_existing_campaigns: + td>: queries/ingest_existing_campaigns.sql + create_table: ${td.tables.existing_campaigns} diff --git a/scenarios/cdp_campaign_management/main_incremental_ingest.dig b/scenarios/cdp_campaign_management/main_incremental_ingest.dig new file mode 100644 index 00000000..54c75077 --- /dev/null +++ b/scenarios/cdp_campaign_management/main_incremental_ingest.dig @@ -0,0 +1,33 @@ +timezone: Asia/Tokyo +schedule: + daily>: 06:00:00 + +_export: + !include : common/system_settings.yaml + !include : common/user_settings_sample.yaml + !include : common/gsheet_settings.yaml + +# +check_settings: +# call>: check_settings.dig + ++repeat_each_ps: + + for_each>: + ps_id: ${td.ps} + _do: + _export: + td: + database: ${td.base_db_name}_${ps_id} + + +prepare_dbs: + td_ddl>: + create_databases: + - ${td.database} + + +incremental_ingest: + call>: incremental_ingest.dig + + +write_to_gsheet: + if>: ${typeof gsheet === 'undefined' || gsheet == null || typeof gsheet.result_connection === 'undefined'} + _else_do: + call>: write_to_gsheet.dig diff --git a/scenarios/cdp_campaign_management/main_initial_ingest.dig b/scenarios/cdp_campaign_management/main_initial_ingest.dig new file mode 100644 index 00000000..229a872c --- /dev/null +++ b/scenarios/cdp_campaign_management/main_initial_ingest.dig @@ -0,0 +1,28 @@ +_export: + !include : common/system_settings.yaml + !include : common/user_settings_sample.yaml + !include : common/gsheet_settings.yaml + +# +check_settings: +# call>: check_settings.dig + ++repeat_each_ps: + for_each>: + ps_id: ${td.ps} + _do: + _export: + td: + database: ${td.base_db_name}_${ps_id} + + +prepare_dbs: + td_ddl>: + create_databases: + - ${td.database} + + +initial_ingest: + call>: initial_ingest.dig + + +write_to_gsheet: + if>: ${typeof gsheet === 'undefined' || gsheet == null || typeof gsheet.result_connection === 'undefined'} + _else_do: + call>: write_to_gsheet.dig diff --git a/scenarios/cdp_campaign_management/py_scripts/check_settings.py b/scenarios/cdp_campaign_management/py_scripts/check_settings.py new file mode 100644 index 00000000..bd9ec69a --- /dev/null +++ b/scenarios/cdp_campaign_management/py_scripts/check_settings.py @@ -0,0 +1,71 @@ +import os +import sys +import json + +def run( + user_id, + clicks_tables, + conversions_tables, + mta_settings): + + error_counter = 0 + if user_id == "" or user_id == None: + print("user_id is not set.") + error_counter += 1 + else: + print(f"ⓘ {user_id} is used as the conversion journey identifier.") + + + click_tables = json.loads(clicks_tables) + for ps_id in click_tables: + idx = 0 + for table_setting in click_tables[ps_id]: + idx += 1 + for key in ('table','url_col','is_audience_table'): + if key not in table_setting.keys(): + print(f"⚠ error in `clicks_tables`: `{key}` is not set in the {idx}th table of ps_id={ps_id}.") + error_counter += 1 + + if 'filter' not in table_setting.keys(): + print(f"ⓘ warning: `filter` is not set in the {idx}th click table of ps_id={ps_id}.") + + if 'is_audience_table' in table_setting.keys(): + is_audience_table = table_setting['is_audience_table'] + if type(is_audience_table) != bool: + print(f"⚠ error in `clicks_tables`: `is_audience_table` is not a boolean value in the {idx}th table of ps_id={ps_id}.") + error_counter += 1 + if not is_audience_table: + if 'db' not in table_setting: + print(f"⚠ error in `clicks_tables`: `db` is not set in the {idx}th table of ps_id={ps_id}.") + error_counter += 1 + if 'time_col' not in table_setting: + print(f"⚠ error in `clicks_tables`: `time_col` is not set in the {idx}th table of ps_id={ps_id}.") + error_counter += 1 + + conversions_tables = json.loads(conversions_tables) + for ps_id in conversions_tables: + idx = 0 + for table_setting in conversions_tables[ps_id]: + idx += 1 + for key in ('table','val_col','cv_name','acquired_revenue_per_person','is_audience_table'): + if key not in table_setting.keys(): + print(f"⚠ error in `conversion_tables`: `{key}` is not set in the {idx}th table of ps_id={ps_id}.") + error_counter += 1 + + if 'filter' not in table_setting.keys(): + print(f"ⓘ warning: `filter` is not set in the {idx}th conversion table of ps_id={ps_id}.") + + if 'is_audience_table' in table_setting.keys(): + is_audience_table = table_setting['is_audience_table'] + if type(is_audience_table) != bool: + print(f"⚠ error in `conversion_tables`: `is_audience_table` is not a boolean value in the {idx}th table of ps_id={ps_id}.") + error_counter += 1 + if not is_audience_table: + for key in ('db','time_col'): + if is_audience_table and key not in table_setting: + print(f"⚠ error in `conversion_tables`: `{key}` is not set in the {idx}th table of ps_id={ps_id}.") + error_counter += 1 + + if error_counter > 0: + print(f"⚠ ⚠ ⚠ There are {error_counter} configuration errors. ⚠ ⚠ ⚠ ") + sys.exit(1) diff --git a/scenarios/cdp_campaign_management/py_scripts/ingest_activations_queries.py b/scenarios/cdp_campaign_management/py_scripts/ingest_activations_queries.py new file mode 100644 index 00000000..be8e7ea9 --- /dev/null +++ b/scenarios/cdp_campaign_management/py_scripts/ingest_activations_queries.py @@ -0,0 +1,148 @@ +import os +import sys +import pandas as pd +import math +import pytd +from dateutil import parser + +pd.set_option('display.max_columns', None) +headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']} +MAX_UNIONS = 40 + +def run( + session_unixtime, + time_from, + time_to, + user_id, + input_db, + input_table_master_activations, + input_table_daily_activations_info, + cdp_audience_db, + input_table_customers, + input_table_clicks, + dest_db, + dest_table, + query_store_table, + api_endpoint='api.treasuredata.com'): + + client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db) + + tables_obj = client.list_tables(cdp_audience_db) + input_tables = [] + for tbl in tables_obj: + input_tables.append(tbl.name) + + res = client.query( + f'SELECT journey_id, syndication_id, activation_step_id, activation_name, stage_no, step_id FROM {input_db}.{input_table_master_activations} WHERE syndication_id is not NULL' + ) + df_1 = pd.DataFrame(**res) + print(f"Searched {len(df_1)} activations. Create {math.ceil(len(df_1)/MAX_UNIONS)} queries in the process.") + + qry_union_all = '' + l = [] + columns_q = ['time','db_name','table_name','query'] + + for idx, row in df_1.iterrows(): + + journey_table = 'journey_%s' % (row['journey_id']) + syndication_id = row['syndication_id'] + activation_step_id = row['activation_step_id'] + activation_name = row['activation_name'] + stage_no = row['stage_no'] + step_id = row['step_id'] + column_id = 'intime_stage_%s_%s' % (stage_no, step_id) + if stage_no < 0: # If the value is missing, -1 is included. + continue + + if not journey_table in input_tables: + print(f'{cdp_audience_db}.{journey_table} does not exist, so skip this task.') + continue + + inner_qry = ' '.join([ + f"SELECT \"{column_id}\" AS session_time, t2.{user_id}, t1.cdp_customer_id, '{syndication_id}' AS syndication_id, '{activation_step_id}' AS activation_step_id, '{activation_name}' AS activation_name", + f"FROM {cdp_audience_db}.{journey_table} t1", + f"LEFT OUTER JOIN {cdp_audience_db}.{input_table_customers} t2", + f"ON t1.cdp_customer_id = t2.cdp_customer_id", + f"WHERE \"{column_id}\" IS NOT NULL", + f"AND TD_TIME_RANGE(\"{column_id}\",{time_from},{time_to}) " + ]) + + journey_reentry_history_table = f"{journey_table}_reentry_history" + if not journey_reentry_history_table in input_tables: + inner_qry += ' '.join([ + f" UNION ALL", + f"SELECT \"{column_id}\" AS session_time, t2.{user_id}, t1.cdp_customer_id, '{syndication_id}' AS syndication_id, '{activation_step_id}' AS activation_step_id, '{activation_name}' AS activation_name", + f"FROM {cdp_audience_db}.{journey_reentry_history_table} t1", + f"LEFT OUTER JOIN {cdp_audience_db}.{input_table_customers} t2", + f"ON t1.cdp_customer_id = t2.cdp_customer_id", + f"WHERE \"{column_id}\" IS NOT NULL", + f"AND TD_TIME_RANGE(\"{column_id}\",{time_from},{time_to}) " + ]) + + journey_jump_history_table = f"{journey_table}_jump_history" + if journey_jump_history_table in input_tables: + inner_qry += ' '.join([ + f" UNION ALL", + f"SELECT \"{column_id}\" AS session_time, t2.{user_id}, t1.cdp_customer_id, '{syndication_id}' AS syndication_id, '{activation_step_id}' AS activation_step_id, '{activation_name}' AS activation_name", + f"FROM {cdp_audience_db}.{journey_jump_history_table} t1", + f"LEFT OUTER JOIN {cdp_audience_db}.{input_table_customers} t2", + f"ON t1.cdp_customer_id = t2.cdp_customer_id", + f"WHERE \"{column_id}\" IS NOT NULL", + f"AND TD_TIME_RANGE(\"{column_id}\",{time_from},{time_to}) " + ]) + + qry = ' '.join([ + f"SELECT", + f"s2.time_finished AS time", + f",s1.{user_id}", + f",s1.activation_step_id", + f",s1.syndication_id", + f",'journeyActivationStep' AS activation_type", + f",s1.activation_name", + f",cv_name", + f",utm_campaign", + f",utm_medium", + f",utm_source", + f",utm_content", + f",utm_connector", + f",utm_term", + f"FROM({inner_qry}) s1", + f"JOIN {input_db}.{input_table_daily_activations_info} s2", + f"ON s1.session_time <= s2.time", + f"AND s2.time < s1.session_time + 60*60*24*1", + f"AND s1.syndication_id = s2.syndication_id", + f"LEFT OUTER JOIN (", + f"SELECT", + f"activation_step_id", + f",MAX_BY(cv_name,time) AS cv_name", + f",MAX_BY(utm_campaign,time) AS utm_campaign", + f",MAX_BY(utm_medium,time) AS utm_medium", + f",MAX_BY(utm_source,time) AS utm_source", + f",MAX_BY(utm_content,time) AS utm_content", + f",MAX_BY(utm_connector,time) AS utm_connector", + f",MAX_BY(utm_term,time) AS utm_term", + f"FROM {input_db}.{input_table_clicks}", + f"GROUP BY 1", + f") s3", + f"ON s1.activation_step_id = s3.activation_step_id", + ]) + + if qry_union_all == '': + qry_union_all = qry + else: + qry_union_all = ' '.join([qry_union_all, ' UNION ALL ', qry]) + + if (idx+1)%MAX_UNIONS == 0: + print(qry_union_all) + l.append([int(session_unixtime), dest_db, dest_table, qry_union_all]) + qry_union_all = '' + + if qry_union_all != '': + print(qry_union_all) + l.append([int(session_unixtime), dest_db, dest_table, qry_union_all]) + + df_query_store = pd.DataFrame(data=l, columns=columns_q) + print(df_query_store) + if len(df_query_store)>0: + # Don't use writer='insert_into' because it changes '' to "". + client.load_table_from_dataframe(df_query_store, f"{dest_db}.{query_store_table}", writer='bulk_import', if_exists='overwrite', fmt='msgpack') diff --git a/scenarios/cdp_campaign_management/queries/create_activations.sql b/scenarios/cdp_campaign_management/queries/create_activations.sql new file mode 100644 index 00000000..6b633bdc --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/create_activations.sql @@ -0,0 +1,16 @@ +DROP TABLE IF EXISTS ${dest_db}.${dest_table}; +CREATE TABLE IF NOT EXISTS ${dest_db}.${dest_table} ( + time bigint + ,${td.user_id} varchar + ,activation_step_id varchar + ,syndication_id varchar + ,activation_type varchar + ,activation_name varchar + ,cv_name varchar + ,utm_campaign varchar + ,utm_medium varchar + ,utm_source varchar + ,utm_content varchar + ,utm_connector varchar + ,utm_term varchar +) diff --git a/scenarios/cdp_campaign_management/queries/create_cdp_journey_tables.sql b/scenarios/cdp_campaign_management/queries/create_cdp_journey_tables.sql new file mode 100644 index 00000000..09c60fa3 --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/create_cdp_journey_tables.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS ${cdp_audience_db}.${journey_jump_history_table}( + LIKE ${cdp_audience_db}.${journey_table} +); +CREATE TABLE IF NOT EXISTS ${cdp_audience_db}.${journey_reentry_history_table}( + LIKE ${cdp_audience_db}.${journey_table} +); \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/create_clicks.sql b/scenarios/cdp_campaign_management/queries/create_clicks.sql new file mode 100644 index 00000000..5b407be9 --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/create_clicks.sql @@ -0,0 +1,17 @@ +DROP TABLE IF EXISTS ${dest_db}.${dest_table}; +CREATE TABLE IF NOT EXISTS ${dest_db}.${dest_table} ( + time bigint + ,time_s varchar + ,db_name varchar + ,table_name varchar + ,${td.user_id} varchar + ,activation_step_id varchar + ,cv_name varchar + ,utm_campaign varchar + ,utm_medium varchar + ,utm_source varchar + ,utm_content varchar + ,utm_connector varchar + ,utm_term varchar + ,url varchar +) \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/create_conversion_journeys.sql b/scenarios/cdp_campaign_management/queries/create_conversion_journeys.sql new file mode 100644 index 00000000..97e04dde --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/create_conversion_journeys.sql @@ -0,0 +1,13 @@ +DROP TABLE IF EXISTS ${dest_db}.${dest_table}; +CREATE TABLE IF NOT EXISTS ${dest_db}.${dest_table} ( + time bigint + ,time_s varchar + ,type varchar + ,${user_id} varchar + ,activation_step_id varchar + ,cv_name varchar + ,cv_flg int + ,val double + ,revenue double + ,time_hour_from_activation double +) diff --git a/scenarios/cdp_campaign_management/queries/create_conversions.sql b/scenarios/cdp_campaign_management/queries/create_conversions.sql new file mode 100644 index 00000000..fc3908a5 --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/create_conversions.sql @@ -0,0 +1,9 @@ +DROP TABLE IF EXISTS ${dest_db}.${dest_table}; +CREATE TABLE IF NOT EXISTS ${dest_db}.${dest_table} ( + time bigint + ,time_s varchar + ,${user_id} varchar + ,val double + ,revenue double + ,cv_name varchar +) \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/create_daily_activations.sql b/scenarios/cdp_campaign_management/queries/create_daily_activations.sql new file mode 100644 index 00000000..57745465 --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/create_daily_activations.sql @@ -0,0 +1,13 @@ +DROP TABLE IF EXISTS ${dest_db}.${dest_table}; +CREATE TABLE IF NOT EXISTS ${dest_db}.${dest_table} ( + time bigint + ,date varchar + ,activation_step_id varchar + ,cv_name varchar + ,utm_source varchar + ,utm_medium varchar + ,utm_campaign varchar + ,utm_connector varchar + ,utm_content varchar + ,cnt bigint +) \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/create_daily_activations_info.sql b/scenarios/cdp_campaign_management/queries/create_daily_activations_info.sql new file mode 100644 index 00000000..5c8aebc1 --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/create_daily_activations_info.sql @@ -0,0 +1,14 @@ +DROP TABLE IF EXISTS ${dest_db}.${dest_table}; +CREATE TABLE IF NOT EXISTS ${dest_db}.${dest_table} ( + time bigint + ,time_finished bigint + ,journey_id varchar + ,syndication_id varchar + ,name varchar + ,workflow_id varchar + ,workflow_session_id varchar + ,workflow_attempt_id varchar + ,created_at varchar + ,finished_at varchar + ,status varchar +) \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/create_daily_clicks.sql b/scenarios/cdp_campaign_management/queries/create_daily_clicks.sql new file mode 100644 index 00000000..a46a509b --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/create_daily_clicks.sql @@ -0,0 +1,14 @@ +DROP TABLE IF EXISTS ${dest_db}.${dest_table}; +CREATE TABLE IF NOT EXISTS ${dest_db}.${dest_table} ( + time bigint + ,date varchar + ,activation_step_id varchar + ,cv_name varchar + ,utm_source varchar + ,utm_medium varchar + ,utm_campaign varchar + ,utm_connector varchar + ,utm_content varchar + ,utm_term varchar + ,cnt bigint +) \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/create_daily_conversions.sql b/scenarios/cdp_campaign_management/queries/create_daily_conversions.sql new file mode 100644 index 00000000..ac6eb74f --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/create_daily_conversions.sql @@ -0,0 +1,9 @@ +DROP TABLE IF EXISTS ${dest_db}.${dest_table}; +CREATE TABLE IF NOT EXISTS ${dest_db}.${dest_table} ( + time bigint + ,date varchar + ,cv_name varchar + ,cnt bigint + ,val double + ,revenue double +) \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/create_daily_mta_conversion_journeys.sql b/scenarios/cdp_campaign_management/queries/create_daily_mta_conversion_journeys.sql new file mode 100644 index 00000000..ab74e2c0 --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/create_daily_mta_conversion_journeys.sql @@ -0,0 +1,28 @@ +DROP TABLE IF EXISTS ${dest_db}.${dest_table}; +CREATE TABLE IF NOT EXISTS ${dest_db}.${dest_table} ( + time bigint + ,date varchar + ,activation_step_id varchar varchar + ,is_internal_campaign_click varchar + ,type varchar + ,utm_source varchar + ,utm_medium varchar + ,utm_campaign varchar + ,utm_content varchar + ,utm_connector varchar + ,cv_name varchar + ,cnt_activations bigint + ,cnt_clicks bigint + ,cnt_clicks_related_conversion varchar + ,cnt_conversions bigint + ,cv_revenue double + ,acquired_person_last_click_model double + ,acquired_person_first_click_model double + ,acquired_person_session_model double + ,acquired_revenue_last_click_model double + ,acquired_revenue_first_click_model double + ,acquired_revenue_session_model double + ,size_journey bigint + ,cnt_cv_id varchar +) + diff --git a/scenarios/cdp_campaign_management/queries/create_journeys.sql b/scenarios/cdp_campaign_management/queries/create_journeys.sql new file mode 100644 index 00000000..1a66a6bb --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/create_journeys.sql @@ -0,0 +1,14 @@ +DROP TABLE IF EXISTS ${dest_db}.${dest_table}; +CREATE TABLE IF NOT EXISTS ${dest_db}.${dest_table}( + time bigint + ,audience_id varchar + ,id varchar + ,name varchar + ,state varchar + ,created_at varchar + ,updated_at varchar + ,launched_at varchar + ,allow_reentry varchar + ,paused varchar + ,num_stages bigint +) \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/create_master_activations.sql b/scenarios/cdp_campaign_management/queries/create_master_activations.sql new file mode 100644 index 00000000..917c1ed2 --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/create_master_activations.sql @@ -0,0 +1,23 @@ +DROP TABLE IF EXISTS ${dest_db}.${dest_table}; +CREATE TABLE IF NOT EXISTS ${dest_db}.${dest_table}( + time bigint + ,journey_id varchar + ,activation_step_id varchar + ,syndication_id varchar + ,activation_name varchar + ,schedule_type varchar + ,schedule_option varchar + ,timezone varchar + ,connection_id varchar + ,connection_type varchar + ,connection_name varchar + ,all_columns varchar + ,step_id varchar + ,stage_no bigint + ,stage_name varchar + ,state varchar + ,created_at varchar + ,updated_at varchar + ,journey_name varchar +) + diff --git a/scenarios/cdp_campaign_management/queries/create_master_campaigns.sql b/scenarios/cdp_campaign_management/queries/create_master_campaigns.sql new file mode 100644 index 00000000..feee2e53 --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/create_master_campaigns.sql @@ -0,0 +1,12 @@ +DROP TABLE IF EXISTS ${dest_db}.${dest_table}; +CREATE TABLE IF NOT EXISTS ${dest_db}.${dest_table} ( + time bigint + ,activation_step_id varchar + ,cv_name varchar + ,utm_campaign varchar + ,utm_medium varchar + ,utm_source varchar + ,utm_content varchar + ,utm_connector varchar + ,utm_term varchar +) \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/create_mta_conversion_journeys.sql b/scenarios/cdp_campaign_management/queries/create_mta_conversion_journeys.sql new file mode 100644 index 00000000..4b4c894a --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/create_mta_conversion_journeys.sql @@ -0,0 +1,32 @@ +DROP TABLE IF EXISTS ${dest_db}.${dest_table}; +CREATE TABLE IF NOT EXISTS ${dest_db}.${dest_table} ( + time bigint + ,cv_time bigint + ,cv_id varchar + ,position bigint + ,time_hour_to_cv double + ,time_hour_to_next double + ,time_hour_from_activation double + ,type varchar + ,click_type varchar + ,activation_step_id varchar + ,utm_source varchar + ,utm_medium varchar + ,utm_campaign varchar + ,utm_content varchar + ,utm_connector varchar + ,cv_name varchar + ,size_journey bigint + ,size_cv_session bigint + ,is_within_cv_session bigint + ,revenue double + ,attr_last_click_model double + ,attr_first_click_model double + ,attr_session_model double + ,acquired_person_last_click_model bigint + ,acquired_revenue_last_click_model double + ,acquired_person_first_click_model bigint + ,acquired_revenue_first_click_model double + ,acquired_person_session_model double + ,acquired_revenue_session_model double +) diff --git a/scenarios/cdp_campaign_management/queries/create_query_store.sql b/scenarios/cdp_campaign_management/queries/create_query_store.sql new file mode 100644 index 00000000..b08c703c --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/create_query_store.sql @@ -0,0 +1,7 @@ +DROP TABLE IF EXISTS ${dest_db}.${dest_table}; +CREATE TABLE IF NOT EXISTS ${dest_db}.${dest_table} ( + time bigint + ,db_name varchar + ,table_name varchar + ,query varchar +) \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/de_duplicate_master_campaings.sql b/scenarios/cdp_campaign_management/queries/de_duplicate_master_campaings.sql new file mode 100644 index 00000000..e831c48f --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/de_duplicate_master_campaings.sql @@ -0,0 +1,11 @@ +SELECT + activation_step_id + ,utm_campaign + ,utm_medium + ,utm_source + ,cv_name + ,MAX(utm_content) AS utm_content + ,MAX(utm_connector) AS utm_connector + ,MAX(utm_term) AS utm_term +FROM ${td.database}.${td.tables.tmp_master_campaigns} +GROUP BY 1,2,3,4,5 \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/generate_queries.sql b/scenarios/cdp_campaign_management/queries/generate_queries.sql new file mode 100644 index 00000000..d2060d36 --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/generate_queries.sql @@ -0,0 +1,3 @@ +SELECT * +FROM ${input_db}.${input_table} +WHERE table_name = '${target_table}' diff --git a/scenarios/cdp_campaign_management/queries/ingest_activations.sql b/scenarios/cdp_campaign_management/queries/ingest_activations.sql new file mode 100644 index 00000000..4171284d --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/ingest_activations.sql @@ -0,0 +1,75 @@ +WITH tbl_base_activations AS +( + SELECT + t1.time + ,${td.user_id} + ,cdp_customer_id + ,t1.syndication_id + ,COALESCE(type,'segment') AS activation_type + ,segment_id + ,activation_step_id + ,activation_name + ,segment_name + ,connector_type + ,journey_id + FROM + ( + SELECT + time + ,identifier AS ${td.user_id} + ,audience_id + ,NULL AS cdp_customer_id + ,CAST(activation_id AS VARCHAR) AS syndication_id + ,CAST(segment_id AS VARCHAR) AS segment_id + ,segment_name + ,activation_name + ,integration_type AS connector_type + FROM ${cdp_audience_db}.${td.tables.activation_log} + WHERE identifier_type = '${td.user_id}' + AND CAST(audience_id AS VARCHAR) = '${ps_id}' + AND TD_TIME_RANGE(time,${time_from},${time_to}) + ) t1 + JOIN + ( + SELECT + CAST(id AS VARCHAR) AS activation_step_id + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.syndicationId') AS syndication_id + ,type + ,journey_id + FROM ${td.monitoring.db.cdp_monitoring}.${td.monitoring.tables.journey_activation} + ) t2 + ON t1.syndication_id = t2.syndication_id +) + +SELECT + time + ,${td.user_id} + ,s1.activation_step_id + ,syndication_id + ,activation_type + ,activation_name + -- ,segment_id + -- ,segment_name + ,journey_id + ,cv_name + ,utm_source + ,utm_medium + ,utm_campaign + ,utm_content + ,utm_term + ,COALESCE(utm_connector,connector_type) AS utm_connector +FROM tbl_base_activations s1 +LEFT OUTER JOIN ( + SELECT + activation_step_id + ,MAX_BY(cv_name,time) AS cv_name + ,MAX_BY(utm_campaign,time) AS utm_campaign + ,MAX_BY(utm_medium,time) AS utm_medium + ,MAX_BY(utm_source,time) AS utm_source + ,MAX_BY(utm_content,time) AS utm_content + ,MAX_BY(utm_connector,time) AS utm_connector + ,MAX_BY(utm_term,time) AS utm_term + FROM ${td.database}.${td.tables.clicks} + GROUP BY 1 +) s2 +ON s1.activation_step_id = s2.activation_step_id diff --git a/scenarios/cdp_campaign_management/queries/ingest_clicks.sql b/scenarios/cdp_campaign_management/queries/ingest_clicks.sql new file mode 100644 index 00000000..fcf4b88e --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/ingest_clicks.sql @@ -0,0 +1,55 @@ +SELECT ${distinct} + t1.time + ,db_name + ,table_name + ,${user_column} + ,COALESCE(CAST(t3.activation_step_id AS VARCHAR), CAST(t1.activation_step_id AS VARCHAR)) AS activation_step_id + ,COALESCE(t3.cv_name, t1.cv_name) AS cv_name + ,COALESCE(t3.utm_campaign, t1.utm_campaign) AS utm_campaign + ,COALESCE(t3.utm_medium, t1.utm_medium) AS utm_medium + ,COALESCE(t3.utm_source, t1.utm_source) AS utm_source + ,COALESCE(t3.utm_content, t1.utm_content) AS utm_content + ,COALESCE(t3.utm_term, t1.utm_term) AS utm_term + ,COALESCE(t3.utm_connector, t1.utm_connector) AS utm_connector +FROM +( + SELECT + ${time_column} AS time + ,'${input_db}' AS db_name + ,'${input_table}' AS table_name + ,${user_column_inner} + ,url_extract_parameter(${url_column}, '${td.utm_names.utm_id}') as activation_step_id + ,url_extract_parameter(${url_column}, '${td.utm_names.utm_campaign}') as utm_campaign + ,url_extract_parameter(${url_column}, '${td.utm_names.utm_medium}') as utm_medium + ,url_extract_parameter(${url_column}, '${td.utm_names.utm_source}') as utm_source + ,url_extract_parameter(${url_column}, '${td.utm_names.utm_content}') as utm_content + ,url_extract_parameter(${url_column}, '${td.utm_names.utm_term}') as utm_term + ,url_extract_parameter(${url_column}, '${td.utm_names.utm_connector}') as utm_connector + ,url_extract_parameter(${url_column}, '${td.utm_names.utm_cv}') as cv_name + FROM ${input_db}.${input_table} + WHERE url_extract_parameter(${url_column}, '${td.utm_names.utm_campaign}') IS NOT NULL + AND url_extract_parameter(${url_column}, '${td.utm_names.utm_medium}') IS NOT NULL + AND url_extract_parameter(${url_column}, '${td.utm_names.utm_source}') IS NOT NULL + AND TD_TIME_RANGE(${time_column}, ${time_from}, ${time_to}) + AND ${filter} +) t1 +${join_part} +LEFT OUTER JOIN ( + SELECT + activation_step_id + ,utm_source + ,utm_medium + ,utm_campaign + ,cv_name + ,MAX_BY(utm_connector, time) AS utm_connector + ,MAX_BY(utm_content,time) AS utm_content + ,MAX_BY(utm_term, time) AS utm_term + FROM ${campaign_db}.${master_campaigns_table} + WHERE ${td.utm_names.utm_source} IS NOT NULL AND ${td.utm_names.utm_medium} IS NOT NULL AND ${td.utm_names.utm_campaign} IS NOT NULL + GROUP BY 1,2,3,4,5 +) t3 +ON ( + t1.utm_campaign = t3.utm_campaign + AND t1.utm_medium = t3.utm_medium + AND t1.utm_source = t3.utm_source +) \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/ingest_conversion_journeys.sql b/scenarios/cdp_campaign_management/queries/ingest_conversion_journeys.sql new file mode 100644 index 00000000..90032e1c --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/ingest_conversion_journeys.sql @@ -0,0 +1,61 @@ +SELECT + * + ,ROUND((time-LAG(time)OVER(PARTITION BY ${user_id}, activation_step_id ORDER BY time))/3600.0,1) AS time_hour_from_activation +FROM ( + SELECT + time + ,'Activation' AS type + ,${user_id} + ,CAST(activation_step_id AS VARCHAR) AS activation_step_id + ,cv_name + ,utm_campaign + ,utm_medium + ,utm_source + ,utm_content + ,utm_connector + ,0 AS cv_flg + ,0 AS val + ,0 AS revenue + FROM ${input_db}.${input_table_activations} + WHERE cv_name = '${cv_name}' + AND TD_TIME_RANGE(time, ${time_from}, ${time_to}) + + UNION ALL + SELECT + time + ,'Click' AS type + ,${user_id} + ,CAST(activation_step_id AS VARCHAR) AS activation_step_id + ,cv_name + ,utm_campaign + ,utm_medium + ,utm_source + ,utm_content + ,utm_connector + ,0 AS cv_flg + ,0 AS val + ,0 AS revenue + FROM ${input_db}.${input_table_clicks} + WHERE cv_name = '${cv_name}' + AND TD_TIME_RANGE(time, ${time_from}, ${time_to}) + + UNION ALL + SELECT + time + ,'Conversion' AS type + ,${user_id} + ,NULL AS activation_step_id + ,cv_name + ,NULL AS utm_campaign + ,NULL AS utm_medium + ,NULL AS utm_source + ,NULL AS utm_content + ,NULL AS utm_connector + ,1 AS cv_flg + ,val + ,revenue + FROM ${input_db}.${input_table_conversions} + WHERE cv_name = '${cv_name}' + AND TD_TIME_RANGE(time, ${time_from}, ${time_to}) +) +-- ORDER BY ${user_id}, time diff --git a/scenarios/cdp_campaign_management/queries/ingest_conversions.sql b/scenarios/cdp_campaign_management/queries/ingest_conversions.sql new file mode 100644 index 00000000..e94eb9cf --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/ingest_conversions.sql @@ -0,0 +1,20 @@ +SELECT ${distinct} + t1.time + ,${user_column} + ,val + ,revenue + ,cv_name + ,'${input_db}' AS db_name + ,'${input_table}' AS table_name +FROM ( + SELECT + ${time_column} AS time + ,${inner_user_column} + ,CAST(${val_col} AS DOUBLE) AS val + ,CAST(${acquired_revenue_per_person} AS DOUBLE) * CAST(${val_col} AS DOUBLE) AS revenue + ,'${cv_name}' AS cv_name + FROM ${input_db}.${input_table} + WHERE ${filter} + AND TD_TIME_RANGE(${time_column}, ${time_from}, ${time_to}) +) t1 +${join_part} diff --git a/scenarios/cdp_campaign_management/queries/ingest_daily_activations.sql b/scenarios/cdp_campaign_management/queries/ingest_daily_activations.sql new file mode 100644 index 00000000..d0517b8c --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/ingest_daily_activations.sql @@ -0,0 +1,15 @@ +SELECT + TD_DATE_TRUNC('day', time, '${timezone}') AS time + ,TD_TIME_STRING(time, 'd!', '${timezone}') AS date + ,activation_step_id + ,cv_name + ,utm_source + ,utm_medium + ,utm_campaign + ,MAX_BY(utm_content,time) AS utm_content + ,MAX_BY(utm_connector,time) AS utm_connector + ,MAX_BY(utm_term,time) AS utm_term + ,COUNT(1) AS cnt +FROM ${td.database}.${td.tables.activations} +WHERE TD_TIME_RANGE(time, ${time_from}, ${time_to}) +GROUP BY 1,2,3,4,5,6,7 diff --git a/scenarios/cdp_campaign_management/queries/ingest_daily_activations_info.sql b/scenarios/cdp_campaign_management/queries/ingest_daily_activations_info.sql new file mode 100644 index 00000000..391b5dd2 --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/ingest_daily_activations_info.sql @@ -0,0 +1,41 @@ +WITH tbl_jobs AS +( + SELECT + REGEXP_EXTRACT(query, 'attempt_id: ([\d]+)',1) AS workflow_attempt_id + ,REGEXP_EXTRACT(query, 'project_name: cdp_journey_([\d]+)',1) AS journey_id + , * + FROM ${td.monitoring.db.basic_monitoring}.${td.monitoring.tables.jobs} + WHERE REGEXP_LIKE(query, 'task_name: \+syndication_[\d]*\+syndicate\^sub(?!\+)') + AND TD_TIME_RANGE(time, ${time_from}, ${time_to}) +) +,tbl_act_hst AS +( + SELECT + TD_TIME_PARSE(JSON_EXTRACT_SCALAR(exe, '$.createdAt'), '${timezone}') AS time + ,TD_TIME_PARSE(JSON_EXTRACT_SCALAR(exe, '$.finishedAt'), '${timezone}') AS time_finished + ,id AS syndication_id + ,name AS activation_name + ,JSON_EXTRACT_SCALAR(exe, '$.workflowId') AS workflow_id + ,JSON_EXTRACT_SCALAR(exe, '$.workflowSessionId') AS workflow_session_id + ,JSON_EXTRACT_SCALAR(exe, '$.workflowAttemptId') AS workflow_attempt_id + ,JSON_EXTRACT_SCALAR(exe, '$.createdAt') AS created_at + ,JSON_EXTRACT_SCALAR(exe, '$.finishedAt') AS finished_at + ,JSON_EXTRACT_SCALAR(exe, '$.status') AS status + FROM + ${td.monitoring.db.cdp_monitoring}.${td.monitoring.tables.activations} + CROSS JOIN UNNEST(CAST(JSON_PARSE(executions) AS ARRAY(JSON))) AS t(exe) + WHERE ps_id = '${ps_id}' + AND TD_TIME_RANGE(TD_TIME_PARSE(JSON_EXTRACT_SCALAR(exe, '$.createdAt'), '${timezone}'), ${time_from}, ${time_to}) +) + +SELECT act_hst.* + ,jobs.journey_id + ,CAST(CAST(jobs.num_records AS DOUBLE) AS BIGINT) AS num_records + ,IF( + STARTS_WITH(jobs.result,'{') + ,JSON_EXTRACT_SCALAR(JSON_PARSE(jobs.result), '$.type') + ,SPLIT_PART(jobs.result, '://', 1) + ) AS connector_type +FROM tbl_act_hst act_hst +LEFT OUTER JOIN tbl_jobs jobs +ON act_hst.workflow_attempt_id = jobs.workflow_attempt_id \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/ingest_daily_clicks.sql b/scenarios/cdp_campaign_management/queries/ingest_daily_clicks.sql new file mode 100644 index 00000000..a2f3d4bf --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/ingest_daily_clicks.sql @@ -0,0 +1,15 @@ +SELECT + TD_DATE_TRUNC('day', time, '${timezone}') AS time + ,TD_TIME_STRING(time, 'd!', '${timezone}') AS date + ,activation_step_id + ,cv_name + ,utm_source + ,utm_medium + ,utm_campaign + ,MAX_BY(utm_connector,time) AS utm_connector + ,MAX_BY(utm_content,time) AS utm_content + ,MAX_BY(utm_term,time) AS utm_term + ,COUNT(1) AS cnt +FROM ${td.database}.${td.tables.clicks} +WHERE TD_TIME_RANGE(time, ${time_from}, ${time_to}) +GROUP BY 1,2,3,4,5,6,7 \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/ingest_daily_conversions.sql b/scenarios/cdp_campaign_management/queries/ingest_daily_conversions.sql new file mode 100644 index 00000000..98e2da1b --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/ingest_daily_conversions.sql @@ -0,0 +1,10 @@ +SELECT + TD_DATE_TRUNC('day', time, '${timezone}') AS time + ,TD_TIME_STRING(time, 'd!', '${timezone}') AS date + ,cv_name + ,COUNT(1) AS cnt + ,SUM(val) AS val + ,SUM(revenue) AS revenue +FROM ${td.database}.${td.tables.conversions} +WHERE TD_TIME_RANGE(time, ${time_from}, ${time_to}) +GROUP BY 1,2,3 \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/ingest_daily_mta.sql b/scenarios/cdp_campaign_management/queries/ingest_daily_mta.sql new file mode 100644 index 00000000..06590407 --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/ingest_daily_mta.sql @@ -0,0 +1,67 @@ +SELECT * +FROM +( + SELECT + COALESCE(act.time,clk.time,mta.time) AS time + ,COALESCE(act.date,clk.date,mta.date) AS date + ,COALESCE(act.activation_step_id,clk.activation_step_id,mta.activation_step_id) AS activation_step_id + ,CASE + WHEN COALESCE(act.activation_step_id,clk.activation_step_id,mta.activation_step_id) IN (SELECT activation_step_id FROM ${td.tables.master_activations}) THEN 'internal' + WHEN mta.type='Conversion' THEN 'internal' + ELSE 'external' + END AS is_internal_campaign_click + ,mta.type AS type + ,COALESCE(act.utm_source,clk.utm_source,mta.utm_source) AS utm_source + ,COALESCE(act.utm_medium,clk.utm_medium,mta.utm_medium) AS utm_medium + ,COALESCE(act.utm_campaign,clk.utm_campaign,mta.utm_campaign) AS utm_campaign + ,COALESCE(act.utm_content,clk.utm_content,mta.utm_content) AS utm_content + ,COALESCE(act.utm_connector,clk.utm_connector,mta.utm_connector) AS utm_connector + ,COALESCE(act.cv_name,clk.cv_name,mta.cv_name) AS cv_name + ,COALESCE(act.cnt,0) AS cnt_activations + ,COALESCE(clk.cnt,0) AS cnt_clicks + ,COALESCE(mta.click_cnt,0) AS cnt_clicks_related_conversion + ,mta.conversion_cnt AS cnt_conversions + ,cv_revenue + ,COALESCE(acquired_person_last_click_model,0) AS acquired_person_last_click_model + ,COALESCE(acquired_person_first_click_model,0) AS acquired_person_first_click_model + ,COALESCE(acquired_person_session_model,0) AS acquired_person_session_model + ,COALESCE(acquired_revenue_last_click_model,0) AS acquired_revenue_last_click_model + ,COALESCE(acquired_revenue_first_click_model,0) AS acquired_revenue_first_click_model + ,COALESCE(acquired_revenue_session_model,0) AS acquired_revenue_session_model + ,COALESCE(size_journey,0) AS size_journey + ,COALESCE(cnt_cv_id,0) AS cnt_cv_id + FROM ${td.tables.daily_activations} act + FULL OUTER JOIN ${td.tables.daily_clicks} clk + ON act.time = clk.time AND act.activation_step_id = clk.activation_step_id + FULL OUTER JOIN ( + SELECT + TD_DATE_TRUNC('day',time, '${timezone}') AS time + ,TD_TIME_STRING(time, 'd!', '${timezone}') AS date + ,activation_step_id + ,type + ,utm_source + ,utm_medium + ,utm_campaign + ,utm_content + ,utm_connector + ,cv_name + ,COUNT_IF(type='Conversion') AS conversion_cnt + ,COUNT_IF(type='Click') AS click_cnt + ,SUM(revenue) AS cv_revenue + ,SUM(acquired_person_last_click_model) AS acquired_person_last_click_model + ,SUM(acquired_person_first_click_model) AS acquired_person_first_click_model + ,SUM(acquired_person_session_model) AS acquired_person_session_model + ,SUM(acquired_revenue_last_click_model) AS acquired_revenue_last_click_model + ,SUM(acquired_revenue_first_click_model) AS acquired_revenue_first_click_model + ,SUM(acquired_revenue_session_model) AS acquired_revenue_session_model + ,SUM(size_journey) AS size_journey + ,COUNT(DISTINCT cv_id) AS cnt_cv_id + FROM ${td.tables.mta_conversion_journeys} + GROUP BY 1,2,3,4,5,6,7,8,9,10 + ORDER BY date, activation_step_id + ) mta + ON clk.time = mta.time AND clk.activation_step_id = mta.activation_step_id +) +WHERE TD_TIME_RANGE(time, ${time_from}, ${time_to}) +-- AND type <> 'Conversion' +-- ORDER BY cv_name, time DESC, activation_step_id \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/ingest_daily_mta_conversion_journeys.sql b/scenarios/cdp_campaign_management/queries/ingest_daily_mta_conversion_journeys.sql new file mode 100644 index 00000000..7708f914 --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/ingest_daily_mta_conversion_journeys.sql @@ -0,0 +1,66 @@ +SELECT * +FROM +( + SELECT + COALESCE(act.time,clk.time,mta.time) AS time + ,COALESCE(act.date,clk.date,mta.date) AS date + ,COALESCE(act.activation_step_id,clk.activation_step_id,mta.activation_step_id) AS activation_step_id + ,CASE + WHEN COALESCE(act.activation_step_id,clk.activation_step_id,mta.activation_step_id) IN (SELECT activation_step_id FROM ${td.tables.master_activations}) THEN 'internal' + WHEN mta.type='Conversion' THEN 'internal' + ELSE 'external' + END AS is_internal_campaign_click + ,mta.type AS type + ,COALESCE(act.utm_source,clk.utm_source,mta.utm_source) AS utm_source + ,COALESCE(act.utm_medium,clk.utm_medium,mta.utm_medium) AS utm_medium + ,COALESCE(act.utm_campaign,clk.utm_campaign,mta.utm_campaign) AS utm_campaign + ,COALESCE(act.utm_content,clk.utm_content,mta.utm_content) AS utm_content + ,COALESCE(act.utm_connector,clk.utm_connector,mta.utm_connector) AS utm_connector + ,COALESCE(act.cv_name,clk.cv_name,mta.cv_name) AS cv_name + ,COALESCE(act.cnt,0) AS cnt_activations + ,COALESCE(clk.cnt,0) AS cnt_clicks + ,COALESCE(mta.click_cnt,0) AS cnt_clicks_related_conversion + ,mta.conversion_cnt AS cnt_conversions + ,cv_revenue + ,COALESCE(acquired_person_last_click_model,0) AS acquired_person_last_click_model + ,COALESCE(acquired_person_first_click_model,0) AS acquired_person_first_click_model + ,COALESCE(acquired_person_session_model,0) AS acquired_person_session_model + ,COALESCE(acquired_revenue_last_click_model,0) AS acquired_revenue_last_click_model + ,COALESCE(acquired_revenue_first_click_model,0) AS acquired_revenue_first_click_model + ,COALESCE(acquired_revenue_session_model,0) AS acquired_revenue_session_model + ,COALESCE(size_journey,0) AS size_journey + ,COALESCE(cnt_cv_id,0) AS cnt_cv_id + FROM ${td.tables.daily_activations} act + FULL OUTER JOIN ${td.tables.daily_clicks} clk + ON act.time = clk.time AND act.activation_step_id = clk.activation_step_id + FULL OUTER JOIN ( + SELECT + TD_DATE_TRUNC('day',time, '${timezone}') AS time + ,TD_TIME_STRING(time, 'd!', '${timezone}') AS date + ,activation_step_id + ,type + ,utm_source + ,utm_medium + ,utm_campaign + ,utm_content + ,utm_connector + ,cv_name + ,COUNT_IF(type='Conversion') AS conversion_cnt + ,COUNT_IF(type='Click') AS click_cnt + ,SUM(revenue) AS cv_revenue + ,SUM(acquired_person_last_click_model) AS acquired_person_last_click_model + ,SUM(acquired_person_first_click_model) AS acquired_person_first_click_model + ,SUM(acquired_person_session_model) AS acquired_person_session_model + ,SUM(acquired_revenue_last_click_model) AS acquired_revenue_last_click_model + ,SUM(acquired_revenue_first_click_model) AS acquired_revenue_first_click_model + ,SUM(acquired_revenue_session_model) AS acquired_revenue_session_model + ,SUM(size_journey) AS size_journey + ,COUNT(DISTINCT cv_id) AS cnt_cv_id + FROM ${td.tables.mta_conversion_journeys} + GROUP BY 1,2,3,4,5,6,7,8,9,10 + ORDER BY date, activation_step_id + ) mta + ON clk.time = mta.time AND clk.activation_step_id = mta.activation_step_id +) +WHERE TD_TIME_RANGE(time, ${time_from}, ${time_to}) +-- ORDER BY cv_name, time DESC, activation_step_id \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/ingest_existing_campaigns.sql b/scenarios/cdp_campaign_management/queries/ingest_existing_campaigns.sql new file mode 100644 index 00000000..51fa033f --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/ingest_existing_campaigns.sql @@ -0,0 +1,22 @@ +SELECT + IF( + COALESCE(activation_step_id IN ( + SELECT DISTINCT + activation_step_id + FROM daily_activations + ),false) + ,1,0) AS exists_in_daily_activations + ,utm_source + ,utm_medium + ,utm_campaign + ,cv_name + ,activation_step_id + ,MAX_BY(utm_content,time) AS utm_content + ,MAX_BY(utm_connector,time) AS utm_connector + ,MAX_BY(utm_term,time) AS utm_term + ,MIN(date) AS date_first_appeared + ,MAX(date) AS date_last_appeared + ,COUNT(1) AS cnt + ,MIN(time) AS time +FROM ${td.database}.${td.tables.daily_clicks} +GROUP BY 1,2,3,4,5,6 \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/ingest_journeys.sql b/scenarios/cdp_campaign_management/queries/ingest_journeys.sql new file mode 100644 index 00000000..db6542c3 --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/ingest_journeys.sql @@ -0,0 +1,20 @@ +SELECT + TD_TIME_PARSE(JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.createdAt'),'${timezone}') AS time + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.audienceId') AS audience_id + ,id + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.name') AS name + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.state') AS state + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.createdAt') AS created_at + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.updatedAt') AS updated_at + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.launchedAt') AS launched_at + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.allowReentry') AS allow_reentry + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.paused') AS paused + ,JSON_ARRAY_LENGTH(JSON_EXTRACT(JSON_PARSE(attributes), '$.journeyStages')) AS num_stages +FROM ${td.monitoring.db.cdp_monitoring}.${td.monitoring.tables.entities} +WHERE type = 'journey' +AND JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.audienceId') = '${ps_id}' +AND TD_TIME_RANGE( + TD_TIME_PARSE(JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.createdAt'),'${timezone}') + ,${time_from} + ,${time_to} +) \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/queries/ingest_master_activations.sql b/scenarios/cdp_campaign_management/queries/ingest_master_activations.sql new file mode 100644 index 00000000..5ecda930 --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/ingest_master_activations.sql @@ -0,0 +1,82 @@ +WITH tbl_act AS ( + SELECT + journey_id + ,id AS activation_step_id + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.syndicationId') AS syndication_id + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.activationParams.name') AS activation_name + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.activationParams.scheduleType') AS schedule_type + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.activationParams.scheduleOption') AS schedule_option + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.activationParams.timezone') AS timezone + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.activationParams.connectionId') AS connection_id + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.activationParams.allColumns') AS all_columns + FROM ${td.monitoring.db.cdp_monitoring}.${td.monitoring.tables.journey_activation} +) +, tbl_step_ids AS( + SELECT + time + ,REPLACE(step_id, '-', '_') AS step_id + ,JSON_EXTRACT_SCALAR(jsn, '$.journeyActivationStepId') AS activation_step_id + ,JSON_EXTRACT_SCALAR(jsn, '$.type') AS type + ,stage_name + ,stage_no + ,ps_id + ,state + ,created_at + ,updated_at + ,journey_name + FROM + ( + SELECT + elm + ,idx-1 AS stage_no + ,TD_TIME_PARSE( + JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.createdAt') + ,'${timezone}' + ) AS time + ,JSON_EXTRACT_SCALAR(elm, '$.name') AS stage_name + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.audienceId') AS ps_id + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.state') AS state + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.createdAt') AS created_at + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.updatedAt') AS updated_at + ,JSON_EXTRACT_SCALAR(JSON_PARSE(attributes), '$.name') AS journey_name + FROM ${td.monitoring.db.cdp_monitoring}.${td.monitoring.tables.journey_summary} + CROSS JOIN UNNEST(CAST(JSON_EXTRACT(JSON_PARSE(attributes), '$.journeyStages') AS ARRAY(JSON))) WITH ORDINALITY AS t(elm, idx) + ) + CROSS JOIN UNNEST( + MAP_KEYS(CAST(JSON_EXTRACT(elm, '$.steps') AS MAP(VARCHAR, JSON))) + , MAP_VALUES(CAST(JSON_EXTRACT(elm, '$.steps') AS MAP(VARCHAR, JSON))) + ) AS t(step_id, jsn) + WHERE JSON_EXTRACT_SCALAR(jsn, '$.type') = 'Activation' +) +,tbl_conn AS ( + SELECT + id AS connection_id + ,IF( + STARTS_WITH(conn.url,'{') + ,JSON_EXTRACT_SCALAR(JSON_PARSE(conn.url), '$.type') + ,SPLIT_PART(conn.url, '://', 1) + ) AS connector_type + FROM ${td.monitoring.db.basic_monitoring}.${td.monitoring.tables.connections_details} conn_d + JOIN ${td.monitoring.db.basic_monitoring}.${td.monitoring.tables.connections} conn + ON conn_d.name = conn.name +) + + +SELECT + step_ids.time + ,act.* + ,step_ids.step_id + ,step_ids.stage_name + ,step_ids.stage_no + ,step_ids.state + ,step_ids.created_at + ,step_ids.updated_at + ,step_ids.journey_name + ,conn.connector_type + FROM tbl_act act +LEFT OUTER JOIN tbl_step_ids step_ids +ON act.activation_step_id = step_ids.activation_step_id +AND step_ids.ps_id = '${ps_id}' +LEFT OUTER JOIN tbl_conn conn +ON CAST(act.connection_id AS VARCHAR) = CAST(conn.connection_id AS VARCHAR) +WHERE TD_TIME_RANGE(step_ids.time, ${time_from}, ${time_to}) diff --git a/scenarios/cdp_campaign_management/queries/ingest_mta_conversion_journeys.sql b/scenarios/cdp_campaign_management/queries/ingest_mta_conversion_journeys.sql new file mode 100644 index 00000000..ea1bdc10 --- /dev/null +++ b/scenarios/cdp_campaign_management/queries/ingest_mta_conversion_journeys.sql @@ -0,0 +1,123 @@ +WITH tbl_cv_history AS +( + SELECT time, cv_name, ${td.user_id}, ROW_NUMBER()OVER(ORDER BY time) AS cv_id + FROM ${td.tables.conversion_journeys} + WHERE cv_flg = 1 + AND TD_TIME_RANGE(time, ${time_from}, ${time_to}) +), tbl_join_with_cv_history AS +( + SELECT * + FROM + ( + SELECT + cv_id + ,ROW_NUMBER()OVER(PARTITION BY raw_data.cv_name, raw_data.${td.user_id}, raw_data.time, activation_step_id, type ORDER BY cv_id) AS cv_order + ,cv_history.time AS cv_time + ,(cv_history.time - raw_data.time)/3600 AS time_hour_to_cv + , raw_data.* + FROM ${td.tables.conversion_journeys} raw_data + JOIN tbl_cv_history cv_history + ON raw_data.${td.user_id} = cv_history.${td.user_id} + AND raw_data.cv_name = cv_history.cv_name + WHERE raw_data.time <= cv_history.time + AND raw_data.${td.user_id} <= cv_history.${td.user_id} + AND type <> 'Activation' + ) + WHERE cv_order = 1 +) +,tbl_mta_base AS +( + SELECT * + ,MAX(position)OVER(PARTITION BY cv_id)-1 AS size_journey + ,SUM(is_within_cv_session)OVER(PARTITION BY cv_id RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS size_cv_session + ,GREATEST(MAX(position)OVER(PARTITION BY cv_id)-3,0) AS size_middle_click + FROM + ( + SELECT + time + ,cv_time + ,cv_name + ,cv_id + ,ROW_NUMBER()OVER(PARTITION BY cv_id ORDER BY time, type) AS position --Ensure that 'Conversion' comes after 'Click' when time is the same. + ,time_hour_to_cv + ,CASE + WHEN type = 'Conversion' THEN 0 + WHEN time_hour_to_cv <= ${td.mta.session_model.allowable_time_to_cv} THEN 1 + ELSE 0 + END AS is_within_cv_session + ,(LEAD(time)OVER(PARTITION BY cv_id ORDER BY time) - time)/3600 AS time_hour_to_next + ,time_hour_from_activation + ,CASE + WHEN type = 'Conversion' THEN type + WHEN LEAD(type)OVER(PARTITION BY cv_id ORDER BY time, type) = 'Conversion' THEN 'Last Click' --Ensure that 'Conversion' comes after 'Click' when time is the same. + WHEN LAG(type)OVER(PARTITION BY cv_id ORDER BY time, type) IS NULL THEN 'First Click' --Ensure that 'Conversion' comes after 'Click' when time is the same. + ELSE 'Middle Click' + END AS click_type + ,type + ,${td.user_id} + ,activation_step_id + ,utm_source + ,utm_medium + ,utm_campaign + ,utm_content + ,utm_connector + ,cv_flg + ,val + ,revenue + ,FIRST_VALUE(revenue)OVER(PARTITION BY cv_id ORDER BY time DESC, type DESC) AS base_revenue --Ensure that 'Conversion' comes before 'Click' when time is the same. + FROM tbl_join_with_cv_history + ) +) + +SELECT + time + ,cv_time + ,${td.user_id} + ,TD_MD5( CONCAT(cv_name, CAST(cv_id AS VARCHAR),CAST(cv_time AS VARCHAR),${td.user_id}) ) AS cv_id + ,position + ,time_hour_to_cv + ,time_hour_to_next + ,time_hour_from_activation + ,type + ,click_type + ,activation_step_id + ,utm_source + ,utm_medium + ,utm_campaign + ,utm_content + ,utm_connector + ,cv_name + ,size_journey + ,size_cv_session + ,size_middle_click + ,is_within_cv_session + ,revenue + -- Last Click Model + ,IF(click_type='Last Click',1,0) AS acquired_person_last_click_model + ,IF(click_type='Last Click',1,0) * base_revenue AS acquired_revenue_last_click_model + + -- First Click Model + ,CASE + WHEN click_type='First Click' THEN 1 + WHEN click_type='Last Click' AND size_journey = 1 THEN 1 + ELSE 0 + END AS acquired_person_first_click_model + ,CASE + WHEN click_type='First Click' THEN 1 + WHEN click_type='Last Click' AND size_journey = 1 THEN 1 + ELSE 0 + END * base_revenue AS acquired_revenue_first_click_model + + -- Session Model + ,CASE size_cv_session + WHEN 0 THEN 0 + ELSE 1 * is_within_cv_session * 1.0/size_cv_session + END AS acquired_person_session_model + ,CASE size_cv_session + WHEN 0 THEN 0 + ELSE 1 * is_within_cv_session * 1.0/size_cv_session * base_revenue + END AS acquired_revenue_session_model + +FROM tbl_mta_base +WHERE size_journey > 0 +-- ORDER BY ${td.user_id}, cv_id, position \ No newline at end of file diff --git a/scenarios/cdp_campaign_management/sub_ingest_activations.dig b/scenarios/cdp_campaign_management/sub_ingest_activations.dig new file mode 100644 index 00000000..90fd5fc7 --- /dev/null +++ b/scenarios/cdp_campaign_management/sub_ingest_activations.dig @@ -0,0 +1,36 @@ ++create_query_store: + td>: queries/create_query_store.sql + dest_db: ${td.database} + dest_table: ${td.tables.query_store} + ++ingest_activations_query: + py>: py_scripts.ingest_activations_queries.run + session_unixtime: ${session_unixtime} + time_from: ${time_from} + time_to: ${time_to} + user_id: ${td.user_id} + input_db: ${td.database} + input_table_master_activations: ${td.tables.master_activations} + input_table_daily_activations_info: ${td.tables.daily_activations_info} + cdp_audience_db: cdp_audience_${ps_id} + input_table_customers: customers + input_table_clicks: ${td.tables.clicks} + dest_db: ${td.database} + dest_table: ${td.tables.tmp_activations} + query_store_table: ${td.tables.query_store} + api_endpoint: ${td.api_endpoint} + docker: + image: "digdag/digdag-python:3.9" + _env: + TD_API_KEY: ${secret:td.apikey} + ++load_queries_and_run: + _parallel: ${parallel} + td_for_each>: queries/generate_queries.sql + input_db: ${td.database} + input_table: ${td.tables.query_store} + target_table: ${td.tables.tmp_activations} + _do: + td>: + query: ${td.each.query} + insert_into: ${td.each.db_name}.${td.each.table_name} diff --git a/scenarios/cdp_campaign_management/write_to_gsheet.dig b/scenarios/cdp_campaign_management/write_to_gsheet.dig new file mode 100644 index 00000000..2a6e1f76 --- /dev/null +++ b/scenarios/cdp_campaign_management/write_to_gsheet.dig @@ -0,0 +1,85 @@ ++parallel: + _parallel: true + +export_journeys: + td>: + query: SELECT id,name,state,created_at,updated_at,launched_at,allow_reentry,paused,num_stages FROM ${td.tables.journeys} ORDER BY id + result_connection: ${gsheet.result_connection} + result_settings: + spreadsheet_folder: ${gsheet.sheet_folder} + spreadsheet_title: ${gsheet.spreadsheet_title}_${ps_id} + sheet_title: ${td.tables.journeys} + mode: truncate + rows_threshold: 1000000 + set_nil_for_double_nan: true + range: A1 + value_input_option: USER_ENTERED + + +export_master_activations: + td>: + query: SELECT * FROM ${td.tables.master_activations} ORDER BY journey_id,activation_step_id + result_connection: ${gsheet.result_connection} + result_settings: + spreadsheet_folder: ${gsheet.sheet_folder} + spreadsheet_title: ${gsheet.spreadsheet_title}_${ps_id} + sheet_title: ${td.tables.master_activations} + mode: truncate + rows_threshold: 1000000 + set_nil_for_double_nan: true + range: A1 + value_input_option: USER_ENTERED + + +export_daily_conversions: + td>: + query: SELECT * FROM ${td.tables.daily_conversions} ORDER BY time DESC + result_connection: ${gsheet.result_connection} + result_settings: + spreadsheet_folder: ${gsheet.sheet_folder} + spreadsheet_title: ${gsheet.spreadsheet_title}_${ps_id} + sheet_title: ${td.tables.daily_conversions} + mode: truncate + rows_threshold: 1000000 + set_nil_for_double_nan: true + range: A1 + value_input_option: USER_ENTERED + + +export_existing_campaigns: + td>: + query: SELECT * FROM ${td.tables.existing_campaigns} ORDER BY time, cv_name, utm_campaign, utm_source, utm_medium + result_connection: ${gsheet.result_connection} + result_settings: + spreadsheet_folder: ${gsheet.sheet_folder} + spreadsheet_title: ${gsheet.spreadsheet_title}_${ps_id} + sheet_title: ${td.tables.existing_campaigns} + mode: truncate + rows_threshold: 1000000 + set_nil_for_double_nan: true + range: A1 + value_input_option: USER_ENTERED + + +export_mta: + td>: + query: SELECT TD_TIME_STRING(time,'d!','${timezone}') AS date,* FROM ${td.tables.mta_conversion_journeys} ORDER BY cv_name, ${td.user_id}, cv_id, position LIMIT 1000 + result_connection: ${gsheet.result_connection} + result_settings: + spreadsheet_folder: ${gsheet.sheet_folder} + spreadsheet_title: ${gsheet.spreadsheet_title}_${ps_id} + sheet_title: ${td.tables.mta_conversion_journeys} + mode: truncate + rows_threshold: 1000000 + set_nil_for_double_nan: true + range: A1 + value_input_option: USER_ENTERED + + +export_daily_mta: + td>: + query: SELECT * FROM ${td.tables.daily_mta_conversion_journeys} ORDER BY cv_name, time DESC, activation_step_id + result_connection: ${gsheet.result_connection} + result_settings: + spreadsheet_folder: ${gsheet.sheet_folder} + spreadsheet_title: ${gsheet.spreadsheet_title}_${ps_id} + sheet_title: ${td.tables.daily_mta_conversion_journeys} + mode: truncate + rows_threshold: 1000000 + set_nil_for_double_nan: true + range: A1 + value_input_option: USER_ENTERED