Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add SQL adapter #779

Draft
wants to merge 37 commits into
base: main
Choose a base branch
from
Draft

add SQL adapter #779

wants to merge 37 commits into from

Conversation

skarakuzu
Copy link
Contributor

preliminary start of sql adapter. to be continued ...

Checklist

  • Add a Changelog entry
  • Add the ticket number which this PR closes to the comment section

@@ -44,6 +44,9 @@ tiled = "tiled.commandline.main:main"

# This is the union of all optional dependencies.
all = [
"adbc_driver_manager",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section is used when tiled is installed like pip install "tiled[all]". These three should also be added to the section server, below, so that they are included when tiled is installed like pip install "tiled[server]" # server only.

@danielballan
Copy link
Member

Lifecycle:

  1. Client declares that it wants to create a new tabular dataset, via a request POST /api/v1/metadata/my_table.
  2. In "catalog" SQL database, the server adds a row to the nodes table with any metadata about this table. This is how the new table is connected to any overall dataset, like Bluesky scan and its Scan ID.
  3. Also in the "catalog" SQL database, the server adds a row each to the data_sources table and the assets table. Together, they describe how to locate where the new data will be saved. The Asset part is very locked down. It has room for the URI of the tabular SQL database: postgresql://... and some boilerplate. The DataSource has a freeform area called parameters, which can fit any JSON. We can use this to put in dataset-specific details, like the name of the SQL table (table_name)---derived from the Arrow schema in this case---and a means of selecting the rows of interest for this new dataset (dataset_id).
  4. When data is written or read, a SQLAdapter object is instantiated inside the server. It is passed information extracted from this DataSource and Asset. So, it can know the table_name and the dataset_id.

@danielballan
Copy link
Member

Test script:

import pandas
from tiled.client import from_uri
from tiled.structures.core import StructureFamily
from tiled.structures.data_source import Asset, DataSource, Management
from tiled.structures.table import TableStructure

client = from_uri("http://localhost:8000", api_key="secret")

df = pandas.DataFrame({"a": [1, 2, 3], "b": [1., 2., 3.]})
structure = TableStructure.from_pandas(df)

x = client.new(
    structure_family=StructureFamily.table,
    data_sources=[
        DataSource(
            management=Management.writable,
            mimetype="application/x-tiled-sql-table",
            structure_family=StructureFamily.table,
	        structure=structure,
            assets=[],
        ),
    ],
    metadata={},
    specs=[],
    key="x",
)
x.write(df)
x.append_partition(df, 0)

# This does not work yet
# x.read()  # calls /table/partition/x?partition=0 adapter.read_partition()

@danielballan
Copy link
Member

danielballan commented Jan 16, 2025

For this PR

  • Add dataset_id column and filter by it.
  • Create table eagerly, if ADBC APIs allow it. Seems not possible
  • In Adapter, remove write. Write would mean "overwrite" or "replace" and we are not sure we want to expose this. (We can add it later if we want it.)
  • In client, replace write_appendable_dataframe with create_appendable_dataframe. This will run the self.new(...) call, which runs init_storage on the server side, but it will not take any data. Data will be appended in later calls.
  • In Adapter, I removed append and used append_partition. (For now it's stuck at partition=0 but this constraint will be temporary.) Tests need to be updated.
  • Execute CREATE INDEX IF NOT EXISTS .... on dataset_id column.
  • Pandas indexes should round-trip. (Dan)
  • Protect against SQL injection. In init_storage, table_name should match some restrictive regex pattern. Maybe lowercase letters, numbers, and underscores?

Intended usage now looks like...

The following prompts the server to:

  1. Generate a table_name from schema hash. (The table might or might not already exist, containing rows from other dataset_ids.)
  2. Generate a new unique dataset_id for this dataset.
  3. Store the table_name, dataset_id, and any metadata passed here in the catalog database.
# This uploads no data.
x = client.create_appendable_table(schema, key="x")

The following prompts the server to:

  1. Create the table {table_name} if it not yet exist.
  2. Ingest the rows into that table, with an additional dataset_id column.
# Now data can be added, potentially in parallel.
x.append_partition(df, 0)

In a separate process, this would also work. We can access an existing table and keep appending.

x = client["x"]
x.append_partition(df, 0)

In following up PRs...

  • Support PG database with credentials.
  • Connection pooling
  • Supporting more than one partition. SQL will scale find to a large table, but current Tiled does not let the client request less than a full partition. We either need to change that and let users request row ranges (seems complicated, especially with parquet...so I think might be something to wait to do...) or mark up the data in the SQL table as belonging to reasonably-sized partitions. Similar to how arrays are chunked by the client, table rows should be partitioned.

Maybe in the future partitions are added like this? Not sure whether PostgreSQL native "table partitioning" fits our use case.

# table_blahblahblah
dataset_id partition_id ...
12345        1
12345        1
12345        2
12345        3
12345        3
12345        3
24323        
def read_partition(self, partition):
    query = f"SELECT * FROM {self.table_name} WHERE dataset_id={self.dataset_id} AND partition={partition}"
    ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants