From 34df5fb14dd72bfc6d3b00d7b1b02cba9d6da00d Mon Sep 17 00:00:00 2001 From: Reuven Gonzales Date: Tue, 19 Nov 2024 15:04:47 +0700 Subject: [PATCH] Prototyping: Duckdb based metrics calculation (#2469) * Reorder metrics queries * wip * more wip * test * Fixes * fix * tester * test * test * test * test * More fixes * test * test * test * test * more testing * fixes * add connectorx * big rewrite, try using trino as part of the export mechanism * add bokeh * test * Update dockerfile * Clean up some in progress work --- .dockerignore | 4 +- docker/images/dagster-dask/Dockerfile | 10 +- ops/tf-modules/warehouse-cluster/main.tf | 4 +- poetry.lock | 144 +++- pyproject.toml | 2 + warehouse/metrics_mesh/fixtures/README.md | 2 + .../metrics_mesh/models/metrics_factories.py | 32 +- warehouse/metrics_tools/compute/cluster.py | 20 + warehouse/metrics_tools/compute/flight.py | 706 ++++++++++++++++++ warehouse/metrics_tools/compute/run_get.py | 18 + warehouse/metrics_tools/compute/test_setup.sh | 8 + warehouse/metrics_tools/compute/worker.py | 166 +++- warehouse/metrics_tools/transformer/tables.py | 46 +- warehouse/metrics_tools/utils/logging.py | 14 +- 14 files changed, 1109 insertions(+), 67 deletions(-) create mode 100644 warehouse/metrics_mesh/fixtures/README.md create mode 100644 warehouse/metrics_tools/compute/cluster.py create mode 100644 warehouse/metrics_tools/compute/flight.py create mode 100644 warehouse/metrics_tools/compute/run_get.py create mode 100644 warehouse/metrics_tools/compute/test_setup.sh diff --git a/.dockerignore b/.dockerignore index 9cbecdaee..d6a4feb0a 100644 --- a/.dockerignore +++ b/.dockerignore @@ -77,4 +77,6 @@ Dockerfile # Github directory .github/scripts -.git \ No newline at end of file +.git +logs +.mypy_cache diff --git a/docker/images/dagster-dask/Dockerfile b/docker/images/dagster-dask/Dockerfile index 79cb3f141..23c1877e2 100644 --- a/docker/images/dagster-dask/Dockerfile +++ b/docker/images/dagster-dask/Dockerfile @@ -4,17 +4,19 @@ FROM ghcr.io/opensource-observer/oso-public-vars:latest AS public_vars FROM ubuntu:jammy ARG GCLOUD_VERSION=478.0.0 +ARG PYTHON_VERSION=3.12 +ARG PYTHON_PACKAGE=python3.12 ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update && \ apt-get install -y software-properties-common && \ add-apt-repository ppa:deadsnakes/ppa && \ apt-get update && \ - apt-get install -y python3.12 && \ + apt-get install -y ${PYTHON_PACKAGE} && \ apt-get install -y curl git && \ curl -o get-pip.py https://bootstrap.pypa.io/get-pip.py && \ - python3.12 get-pip.py && \ - pip3.12 install poetry && \ + python${PYTHON_VERSION} get-pip.py && \ + pip${PYTHON_VERSION} install poetry && \ curl -o gcloud.tar.gz https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-cli-${GCLOUD_VERSION}-linux-x86_64.tar.gz && \ tar xvf gcloud.tar.gz && \ bash ./google-cloud-sdk/install.sh && \ @@ -48,6 +50,6 @@ ENV DAGSTER_DBT_TARGET_BASE_DIR=/dbt_targets COPY --from=public_vars ./public/vars.env /usr/src/app/vars.env RUN mkdir -p ${DAGSTER_DBT_TARGET_BASE_DIR} && \ - python3.12 -m oso_dagster.compile --additional-vars /usr/src/app/vars.env + python${PYTHON_VERSION} -m oso_dagster.compile --additional-vars /usr/src/app/vars.env ENTRYPOINT [] \ No newline at end of file diff --git a/ops/tf-modules/warehouse-cluster/main.tf b/ops/tf-modules/warehouse-cluster/main.tf index b977917bc..83411ff8c 100644 --- a/ops/tf-modules/warehouse-cluster/main.tf +++ b/ops/tf-modules/warehouse-cluster/main.tf @@ -135,12 +135,12 @@ locals { # SQLMesh Workers { name = "${var.cluster_name}-sqlmesh-worker-node-pool" - machine_type = "n1-highmem-32" + machine_type = "n1-highmem-16" node_locations = join(",", var.cluster_zones) min_count = 0 max_count = 10 local_ssd_count = 0 - local_ssd_ephemeral_storage_count = 2 + local_ssd_ephemeral_storage_count = 1 spot = false disk_size_gb = 100 disk_type = "pd-standard" diff --git a/poetry.lock b/poetry.lock index 11de110aa..d252ab649 100644 --- a/poetry.lock +++ b/poetry.lock @@ -407,6 +407,28 @@ d = ["aiohttp (>=3.10)"] jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"] uvloop = ["uvloop (>=0.15.2)"] +[[package]] +name = "bokeh" +version = "3.6.1" +description = "Interactive plots and applications in the browser from Python" +optional = false +python-versions = ">=3.10" +files = [ + {file = "bokeh-3.6.1-py3-none-any.whl", hash = "sha256:6a97271bd4cc5b32c5bc7aa9c1c0dbe0beb0a8da2a22193e57c73f0c88d2075a"}, + {file = "bokeh-3.6.1.tar.gz", hash = "sha256:04d3fb5fac871423f38e4535838164cd90c3d32e707bcb74c8bf991ed28878fc"}, +] + +[package.dependencies] +contourpy = ">=1.2" +Jinja2 = ">=2.9" +numpy = ">=1.16" +packaging = ">=16.8" +pandas = ">=1.2" +pillow = ">=7.1.0" +PyYAML = ">=3.10" +tornado = ">=6.2" +xyzservices = ">=2021.09.1" + [[package]] name = "boltons" version = "24.0.0" @@ -829,6 +851,115 @@ traitlets = ">=4" [package.extras] test = ["pytest"] +[[package]] +name = "connectorx" +version = "0.4.0" +description = "" +optional = false +python-versions = ">=3.10" +files = [ + {file = "connectorx-0.4.0-cp310-cp310-macosx_10_12_x86_64.whl", hash = "sha256:447bd065feb69b63e51a8a056b1de3ccb98a48108b52987dba1b67555f01a8d8"}, + {file = "connectorx-0.4.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:342d233ca0008ec7cfef2ab91da566f15a1326ac2f51c87c6dd9f77dc9a6549e"}, + {file = "connectorx-0.4.0-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:72103766090f81ed7f8aeba9183d7eb2f24b65de9678d74a61c16f85f89e8b81"}, + {file = "connectorx-0.4.0-cp310-none-win_amd64.whl", hash = "sha256:63a5a4961ed43e5f33eebaa9ee0ea0bb4e3f7d0f710d48e2b51ede3bd1e2af44"}, + {file = "connectorx-0.4.0-cp311-cp311-macosx_10_12_x86_64.whl", hash = "sha256:fb3653c4ff1de8ada871c2ab883ed04dc2b7bb84704fcf08e6a23cf67deb931f"}, + {file = "connectorx-0.4.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:224da692923f3e2f6ae5677dfd0f00ac0374e3989832ee4de029112b09da2281"}, + {file = "connectorx-0.4.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:d9b6d1832c071201cacb810b06c7311cb605f99dead0ee47935d38086f1c9d4b"}, + {file = "connectorx-0.4.0-cp311-none-win_amd64.whl", hash = "sha256:31b2c02f56360c6c4034d23c107772df98c8ab6102f71242bcb9a8a0fec06512"}, + {file = "connectorx-0.4.0-cp312-cp312-macosx_10_12_x86_64.whl", hash = "sha256:5c6bb6cc468ff77b18b71ee44b5ea7e3c72de3a08a4740040fed3361a6086659"}, + {file = "connectorx-0.4.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:42d9ae283541aca860f7082186ecb0525ff933b714e676dfe0e517e7943a9799"}, + {file = "connectorx-0.4.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:c3ebe1e95e4c82d69804d0165be4d4f80ff4bb1beac04d6946aa73ed04819440"}, + {file = "connectorx-0.4.0-cp312-none-win_amd64.whl", hash = "sha256:d4c156df650860002f61155e2eff23d9c822c2e9f14e16d04e21dd1033b09cdc"}, + {file = "connectorx-0.4.0-cp313-cp313-macosx_10_12_x86_64.whl", hash = "sha256:b74266abb86c7d570579004d909d8b4ce5d18dab29e07c63015ea75ed0178adb"}, + {file = "connectorx-0.4.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:dd80488823a174d0c73e4575b10f0c0eb86bc82cbd72e28fd2b611733308699d"}, + {file = "connectorx-0.4.0-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:33a64fdd0b3efd32f54cce664f1022d448db5878e919e7a2f186f2af19eaa572"}, + {file = "connectorx-0.4.0-cp313-none-win_amd64.whl", hash = "sha256:635727407b0a14ff30cd02519e631766b2df55c94da2d57c7599e5a9a4ce2f4f"}, +] + +[[package]] +name = "contourpy" +version = "1.3.0" +description = "Python library for calculating contours of 2D quadrilateral grids" +optional = false +python-versions = ">=3.9" +files = [ + {file = "contourpy-1.3.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:880ea32e5c774634f9fcd46504bf9f080a41ad855f4fef54f5380f5133d343c7"}, + {file = "contourpy-1.3.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:76c905ef940a4474a6289c71d53122a4f77766eef23c03cd57016ce19d0f7b42"}, + {file = "contourpy-1.3.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:92f8557cbb07415a4d6fa191f20fd9d2d9eb9c0b61d1b2f52a8926e43c6e9af7"}, + {file = "contourpy-1.3.0-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:36f965570cff02b874773c49bfe85562b47030805d7d8360748f3eca570f4cab"}, + {file = "contourpy-1.3.0-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:cacd81e2d4b6f89c9f8a5b69b86490152ff39afc58a95af002a398273e5ce589"}, + {file = "contourpy-1.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:69375194457ad0fad3a839b9e29aa0b0ed53bb54db1bfb6c3ae43d111c31ce41"}, + {file = "contourpy-1.3.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:7a52040312b1a858b5e31ef28c2e865376a386c60c0e248370bbea2d3f3b760d"}, + {file = "contourpy-1.3.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:3faeb2998e4fcb256542e8a926d08da08977f7f5e62cf733f3c211c2a5586223"}, + {file = "contourpy-1.3.0-cp310-cp310-win32.whl", hash = "sha256:36e0cff201bcb17a0a8ecc7f454fe078437fa6bda730e695a92f2d9932bd507f"}, + {file = "contourpy-1.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:87ddffef1dbe5e669b5c2440b643d3fdd8622a348fe1983fad7a0f0ccb1cd67b"}, + {file = "contourpy-1.3.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:0fa4c02abe6c446ba70d96ece336e621efa4aecae43eaa9b030ae5fb92b309ad"}, + {file = "contourpy-1.3.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:834e0cfe17ba12f79963861e0f908556b2cedd52e1f75e6578801febcc6a9f49"}, + {file = "contourpy-1.3.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:dbc4c3217eee163fa3984fd1567632b48d6dfd29216da3ded3d7b844a8014a66"}, + {file = "contourpy-1.3.0-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:4865cd1d419e0c7a7bf6de1777b185eebdc51470800a9f42b9e9decf17762081"}, + {file = "contourpy-1.3.0-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:303c252947ab4b14c08afeb52375b26781ccd6a5ccd81abcdfc1fafd14cf93c1"}, + {file = "contourpy-1.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:637f674226be46f6ba372fd29d9523dd977a291f66ab2a74fbeb5530bb3f445d"}, + {file = "contourpy-1.3.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:76a896b2f195b57db25d6b44e7e03f221d32fe318d03ede41f8b4d9ba1bff53c"}, + {file = "contourpy-1.3.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:e1fd23e9d01591bab45546c089ae89d926917a66dceb3abcf01f6105d927e2cb"}, + {file = "contourpy-1.3.0-cp311-cp311-win32.whl", hash = "sha256:d402880b84df3bec6eab53cd0cf802cae6a2ef9537e70cf75e91618a3801c20c"}, + {file = "contourpy-1.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:6cb6cc968059db9c62cb35fbf70248f40994dfcd7aa10444bbf8b3faeb7c2d67"}, + {file = "contourpy-1.3.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:570ef7cf892f0afbe5b2ee410c507ce12e15a5fa91017a0009f79f7d93a1268f"}, + {file = "contourpy-1.3.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:da84c537cb8b97d153e9fb208c221c45605f73147bd4cadd23bdae915042aad6"}, + {file = "contourpy-1.3.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0be4d8425bfa755e0fd76ee1e019636ccc7c29f77a7c86b4328a9eb6a26d0639"}, + {file = "contourpy-1.3.0-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9c0da700bf58f6e0b65312d0a5e695179a71d0163957fa381bb3c1f72972537c"}, + {file = "contourpy-1.3.0-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:eb8b141bb00fa977d9122636b16aa67d37fd40a3d8b52dd837e536d64b9a4d06"}, + {file = "contourpy-1.3.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3634b5385c6716c258d0419c46d05c8aa7dc8cb70326c9a4fb66b69ad2b52e09"}, + {file = "contourpy-1.3.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:0dce35502151b6bd35027ac39ba6e5a44be13a68f55735c3612c568cac3805fd"}, + {file = "contourpy-1.3.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:aea348f053c645100612b333adc5983d87be69acdc6d77d3169c090d3b01dc35"}, + {file = "contourpy-1.3.0-cp312-cp312-win32.whl", hash = "sha256:90f73a5116ad1ba7174341ef3ea5c3150ddf20b024b98fb0c3b29034752c8aeb"}, + {file = "contourpy-1.3.0-cp312-cp312-win_amd64.whl", hash = "sha256:b11b39aea6be6764f84360fce6c82211a9db32a7c7de8fa6dd5397cf1d079c3b"}, + {file = "contourpy-1.3.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:3e1c7fa44aaae40a2247e2e8e0627f4bea3dd257014764aa644f319a5f8600e3"}, + {file = "contourpy-1.3.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:364174c2a76057feef647c802652f00953b575723062560498dc7930fc9b1cb7"}, + {file = "contourpy-1.3.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:32b238b3b3b649e09ce9aaf51f0c261d38644bdfa35cbaf7b263457850957a84"}, + {file = "contourpy-1.3.0-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d51fca85f9f7ad0b65b4b9fe800406d0d77017d7270d31ec3fb1cc07358fdea0"}, + {file = "contourpy-1.3.0-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:732896af21716b29ab3e988d4ce14bc5133733b85956316fb0c56355f398099b"}, + {file = "contourpy-1.3.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d73f659398a0904e125280836ae6f88ba9b178b2fed6884f3b1f95b989d2c8da"}, + {file = "contourpy-1.3.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:c6c7c2408b7048082932cf4e641fa3b8ca848259212f51c8c59c45aa7ac18f14"}, + {file = "contourpy-1.3.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:f317576606de89da6b7e0861cf6061f6146ead3528acabff9236458a6ba467f8"}, + {file = "contourpy-1.3.0-cp313-cp313-win32.whl", hash = "sha256:31cd3a85dbdf1fc002280c65caa7e2b5f65e4a973fcdf70dd2fdcb9868069294"}, + {file = "contourpy-1.3.0-cp313-cp313-win_amd64.whl", hash = "sha256:4553c421929ec95fb07b3aaca0fae668b2eb5a5203d1217ca7c34c063c53d087"}, + {file = "contourpy-1.3.0-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:345af746d7766821d05d72cb8f3845dfd08dd137101a2cb9b24de277d716def8"}, + {file = "contourpy-1.3.0-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:3bb3808858a9dc68f6f03d319acd5f1b8a337e6cdda197f02f4b8ff67ad2057b"}, + {file = "contourpy-1.3.0-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:420d39daa61aab1221567b42eecb01112908b2cab7f1b4106a52caaec8d36973"}, + {file = "contourpy-1.3.0-cp313-cp313t-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:4d63ee447261e963af02642ffcb864e5a2ee4cbfd78080657a9880b8b1868e18"}, + {file = "contourpy-1.3.0-cp313-cp313t-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:167d6c890815e1dac9536dca00828b445d5d0df4d6a8c6adb4a7ec3166812fa8"}, + {file = "contourpy-1.3.0-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:710a26b3dc80c0e4febf04555de66f5fd17e9cf7170a7b08000601a10570bda6"}, + {file = "contourpy-1.3.0-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:75ee7cb1a14c617f34a51d11fa7524173e56551646828353c4af859c56b766e2"}, + {file = "contourpy-1.3.0-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:33c92cdae89ec5135d036e7218e69b0bb2851206077251f04a6c4e0e21f03927"}, + {file = "contourpy-1.3.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:a11077e395f67ffc2c44ec2418cfebed032cd6da3022a94fc227b6faf8e2acb8"}, + {file = "contourpy-1.3.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:e8134301d7e204c88ed7ab50028ba06c683000040ede1d617298611f9dc6240c"}, + {file = "contourpy-1.3.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e12968fdfd5bb45ffdf6192a590bd8ddd3ba9e58360b29683c6bb71a7b41edca"}, + {file = "contourpy-1.3.0-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:fd2a0fc506eccaaa7595b7e1418951f213cf8255be2600f1ea1b61e46a60c55f"}, + {file = "contourpy-1.3.0-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:4cfb5c62ce023dfc410d6059c936dcf96442ba40814aefbfa575425a3a7f19dc"}, + {file = "contourpy-1.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:68a32389b06b82c2fdd68276148d7b9275b5f5cf13e5417e4252f6d1a34f72a2"}, + {file = "contourpy-1.3.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:94e848a6b83da10898cbf1311a815f770acc9b6a3f2d646f330d57eb4e87592e"}, + {file = "contourpy-1.3.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:d78ab28a03c854a873787a0a42254a0ccb3cb133c672f645c9f9c8f3ae9d0800"}, + {file = "contourpy-1.3.0-cp39-cp39-win32.whl", hash = "sha256:81cb5ed4952aae6014bc9d0421dec7c5835c9c8c31cdf51910b708f548cf58e5"}, + {file = "contourpy-1.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:14e262f67bd7e6eb6880bc564dcda30b15e351a594657e55b7eec94b6ef72843"}, + {file = "contourpy-1.3.0-pp310-pypy310_pp73-macosx_10_15_x86_64.whl", hash = "sha256:fe41b41505a5a33aeaed2a613dccaeaa74e0e3ead6dd6fd3a118fb471644fd6c"}, + {file = "contourpy-1.3.0-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eca7e17a65f72a5133bdbec9ecf22401c62bcf4821361ef7811faee695799779"}, + {file = "contourpy-1.3.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:1ec4dc6bf570f5b22ed0d7efba0dfa9c5b9e0431aeea7581aa217542d9e809a4"}, + {file = "contourpy-1.3.0-pp39-pypy39_pp73-macosx_10_15_x86_64.whl", hash = "sha256:00ccd0dbaad6d804ab259820fa7cb0b8036bda0686ef844d24125d8287178ce0"}, + {file = "contourpy-1.3.0-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8ca947601224119117f7c19c9cdf6b3ab54c5726ef1d906aa4a69dfb6dd58102"}, + {file = "contourpy-1.3.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:c6ec93afeb848a0845a18989da3beca3eec2c0f852322efe21af1931147d12cb"}, + {file = "contourpy-1.3.0.tar.gz", hash = "sha256:7ffa0db17717a8ffb127efd0c95a4362d996b892c2904db72428d5b52e1938a4"}, +] + +[package.dependencies] +numpy = ">=1.23" + +[package.extras] +bokeh = ["bokeh", "selenium"] +docs = ["furo", "sphinx (>=7.2)", "sphinx-copybutton"] +mypy = ["contourpy[bokeh,docs]", "docutils-stubs", "mypy (==1.11.1)", "types-Pillow"] +test = ["Pillow", "contourpy[test-no-images]", "matplotlib"] +test-no-images = ["pytest", "pytest-cov", "pytest-rerunfailures", "pytest-xdist", "wurlitzer"] + [[package]] name = "croniter" version = "5.0.1" @@ -7141,6 +7272,17 @@ files = [ [package.dependencies] h11 = ">=0.9.0,<1" +[[package]] +name = "xyzservices" +version = "2024.9.0" +description = "Source of XYZ tiles providers" +optional = false +python-versions = ">=3.8" +files = [ + {file = "xyzservices-2024.9.0-py3-none-any.whl", hash = "sha256:776ae82b78d6e5ca63dd6a94abb054df8130887a4a308473b54a6bd364de8644"}, + {file = "xyzservices-2024.9.0.tar.gz", hash = "sha256:68fb8353c9dbba4f1ff6c0f2e5e4e596bb9e1db7f94f4f7dfbcb26e25aa66fde"}, +] + [[package]] name = "yarl" version = "1.17.0" @@ -7382,4 +7524,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.0" python-versions = "^3.12,<3.13" -content-hash = "15254fa69d7140b4b4a656da1fd65de839b65827d2ce965d3d585f08f3fff625" +content-hash = "7d1a62623fd1b281c059d34ccd159608538429f6546d10b583875658eb9e96cb" diff --git a/pyproject.toml b/pyproject.toml index 4ec39807c..5224b68ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -64,6 +64,8 @@ google-auth = "^2.34.0" pillow = "^10.4.0" dagster-k8s = "^0.24.6" pyiceberg = { extras = ["hive"], version = "^0.7.1" } +connectorx = "^0.4.0" +bokeh = "^3.6.1" [tool.poetry.scripts] diff --git a/warehouse/metrics_mesh/fixtures/README.md b/warehouse/metrics_mesh/fixtures/README.md new file mode 100644 index 000000000..c510c6b0a --- /dev/null +++ b/warehouse/metrics_mesh/fixtures/README.md @@ -0,0 +1,2 @@ +This folder contains both the data generator and the csv most recently generated +from that data generator. This is so we can easily update tests as we need. diff --git a/warehouse/metrics_mesh/models/metrics_factories.py b/warehouse/metrics_mesh/models/metrics_factories.py index 16ef7f201..386215ea3 100644 --- a/warehouse/metrics_mesh/models/metrics_factories.py +++ b/warehouse/metrics_mesh/models/metrics_factories.py @@ -21,6 +21,22 @@ ref="stars.sql", time_aggregations=["daily", "weekly", "monthly"], ), + "active_addresses": MetricQueryDef( + ref="active_addresses.sql", + time_aggregations=["daily", "weekly", "monthly"], + ), + "commits": MetricQueryDef( + ref="commits.sql", + time_aggregations=["daily", "weekly", "monthly"], + ), + "forks": MetricQueryDef( + ref="forks.sql", + time_aggregations=["daily", "weekly", "monthly"], + ), + "gas_fees": MetricQueryDef( + ref="gas_fees.sql", + time_aggregations=["daily", "weekly", "monthly"], + ), # This defines something with a rolling option that allows you to look back # to some arbitrary window. So you specify the window and specify the unit. # The unit and the window are used to pass in variables to the query. So it's @@ -60,22 +76,6 @@ cron="@daily", ), ), - "active_addresses": MetricQueryDef( - ref="active_addresses.sql", - time_aggregations=["daily", "weekly", "monthly"], - ), - "commits": MetricQueryDef( - ref="commits.sql", - time_aggregations=["daily", "weekly", "monthly"], - ), - "forks": MetricQueryDef( - ref="forks.sql", - time_aggregations=["daily", "weekly", "monthly"], - ), - "gas_fees": MetricQueryDef( - ref="gas_fees.sql", - time_aggregations=["daily", "weekly", "monthly"], - ), "change_in_30_developer_activity": MetricQueryDef( vars={ "comparison_interval": 30, diff --git a/warehouse/metrics_tools/compute/cluster.py b/warehouse/metrics_tools/compute/cluster.py new file mode 100644 index 000000000..bdb29b661 --- /dev/null +++ b/warehouse/metrics_tools/compute/cluster.py @@ -0,0 +1,20 @@ +"""Sets up a dask cluster +""" + +import typing as t +from dask_kubernetes.operator import KubeCluster + + +def start_duckdb_cluster( + namespace: str, + gcs_key_id: str, + gcs_secret: str, + duckdb_path: str, + cluster_spec: t.Optional[dict] = None, +): + options: t.Dict[str, t.Any] = {"namespace": namespace} + if cluster_spec: + options["custom_cluster_spec"] = cluster_spec + cluster = KubeCluster(**options) + cluster.adapt(minimum=6, maximum=6) + return cluster diff --git a/warehouse/metrics_tools/compute/flight.py b/warehouse/metrics_tools/compute/flight.py new file mode 100644 index 000000000..d7fbfcaa6 --- /dev/null +++ b/warehouse/metrics_tools/compute/flight.py @@ -0,0 +1,706 @@ +"""A python arrow service that is used to as a proxy to the cluster of compute for the +metrics tools. This allows us to change the underlying compute infrastructure +while maintaining the same interface to the sqlmesh runner. +""" + +import concurrent.futures +import logging +import sys +import typing as t +import json +import click +import time +import concurrent +from metrics_tools.compute.cluster import start_duckdb_cluster +from metrics_tools.compute.worker import MetricsWorkerPlugin +from metrics_tools.definition import PeerMetricDependencyRef +from metrics_tools.runner import FakeEngineAdapter, MetricsRunner +from metrics_tools.transformer.tables import MapTableTransform +from metrics_tools.transformer.transformer import SQLTransformer +import pyarrow as pa +import pyarrow.flight as fl +import asyncio +import pandas as pd +import threading +import trino +import queue +from sqlglot import exp +from sqlmesh.core.dialect import parse_one +from trino.dbapi import Connection, Cursor +import abc +import uuid +from pydantic import BaseModel +from datetime import datetime +from dask.distributed import Client, get_worker, Future, as_completed, print as dprint +from dask_kubernetes.operator import KubeCluster, make_cluster_spec +from dask_kubernetes.operator.kubecluster.kubecluster import CreateMode +from dataclasses import dataclass + + +logger = logging.getLogger(__name__) + + +type_mapping = { + "INTEGER": "int64", + "BIGINT": "int64", + "SMALLINT": "int32", + "NUMERIC": "float64", + "REAL": "float32", + "DOUBLE PRECISION": "float64", + "VARCHAR": "object", + "TEXT": "object", + "BOOLEAN": "bool", + "DATE": "datetime64[ns]", + "TIMESTAMP": "datetime64[ns]", + # Add more mappings as needed +} + +arrow_type_mapping = { + "INTEGER": pa.int32(), + "BIGINT": pa.int64(), + "SMALLINT": pa.int16(), + "NUMERIC": pa.float64(), + "REAL": pa.float32(), + "DOUBLE PRECISION": pa.float64(), + "VARCHAR": pa.string(), + "TEXT": pa.string(), + "BOOLEAN": pa.bool_(), + "DATE": pa.date32(), + "TIMESTAMP": pa.timestamp("us"), +} + + +class QueryInput(BaseModel): + query_str: str + start: datetime + end: datetime + dialect: str + batch_size: int + columns: t.List[t.Tuple[str, str]] + ref: PeerMetricDependencyRef + locals: t.Dict[str, t.Any] + dependent_tables_map: t.Dict[str, str] + + def to_ticket(self) -> fl.Ticket: + return fl.Ticket(self.model_dump_json()) + + def to_column_names(self) -> pd.Series: + return pd.Series(list(map(lambda a: a[0], self.columns))) + + def to_arrow_schema(self) -> pa.Schema: + schema_input = [ + (col_name, arrow_type_mapping[col_type]) + for col_name, col_type in self.columns + ] + print(schema_input) + return pa.schema(schema_input) + + # def coerce_datetimes(self, df: pd.DataFrame) -> pd.DataFrame: + # for col_name, col_type in self.columns: + # if col_type == + + +class Engine(abc.ABC): + def run_query(self, query: str) -> Cursor: + raise NotImplementedError("run_query not implemented") + + +class TrinoEngine(Engine): + @classmethod + def create(cls, host: str, port: int, user: str, catalog: str): + conn = trino.dbapi.connect( + host=host, + port=port, + user=user, + catalog=catalog, + ) + return cls(conn) + + def __init__(self, conn: Connection): + self._conn = conn + + def run_query(self, query: str) -> Cursor: + cursor = self._conn.cursor() + logger.info(f"EXECUTING: {query}") + return cursor.execute(query) + + +def start_loop(loop): + asyncio.set_event_loop(loop) + loop.run_forever() + + +def run_coroutine_in_thread(coro): + loop = asyncio.new_event_loop() + thread = threading.Thread(target=start_loop, args=(loop,)) + thread.start() + + +def execute_duckdb_load( + id: int, gcs_path: str, queries: t.List[str], dependencies: t.Dict[str, str] +): + dprint("Starting duckdb load") + worker = get_worker() + plugin = t.cast(MetricsWorkerPlugin, worker.plugins["metrics"]) + for ref, actual in dependencies.items(): + dprint(f"Loading cache for {ref}:{actual}") + plugin.get_for_cache(ref, actual) + conn = plugin.connection + results: t.List[pd.DataFrame] = [] + for query in queries: + result = conn.execute(query).df() + results.append(result) + + return DuckdbLoadedItem( + id=id, + df=pd.concat(results, ignore_index=True, sort=False), + ) + + +@dataclass(kw_only=True) +class DuckdbLoadedItem: + id: int + df: pd.DataFrame + + +@dataclass(kw_only=True) +class ResultQueueItem: + id: int + record_batch: pa.RecordBatch + + +class MetricsCalculatorFlightServer(fl.FlightServerBase): + def __init__( + self, + cluster: KubeCluster, + engine: TrinoEngine, + gcs_bucket: str, + location: str = "grpc://0.0.0.0:8815", + exported_map: t.Optional[t.Dict[str, str]] = None, + downloaders: int = 64, + queue_size: int = 100, + ): + super().__init__(location) + self.data = pa.Table.from_pydict({"col1": [1, 2, 3]}) + self.loop_loop = asyncio.new_event_loop() + self.loop_thread = threading.Thread( + target=start_loop, + args=(self.loop_loop,), + ) + self.loop_thread.start() + self.engine = engine + self.cluster = cluster + self.exported_map: t.Dict[str, str] = exported_map or {} + self.gcs_bucket = gcs_bucket + self.queue_size = queue_size + self.downloader_count = downloaders + + def run_initialization( + self, hive_uri: str, gcs_key_id: str, gcs_secret: str, duckdb_path: str + ): + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + client = Client(self.cluster) + self.client = client + client.register_plugin( + MetricsWorkerPlugin( + self.gcs_bucket, + hive_uri, + gcs_key_id, + gcs_secret, + duckdb_path, + ), + name="metrics", + ) + + def finalizer(self): + self.client.close() + + def _ticket_to_query_input(self, ticket: fl.Ticket) -> QueryInput: + return QueryInput(**json.loads(ticket.ticket)) + + def table_rewrite(self, query: str, rewrite_map: t.Dict[str, str]): + transformer = SQLTransformer( + transforms=[ + MapTableTransform(rewrite_map), + ] + ) + return transformer.transform(query) + + def export_table_for_cache(self, table: str): + # Using the actual name + # Export with trino + if table in self.exported_map: + logger.debug(f"CACHE HIT FOR {table}") + return self.exported_map[table] + + columns: t.List[t.Tuple[str, str]] = [] + + col_result = self.engine.run_query(f"SHOW COLUMNS FROM {table}").fetchall() + for row in col_result: + column_name = row[0] + column_type = row[1] + columns.append((column_name, column_type)) + + table_exp = exp.to_table(table) + logger.info(f"RETREIVED COLUMNS: {columns}") + export_table_name = f"export_{table_exp.this.this}_{uuid.uuid4().hex}" + + base_create_query = f""" + CREATE table "source"."export"."{export_table_name}" ( + placeholder VARCHAR, + ) WITH ( + format = 'PARQUET', + external_location = 'gs://{self.gcs_bucket}/trino-export/{export_table_name}/' + ) + """ + create_query = parse_one(base_create_query) + create_query.this.set( + "expressions", + [ + exp.ColumnDef( + this=exp.to_identifier(column_name), + kind=parse_one(column_type, into=exp.DataType), + ) + for column_name, column_type in columns + ], + ) + + self.engine.run_query(create_query.sql(dialect="trino")) + + base_insert_query = f""" + INSERT INTO "source"."export"."{export_table_name}" (placeholder) + SELECT placeholder + FROM {table_exp} + """ + + column_identifiers = [ + exp.to_identifier(column_name) for column_name, _ in columns + ] + + insert_query = parse_one(base_insert_query) + insert_query.this.set( + "expressions", + column_identifiers, + ) + select = t.cast(exp.Select, insert_query.expression) + select.set("expressions", column_identifiers) + + self.engine.run_query(insert_query.sql(dialect="trino")) + + self.exported_map[table] = export_table_name + return self.exported_map[table] + + # def shutdown(self): + # pass + + def do_get(self, context: fl.ServerCallContext, ticket: fl.Ticket): + input = self._ticket_to_query_input(ticket) + + exported_dependent_tables_map: t.Dict[str, str] = {} + + # Parse the query + for ref_name, actual_name in input.dependent_tables_map.items(): + # Any deps, use trino to export to gcs + exported_table_name = self.export_table_for_cache(actual_name) + exported_dependent_tables_map[ref_name] = exported_table_name + + # rewrite the query for the temporary caches made by trino + # ex = self.table_rewrite(input.query_str, exported_dependent_tables_map) + # if len(ex) != 1: + # raise Exception("unexpected number of expressions") + + rewritten_query = parse_one(input.query_str).sql(dialect="duckdb") + # columns = input.to_column_names() + + # def gen(): + # futures: t.List[concurrent.futures.Future[pd.DataFrame]] = [] + # for rendered_query in runner.render_rolling_queries(input.start, input.end): + # future = asyncio.run_coroutine_threadsafe( + # async_gen_batch(self.engine, rendered_query, columns), + # self.loop_loop, + # ) + # futures.append(future) + # for res in concurrent.futures.as_completed(futures): + # yield pa.RecordBatch.from_pandas(res.result()) + + def gen_with_dask( + rewritten_query: str, + input: QueryInput, + exported_dependent_tables_map: t.Dict[str, str], + download_queue: queue.Queue[Future], + ): + client = self.client + futures: t.List[Future] = [] + current_batch: t.List[str] = [] + task_ids: t.List[int] = [] + + runner = MetricsRunner.from_engine_adapter( + FakeEngineAdapter("duckdb"), + rewritten_query, + input.ref, + input.locals, + ) + + task_id = 0 + for rendered_query in runner.render_rolling_queries(input.start, input.end): + current_batch.append(rendered_query) + if len(current_batch) >= input.batch_size: + future = client.submit( + execute_duckdb_load, + task_id, + current_batch[:], + exported_dependent_tables_map, + ) + futures.append(future) + current_batch = [] + task_ids.append(task_id) + task_id += 1 + if len(current_batch) > 0: + future = client.submit( + execute_duckdb_load, + task_id, + current_batch[:], + exported_dependent_tables_map, + ) + futures.append(future) + task_ids.append(task_id) + task_id += 1 + + completed_batches = 0 + total_batches = len(futures) + for future in as_completed(futures): + completed_batches += 1 + logger.info(f"progress received [{completed_batches}/{total_batches}]") + future = t.cast(Future, future) + if future.cancelled: + if future.done(): + logger.info("future actually done???") + else: + logger.error("future cancelled. skipping for now?") + print(future) + print(future.result() is not None) + continue + download_queue.put(future) + return task_ids + + def downloader( + kill_event: threading.Event, + download_queue: queue.Queue[Future], + res_queue: queue.Queue[ResultQueueItem], + ): + logger.debug("waiting for download") + while True: + try: + future = download_queue.get(timeout=0.1) + try: + item = t.cast(DuckdbLoadedItem, future.result()) + record_batch = pa.RecordBatch.from_pandas(item.df) + res_queue.put( + ResultQueueItem( + id=item.id, + record_batch=record_batch, + ) + ) + logger.debug("download completed") + finally: + download_queue.task_done() + except queue.Empty: + if kill_event.is_set(): + logger.debug("shutting down downloader") + return + if kill_event.is_set() and not download_queue.empty(): + logger.debug("shutting down downloader prematurely") + return + + def gen_record_batches(size: int): + download_queue: queue.Queue[Future] = queue.Queue(maxsize=size) + res_queue: queue.Queue[ResultQueueItem] = queue.Queue(maxsize=size) + kill_event = threading.Event() + result_queue_timeout = 5.0 + max_result_timeout = 300 + + with concurrent.futures.ThreadPoolExecutor( + max_workers=self.downloader_count + 5 + ) as executor: + dask_thread = executor.submit( + gen_with_dask, + rewritten_query, + input, + exported_dependent_tables_map, + download_queue, + ) + downloaders = [] + for i in range(self.downloader_count): + downloaders.append( + executor.submit( + downloader, kill_event, download_queue, res_queue + ) + ) + + wait_retries = 0 + + completed_task_ids: t.Set[int] = set() + task_ids: t.Optional[t.Set[int]] = None + + while task_ids != completed_task_ids: + try: + result = res_queue.get(timeout=result_queue_timeout) + wait_retries = 0 + logger.debug("sending batch to client") + + completed_task_ids.add(result.id) + + yield result.record_batch + except queue.Empty: + wait_retries += 1 + if task_ids is None: + # If the dask thread is done we know if we can check for completion + if dask_thread.done(): + task_ids = set(dask_thread.result()) + else: + # If we have waited longer then 15 mins let's stop waiting + current_wait_time = wait_retries * result_queue_timeout + if current_wait_time > max_result_timeout: + logger.debug( + "record batches might be completed. with some kind of error" + ) + break + kill_event.set() + logger.debug("waiting for the downloaders to shutdown") + executor.shutdown(cancel_futures=True) + + logger.debug( + f"Distributing query for {input.start} to {input.end}: {rewritten_query}" + ) + try: + return fl.GeneratorStream( + input.to_arrow_schema(), + gen_record_batches(size=self.queue_size), + ) + except Exception as e: + print("caught error") + logger.error("Caught error generating stream", exc_info=e) + raise e + + +def run_get( + start: str, + end: str, + batch_size: int = 1, +): + run_start = time.time() + client = fl.connect("grpc://0.0.0.0:8815") + input = QueryInput( + query_str=""" + SELECT bucket_day, to_artifact_id, from_artifact_id, event_source, event_type, SUM(amount) as amount + FROM metrics.events_daily_to_artifact + where bucket_day >= strptime(@start_ds, '%Y-%m-%d') and bucket_day <= strptime(@end_ds, '%Y-%m-%d') + group by + bucket_day, + to_artifact_id, + from_artifact_id, + event_source, + event_type + """, + start=datetime.strptime(start, "%Y-%m-%d"), + end=datetime.strptime(end, "%Y-%m-%d"), + dialect="duckdb", + columns=[ + ("bucket_day", "TIMESTAMP"), + ("to_artifact_id", "VARCHAR"), + ("from_artifact_id", "VARCHAR"), + ("event_source", "VARCHAR"), + ("event_type", "VARCHAR"), + ("amount", "NUMERIC"), + ], + ref=PeerMetricDependencyRef( + name="", entity_type="artifact", window=30, unit="day" + ), + locals={}, + dependent_tables_map={ + "metrics.events_daily_to_artifact": "sqlmesh__metrics.metrics__events_daily_to_artifact__2357434958" + }, + batch_size=batch_size, + ) + reader = client.do_get(input.to_ticket()) + r = reader.to_reader() + count = 0 + for batch in r: + count += 1 + print(f"[{count}] ROWS={batch.num_rows}") + run_end = time.time() + print(f"DURATION={run_end - run_start}s") + + +@click.command() +@click.option("--host", envvar="SQLMESH_TRINO_HOST", required=True) +@click.option("--port", default=8080, type=click.INT) +@click.option("--catalog", default="metrics") +@click.option("--user", default="sqlmesh") +@click.option("--gcs-bucket", envvar="METRICS_FLIGHT_SERVER_GCS_BUCKET", required=True) +@click.option("--gcs-key-id", envvar="METRICS_FLIGHT_SERVER_GCS_KEY_ID", required=True) +@click.option("--gcs-secret", envvar="METRICS_FLIGHT_SERVER_GCS_SECRET", required=True) +@click.option( + "--worker-duckdb-path", + envvar="METRICS_FLIGHT_SERVER_WORKER_DUCKDB_PATH", + required=True, +) +@click.option("--hive-uri", envvar="METRICS_FLIGHT_SERVER_HIVE_URI", required=True) +@click.option("--image-tag", required=True) +@click.option("--threads", type=click.INT, default=16) +@click.option("--worker-memory-limit", default="90000Mi") +@click.option("--worker-memory-request", default="75000Mi") +@click.option("--scheduler-memory-limit", default="90000Mi") +@click.option("--scheduler-memory-request", default="75000Mi") +@click.option("--cluster-only/--no-cluster-only", default=False) +@click.option("--cluster-name", default="sqlmesh-flight") +@click.option("--cluster-namespace", default="sqlmesh-manual") +def main( + host: str, + port: int, + catalog: str, + user: str, + gcs_bucket: str, + gcs_key_id: str, + gcs_secret: str, + worker_duckdb_path: str, + hive_uri: str, + image_tag: str, + threads: int, + scheduler_memory_limit: str, + scheduler_memory_request: str, + worker_memory_limit: str, + worker_memory_request: str, + cluster_only: bool, + cluster_name: str, + cluster_namespace: str, +): + cluster_spec = make_new_cluster( + f"ghcr.io/opensource-observer/dagster-dask:{image_tag}", + cluster_name, + cluster_namespace, + threads=threads, + scheduler_memory_limit=scheduler_memory_limit, + scheduler_memory_request=scheduler_memory_request, + worker_memory_limit=worker_memory_limit, + worker_memory_request=worker_memory_request, + ) + + if cluster_only: + # Start the cluster + cluster = start_duckdb_cluster( + cluster_namespace, + gcs_key_id, + gcs_secret, + worker_duckdb_path, + cluster_spec=cluster_spec, + ) + try: + while True: + time.sleep(1.0) + finally: + cluster.close() + else: + cluster = KubeCluster( + name=cluster_name, + namespace=cluster_namespace, + create_mode=CreateMode.CONNECT_ONLY, + shutdown_on_close=False, + ) + server = MetricsCalculatorFlightServer( + cluster, + TrinoEngine.create( + host, + port, + user, + catalog, + ), + gcs_bucket, + exported_map={ + "sqlmesh__metrics.metrics__events_daily_to_artifact__2357434958": "export_metrics__events_daily_to_artifact__2357434958_5def5e890a984cf99f7364ce3c2bb958", + }, + ) + server.run_initialization(hive_uri, gcs_key_id, gcs_secret, worker_duckdb_path) + with server as s: + s.serve() + + +def make_new_cluster( + image: str, + cluster_id: str, + service_account_name: str, + threads: int, + scheduler_memory_request: str, + scheduler_memory_limit: str, + worker_memory_request: str, + worker_memory_limit: str, +): + spec = make_cluster_spec( + name=f"{cluster_id}", + resources={ + "requests": {"memory": scheduler_memory_request}, + "limits": {"memory": scheduler_memory_limit}, + }, + image=image, + ) + spec["spec"]["scheduler"]["spec"]["tolerations"] = [ + { + "key": "pool_type", + "effect": "NoSchedule", + "operator": "Equal", + "value": "sqlmesh-worker", + } + ] + spec["spec"]["scheduler"]["spec"]["nodeSelector"] = {"pool_type": "sqlmesh-worker"} + + spec["spec"]["worker"]["spec"]["tolerations"] = [ + { + "key": "pool_type", + "effect": "NoSchedule", + "operator": "Equal", + "value": "sqlmesh-worker", + } + ] + spec["spec"]["worker"]["spec"]["nodeSelector"] = {"pool_type": "sqlmesh-worker"} + + # Give the workers a different resource allocation + for container in spec["spec"]["worker"]["spec"]["containers"]: + container["resources"] = { + "limits": { + "memory": worker_memory_limit, + }, + "requests": { + "memory": worker_memory_request, + }, + } + volume_mounts = container.get("volumeMounts", []) + volume_mounts.append( + { + "mountPath": "/scratch", + "name": "scratch", + } + ) + if container["name"] == "worker": + args: t.List[str] = container["args"] + args.append("--nthreads") + args.append(f"{threads}") + args.append("--nworkers") + args.append("1") + args.append("--memory-limit") + args.append("0") + container["volumeMounts"] = volume_mounts + volumes = spec["spec"]["worker"]["spec"].get("volumes", []) + volumes.append( + { + "name": "scratch", + "emptyDir": {}, + } + ) + spec["spec"]["worker"]["spec"]["volumes"] = volumes + spec["spec"]["worker"]["spec"]["serviceAccountName"] = service_account_name + + return spec + + +if __name__ == "__main__": + main() diff --git a/warehouse/metrics_tools/compute/run_get.py b/warehouse/metrics_tools/compute/run_get.py new file mode 100644 index 000000000..4c3f09525 --- /dev/null +++ b/warehouse/metrics_tools/compute/run_get.py @@ -0,0 +1,18 @@ +# Testing script +from metrics_tools.compute import flight +import click +from datetime import datetime + + +@click.command() +@click.option("--batch-size", type=click.INT, default=1) +@click.option("--start", default="2024-01-01") +@click.option("--end") +def main(batch_size: int, start, end): + if not end: + end = datetime.now().strftime("%Y-%m-%d") + flight.run_get(batch_size=batch_size, start=start, end=end) + + +if __name__ == "__main__": + main() diff --git a/warehouse/metrics_tools/compute/test_setup.sh b/warehouse/metrics_tools/compute/test_setup.sh new file mode 100644 index 000000000..ccf1c3279 --- /dev/null +++ b/warehouse/metrics_tools/compute/test_setup.sh @@ -0,0 +1,8 @@ +apt-get install -y tmux vim git + +cd /usr/src +mv app old +git clone https://github.com/opensource-observer/oso.git +mv oso app +cd app +git checkout ravenac95/test-duckdb-cluster \ No newline at end of file diff --git a/warehouse/metrics_tools/compute/worker.py b/warehouse/metrics_tools/compute/worker.py index 9cd2959a1..e88e1006f 100644 --- a/warehouse/metrics_tools/compute/worker.py +++ b/warehouse/metrics_tools/compute/worker.py @@ -1,12 +1,26 @@ # The worker initialization import abc -from graphql import ExecutionContext +import os +from metrics_tools.utils.logging import add_metrics_tools_to_existing_logger import pandas as pd import typing as t import duckdb -from datetime import datetime +import uuid from sqlglot import exp from dask.distributed import WorkerPlugin, Worker +import logging +import sys +from threading import Lock +from google.cloud import storage +from contextlib import contextmanager + + +from pyiceberg.catalog import load_catalog +from pyiceberg.table import Table as IcebergTable + +logger = logging.getLogger(__name__) + +mutex = Lock() class DuckDBWorkerInterface(abc.ABC): @@ -15,30 +29,51 @@ def fetchdf(self, query: str) -> pd.DataFrame: class MetricsWorkerPlugin(WorkerPlugin): - def __init__(self, duckdb_path: str): + def __init__( + self, + gcs_bucket: str, + hive_uri: str, + gcs_key_id: str, + gcs_secret: str, + duckdb_path: str, + ): + self._gcs_bucket = gcs_bucket + self._hive_uri = hive_uri + self._gcs_key_id = gcs_key_id + self._gcs_secret = gcs_secret self._duckdb_path = duckdb_path self._conn = None + self._cache_status: t.Dict[str, bool] = {} + self._catalog = None + self._mode = "duckdb" + self._uuid = uuid.uuid4().hex def setup(self, worker: Worker): + add_metrics_tools_to_existing_logger("distributed") + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + self._conn = duckdb.connect(self._duckdb_path) # Connect to iceberg if this is a remote worker - self._conn.sql( - """ + worker.log_event("info", "what") + sql = f""" INSTALL iceberg; LOAD iceberg; CREATE SECRET secret1 ( TYPE GCS, - PROVIDER CREDENTIAL_CHAIN - ) - CREATE SCHEMA IF NOT EXISTS sources; - CREATE TABLE IF NOT EXISTS sources.cache_status ( - table_name VARCHAR PRIMARY KEY, - version VARCHAR, - is_locked BOOLEAN + KEY_ID '{self._gcs_key_id}', + SECRET '{self._gcs_secret}' ); """ + self._conn.sql(sql) + self._catalog = load_catalog( + "metrics", + **{ + "uri": self._hive_uri, + "gcs.project-id": "opensource-observer", + "gcs.access": "read_only", + }, ) def teardown(self, worker: Worker): @@ -48,24 +83,105 @@ def teardown(self, worker: Worker): @property def connection(self): assert self._conn is not None - return self._conn + return self._conn.cursor() - def wait_for_cache(self, table: str): + def get_for_cache( + self, + table_ref_name: str, + table_actual_name: str, + ): """Checks if a table is cached in the local duckdb""" + logger.info( + f"[{self._uuid}] got a cache request for {table_ref_name}:{table_actual_name}" + ) + if self._cache_status.get(table_ref_name): + return + with mutex: + if self._cache_status.get(table_ref_name): + return + destination_table = exp.to_table(table_ref_name) + + # if self._mode == "duckdb": + # self.load_using_duckdb( + # table_ref_name, table_actual_name, destination_table, table + # ) + # else: + # self.load_using_pyiceberg( + # table_ref_name, table_actual_name, destination_table, table + # ) + self.load_using_gcs_parquet( + table_ref_name, table_actual_name, destination_table + ) + + self._cache_status[table_ref_name] = True + + def load_using_duckdb( + self, + table_ref_name: str, + table_actual_name: str, + destination_table: exp.Table, + ): + source_table = exp.to_table(table_actual_name) + assert self._catalog is not None + table = self._catalog.load_table((source_table.db, source_table.this.this)) + + self.connection.execute(f"CREATE SCHEMA IF NOT EXISTS {destination_table.db}") + caching_sql = f""" + CREATE TABLE IF NOT EXISTS {destination_table.db}.{destination_table.this.this} AS + SELECT * FROM iceberg_scan('{table.metadata_location}') + """ + logger.info(f"CACHING TABLE {table_ref_name} WITH SQL: {caching_sql}") + self.connection.sql(caching_sql) + logger.info(f"CACHING TABLE {table_ref_name} COMPLETED") + + def load_using_pyiceberg( + self, + table_ref_name: str, + table_actual_name: str, + destination_table: exp.Table, + table: IcebergTable, + ): + source_table = exp.to_table(table_actual_name) + assert self._catalog is not None + table = self._catalog.load_table((source_table.db, source_table.this.this)) + batch_reader = table.scan().to_arrow_batch_reader() # noqa: F841 + self.connection.execute(f"CREATE SCHEMA IF NOT EXISTS {destination_table.db}") + logger.info(f"CACHING TABLE {table_ref_name} WITH ICEBERG") + self.connection.sql( + f""" + CREATE TABLE IF NOT EXISTS {destination_table.db}.{destination_table.this.this} AS + SELECT * FROM batch_reader + """ + ) + logger.info(f"CACHING TABLE {table_ref_name} COMPLETED") + + def load_using_gcs_parquet( + self, + table_ref_name: str, + table_actual_name: str, + destination_table: exp.Table, + ): + self.connection.execute(f"CREATE SCHEMA IF NOT EXISTS {destination_table.db}") + logger.info(f"CACHING TABLE {table_ref_name} WITH PARQUET") self.connection.sql( f""" - SELECT * FROM {table} + CREATE TABLE IF NOT EXISTS {destination_table.db}.{destination_table.this.this} AS + SELECT * FROM read_parquet('gs://{self._gcs_bucket}/trino-export/{table_actual_name}/*') """ ) + logger.info(f"CACHING TABLE {table_ref_name} COMPLETED") + @contextmanager + def gcs_client(self): + client = storage.Client() + try: + yield client + finally: + client.close() + + @property + def bucket(self): + return self._gcs_bucket -def batch_metrics_query( - query: exp.Expression, - context: ExecutionContext, - start: datetime, - end: datetime, - execution_time: datetime, - **kwargs: t.Any, -): - """Yield batches of dataframes""" - pass + def bucket_path(self, *joins: str): + return os.path.join(f"gs://{self.bucket}", *joins) diff --git a/warehouse/metrics_tools/transformer/tables.py b/warehouse/metrics_tools/transformer/tables.py index 50ce8c9c2..ee2ca5fe4 100644 --- a/warehouse/metrics_tools/transformer/tables.py +++ b/warehouse/metrics_tools/transformer/tables.py @@ -8,31 +8,51 @@ from .base import Transform -class ExecutionContextTableTransform(Transform): - def __init__( - self, - context: ExecutionContext, - ): - self._context = context +class TableTransform(Transform): + def transform_table_name(self, table: exp.Table) -> exp.Table | None: + raise NotImplementedError("transform table not implemeented") def __call__(self, query: t.List[exp.Expression]) -> t.List[exp.Expression]: - context = self._context - def transform_tables(node: exp.Expression): if not isinstance(node, exp.Table): return node - table_name = f"{node.db}.{node.this.this}" - try: - actual_table_name = context.table(table_name) - except KeyError: + actual_table = self.transform_table_name(node) + if not actual_table: return node table_kwargs = {} if node.alias: table_kwargs["alias"] = node.alias - return exp.to_table(actual_table_name, **table_kwargs) + return exp.to_table(actual_table.this.this, **table_kwargs) transformed_expressions = [] for expression in query: transformed = expression.transform(transform_tables) transformed_expressions.append(transformed) return transformed_expressions + + +class MapTableTransform(TableTransform): + def __init__(self, map: t.Dict[str, str]): + self._map = map + + def transform_table_name(self, table: exp.Table) -> exp.Table | None: + table_name = f"{table.db}.{table.this.this}" + actual_name = self._map.get(table_name, None) + if actual_name: + return exp.to_table(actual_name) + return None + + +class ExecutionContextTableTransform(TableTransform): + def __init__( + self, + context: ExecutionContext, + ): + self._context = context + + def transform_table_name(self, table: exp.Table) -> exp.Table | None: + table_name = f"{table.db}.{table.this.this}" + try: + return exp.to_table(self._context.table(table_name)) + except KeyError: + return None diff --git a/warehouse/metrics_tools/utils/logging.py b/warehouse/metrics_tools/utils/logging.py index 43d00d186..a0edf41e2 100644 --- a/warehouse/metrics_tools/utils/logging.py +++ b/warehouse/metrics_tools/utils/logging.py @@ -11,12 +11,16 @@ def add_metrics_tools_to_sqlmesh_logging(): global connected_to_sqlmesh_logs + app_name = os.path.basename(__main__.__file__) + if app_name == "sqlmesh" and not connected_to_sqlmesh_logs: + add_metrics_tools_to_existing_logger(app_name) + connected_to_sqlmesh_logs = True + + +def add_metrics_tools_to_existing_logger(logger_name: str): class MetricsToolsFilter(logging.Filter): def filter(self, record): return record.name == "metrics_tools" - app_name = os.path.basename(__main__.__file__) - if app_name == "sqlmesh" and not connected_to_sqlmesh_logs: - app_logger = logging.getLogger(app_name) - app_logger.addFilter(MetricsToolsFilter()) - connected_to_sqlmesh_logs = True + app_logger = logging.getLogger(logger_name) + app_logger.addFilter(MetricsToolsFilter())