From 54b09318db04f8a6ea6cd30bdabfb6585b242f08 Mon Sep 17 00:00:00 2001 From: Sam <110511194+sam-xyz@users.noreply.github.com> Date: Tue, 14 Jan 2025 08:36:21 +0800 Subject: [PATCH] add fix (#298) --- .../workflows/dbt_run_traces_fix_dexalot.yml | 50 ++ .../silver/silver_dexalot__fact_traces2.sql | 444 +++++++++++++++++- .../silver_dexalot__fact_traces2_fix.sql | 164 +++++++ 3 files changed, 652 insertions(+), 6 deletions(-) create mode 100644 .github/workflows/dbt_run_traces_fix_dexalot.yml create mode 100644 models/dexalot/silver/traces2_fix/silver_dexalot__fact_traces2_fix.sql diff --git a/.github/workflows/dbt_run_traces_fix_dexalot.yml b/.github/workflows/dbt_run_traces_fix_dexalot.yml new file mode 100644 index 00000000..1474cf6f --- /dev/null +++ b/.github/workflows/dbt_run_traces_fix_dexalot.yml @@ -0,0 +1,50 @@ +name: dbt_run_traces_fix +run-name: dbt_run_traces_fix + +on: + workflow_dispatch: + inputs: + use_xl_env: + description: "Use the 2xl environment" + type: boolean + schedule: + # every 15 minutes (see https://crontab.guru) + - cron: "*/15 * * * *" + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: ${{ github.event_name == 'workflow_dispatch' && inputs.use_xl_env && 'workflow_prod_2xl' || 'workflow_prod_backfill' }} + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: run traces fix model + run: | + dbt run -m "avalanche_models,tag:traces_fix_dexalot" diff --git a/models/dexalot/silver/silver_dexalot__fact_traces2.sql b/models/dexalot/silver/silver_dexalot__fact_traces2.sql index 9c924e8d..9c35e2f8 100644 --- a/models/dexalot/silver/silver_dexalot__fact_traces2.sql +++ b/models/dexalot/silver/silver_dexalot__fact_traces2.sql @@ -7,10 +7,442 @@ full_refresh = false, tags = ['dexalot_non_realtime'] ) }} +{# {{ fsc_evm.gold_traces_v2( +full_reload_start_block = 25000000, +full_reload_blocks = 2000000, +schema_name = 'silver_dexalot', +uses_tx_status = TRUE +) }} +#} +WITH silver_traces AS ( + + SELECT + block_number, + tx_position, + trace_address, + parent_trace_address, + trace_address_array, + trace_json, + traces_id, + 'regular' AS source + FROM + {{ ref('silver_dexalot__traces') }} + WHERE + 1 = 1 + +{% if is_incremental() and not full_reload_mode %} +AND modified_timestamp > ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} +) {% elif is_incremental() and full_reload_mode %} +AND block_number BETWEEN ( + SELECT + MAX( + block_number + ) + FROM + {{ this }} +) +AND ( + SELECT + MAX( + block_number + ) + 2000000 + FROM + {{ this }} +) +{% else %} + AND block_number <= 25000000 +{% endif %} +), +sub_traces AS ( + SELECT + block_number, + tx_position, + parent_trace_address, + COUNT(*) AS sub_traces + FROM + silver_traces + GROUP BY + block_number, + tx_position, + parent_trace_address +), +trace_index_array AS ( + SELECT + block_number, + tx_position, + trace_address, + ARRAY_AGG(flat_value) AS number_array + FROM + ( + SELECT + block_number, + tx_position, + trace_address, + IFF( + VALUE :: STRING = 'ORIGIN', + -1, + VALUE :: INT + ) AS flat_value + FROM + silver_traces, + LATERAL FLATTEN ( + input => trace_address_array + ) + ) + GROUP BY + block_number, + tx_position, + trace_address +), +trace_index_sub_traces AS ( + SELECT + b.block_number, + b.tx_position, + b.trace_address, + IFNULL( + sub_traces, + 0 + ) AS sub_traces, + number_array, + ROW_NUMBER() over ( + PARTITION BY b.block_number, + b.tx_position + ORDER BY + number_array ASC + ) - 1 AS trace_index, + b.trace_json, + b.traces_id, + b.source + FROM + silver_traces b + LEFT JOIN sub_traces s + ON b.block_number = s.block_number + AND b.tx_position = s.tx_position + AND b.trace_address = s.parent_trace_address + JOIN trace_index_array n + ON b.block_number = n.block_number + AND b.tx_position = n.tx_position + AND b.trace_address = n.trace_address +), +errored_traces AS ( + SELECT + block_number, + tx_position, + trace_address, + trace_json + FROM + trace_index_sub_traces + WHERE + trace_json :error :: STRING IS NOT NULL +), +error_logic AS ( + SELECT + b0.block_number, + b0.tx_position, + b0.trace_address, + b0.trace_json :error :: STRING AS error, + b1.trace_json :error :: STRING AS any_error, + b2.trace_json :error :: STRING AS origin_error + FROM + trace_index_sub_traces b0 + LEFT JOIN errored_traces b1 + ON b0.block_number = b1.block_number + AND b0.tx_position = b1.tx_position + AND b0.trace_address RLIKE CONCAT( + '^', + b1.trace_address, + '(_[0-9]+)*$' + ) + LEFT JOIN errored_traces b2 + ON b0.block_number = b2.block_number + AND b0.tx_position = b2.tx_position + AND b2.trace_address = 'ORIGIN' +), +aggregated_errors AS ( + SELECT + block_number, + tx_position, + trace_address, + error, + IFF(MAX(any_error) IS NULL + AND error IS NULL + AND origin_error IS NULL, TRUE, FALSE) AS trace_succeeded + FROM + error_logic + GROUP BY + block_number, + tx_position, + trace_address, + error, + origin_error), + json_traces AS ( + SELECT + block_number, + tx_position, + trace_address, + sub_traces, + number_array, + trace_index, + trace_succeeded, + trace_json :error :: STRING AS error_reason, + trace_json :revertReason :: STRING AS revert_reason, + trace_json :from :: STRING AS from_address, + trace_json :to :: STRING AS to_address, + IFNULL( + trace_json :value :: STRING, + '0x0' + ) AS value_hex, + IFNULL( + utils.udf_hex_to_int( + trace_json :value :: STRING + ), + '0' + ) AS value_precise_raw, + utils.udf_decimal_adjust( + value_precise_raw, + 18 + ) AS value_precise, + value_precise :: FLOAT AS VALUE, + utils.udf_hex_to_int( + trace_json :gas :: STRING + ) :: INT AS gas, + utils.udf_hex_to_int( + trace_json :gasUsed :: STRING + ) :: INT AS gas_used, + trace_json :input :: STRING AS input, + trace_json :output :: STRING AS output, + trace_json :type :: STRING AS TYPE, + traces_id + FROM + trace_index_sub_traces + JOIN aggregated_errors USING ( + block_number, + tx_position, + trace_address + ) + ), + incremental_traces AS ( + SELECT + f.block_number, + t.tx_hash, + t.block_timestamp, + t.origin_function_signature, + t.from_address AS origin_from_address, + t.to_address AS origin_to_address, + f.tx_position, + f.trace_index, + f.from_address AS from_address, + f.to_address AS to_address, + f.value_hex, + f.value_precise_raw, + f.value_precise, + f.value, + f.gas, + f.gas_used, + f.input, + f.output, + f.type, + f.sub_traces, + f.error_reason, + f.revert_reason, + f.traces_id, + f.trace_succeeded, + f.trace_address, + t.tx_status AS tx_succeeded + FROM + json_traces f + LEFT OUTER JOIN {{ ref('silver_dexalot__transactions') }} + t + ON f.tx_position = t.position + AND f.block_number = t.block_number + +{% if is_incremental() and not full_reload_mode %} +AND t.modified_timestamp >= ( + SELECT + DATEADD('hour', -24, MAX(modified_timestamp)) + FROM + {{ this }}) + {% endif %} +) + +{% if is_incremental() %}, +overflow_blocks AS ( + SELECT + DISTINCT block_number + FROM + silver_traces + WHERE + source = 'overflow' +), +heal_missing_data AS ( + SELECT + t.block_number, + txs.tx_hash, + txs.block_timestamp, + txs.origin_function_signature, + txs.from_address AS origin_from_address, + txs.to_address AS origin_to_address, + t.tx_position, + t.trace_index, + t.from_address, + t.to_address, + t.value_hex, + t.value_precise_raw, + t.value_precise, + t.value, + t.gas, + t.gas_used, + t.input, + t.output, + t.type, + t.sub_traces, + t.error_reason, + t.revert_reason, + t.fact_traces_id AS traces_id, + t.trace_succeeded, + t.trace_address, + txs.tx_status AS tx_succeeded + FROM + {{ this }} + t + JOIN {{ ref('silver_dexalot__transactions') }} + txs + ON t.tx_position = txs.position + AND t.block_number = txs.block_number + WHERE + t.tx_hash IS NULL + OR t.block_timestamp IS NULL + OR t.tx_succeeded IS NULL +) +{% endif %}, +all_traces AS ( + SELECT + block_number, + tx_hash, + block_timestamp, + origin_function_signature, + origin_from_address, + origin_to_address, + tx_position, + trace_index, + from_address, + to_address, + value_hex, + value_precise_raw, + value_precise, + VALUE, + gas, + gas_used, + input, + output, + TYPE, + sub_traces, + error_reason, + revert_reason, + trace_succeeded, + trace_address, + tx_succeeded + FROM + incremental_traces -{{ fsc_evm.gold_traces_v2( - full_reload_start_block = 25000000, - full_reload_blocks = 2000000, - schema_name = 'silver_dexalot', - uses_tx_status = true -) }} \ No newline at end of file +{% if is_incremental() %} +UNION ALL +SELECT + block_number, + tx_hash, + block_timestamp, + origin_function_signature, + origin_from_address, + origin_to_address, + tx_position, + trace_index, + from_address, + to_address, + value_hex, + value_precise_raw, + value_precise, + VALUE, + gas, + gas_used, + input, + output, + TYPE, + sub_traces, + error_reason, + revert_reason, + trace_succeeded, + trace_address, + tx_succeeded +FROM + heal_missing_data +UNION ALL +SELECT + block_number, + tx_hash, + block_timestamp, + origin_function_signature, + origin_from_address, + origin_to_address, + tx_position, + trace_index, + from_address, + to_address, + value_hex, + value_precise_raw, + value_precise, + VALUE, + gas, + gas_used, + input, + output, + TYPE, + sub_traces, + error_reason, + revert_reason, + trace_succeeded, + trace_address, + tx_succeeded +FROM + {{ this }} + JOIN overflow_blocks USING (block_number) +{% endif %} +) +SELECT + block_number, + block_timestamp, + tx_hash, + tx_position, + trace_index, + from_address, + to_address, + input, + output, + TYPE, + trace_address, + sub_traces, + VALUE, + value_precise_raw, + value_precise, + value_hex, + gas, + gas_used, + origin_from_address, + origin_to_address, + origin_function_signature, + trace_succeeded, + error_reason, + revert_reason, + tx_succeeded, + {{ dbt_utils.generate_surrogate_key( + ['tx_hash', 'trace_index'] + ) }} AS fact_traces_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp +FROM + all_traces qualify(ROW_NUMBER() over(PARTITION BY block_number, tx_position, trace_index +ORDER BY + modified_timestamp DESC, block_timestamp DESC nulls last)) = 1 diff --git a/models/dexalot/silver/traces2_fix/silver_dexalot__fact_traces2_fix.sql b/models/dexalot/silver/traces2_fix/silver_dexalot__fact_traces2_fix.sql new file mode 100644 index 00000000..3e3b3d3e --- /dev/null +++ b/models/dexalot/silver/traces2_fix/silver_dexalot__fact_traces2_fix.sql @@ -0,0 +1,164 @@ +{{ config ( + materialized = "incremental", + incremental_strategy = 'delete+insert', + unique_key = ["block_number", "tx_position", "trace_address"], + tags = ['traces_fix_dexalot'] +) }} + +{% set batch_query %} + +SELECT + MAX(next_batch_id) AS next_batch_id +FROM + ( + SELECT + 1 AS next_batch_id + +{% if is_incremental() %} +UNION ALL +SELECT + COALESCE(MAX(batch_id), 0) + 1 AS next_batch_id +FROM + {{ this }} +{% endif %}) {% endset %} +{% if execute %} + {% set result = run_query(batch_query) %} + {{ log( + "Debug - Batch Query result: " ~ result, + info = True + ) }} + + {% set batch_id = result.columns [0] [0] %} + {% if batch_id > 29 %} + {{ exceptions.raise_compiler_error("Processing complete - reached max batch_id of 29") }} + {% endif %} + + {% set block_size = 1000000 %} + {% set block_start = 1 + ( + batch_id - 1 + ) * block_size %} + {% set block_end = batch_id * block_size %} + {{ log( + "Processing batch_id: " ~ batch_id ~ ", blocks: " ~ block_start ~ " to " ~ block_end, + info = True + ) }} +{% endif %} + +WITH silver_traces AS ( + SELECT + block_number, + tx_position, + trace_address, + parent_trace_address, + trace_json + FROM + {{ ref('silver_dexalot__traces') }} + WHERE + block_number BETWEEN {{ block_start }} + AND {{ block_end }} +), +errored_traces AS ( + SELECT + block_number, + tx_position, + trace_address, + trace_json + FROM + silver_traces + WHERE + trace_json :error :: STRING IS NOT NULL +), +error_logic AS ( + SELECT + b0.block_number, + b0.tx_position, + b0.trace_address, + b0.trace_json :error :: STRING AS error, + b1.trace_json :error :: STRING AS any_error, + b2.trace_json :error :: STRING AS origin_error + FROM + silver_traces b0 + LEFT JOIN errored_traces b1 + ON b0.block_number = b1.block_number + AND b0.tx_position = b1.tx_position + AND b0.trace_address RLIKE CONCAT( + '^', + b1.trace_address, + '(_[0-9]+)*$' + ) + LEFT JOIN errored_traces b2 + ON b0.block_number = b2.block_number + AND b0.tx_position = b2.tx_position + AND b2.trace_address = 'ORIGIN' +), +aggregated_errors AS ( + SELECT + block_number, + tx_position, + trace_address, + error, + IFF(MAX(any_error) IS NULL + AND error IS NULL + AND origin_error IS NULL, TRUE, FALSE) AS trace_succeeded + FROM + error_logic + GROUP BY + block_number, + tx_position, + trace_address, + error, + origin_error), + prod AS ( + SELECT + block_number, + tx_position, + tx_hash, + trace_address, + trace_succeeded AS prod_trace_succeeded + FROM + {{ ref('silver_dexalot__fact_traces2') }} + WHERE + block_number BETWEEN {{ block_start }} + AND {{ block_end }} + ), + final_errors AS ( + SELECT + block_number, + tx_position, + trace_address, + error, + trace_succeeded, + prod_trace_succeeded + FROM + aggregated_errors + INNER JOIN prod USING ( + block_number, + tx_position, + trace_address + ) + WHERE + prod_trace_succeeded != trace_succeeded + UNION ALL + SELECT + NULL AS block_number, + NULL AS tx_position, + NULL AS trace_address, + NULL AS error, + NULL AS trace_succeeded, + NULL AS prod_trace_succeeded + ), + batch AS ( + SELECT + CAST({{ batch_id }} AS NUMBER(10, 0)) AS batch_id + ) + SELECT + batch_id, + block_number, + tx_position, + trace_address, + error, + trace_succeeded, + prod_trace_succeeded + FROM + batch + CROSS JOIN final_errors