copy over existing data pipeline from chai
Sep 27, 2024
# directories

# other files
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.

# data files
# chai-oss
tea's CHAI package ranker
# data pipeline

inspiration [here](

this is an attempt at an open-source data pipeline, from which we can build our app that
ranks open-source projects. all pieces for this are managed by `docker-compose`. there
are 3 services to it:

1. db: postgres to store package specific data
1. alembic: for running migrations
1. pipeline: which fetches and writes data

first run `mkdir -p data/{crates,pkgx,homebrew,npm,pypi,rubys}`, to setup the data
directory where the fetchers will store the data.

then, running `docker compose up` will setup the db and run the pipeline. a successful
run will look something like this:

db-1 | 2024-09-23 18:33:31.199 UTC [1] LOG: listening on IPv4 address "", port 5432
db-1 | 2024-09-23 18:33:31.199 UTC [1] LOG: listening on IPv6 address "::", port 5432
db-1 | 2024-09-23 18:33:31.202 UTC [1] LOG: listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432"
db-1 | 2024-09-23 18:33:31.230 UTC [30] LOG: database system was shut down at 2024-09-23 18:04:05 UTC
db-1 | 2024-09-23 18:33:31.242 UTC [1] LOG: database system is ready to accept connections
alembic-1 | db:5432 - accepting connections
alembic-1 | INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
alembic-1 | INFO [alembic.runtime.migration] Will assume transactional DDL.
alembic-1 | db currently at 0db06140525f (head)
alembic-1 | INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
alembic-1 | INFO [alembic.runtime.migration] Will assume transactional DDL.
alembic-1 | migrations run
alembic-1 exited with code 0
alembic-1 | postgresql://postgres:s3cr3t@db:5432/chai
alembic-1 | s3cr3t
pipeline-1 | 0.01: [crates_orchestrator]: [DEBUG]: logging is working
pipeline-1 | 0.01: [main_pipeline]: [DEBUG]: logging is working
pipeline-1 | 0.01: [DB]: [DEBUG]: logging is working
pipeline-1 | 0.03: [DB]: [DEBUG]: created engine
pipeline-1 | 0.03: [DB]: [DEBUG]: created session
pipeline-1 | 0.03: [DB]: [DEBUG]: connected to postgresql://postgres:s3cr3t@db:5432/chai
pipeline-1 | 0.03: [crates_orchestrator]: fetching crates packages
pipeline-1 | 0.03: [crates_fetcher]: [DEBUG]: logging is working
pipeline-1 | 0.03: [crates_fetcher]: [DEBUG]: adding package manager crates

> [!TIP]
> to force it, `docker-compose up --force-recreate --build`
## Hard Reset

if at all you need to do a hard reset, here's the steps

1. `rm -rf db/data`: removes all the data that was loaded into the db
1. `rm -rf .venv`: if you created a virtual environment for local dev, this removes it
1. `rm -rf data`: removes all the data the fetcher is putting
1. `docker system prune -a -f --volumes`: removes **everything** docker-related

> step 4 deletes all your docker careful
## Alembic Alternatives
- sqlx command line tool to manage migrations, alongside models for sqlx in rust
- vapor's migrations are written in swift

## FAQs / common issues

1. the database url is `postgresql://postgres:s3cr3t@localhost:5435/chai`, and is used
as `CHAI_DATABASE_URL` in the environment.
1. the command `./` is used to run migrations, and you might need to
`chmod +x alembic/` so that it can be executed
1. the command `./` is used to run the pipeline, and you might need to
`chmod +x src/` so that it can be executed
1. migrations sometimes don't apply before the service starts, so you might need to
manually apply them:

cd alembic
alembic upgrade head

## tasks

these are tasks that can be run using if you have pkgx, just run `dev` to
inject into your environment. if you don't...go get it.

### reset

rm -rf db/data data .venv

### setup

mkdir -p data/{crates,pkgx,homebrew,npm,pypi,rubys}

### local-dev

uv venv
cd src
uv pip install -r requirements.txt

### chai-start

Requires: setup
Inputs: FORCE
Env: FORCE=not-force

if [ "$FORCE" = "force" ]; then
docker-compose up --force-recreate --build -d
docker-compose up -d
export CHAI_DATABASE_URL="postgresql://postgres:s3cr3t@localhost:5435/chai"

### chai-stop

docker-compose down

### db-reset

Requires: chai-stop

rm -rf db/data

### db-logs

docker-compose logs db

### db-generate-migration

Env: CHAI_DATABASE_URL=postgresql://postgres:s3cr3t@localhost:5435/chai

cd alembic
alembic revision --autogenerate -m "$MIGRATION_NAME"

### db-upgrade

Env: CHAI_DATABASE_URL=postgresql://postgres:s3cr3t@localhost:5435/chai

cd alembic
alembic upgrade head

### db-downgrade

Inputs: STEP
Env: CHAI_DATABASE_URL=postgresql://postgres:s3cr3t@localhost:5435/chai

cd alembic
alembic downgrade -$STEP

### db

psql "postgresql://postgres:s3cr3t@localhost:5435/chai"

### db-list-packages

psql "postgresql://postgres:s3cr3t@localhost:5435/chai" -c "SELECT count(id) FROM packages;"

### db-list-history

psql "postgresql://postgres:s3cr3t@localhost:5435/chai" -c "SELECT * FROM load_history;"
# this .pkgx.yaml file is only for alembic

dependencies: 16 1 2
FROM pkgxdev/pkgx:latest
# WORKDIR /app

# # install alembic
# COPY .pkgx.yaml .
# RUN dev

RUN pkgx install^1^2^16
script_location = .
file_template = %%(year)d%%(month).2d%%(day).2d_%%(hour).2d%%(minute).2d-%%(slug)s

prepend_sys_path = ..
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.

sqlalchemy.url = ${env:CHAI_DATABASE_URL}

# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# TODO: this doesn't work rn
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME

# Logging configuration
keys = root,sqlalchemy,alembic

keys = console

keys = generic

level = WARN
handlers = console
qualname =

level = WARN
handlers =
qualname = sqlalchemy.engine

level = INFO
handlers =
qualname = alembic

class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
import os
from logging.config import fileConfig

from alembic import context
from sqlalchemy import engine_from_config, pool
from src.pipeline.models import Base

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# interpret the config file for Python logging.
if config.config_file_name is not None:

# metadata for all models
target_metadata = Base.metadata

# get database url
database_url = os.getenv("CHAI_DATABASE_URL")
if database_url:
config.set_main_option("sqlalchemy.url", database_url)

def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
url = config.get_main_option("sqlalchemy.url")
dialect_opts={"paramstyle": "named"},

with context.begin_transaction():

def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),

with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)

with context.begin_transaction():

if context.is_offline_mode():
# wait for db to be ready
until pg_isready -h db -p 5432 -U postgres; do
echo "waiting for database..."
sleep 2

# migrate
echo "db currently at $(pkgx +alembic alembic current)"
pkgx +alembic alembic upgrade head
echo "migrations run"
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
${imports if imports else ""}

# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}

def upgrade() -> None:
${upgrades if upgrades else "pass"}

def downgrade() -> None:
${downgrades if downgrades else "pass"}

