From be1c690b947176f1d8628b0be07aa65fd420bc08 Mon Sep 17 00:00:00 2001 From: Sanchit Ram Arvind Date: Fri, 27 Sep 2024 14:57:16 -0500 Subject: [PATCH] copy over existing data pipeline from chai --- .dockerignore | 8 + .gitignore | 4 + README.md | 194 ++++++++++++- alembic/.pkgx.yaml | 6 + alembic/Dockerfile | 8 + alembic/alembic.ini | 53 ++++ alembic/env.py | 70 +++++ alembic/run_migrations.sh | 12 + alembic/script.py.mako | 26 ++ .../20240918_1200-initial_migration.py | 74 +++++ .../20240923_0821-add_load_history.py | 33 +++ .../versions/20240925_0808-add_users_urls.py | 269 ++++++++++++++++++ db/init-scripts/init-script.sql | 7 + docker-compose.yml | 47 +++ pkgx.yaml | 11 + pyproject.toml | 30 ++ src/.pkgx.yaml | 6 + src/Dockerfile | 12 + src/pipeline/crates.py | 61 ++++ src/pipeline/main.py | 44 +++ src/pipeline/models/__init__.py | 142 +++++++++ src/pipeline/pkgx.py | 58 ++++ src/pipeline/utils/fetcher.py | 108 +++++++ src/pipeline/utils/logger.py | 45 +++ src/pipeline/utils/pg.py | 166 +++++++++++ src/pipeline/utils/transformer.py | 165 +++++++++++ src/requirements.txt | 32 +++ src/run_pipeline.sh | 13 + 28 files changed, 1702 insertions(+), 2 deletions(-) create mode 100644 .dockerignore create mode 100644 alembic/.pkgx.yaml create mode 100644 alembic/Dockerfile create mode 100644 alembic/alembic.ini create mode 100644 alembic/env.py create mode 100755 alembic/run_migrations.sh create mode 100644 alembic/script.py.mako create mode 100644 alembic/versions/20240918_1200-initial_migration.py create mode 100644 alembic/versions/20240923_0821-add_load_history.py create mode 100644 alembic/versions/20240925_0808-add_users_urls.py create mode 100644 db/init-scripts/init-script.sql create mode 100644 docker-compose.yml create mode 100644 pkgx.yaml create mode 100644 pyproject.toml create mode 100644 src/.pkgx.yaml create mode 100644 src/Dockerfile create mode 100644 src/pipeline/crates.py create mode 100644 src/pipeline/main.py create mode 100644 src/pipeline/models/__init__.py create mode 100644 src/pipeline/pkgx.py create mode 100644 src/pipeline/utils/fetcher.py create mode 100644 src/pipeline/utils/logger.py create mode 100644 src/pipeline/utils/pg.py create mode 100644 src/pipeline/utils/transformer.py create mode 100644 src/requirements.txt create mode 100755 src/run_pipeline.sh diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..0384b02 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +# directories +data/ +db/ +.venv/ + +# other files +.gitignore +docker-compose.yml \ No newline at end of file diff --git a/.gitignore b/.gitignore index 82f9275..779076d 100644 --- a/.gitignore +++ b/.gitignore @@ -160,3 +160,7 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# data files +data +db/data \ No newline at end of file diff --git a/README.md b/README.md index b0fc48b..99c70b1 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,192 @@ -# chai-oss -tea's CHAI package ranker +# data pipeline + +inspiration [here](https://github.com/vbelz/Data-pipeline-twitter) + +this is an attempt at an open-source data pipeline, from which we can build our app that +ranks open-source projects. all pieces for this are managed by `docker-compose`. there +are 3 services to it: + +1. db: postgres to store package specific data +1. alembic: for running migrations +1. pipeline: which fetches and writes data + +first run `mkdir -p data/{crates,pkgx,homebrew,npm,pypi,rubys}`, to setup the data +directory where the fetchers will store the data. + +then, running `docker compose up` will setup the db and run the pipeline. a successful +run will look something like this: + +``` +db-1 | 2024-09-23 18:33:31.199 UTC [1] LOG: listening on IPv4 address "0.0.0.0", port 5432 +db-1 | 2024-09-23 18:33:31.199 UTC [1] LOG: listening on IPv6 address "::", port 5432 +db-1 | 2024-09-23 18:33:31.202 UTC [1] LOG: listening on Unix socket "/var/run/postgresql/.s.PGSQL.5432" +db-1 | 2024-09-23 18:33:31.230 UTC [30] LOG: database system was shut down at 2024-09-23 18:04:05 UTC +db-1 | 2024-09-23 18:33:31.242 UTC [1] LOG: database system is ready to accept connections +alembic-1 | db:5432 - accepting connections +alembic-1 | INFO [alembic.runtime.migration] Context impl PostgresqlImpl. +alembic-1 | INFO [alembic.runtime.migration] Will assume transactional DDL. +alembic-1 | db currently at 0db06140525f (head) +alembic-1 | INFO [alembic.runtime.migration] Context impl PostgresqlImpl. +alembic-1 | INFO [alembic.runtime.migration] Will assume transactional DDL. +alembic-1 | migrations run +alembic-1 exited with code 0 +alembic-1 | postgresql://postgres:s3cr3t@db:5432/chai +alembic-1 | s3cr3t +pipeline-1 | 0.01: [crates_orchestrator]: [DEBUG]: logging is working +pipeline-1 | 0.01: [main_pipeline]: [DEBUG]: logging is working +pipeline-1 | 0.01: [DB]: [DEBUG]: logging is working +pipeline-1 | 0.03: [DB]: [DEBUG]: created engine +pipeline-1 | 0.03: [DB]: [DEBUG]: created session +pipeline-1 | 0.03: [DB]: [DEBUG]: connected to postgresql://postgres:s3cr3t@db:5432/chai +pipeline-1 | 0.03: [crates_orchestrator]: fetching crates packages +pipeline-1 | 0.03: [crates_fetcher]: [DEBUG]: logging is working +pipeline-1 | 0.03: [crates_fetcher]: [DEBUG]: adding package manager crates +``` + +> [!TIP] +> +> to force it, `docker-compose up --force-recreate --build` + +## Hard Reset + +if at all you need to do a hard reset, here's the steps + +1. `rm -rf db/data`: removes all the data that was loaded into the db +1. `rm -rf .venv`: if you created a virtual environment for local dev, this removes it +1. `rm -rf data`: removes all the data the fetcher is putting +1. `docker system prune -a -f --volumes`: removes **everything** docker-related + +> [!WARNING] +> +> step 4 deletes all your docker stuff...be careful + + + + +## FAQs / common issues + +1. the database url is `postgresql://postgres:s3cr3t@localhost:5435/chai`, and is used + as `CHAI_DATABASE_URL` in the environment. +1. the command `./run_migrations.sh` is used to run migrations, and you might need to + `chmod +x alembic/run_migrations.sh` so that it can be executed +1. the command `./run_pipeline.sh` is used to run the pipeline, and you might need to + `chmod +x src/run_pipeline.sh` so that it can be executed +1. migrations sometimes don't apply before the service starts, so you might need to + manually apply them: + + ```sh + cd alembic + alembic upgrade head + ``` + +## tasks + +these are tasks that can be run using xcfile.dev. if you have pkgx, just run `dev` to +inject into your environment. if you don't...go get it. + +### reset + +```sh +rm -rf db/data data .venv +``` + +### setup + +```sh +mkdir -p data/{crates,pkgx,homebrew,npm,pypi,rubys} +``` + +### local-dev + +```sh +uv venv +cd src +uv pip install -r requirements.txt +``` + +### chai-start + +Requires: setup +Inputs: FORCE +Env: FORCE=not-force + +```sh +if [ "$FORCE" = "force" ]; then + docker-compose up --force-recreate --build -d +else + docker-compose up -d +fi +export CHAI_DATABASE_URL="postgresql://postgres:s3cr3t@localhost:5435/chai" +``` + +### chai-stop + +```sh +docker-compose down +``` + +### db-reset + +Requires: chai-stop + +```sh +rm -rf db/data +``` + +### db-logs + +```sh +docker-compose logs db +``` + +### db-generate-migration + +Inputs: MIGRATION_NAME +Env: CHAI_DATABASE_URL=postgresql://postgres:s3cr3t@localhost:5435/chai + +```sh +cd alembic +alembic revision --autogenerate -m "$MIGRATION_NAME" +``` + +### db-upgrade + +Env: CHAI_DATABASE_URL=postgresql://postgres:s3cr3t@localhost:5435/chai + +```sh +cd alembic +alembic upgrade head +``` + +### db-downgrade + +Inputs: STEP +Env: CHAI_DATABASE_URL=postgresql://postgres:s3cr3t@localhost:5435/chai + +```sh +cd alembic +alembic downgrade -$STEP +``` + +### db + +```sh +psql "postgresql://postgres:s3cr3t@localhost:5435/chai" +``` + +### db-list-packages + +```sh +psql "postgresql://postgres:s3cr3t@localhost:5435/chai" -c "SELECT count(id) FROM packages;" +``` + +### db-list-history + +```sh +psql "postgresql://postgres:s3cr3t@localhost:5435/chai" -c "SELECT * FROM load_history;" +``` diff --git a/alembic/.pkgx.yaml b/alembic/.pkgx.yaml new file mode 100644 index 0000000..025cfe3 --- /dev/null +++ b/alembic/.pkgx.yaml @@ -0,0 +1,6 @@ +# this .pkgx.yaml file is only for alembic + +dependencies: + postgresql.org: 16 + alembic.sqlalchemy.org: 1 + psycopg.org/psycopg2: 2 diff --git a/alembic/Dockerfile b/alembic/Dockerfile new file mode 100644 index 0000000..e1389ad --- /dev/null +++ b/alembic/Dockerfile @@ -0,0 +1,8 @@ +FROM pkgxdev/pkgx:latest +# WORKDIR /app + +# # install alembic +# COPY .pkgx.yaml . +# RUN dev + +RUN pkgx install alembic.sqlalchemy.org^1 psycopg.org/psycopg2^2 postgresql.org^16 \ No newline at end of file diff --git a/alembic/alembic.ini b/alembic/alembic.ini new file mode 100644 index 0000000..8342a05 --- /dev/null +++ b/alembic/alembic.ini @@ -0,0 +1,53 @@ +[alembic] +script_location = . +file_template = %%(year)d%%(month).2d%%(day).2d_%%(hour).2d%%(minute).2d-%%(slug)s + +prepend_sys_path = .. +version_path_separator = os # Use os.pathsep. Default configuration used for new projects. + +# URL +sqlalchemy.url = ${env:CHAI_DATABASE_URL} + + +[post_write_hooks] +# lint with attempts to fix using "ruff" - use the exec runner, execute a binary +# TODO: this doesn't work rn +# hooks = ruff +# ruff.type = exec +# ruff.executable = %(here)s/.venv/bin/ruff +# ruff.options = --fix REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/alembic/env.py b/alembic/env.py new file mode 100644 index 0000000..be70d2f --- /dev/null +++ b/alembic/env.py @@ -0,0 +1,70 @@ +import os +from logging.config import fileConfig + +from alembic import context +from sqlalchemy import engine_from_config, pool +from src.pipeline.models import Base + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# interpret the config file for Python logging. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# metadata for all models +target_metadata = Base.metadata + +# get database url +database_url = os.getenv("CHAI_DATABASE_URL") +if database_url: + config.set_main_option("sqlalchemy.url", database_url) + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/alembic/run_migrations.sh b/alembic/run_migrations.sh new file mode 100755 index 0000000..6e042f1 --- /dev/null +++ b/alembic/run_migrations.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# wait for db to be ready +until pg_isready -h db -p 5432 -U postgres; do + echo "waiting for database..." + sleep 2 +done + +# migrate +echo "db currently at $(pkgx +alembic +psycopg.org/psycopg2 alembic current)" +pkgx +alembic +psycopg.org/psycopg2 alembic upgrade head +echo "migrations run" \ No newline at end of file diff --git a/alembic/script.py.mako b/alembic/script.py.mako new file mode 100644 index 0000000..fbc4b07 --- /dev/null +++ b/alembic/script.py.mako @@ -0,0 +1,26 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} diff --git a/alembic/versions/20240918_1200-initial_migration.py b/alembic/versions/20240918_1200-initial_migration.py new file mode 100644 index 0000000..e82f3fc --- /dev/null +++ b/alembic/versions/20240918_1200-initial_migration.py @@ -0,0 +1,74 @@ +"""initial migration + +Revision ID: 14ea939f4bf7 +Revises: +Create Date: 2024-09-18 12:00:22.201699 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "14ea939f4bf7" +down_revision: Union[str, None] = None +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "projects", + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("derived_id", sa.String(), nullable=False, unique=True), + sa.Column("name", sa.String(), nullable=False), + sa.Column("package_manager", sa.String(), nullable=False), + sa.Column("repo", sa.String(), nullable=True), + sa.Column("record_created_at", sa.DateTime(), nullable=False), + sa.Column("record_updated_at", sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("pk_projects")), + ) + op.create_table( + "versions", + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("project_id", sa.UUID(), nullable=False), + sa.Column("version", sa.String(), nullable=False), + sa.Column("record_created_at", sa.DateTime(), nullable=False), + sa.Column("record_updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint( + ["project_id"], + ["projects.id"], + name=op.f("fk_versions_project_id_projects"), + ), + sa.PrimaryKeyConstraint("id", name=op.f("pk_versions")), + ) + op.create_table( + "depends_on", + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("version_id", sa.UUID(), nullable=False), + sa.Column("dependency_id", sa.UUID(), nullable=False), + sa.Column("dependency_type", sa.String(), nullable=True), + sa.Column("semver_range", sa.String(), nullable=True), + sa.Column("record_created_at", sa.DateTime(), nullable=False), + sa.Column("record_updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint( + ["version_id"], + ["versions.id"], + name=op.f("fk_depends_on_version_id_versions"), + ), + sa.ForeignKeyConstraint( + ["dependency_id"], + ["projects.id"], + name=op.f("fk_depends_on_dependency_id_projects"), + ), + sa.PrimaryKeyConstraint("id", name=op.f("pk_depends_on")), + ) + + +def downgrade() -> None: + op.drop_table("depends_on") + op.drop_table("versions") + op.drop_table("projects") diff --git a/alembic/versions/20240923_0821-add_load_history.py b/alembic/versions/20240923_0821-add_load_history.py new file mode 100644 index 0000000..2c4d97d --- /dev/null +++ b/alembic/versions/20240923_0821-add_load_history.py @@ -0,0 +1,33 @@ +"""add_load_history + +Revision ID: 0db06140525f +Revises: 14ea939f4bf7 +Create Date: 2024-09-23 08:21:52.454272 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "0db06140525f" +down_revision: Union[str, None] = "14ea939f4bf7" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "load_history", + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("package_manager", sa.String(), nullable=False), + sa.Column("record_created_at", sa.DateTime(), nullable=False), + sa.Column("record_updated_at", sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("pk_load_history")), + ) + + +def downgrade() -> None: + op.drop_table("load_history") diff --git a/alembic/versions/20240925_0808-add_users_urls.py b/alembic/versions/20240925_0808-add_users_urls.py new file mode 100644 index 0000000..adb06e2 --- /dev/null +++ b/alembic/versions/20240925_0808-add_users_urls.py @@ -0,0 +1,269 @@ +"""add_users_urls + +Revision ID: a97f16d0656a +Revises: 0db06140525f +Create Date: 2024-09-25 08:08:30.574254 + +""" + +# TODO: this one renames the projects table to packages +# it's destructive, so we should definitely redo the migrations +# so that it's clean from here on out + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = "a97f16d0656a" +down_revision: Union[str, None] = "0db06140525f" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "package_managers", + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("name", sa.String(), nullable=False), + sa.Column("record_created_at", sa.DateTime(), nullable=False), + sa.Column("record_updated_at", sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("pk_package_managers")), + sa.UniqueConstraint("name", name=op.f("uq_package_managers_name")), + ) + op.create_table( + "url_types", + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("name", sa.String(), nullable=False), + sa.Column("record_created_at", sa.DateTime(), nullable=False), + sa.Column("record_updated_at", sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("pk_url_types")), + sa.UniqueConstraint("name", name=op.f("uq_url_types_name")), + ) + op.create_table( + "urls", + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("url", sa.String(), nullable=False), + sa.Column("record_created_at", sa.DateTime(), nullable=False), + sa.Column("record_updated_at", sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("pk_urls")), + sa.UniqueConstraint("url", name=op.f("uq_urls_url")), + ) + op.create_table( + "user_types", + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("name", sa.String(), nullable=False), + sa.Column("record_created_at", sa.DateTime(), nullable=False), + sa.Column("record_updated_at", sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("pk_user_types")), + sa.UniqueConstraint("name", name=op.f("uq_user_types_name")), + ) + op.create_table( + "users", + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("username", sa.String(), nullable=False), + sa.Column("record_created_at", sa.DateTime(), nullable=False), + sa.Column("record_updated_at", sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("pk_users")), + sa.UniqueConstraint("username", name=op.f("uq_users_username")), + ) + op.create_table( + "packages", + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("derived_id", sa.String(), nullable=False), + sa.Column("name", sa.String(), nullable=False), + sa.Column("package_manager_id", sa.UUID(), nullable=False), + sa.Column("record_created_at", sa.DateTime(), nullable=False), + sa.Column("record_updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint( + ["package_manager_id"], + ["package_managers.id"], + name=op.f("fk_packages_package_manager_id_package_managers"), + ), + sa.PrimaryKeyConstraint("id", name=op.f("pk_packages")), + sa.UniqueConstraint("derived_id", name=op.f("uq_packages_derived_id")), + ) + op.create_table( + "user_urls", + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("user_id", sa.UUID(), nullable=False), + sa.Column("url_id", sa.UUID(), nullable=False), + sa.Column("user_type_id", sa.UUID(), nullable=False), + sa.Column("record_created_at", sa.DateTime(), nullable=False), + sa.Column("record_updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint( + ["url_id"], ["urls.id"], name=op.f("fk_user_urls_url_id_urls") + ), + sa.ForeignKeyConstraint( + ["user_id"], ["users.id"], name=op.f("fk_user_urls_user_id_users") + ), + sa.ForeignKeyConstraint( + ["user_type_id"], + ["user_types.id"], + name=op.f("fk_user_urls_user_type_id_user_types"), + ), + sa.PrimaryKeyConstraint("id", name=op.f("pk_user_urls")), + ) + op.create_table( + "package_urls", + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("package_id", sa.UUID(), nullable=False), + sa.Column("url_id", sa.UUID(), nullable=False), + sa.Column("url_type_id", sa.UUID(), nullable=False), + sa.Column("record_created_at", sa.DateTime(), nullable=False), + sa.Column("record_updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint( + ["package_id"], + ["packages.id"], + name=op.f("fk_package_urls_package_id_packages"), + ), + sa.ForeignKeyConstraint( + ["url_id"], ["urls.id"], name=op.f("fk_package_urls_url_id_urls") + ), + sa.ForeignKeyConstraint( + ["url_type_id"], + ["url_types.id"], + name=op.f("fk_package_urls_url_type_id_url_types"), + ), + sa.PrimaryKeyConstraint("id", name=op.f("pk_package_urls")), + ) + op.create_table( + "dependencies", + sa.Column("id", sa.UUID(), nullable=False), + sa.Column("version_id", sa.UUID(), nullable=False), + sa.Column("dependency_id", sa.UUID(), nullable=False), + sa.Column("dependency_type", sa.String(), nullable=True), + sa.Column("semver_range", sa.String(), nullable=True), + sa.Column("record_created_at", sa.DateTime(), nullable=False), + sa.Column("record_updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint( + ["dependency_id"], + ["packages.id"], + name=op.f("fk_dependencies_dependency_id_packages"), + ), + sa.ForeignKeyConstraint( + ["version_id"], + ["versions.id"], + name=op.f("fk_dependencies_version_id_versions"), + ), + sa.PrimaryKeyConstraint("id", name=op.f("pk_dependencies")), + ) + op.add_column( + "load_history", sa.Column("package_manager_id", sa.UUID(), nullable=False) + ) + op.create_foreign_key( + op.f("fk_load_history_package_manager_id_package_managers"), + "load_history", + "package_managers", + ["package_manager_id"], + ["id"], + ) + op.drop_column("load_history", "package_manager") + op.add_column("versions", sa.Column("package_id", sa.UUID(), nullable=False)) + op.drop_constraint( + "fk_versions_project_id_projects", "versions", type_="foreignkey" + ) + op.create_foreign_key( + op.f("fk_versions_package_id_packages"), + "versions", + "packages", + ["package_id"], + ["id"], + ) + op.drop_column("versions", "project_id") + # > DANGER ZONE + # drop the old tables + op.drop_table("depends_on") + op.drop_table("projects") + # < END DANGER ZONE + + +def downgrade() -> None: + op.add_column( + "versions", + sa.Column("project_id", sa.UUID(), autoincrement=False, nullable=False), + ) + op.drop_constraint( + op.f("fk_versions_package_id_packages"), "versions", type_="foreignkey" + ) + op.create_foreign_key( + "fk_versions_project_id_projects", + "versions", + "projects", + ["project_id"], + ["id"], + ) + op.drop_column("versions", "package_id") + op.add_column( + "load_history", + sa.Column("package_manager", sa.VARCHAR(), autoincrement=False, nullable=False), + ) + op.drop_constraint( + op.f("fk_load_history_package_manager_id_package_managers"), + "load_history", + type_="foreignkey", + ) + op.drop_column("load_history", "package_manager_id") + op.create_table( + "projects", + sa.Column("id", sa.UUID(), autoincrement=False, nullable=False), + sa.Column("derived_id", sa.VARCHAR(), autoincrement=False, nullable=False), + sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False), + sa.Column("package_manager", sa.VARCHAR(), autoincrement=False, nullable=False), + sa.Column("repo", sa.VARCHAR(), autoincrement=False, nullable=True), + sa.Column( + "record_created_at", + postgresql.TIMESTAMP(), + autoincrement=False, + nullable=False, + ), + sa.Column( + "record_updated_at", + postgresql.TIMESTAMP(), + autoincrement=False, + nullable=False, + ), + sa.PrimaryKeyConstraint("id", name="pk_projects"), + sa.UniqueConstraint("derived_id", name="uq_projects_derived_id"), + postgresql_ignore_search_path=False, + ) + op.create_table( + "depends_on", + sa.Column("id", sa.UUID(), autoincrement=False, nullable=False), + sa.Column("version_id", sa.UUID(), autoincrement=False, nullable=False), + sa.Column("dependency_id", sa.UUID(), autoincrement=False, nullable=False), + sa.Column("dependency_type", sa.VARCHAR(), autoincrement=False, nullable=True), + sa.Column("semver_range", sa.VARCHAR(), autoincrement=False, nullable=True), + sa.Column( + "record_created_at", + postgresql.TIMESTAMP(), + autoincrement=False, + nullable=False, + ), + sa.Column( + "record_updated_at", + postgresql.TIMESTAMP(), + autoincrement=False, + nullable=False, + ), + sa.ForeignKeyConstraint( + ["dependency_id"], + ["projects.id"], + name="fk_depends_on_dependency_id_projects", + ), + sa.ForeignKeyConstraint( + ["version_id"], ["versions.id"], name="fk_depends_on_version_id_versions" + ), + sa.PrimaryKeyConstraint("id", name="pk_depends_on"), + ) + op.drop_table("dependencies") + op.drop_table("package_urls") + op.drop_table("user_urls") + op.drop_table("packages") + op.drop_table("users") + op.drop_table("user_types") + op.drop_table("urls") + op.drop_table("url_types") + op.drop_table("package_managers") diff --git a/db/init-scripts/init-script.sql b/db/init-scripts/init-script.sql new file mode 100644 index 0000000..5879bd3 --- /dev/null +++ b/db/init-scripts/init-script.sql @@ -0,0 +1,7 @@ +CREATE DATABASE chai; + +\c chai + +CREATE EXTENSION IF NOT EXISTS "pgcrypto"; +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; +-- CREATE EXTENSION IF NOT EXISTS "pg_bigm"; diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..9040799 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,47 @@ +services: + db: + image: postgres + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=s3cr3t + ports: + - "5435:5432" + volumes: + - ./db/data:/var/lib/postgresql/data + - ./db/init-scripts/init-script.sql:/docker-entrypoint-initdb.d/create-database.sql + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres -d chai"] + interval: 5s + timeout: 5s + retries: 5 + + alembic: + build: alembic + environment: + - CHAI_DATABASE_URL=postgresql://postgres:s3cr3t@db:5432/chai + - POSTGRES_PASSWORD=s3cr3t + volumes: + - .:/app + depends_on: + db: + condition: service_healthy + working_dir: /app/alembic + entrypoint: ["./run_migrations.sh"] + + pipeline: + build: src + # TODO: PKG_MANAGER should be configurable + # TODO: do I need PYTHONPATH? + environment: + - CHAI_DATABASE_URL=postgresql://postgres:s3cr3t@db:5432/chai + - PKG_MANAGER=crates + - PYTHONPATH=/app + volumes: + - .:/app + working_dir: /app + depends_on: + db: + condition: service_healthy + alembic: + condition: service_completed_successfully + entrypoint: ["./src/run_pipeline.sh"] \ No newline at end of file diff --git a/pkgx.yaml b/pkgx.yaml new file mode 100644 index 0000000..2ccb8c5 --- /dev/null +++ b/pkgx.yaml @@ -0,0 +1,11 @@ +# this is the pkgx config across all the services covered by docker-compose +dependencies: + python.org: ~3.11 + xcfile.dev: 0 + cli.github.com: 2 + astral.sh/ruff: 0 + astral.sh/uv: 0 + postgresql.org: 16 + docker.com/compose: 2 + alembic.sqlalchemy.org: 1 + psycopg.org/psycopg2: 2 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..bdea2ec --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,30 @@ +[project] +name = "pipeline" +version = "0.1.0" +dependencies = [ + "alembic~=1.12", + "pyyaml~=6.0", + "psycopg2~=2.9", + "requests~=2.31", + "ruff~=0.6", + "sqlalchemy~=2.0", +] +requires-python = "~=3.11" +authors = [{ name = "Sanchit Ram Arvind", email = "sanchit@pkgx.dev" }] +maintainers = [{ name = "Sanchit Ram Arvind", email = "sanchit@pkgx.dev" }] +description = "the open-source package data pipeline" +readme = "README.md" +keywords = ["tea", "package managerment", "data pipeline"] + +[project.urls] +Homepage = "https://tea.xyz" +Repository = "https://github.com/teaxyz/chai.git" + +[project.scripts] +# only useful for local development +chai-pipeline-fetch = "pipeline.main:main" + +[tool.pytest.ini_options] +addopts = ["--import-mode=importlib"] +pythonpath = ["src", "."] +testpaths = ["tests"] diff --git a/src/.pkgx.yaml b/src/.pkgx.yaml new file mode 100644 index 0000000..0124ee4 --- /dev/null +++ b/src/.pkgx.yaml @@ -0,0 +1,6 @@ +# this is the pkgx config for the pipeline + +dependencies: + python.org: ~3.11 + astral.sh/uv: 0 + postgresql.org: 16 diff --git a/src/Dockerfile b/src/Dockerfile new file mode 100644 index 0000000..f17610c --- /dev/null +++ b/src/Dockerfile @@ -0,0 +1,12 @@ +FROM pkgxdev/pkgx +# WORKDIR /app + +# gets pkgx setup +# COPY .pkgx.yaml . +# RUN dev + +# just install everything + +RUN pkgx install python.org^3.11 astral.sh/uv^0 postgresql.org^16 +COPY requirements.txt . +RUN CC=clang pkgx +clang@18 uv pip install -r requirements.txt --system diff --git a/src/pipeline/crates.py b/src/pipeline/crates.py new file mode 100644 index 0000000..a4321bf --- /dev/null +++ b/src/pipeline/crates.py @@ -0,0 +1,61 @@ +from src.pipeline.models import PackageManager +from src.pipeline.utils.fetcher import TarballFetcher +from src.pipeline.utils.logger import Logger +from src.pipeline.utils.pg import DB +from src.pipeline.utils.transformer import CratesTransformer + +FILE_LOCATION = "https://static.crates.io/db-dump.tar.gz" + +logger = Logger("crates_orchestrator", mode=Logger.VERBOSE) + + +def get_crates_packages(db: DB) -> None: + # get crates's package manager id, and add it if it doesn't exist + package_manager_id = db.get_package_manager_id("crates") + if package_manager_id is None: + package_manager_id = db.insert_package_manager("crates") + package_manager = PackageManager(id=package_manager_id, name="crates") + + # use the fetcher to get the fileis from crates itself + logger.log("need to unravel a ~3GB tarball, this takes ~42 seconds") + fetcher = TarballFetcher("crates", FILE_LOCATION) + files = fetcher.fetch() + fetcher.write(files) + + # use the transformer to figure out what we'd need for our ranker + transformer = CratesTransformer() + logger.log("transforming crates packages") + + # load the projects, versions, and dependencies into our db + logger.log("loading crates packages into db, currently this might take a while") + + # TODO: handle auto-generated ids + # auto-generated ids are something this orchestrator does not know about + # I don't wanna get them, I'd rather just collect the ids when I load packages + # and then use those ids to insert the versions + # ughhhh, this is not the best way to do this, but it works for now + + # packages + db.insert_packages(transformer.packages(), package_manager) + + # update the transformer's map with all the db_ids + logger.log("getting loaded pkg_ids to correctly load versions (takes 5 seconds)") + loaded_packages = db.select_packages_by_package_manager(package_manager) + transformer.update_crates_db_ids(loaded_packages) + + # versions + db.insert_versions(transformer.versions()) + + # update the transformer's map with all the db_ids + logger.log("getting loaded ver_ids to correctly load deps (takes 50 seconds)") + loaded_versions = db.select_versions_by_package_manager(package_manager) + transformer.update_crates_versions_db_ids(loaded_versions) + + # dependencies + logger.log("loading crates dependencies into db...this will take the longest") + db.insert_dependencies(transformer.dependencies()) + + # insert load history + db.insert_load_history(package_manager_id) + logger.log("✅ crates") + logger.log("in a new terminal, run README.md/db-list-history to verify") diff --git a/src/pipeline/main.py b/src/pipeline/main.py new file mode 100644 index 0000000..ad74554 --- /dev/null +++ b/src/pipeline/main.py @@ -0,0 +1,44 @@ +import sys +import traceback +from os import getenv + +from src.pipeline.crates import get_crates_packages +from src.pipeline.pkgx import get_pkgx_packages +from src.pipeline.utils.logger import Logger +from src.pipeline.utils.pg import DB + +logger = Logger("main_pipeline") + + +def main(): + if len(sys.argv) != 2: + print("usage: python main.py ") + sys.exit(1) + + if getenv("CHAI_DATABASE_URL") is None: + logger.error("CHAI_DATABASE_URL is not set") + sys.exit(1) + + # initialize the db and handoff everywhere + db = DB() + package_manager = sys.argv[1] + + try: + if package_manager == "pkgx": + get_pkgx_packages(db) + elif package_manager == "crates": + get_crates_packages(db) + else: + print("invalid package manager") + sys.exit(1) + except Exception as e: + # collect all the exception information + exc_type, exc_value, exc_traceback = sys.exc_info() + # TODO: move this to logger class + logger.error(f"{exc_type.__name__}: {exc_value}") + logger.error(f"traceback: {''.join(traceback.format_tb(exc_traceback))}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/src/pipeline/models/__init__.py b/src/pipeline/models/__init__.py new file mode 100644 index 0000000..1a520a6 --- /dev/null +++ b/src/pipeline/models/__init__.py @@ -0,0 +1,142 @@ +# __init__.py +from sqlalchemy import Column, DateTime, ForeignKey, Integer, MetaData, String, func +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.ext.declarative import declarative_base + +naming_convention = { + "ix": "ix_%(column_0_label)s", + "uq": "uq_%(table_name)s_%(column_0_name)s", + "ck": "ck_%(table_name)s_%(constraint_name)s", + "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", + "pk": "pk_%(table_name)s", +} +metadata = MetaData(naming_convention=naming_convention) +Base = declarative_base(metadata=metadata) + + +# record_created_at, record_updated_at are always set to the current timestamp +# and should always be part of every single table +class Package(Base): + __tablename__ = "packages" + id = Column(UUID(as_uuid=True), primary_key=True, default=func.uuid_generate_v4()) + derived_id = Column(String, nullable=False, unique=True) # package_manager/name + name = Column(String, nullable=False) + package_manager_id = Column( + UUID(as_uuid=True), ForeignKey("package_managers.id"), nullable=False + ) + record_created_at = Column(DateTime, nullable=False, default=func.now()) + record_updated_at = Column(DateTime, nullable=False, default=func.now()) + + def to_dict(self): + return { + "derived_id": self.derived_id, + "name": self.name, + "package_manager_id": self.package_manager_id, + } + + +class PackageManager(Base): + __tablename__ = "package_managers" + id = Column(UUID(as_uuid=True), primary_key=True, default=func.uuid_generate_v4()) + name = Column(String, nullable=False, unique=True) + record_created_at = Column(DateTime, nullable=False, default=func.now()) + record_updated_at = Column(DateTime, nullable=False, default=func.now()) + + +# this is a collection of all the different type of URLs +class URLs(Base): + __tablename__ = "urls" + id = Column(UUID(as_uuid=True), primary_key=True, default=func.uuid_generate_v4()) + url = Column(String, nullable=False, unique=True) + record_created_at = Column(DateTime, nullable=False, default=func.now()) + record_updated_at = Column(DateTime, nullable=False, default=func.now()) + + +class URLType(Base): + __tablename__ = "url_types" + id = Column(UUID(as_uuid=True), primary_key=True, default=func.uuid_generate_v4()) + name = Column(String, nullable=False, unique=True) # repo, homepage, etc + record_created_at = Column(DateTime, nullable=False, default=func.now()) + record_updated_at = Column(DateTime, nullable=False, default=func.now()) + + +class Version(Base): + __tablename__ = "versions" + id = Column(UUID(as_uuid=True), primary_key=True, default=func.uuid_generate_v4()) + package_id = Column(UUID(as_uuid=True), ForeignKey("packages.id"), nullable=False) + version = Column(String, nullable=False) + record_created_at = Column(DateTime, nullable=False, default=func.now()) + record_updated_at = Column(DateTime, nullable=False, default=func.now()) + + def to_dict(self): + return {"package_id": self.package_id, "version": self.version} + + +class DependsOn(Base): + __tablename__ = "dependencies" + id = Column(UUID(as_uuid=True), primary_key=True, default=func.uuid_generate_v4()) + version_id = Column(UUID(as_uuid=True), ForeignKey("versions.id"), nullable=False) + dependency_id = Column( + UUID(as_uuid=True), ForeignKey("packages.id"), nullable=False + ) + # ideally, these are non-nullable but diff package managers are picky about this + dependency_type = Column(String, nullable=True) + semver_range = Column(String, nullable=True) + record_created_at = Column(DateTime, nullable=False) + record_updated_at = Column(DateTime, nullable=False) + + def to_dict(self): + return { + "version_id": self.version_id, + "dependency_id": self.dependency_id, + "dependency_type": self.dependency_type, + "semver_range": self.semver_range, + } + + +class LoadHistory(Base): + __tablename__ = "load_history" + id = Column(UUID(as_uuid=True), primary_key=True, default=func.uuid_generate_v4()) + package_manager_id = Column( + UUID(as_uuid=True), ForeignKey("package_managers.id"), nullable=False + ) + record_created_at = Column(DateTime, nullable=False, default=func.now()) + record_updated_at = Column(DateTime, nullable=False, default=func.now()) + + +class PackageURL(Base): + __tablename__ = "package_urls" + id = Column(UUID(as_uuid=True), primary_key=True, default=func.uuid_generate_v4()) + package_id = Column(UUID(as_uuid=True), ForeignKey("packages.id"), nullable=False) + url_id = Column(UUID(as_uuid=True), ForeignKey("urls.id"), nullable=False) + url_type_id = Column(UUID(as_uuid=True), ForeignKey("url_types.id"), nullable=False) + record_created_at = Column(DateTime, nullable=False, default=func.now()) + record_updated_at = Column(DateTime, nullable=False, default=func.now()) + + +class Users(Base): + __tablename__ = "users" + id = Column(UUID(as_uuid=True), primary_key=True, default=func.uuid_generate_v4()) + username = Column(String, nullable=False, unique=True) + record_created_at = Column(DateTime, nullable=False, default=func.now()) + record_updated_at = Column(DateTime, nullable=False, default=func.now()) + + +class UserTypes(Base): + __tablename__ = "user_types" + id = Column(UUID(as_uuid=True), primary_key=True, default=func.uuid_generate_v4()) + name = Column(String, nullable=False, unique=True) # github, gitlab, crates, etc. + record_created_at = Column(DateTime, nullable=False, default=func.now()) + record_updated_at = Column(DateTime, nullable=False, default=func.now()) + + +class UserURLs(Base): + __tablename__ = "user_urls" + id = Column(UUID(as_uuid=True), primary_key=True, default=func.uuid_generate_v4()) + user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False) + url_id = Column(UUID(as_uuid=True), ForeignKey("urls.id"), nullable=False) + user_type_id = Column( + UUID(as_uuid=True), ForeignKey("user_types.id"), nullable=False + ) + record_created_at = Column(DateTime, nullable=False, default=func.now()) + record_updated_at = Column(DateTime, nullable=False, default=func.now()) diff --git a/src/pipeline/pkgx.py b/src/pipeline/pkgx.py new file mode 100644 index 0000000..1775560 --- /dev/null +++ b/src/pipeline/pkgx.py @@ -0,0 +1,58 @@ +import json +import os + +from requests import RequestException, get +from requests.exceptions import HTTPError +from src.pipeline.utils.logger import Logger + +# env vars +PANTRY_URL = "https://api.github.com/repos/pkgxdev/pantry" +OUTPUT_DIR = "data/pkgx" + +# setup +headers = { + "Authorization": f"Bearer {os.getenv('GITHUB_TOKEN')}", +} + + +def get_contents(path: str) -> list[dict]: + url = f"{PANTRY_URL}/contents/{path}" + response = get(url, headers=headers) + response.raise_for_status() + return response.json() + + +def get_pkgx_packages(logger: Logger) -> None: + try: + packages = {} + projects = get_contents("projects") + + for project in projects: + logger.debug(f"project: {project}") + if project["type"] == "dir": + project_contents = get_contents(project["path"]) + for item in project_contents: + if item["name"] == "package.yml": + response = get(item["download_url"], headers=headers) + packages[project["name"]] = response.text + + # Ensure output directory exists + os.makedirs(OUTPUT_DIR, exist_ok=True) + + # Write packages to JSON file + output_file = os.path.join(OUTPUT_DIR, "pkgx_packages.json") + with open(output_file, "w") as f: + json.dump(packages, f, indent=2) + + except HTTPError as e: + if e.response.status_code == 404: + logger.error("404, probs bad url") + elif e.response.status_code == 401: + logger.error("401, probs bad token") + raise e + except RequestException as e: + logger.error(f"RequestException: {e}") + raise e + except Exception as e: + logger.error(f"Exception: {e}") + raise e diff --git a/src/pipeline/utils/fetcher.py b/src/pipeline/utils/fetcher.py new file mode 100644 index 0000000..da25064 --- /dev/null +++ b/src/pipeline/utils/fetcher.py @@ -0,0 +1,108 @@ +import os +import tarfile +from dataclasses import dataclass +from datetime import datetime +from io import BytesIO +from typing import Any + +from requests import get +from src.pipeline.utils.logger import Logger + + +@dataclass +class Data: + file_path: str + file_name: str + content: Any # json or bytes + + +class Fetcher: + def __init__(self, name: str, source: str): + self.name = name + self.source = source + self.output = f"data/{name}" + self.logger = Logger(f"{name}_fetcher") + + def write(self, files: list[Data]): + """generic write function for some collection of files""" + + # prep the file location + now = datetime.now().strftime("%Y-%m-%d") + root_path = f"{self.output}/{now}" + + # write + # it can be anything - json, tarball, etc. + for item in files: + file_path = item.file_path + file_name = item.file_name + file_content = item.content + full_path = os.path.join(root_path, file_path) + + # make sure the path exists + os.makedirs(full_path, exist_ok=True) + + with open(os.path.join(full_path, file_name), "wb") as f: + self.logger.debug(f"writing {full_path}") + f.write(file_content) + + # update the latest symlink + self.update_symlink(now) + + def update_symlink(self, latest_path: str): + latest_symlink = f"{self.output}/latest" + if os.path.islink(latest_symlink): + self.logger.debug(f"removing existing symlink {latest_symlink}") + os.remove(latest_symlink) + + self.logger.debug(f"creating symlink {latest_symlink} -> {latest_path}") + os.symlink(latest_path, latest_symlink) + + def fetch(self): + response = get(self.source) + try: + response.raise_for_status() + except Exception as e: + self.logger.error(f"error fetching {self.source}: {e}") + raise e + + return response.content + + +class TarballFetcher(Fetcher): + def __init__(self, name: str, source: str): + super().__init__(name, source) + + def fetch(self) -> list[Data]: + content = super().fetch() + + bytes_io_object = BytesIO(content) + bytes_io_object.seek(0) + + files = [] + with tarfile.open(fileobj=bytes_io_object, mode="r:gz") as tar: + for member in tar.getmembers(): + if member.isfile(): + bytes_io_file = BytesIO(tar.extractfile(member).read()) + destination_key = member.name + file_name = destination_key.split("/")[-1] + file_path = "/".join(destination_key.split("/")[:-1]) + self.logger.debug(f"file_path/file_name: {file_path}/{file_name}") + files.append(Data(file_path, file_name, bytes_io_file.read())) + + return files + + +class JSONFetcher(Fetcher): + def __init__(self, name: str, source: str): + super().__init__(name, source) + + def fetch(self): + pass + + +class YAMLFetcher(Fetcher): + def __init__(self, name: str, source: str): + super().__init__(name, source) + + def fetch(self): + pass diff --git a/src/pipeline/utils/logger.py b/src/pipeline/utils/logger.py new file mode 100644 index 0000000..84d6f1a --- /dev/null +++ b/src/pipeline/utils/logger.py @@ -0,0 +1,45 @@ +import time + +# use inspect to print the line of code as well? +# caller = inspect.currentframe().f_back +# filename = caller.f_code.co_filename, lineno = caller.f_lineno + + +def as_minutes(seconds: float) -> float: + return seconds / 60 + + +class Logger: + SILENT = 0 + NORMAL = 1 + VERBOSE = 2 + + def __init__(self, name: str, mode=VERBOSE, start=time.time()) -> None: + self.name = name + self.start = start + self.mode = int(mode) + self.debug("logging is working") + + def print(self, msg: str): + print(f"{self.time_diff():.2f}: [{self.name}]: {msg}") + + def error(self, message): + self.print(f"[ERROR]: {message}") + + def log(self, message): + if self.mode >= Logger.NORMAL: + self.print(f"{message}") + + def debug(self, message): + if self.mode >= Logger.VERBOSE: + self.print(f"[DEBUG]: {message}") + + def warn(self, message): + if self.mode >= Logger.NORMAL: + self.print(f"[WARN]: {message}") + + def is_verbose(self): + return self.mode >= Logger.VERBOSE + + def time_diff(self): + return time.time() - self.start diff --git a/src/pipeline/utils/pg.py b/src/pipeline/utils/pg.py new file mode 100644 index 0000000..d156dc0 --- /dev/null +++ b/src/pipeline/utils/pg.py @@ -0,0 +1,166 @@ +import os +from typing import Iterable, List, Type + +from sqlalchemy import UUID, create_engine +from sqlalchemy.dialects import postgresql +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm.decl_api import DeclarativeMeta +from sqlalchemy.orm.session import Session +from src.pipeline.models import DependsOn, LoadHistory, Package, PackageManager, Version +from src.pipeline.utils.logger import Logger + +CHAI_DATABASE_URL = os.getenv("CHAI_DATABASE_URL") + + +class DB: + def __init__(self): + self.logger = Logger("DB") + self.engine = create_engine(CHAI_DATABASE_URL) + self.session = sessionmaker(self.engine) + self.logger.debug("connected") + + # TODO: move to transformer_v2 + def _batch( + self, + items: Iterable[DeclarativeMeta], + model: Type[DeclarativeMeta], + batch_size: int = 10000, + ) -> None: + """just handles batching logic for any type of model we wanna insert""" + self.logger.debug("starting a batch insert") + with self.session() as session: + batch = [] + for item in items: + batch.append(item) + if len(batch) == batch_size: + self.logger.debug(f"inserting {len(batch)} {model.__name__}") + self._insert_batch(batch, session, model) + batch = [] # reset + + if batch: + self.logger.debug(f"finally, inserting {len(batch)} {model.__name__}") + self._insert_batch(batch, session, model) + + session.commit() # commit here + + def _insert_batch( + self, + batch: List[DeclarativeMeta], + session: Session, + model: Type[DeclarativeMeta], + ) -> None: + """ + inserts a batch of items, any model, into the database + however, this mandates `on conflict do nothing` + """ + # we use statements here, not ORMs because of the on_conflict_do_nothing + # https://github.com/sqlalchemy/sqlalchemy/issues/5374 + stmt = ( + insert(model) + .values([item.to_dict() for item in batch]) + .on_conflict_do_nothing() + ) + session.execute(stmt) + + def insert_packages( + self, package_generator: Iterable[str], package_manager: PackageManager + ) -> List[UUID]: + def package_object_generator(): + for name in package_generator: + yield Package( + derived_id=f"{package_manager.name}/{name}", + name=name, + package_manager_id=package_manager.id, + ) + + return self._batch(package_object_generator(), Package, 10000) + + def insert_versions(self, version_generator: Iterable[dict[str, str]]): + def version_object_generator(): + for item in version_generator: + package_id = item["package_id"] + version = item["version"] + yield Version( + package_id=package_id, + version=version, + ) + + self._batch(version_object_generator(), Version, 10000) + + def insert_dependencies(self, dependency_generator: Iterable[dict[str, str]]): + def dependency_object_generator(): + for item in dependency_generator: + version_id = item["version_id"] + dependency_id = item["dependency_id"] + semver_range = item["semver_range"] + yield DependsOn( + version_id=version_id, + dependency_id=dependency_id, + semver_range=semver_range, + ) + + self._batch(dependency_object_generator(), DependsOn, 10000) + + def insert_load_history(self, package_manager_id: str): + with self.session() as session: + session.add(LoadHistory(package_manager_id=package_manager_id)) + session.commit() + + def get_package_manager_id(self, package_manager: str): + with self.session() as session: + result = ( + session.query(PackageManager).filter_by(name=package_manager).first() + ) + + if result: + self.logger.debug(f"id: {result.id}") + return result.id + return None + + def insert_package_manager(self, package_manager: str) -> UUID | None: + with self.session() as session: + exists = self.get_package_manager_id(package_manager) is not None + if not exists: + result = session.add(PackageManager(name=package_manager)) + session.commit() + id = result.id + self.logger.debug(f"{package_manager}: {id}") + return id + + def select_packages_by_package_manager( + self, package_manager: PackageManager + ) -> List[Package]: + with self.session() as session: + return ( + session.query(Package) + .filter_by(package_manager_id=package_manager.id) + .all() + ) + + def select_versions_by_package_manager( + self, package_manager: PackageManager + ) -> List[Version]: + with self.session() as session: + return ( + session.query(Version) + .join(Package) + .filter(Package.package_manager_id == package_manager.id) + .all() + ) + + def print_statement(self, stmt): + dialect = postgresql.dialect() + compiled_stmt = stmt.compile( + dialect=dialect, compile_kwargs={"literal_binds": True} + ) + self.logger.log(str(compiled_stmt)) + + +if __name__ == "__main__": + db = DB() + # random tests + package_manager = PackageManager( + name="crates", id=db.get_package_manager_id("crates") + ) + print(db.select_package_manager_versions(package_manager)) diff --git a/src/pipeline/utils/transformer.py b/src/pipeline/utils/transformer.py new file mode 100644 index 0000000..8892e1c --- /dev/null +++ b/src/pipeline/utils/transformer.py @@ -0,0 +1,165 @@ +import csv +import os +from dataclasses import dataclass, field +from typing import Dict, Generator, List + +from sqlalchemy import UUID +from src.pipeline.models import Package, Version +from src.pipeline.utils.logger import Logger + +# TODO: do we need to do that? +# we do for now, but we can figure it out later +csv.field_size_limit(10000000) + + +# this is a file buffer +# it WILL NOT write a file anywhere +class Transformer: + def __init__(self, name: str): + self.name = name + self.input = f"data/{name}/latest" + self.logger = Logger(f"{name}_transformer") + + # knows what files to open + def transform(self) -> str: + # opens + pass + + def projects(self, data: str): + pass + + def versions(self, data: str): + pass + + def dependencies(self, data: str): + pass + + +# load this in once: build this dictionary +# {"1.0.0": "semverator", "1.0.1": "semverator"} +# {"semverator": {"versions": ["1.0.0", "1.0.1"], "uuid": uuid}} => another option +# {"semverator": uuid} +@dataclass +class Crate: + crate_id: int + name: str + versions: List[int] = field(default_factory=list) + db_id: UUID | None = None + + +@dataclass +class CrateVersion: + version_id: int + version: str + db_id: UUID | None = None + + +class CratesTransformer(Transformer): + def __init__(self): + super().__init__("crates") + self.files = { + "projects": "crates.csv", + "versions": "versions.csv", + "dependencies": "dependencies.csv", + } + # TODO: we gotta redo this too, it works, but it's unnecessarily bulky + self.crates: Dict[int, Crate] = {} + self.name_map: Dict[str, int] = {} + self.crate_versions: Dict[int, CrateVersion] = {} + self.num_map: Dict[str, int] = {} + + # TODO: can I move this to the transformer class? + def finder(self, file_name: str) -> str: + input_dir = os.path.realpath(self.input) + + for root, _, files in os.walk(input_dir): + if file_name in files: + return os.path.join(root, file_name) + else: + self.logger.error(f"{file_name} not found in {input_dir}") + raise FileNotFoundError(f"Missing {file_name} file") + + def packages(self) -> Generator[str, None, None]: + projects_path = self.finder(self.files["projects"]) + + with open(projects_path) as f: + reader = csv.DictReader(f) + for row in reader: + # TODO: note that the fact that the below happens within this method + # us also problematic, as we cannot run purely insert_versions, without + # running this + + # track it in our map + crate_id = row["id"] + name = row["name"] + self.crates[crate_id] = Crate(crate_id=crate_id, name=name) + self.name_map[name] = crate_id + + # yield name for loading into postgres + yield name + + def versions(self) -> Generator[Dict[str, int], None, None]: + versions_path = self.finder(self.files["versions"]) + + with open(versions_path) as f: + reader = csv.DictReader(f) + for row in reader: + # track it on our crates map + crate_id = row["crate_id"] + if crate_id not in self.crates: + raise ValueError(f"Crate {crate_id} not found in crates") + self.crates[crate_id].versions.append(row["num"]) + + # track it on our versions map + version_id = row["id"] + version_num = row["num"] + self.crate_versions[version_id] = CrateVersion( + version_id=version_id, version=version_num + ) + self.num_map[version_num] = version_id + + # get the information we need + package_id = self.get_crate_db_id(crate_id) + + yield { + "package_id": package_id, + "version": version_num, + } + + def dependencies(self): + dependencies_path = self.finder(self.files["dependencies"]) + + with open(dependencies_path) as f: + reader = csv.DictReader(f) + for row in reader: + start_id = row["version_id"] + end_id = row["dependency_id"] + semver = row["req"] + + version_id = self.get_crate_version_db_id(start_id) + dependency_id = self.get_crate_db_id(end_id) + yield { + "version_id": version_id, + "dependency_id": dependency_id, + "semver_range": semver, + } + + def get_crate_db_id(self, crate_id: int) -> UUID: + return self.crates[crate_id].db_id + + def get_crate_version_db_id(self, version_id: int) -> UUID: + return self.crate_versions[version_id].db_id + + def update_crates_db_ids(self, packages: List[Package]): + for pkg in packages: + try: + crate_id = self.name_map[pkg.name] + self.crates[crate_id].db_id = pkg.id + except KeyError: + self.logger.warn(f"pkg {pkg.name} not found in name map") + + def update_crates_versions_db_ids(self, versions: List[Version]): + for version in versions: + # version is a number and a package_id + crate_version_id = self.num_map[version.version] + self.crate_versions[crate_version_id].db_id = version.id diff --git a/src/requirements.txt b/src/requirements.txt new file mode 100644 index 0000000..1b4d304 --- /dev/null +++ b/src/requirements.txt @@ -0,0 +1,32 @@ +# This file was autogenerated by uv via the following command: +# uv pip compile pyproject.toml -o src/requirements.txt +alembic==1.13.2 + # via pipeline (pyproject.toml) +certifi==2024.8.30 + # via requests +charset-normalizer==3.3.2 + # via requests +idna==3.8 + # via requests +mako==1.3.5 + # via alembic +markupsafe==2.1.5 + # via mako +psycopg2==2.9.9 + # via pipeline (pyproject.toml) +pyyaml==6.0.2 + # via pipeline (pyproject.toml) +requests==2.32.3 + # via pipeline (pyproject.toml) +ruff==0.6.5 + # via pipeline (pyproject.toml) +sqlalchemy==2.0.34 + # via + # pipeline (pyproject.toml) + # alembic +typing-extensions==4.12.2 + # via + # alembic + # sqlalchemy +urllib3==2.2.2 + # via requests diff --git a/src/run_pipeline.sh b/src/run_pipeline.sh new file mode 100755 index 0000000..914970a --- /dev/null +++ b/src/run_pipeline.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +# wait for db to be ready +until pg_isready -h db -p 5432 -U postgres; do + echo "waiting for database..." + sleep 2 +done + +# make directory structure +# working_dir is /app +mkdir -p data/{crates,pkgx,homebrew,npm,pypi,rubys} + +pkgx +python^3.11 +postgresql.org^16 python -u src/pipeline/main.py crates \ No newline at end of file