Skip to content

Commit

Permalink
Prototyping: Duckdb based metrics calculation (#2469)
Browse files Browse the repository at this point in the history
* Reorder metrics queries

* wip

* more wip

* test

* Fixes

* fix

* tester

* test

* test

* test

* test

* More fixes

* test

* test

* test

* test

* more testing

* fixes

* add connectorx

* big rewrite, try using trino as part of the export mechanism

* add bokeh

* test

* Update dockerfile

* Clean up some in progress work
  • Loading branch information
ravenac95 authored Nov 19, 2024
1 parent c6e20ac commit 34df5fb
Show file tree
Hide file tree
Showing 14 changed files with 1,109 additions and 67 deletions.
4 changes: 3 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,6 @@ Dockerfile

# Github directory
.github/scripts
.git
.git
logs
.mypy_cache
10 changes: 6 additions & 4 deletions docker/images/dagster-dask/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ FROM ghcr.io/opensource-observer/oso-public-vars:latest AS public_vars
FROM ubuntu:jammy

ARG GCLOUD_VERSION=478.0.0
ARG PYTHON_VERSION=3.12
ARG PYTHON_PACKAGE=python3.12

ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && \
apt-get install -y software-properties-common && \
add-apt-repository ppa:deadsnakes/ppa && \
apt-get update && \
apt-get install -y python3.12 && \
apt-get install -y ${PYTHON_PACKAGE} && \
apt-get install -y curl git && \
curl -o get-pip.py https://bootstrap.pypa.io/get-pip.py && \
python3.12 get-pip.py && \
pip3.12 install poetry && \
python${PYTHON_VERSION} get-pip.py && \
pip${PYTHON_VERSION} install poetry && \
curl -o gcloud.tar.gz https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-cli-${GCLOUD_VERSION}-linux-x86_64.tar.gz && \
tar xvf gcloud.tar.gz && \
bash ./google-cloud-sdk/install.sh && \
Expand Down Expand Up @@ -48,6 +50,6 @@ ENV DAGSTER_DBT_TARGET_BASE_DIR=/dbt_targets

COPY --from=public_vars ./public/vars.env /usr/src/app/vars.env
RUN mkdir -p ${DAGSTER_DBT_TARGET_BASE_DIR} && \
python3.12 -m oso_dagster.compile --additional-vars /usr/src/app/vars.env
python${PYTHON_VERSION} -m oso_dagster.compile --additional-vars /usr/src/app/vars.env

ENTRYPOINT []
4 changes: 2 additions & 2 deletions ops/tf-modules/warehouse-cluster/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,12 @@ locals {
# SQLMesh Workers
{
name = "${var.cluster_name}-sqlmesh-worker-node-pool"
machine_type = "n1-highmem-32"
machine_type = "n1-highmem-16"
node_locations = join(",", var.cluster_zones)
min_count = 0
max_count = 10
local_ssd_count = 0
local_ssd_ephemeral_storage_count = 2
local_ssd_ephemeral_storage_count = 1
spot = false
disk_size_gb = 100
disk_type = "pd-standard"
Expand Down
144 changes: 143 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ google-auth = "^2.34.0"
pillow = "^10.4.0"
dagster-k8s = "^0.24.6"
pyiceberg = { extras = ["hive"], version = "^0.7.1" }
connectorx = "^0.4.0"
bokeh = "^3.6.1"


[tool.poetry.scripts]
Expand Down
2 changes: 2 additions & 0 deletions warehouse/metrics_mesh/fixtures/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
This folder contains both the data generator and the csv most recently generated
from that data generator. This is so we can easily update tests as we need.
32 changes: 16 additions & 16 deletions warehouse/metrics_mesh/models/metrics_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,22 @@
ref="stars.sql",
time_aggregations=["daily", "weekly", "monthly"],
),
"active_addresses": MetricQueryDef(
ref="active_addresses.sql",
time_aggregations=["daily", "weekly", "monthly"],
),
"commits": MetricQueryDef(
ref="commits.sql",
time_aggregations=["daily", "weekly", "monthly"],
),
"forks": MetricQueryDef(
ref="forks.sql",
time_aggregations=["daily", "weekly", "monthly"],
),
"gas_fees": MetricQueryDef(
ref="gas_fees.sql",
time_aggregations=["daily", "weekly", "monthly"],
),
# This defines something with a rolling option that allows you to look back
# to some arbitrary window. So you specify the window and specify the unit.
# The unit and the window are used to pass in variables to the query. So it's
Expand Down Expand Up @@ -60,22 +76,6 @@
cron="@daily",
),
),
"active_addresses": MetricQueryDef(
ref="active_addresses.sql",
time_aggregations=["daily", "weekly", "monthly"],
),
"commits": MetricQueryDef(
ref="commits.sql",
time_aggregations=["daily", "weekly", "monthly"],
),
"forks": MetricQueryDef(
ref="forks.sql",
time_aggregations=["daily", "weekly", "monthly"],
),
"gas_fees": MetricQueryDef(
ref="gas_fees.sql",
time_aggregations=["daily", "weekly", "monthly"],
),
"change_in_30_developer_activity": MetricQueryDef(
vars={
"comparison_interval": 30,
Expand Down
20 changes: 20 additions & 0 deletions warehouse/metrics_tools/compute/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Sets up a dask cluster
"""

import typing as t
from dask_kubernetes.operator import KubeCluster


def start_duckdb_cluster(
namespace: str,
gcs_key_id: str,
gcs_secret: str,
duckdb_path: str,
cluster_spec: t.Optional[dict] = None,
):
options: t.Dict[str, t.Any] = {"namespace": namespace}
if cluster_spec:
options["custom_cluster_spec"] = cluster_spec
cluster = KubeCluster(**options)
cluster.adapt(minimum=6, maximum=6)
return cluster
Loading

0 comments on commit 34df5fb

Please sign in to comment.