Skip to content

Commit

Permalink
Merge pull request #504 from splitgraph/dyn-store
Browse files Browse the repository at this point in the history
Add support for dynamic object store usage via clade
  • Loading branch information
gruuya authored Feb 28, 2024
2 parents 5bdabc2 + b22c40b commit 39f18f8
Show file tree
Hide file tree
Showing 16 changed files with 267 additions and 137 deletions.
14 changes: 11 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,28 @@ jobs:
RUSTFLAGS: "-C debuginfo=1"

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

# Setup pre-commit
- name: Install pre-commit
run: |
sudo apt-get update
sudo apt-get install -y pre-commit
- name: Configure pre-commit cache
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: ~/.cache/pre-commit
key: pre-commit-${{ runner.os }}-pre-commit-${{ hashFiles('**/.pre-commit-config.yaml') }}
- name: Install protoc
run: sudo apt install -y protobuf-compiler
run: |
mkdir -p $HOME/d/protoc
cd $HOME/d/protoc
export PROTO_ZIP="protoc-21.4-linux-x86_64.zip"
curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
unzip $PROTO_ZIP
echo "$HOME/d/protoc/bin" >> $GITHUB_PATH
export PATH=$PATH:$HOME/d/protoc/bin
protoc --version
# Use https://github.com/marketplace/actions/rust-cache

Expand Down
64 changes: 24 additions & 40 deletions .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ jobs:

strategy:
matrix:
build: [linux, macos, win-msvc]
build: [linux-x86_64, osx-x86_64, win64]
include:
- build: linux
- build: linux-x86_64
os: ubuntu-20.04 # We can update to 22.04 once we're comfortable dropping libssl 1.x support
target: x86_64-unknown-linux-gnu
- build: macos
- build: osx-x86_64
os: macos-latest
target: x86_64-apple-darwin
- build: win-msvc
- build: win64
os: windows-latest
target: x86_64-pc-windows-msvc

Expand All @@ -33,40 +33,24 @@ jobs:
# Taken from https://github.com/apache/arrow-datafusion/blob/master/.github/workflows/rust.yml
shell: bash
run: |
if [ "${{ matrix.build }}" = "win-msvc" ]; then
mkdir -p $HOME/d/protoc
cd $HOME/d/protoc
export PROTO_ZIP="protoc-21.4-win64.zip"
curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
unzip $PROTO_ZIP
echo "$HOME/d/protoc/bin" >> $GITHUB_PATH
export PATH=$PATH:$HOME/d/protoc/bin
protoc.exe --version
mkdir -p $HOME/d/protoc
cd $HOME/d/protoc
export PROTO_ZIP="protoc-21.4-${{ matrix.build }}.zip"
curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
unzip $PROTO_ZIP
echo "$HOME/d/protoc/bin" >> $GITHUB_PATH
export PATH=$PATH:$HOME/d/protoc/bin
if [ "${{ matrix.build }}" = "win64" ]; then
protoc.exe --version
vcpkg integrate install
vcpkg.exe install openssl:x64-windows-static-md
elif [ "${{ matrix.build }}" = "linux" ]; then
mkdir -p $HOME/d/protoc
cd $HOME/d/protoc
export PROTO_ZIP="protoc-21.4-linux-x86_64.zip"
curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
unzip $PROTO_ZIP
echo "$HOME/d/protoc/bin" >> $GITHUB_PATH
export PATH=$PATH:$HOME/d/protoc/bin
protoc --version
else
mkdir -p $HOME/d/protoc
cd $HOME/d/protoc
export PROTO_ZIP="protoc-21.4-osx-x86_64.zip"
curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
unzip $PROTO_ZIP
echo "$HOME/d/protoc/bin" >> $GITHUB_PATH
export PATH=$PATH:$HOME/d/protoc/bin
else
protoc --version
fi
- name: Checkout the repository
uses: actions/checkout@v3
uses: actions/checkout@v4
- run: |
rustup toolchain install nightly --profile minimal
rustup default nightly
Expand Down Expand Up @@ -114,23 +98,23 @@ jobs:
fi
- name: Login to DockerHub (Linux only)
if: matrix.build == 'linux'
uses: docker/login-action@v2
if: matrix.build == 'linux-x86_64'
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}

- name: Test building and invoking the Docker image (Linux only)
if: matrix.build == 'linux'
if: matrix.build == 'linux-x86_64'
run: |
DOCKER_BUILDKIT=1 docker build . -t splitgraph/seafowl:test
docker run --rm splitgraph/seafowl:test --version
- name: Determine Docker tags (Linux only)
if: matrix.build == 'linux'
if: matrix.build == 'linux-x86_64'
id: meta
# https://github.com/docker/metadata-action
uses: docker/metadata-action@v4
uses: docker/metadata-action@v5
with:
images: |
splitgraph/seafowl
Expand All @@ -143,16 +127,16 @@ jobs:
type=semver,pattern={{major}}.{{minor}}
- name: Build and push Docker image (Linux only)
if: matrix.build == 'linux'
uses: docker/build-push-action@v4
if: matrix.build == 'linux-x86_64'
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}

- name: Upload binaries as artifacts
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: ${{ env.ARTIFACT }}
path: ${{ env.SOURCE }}
Expand All @@ -173,7 +157,7 @@ jobs:
# Checkout required to access the release-notes.py script
- name: Checkout the repository
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Generate release notes
run: |
./.github/workflows/release-notes.py --tag ${{ env.RELEASE_VERSION }} --output notes-${{ env.RELEASE_VERSION }}.md
Expand Down
14 changes: 13 additions & 1 deletion clade/proto/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,18 @@ message SchemaObject {

message TableObject {
string name = 1;
string location = 2;
// Path within the provided storage location, if any
string path = 2;
// URL of the root storage location
optional string location = 3;
}

// A single root storage location, hosting many individual tables
message StorageLocation {
// URL of the root storage location
string location = 1;
// Connection options for the object store client
map<string, string> options = 2;
}

message ListSchemaRequest {
Expand All @@ -18,6 +29,7 @@ message ListSchemaRequest {

message ListSchemaResponse {
repeated SchemaObject schemas = 1;
repeated StorageLocation stores = 2;
}

service SchemaStoreService {
Expand Down
6 changes: 3 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ services:
entrypoint: >
/bin/sh -c " /usr/bin/mc config host add test-minio http://minio:9000 minioadmin minioadmin;
/usr/bin/mc rm -r --force test-minio/seafowl-test-bucket; /usr/bin/mc mb
test-minio/seafowl-test-bucket; /usr/bin/mc cp test-data/table_with_ns_column.parquet
test-minio/seafowl-test-bucket/table_with_ns_column.parquet; /usr/bin/mc anonymous set public
test-minio/seafowl-test-bucket/table_with_ns_column.parquet;
test-minio/seafowl-test-bucket; /usr/bin/mc cp -r test-data test-minio/seafowl-test-bucket;
/usr/bin/mc anonymous set public
test-minio/seafowl-test-bucket/test-data/table_with_ns_column.parquet;
/usr/bin/mc mb test-minio/seafowl-test-bucket-public; /usr/bin/mc anonymous set public
test-minio/seafowl-test-bucket-public; exit 0; "
Expand Down
95 changes: 71 additions & 24 deletions src/catalog/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,20 @@ use crate::wasm_udf::data_types::{
CreateFunctionDataType, CreateFunctionDetails, CreateFunctionLanguage,
CreateFunctionVolatility,
};
use clade::schema::SchemaObject;
use clade::schema::{SchemaObject, TableObject};
use datafusion::catalog::schema::MemorySchemaProvider;
use datafusion::datasource::TableProvider;
use deltalake::DeltaTable;
use deltalake::logstore::default_logstore;
use deltalake::storage::{ObjectStoreFactory, ObjectStoreRef, StorageOptions};
use deltalake::{DeltaResult, DeltaTable, DeltaTableError};
use object_store::path::Path;
use object_store::prefix::PrefixStore;
use object_store::{parse_url_opts, ObjectStore};
use parking_lot::RwLock;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use url::Url;

// This is the main entrypoint to all individual catalogs for various objects types.
// The intention is to make it extensible and de-coupled from the underlying metastore
Expand All @@ -31,13 +37,13 @@ pub struct Metastore {
pub tables: Arc<dyn TableStore>,
pub functions: Arc<dyn FunctionStore>,
staging_schema: Arc<MemorySchemaProvider>,
object_store: Arc<InternalObjectStore>,
default_store: Arc<InternalObjectStore>,
}

impl Metastore {
pub fn new_from_repository(
repository: Arc<dyn Repository>,
object_store: Arc<InternalObjectStore>,
default_store: Arc<InternalObjectStore>,
) -> Self {
let repository_store = Arc::new(RepositoryStore { repository });

Expand All @@ -48,13 +54,13 @@ impl Metastore {
tables: repository_store.clone(),
functions: repository_store,
staging_schema,
object_store,
default_store,
}
}

pub fn new_from_external(
external_store: Arc<ExternalStore>,
object_store: Arc<InternalObjectStore>,
default_store: Arc<InternalObjectStore>,
) -> Self {
let staging_schema = Arc::new(MemorySchemaProvider::new());
Self {
Expand All @@ -63,7 +69,7 @@ impl Metastore {
tables: external_store.clone(),
functions: external_store,
staging_schema,
object_store,
default_store,
}
}

Expand All @@ -73,16 +79,27 @@ impl Metastore {
) -> CatalogResult<SeafowlDatabase> {
let catalog_schemas = self.schemas.list(catalog_name).await?;

// NB we can't distinguish between a database without tables and a database
// that doesn't exist at all due to our query.
// Collect all provided object store locations into corresponding clients
// TODO: cache this using location (+ options?) as key, potentially with
// a TTL, and re-use when building table log stores below.
let stores = catalog_schemas
.stores
.into_iter()
.map(|store| {
let url = Url::parse(&store.location)?;
Ok((
store.location.clone(),
parse_url_opts(&url, store.options)?.0.into(),
))
})
.collect::<CatalogResult<_>>()?;

// Turn the list of all collections, tables and their columns into a nested map.

let schemas: HashMap<Arc<str>, Arc<SeafowlSchema>> = catalog_schemas
let schemas = catalog_schemas
.schemas
.into_iter()
.map(|schema| self.build_schema(schema))
.collect();
.map(|schema| self.build_schema(schema, &stores))
.collect::<CatalogResult<HashMap<_, _>>>()?;

let name: Arc<str> = Arc::from(catalog_name);

Expand All @@ -94,38 +111,58 @@ impl Metastore {
})
}

fn build_schema(&self, schema: SchemaObject) -> (Arc<str>, Arc<SeafowlSchema>) {
fn build_schema(
&self,
schema: SchemaObject,
stores: &HashMap<String, Arc<dyn ObjectStore>>,
) -> CatalogResult<(Arc<str>, Arc<SeafowlSchema>)> {
let schema_name = schema.name;

let tables = schema
.tables
.into_iter()
.map(|table| self.build_table(table.name, &table.location))
.collect::<HashMap<_, _>>();
.map(|table| self.build_table(table, stores))
.collect::<CatalogResult<HashMap<_, _>>>()?;

(
Ok((
Arc::from(schema_name.clone()),
Arc::new(SeafowlSchema {
name: Arc::from(schema_name),
tables: RwLock::new(tables),
}),
)
))
}

fn build_table(
&self,
table_name: String,
table_uuid: &str,
) -> (Arc<str>, Arc<dyn TableProvider>) {
table: TableObject,
stores: &HashMap<String, Arc<dyn ObjectStore>>,
) -> CatalogResult<(Arc<str>, Arc<dyn TableProvider>)> {
// Build a delta table but don't load it yet; we'll do that only for tables that are
// actually referenced in a statement, via the async `table` method of the schema provider.
// TODO: this means that any `information_schema.columns` query will serially load all
// delta tables present in the database. The real fix for this is to make DF use `TableSource`
// for the information schema, and then implement `TableSource` for `DeltaTable` in delta-rs.
let table_log_store = self.object_store.get_log_store(table_uuid);

let table = DeltaTable::new(table_log_store, Default::default());
(Arc::from(table_name), Arc::new(table) as _)
let table_log_store = match table.location {
// Use the provided customized location
Some(location) => {
let store = stores.get(&location).ok_or(CatalogError::Generic {
reason: format!("Object store for location {location} not found"),
})?;
let prefixed_store: PrefixStore<Arc<dyn ObjectStore>> =
PrefixStore::new(store.clone(), &*table.path);

let url = Url::parse(&format!("{location}/{}", table.path))?;

default_logstore(Arc::from(prefixed_store), &url, &Default::default())
}
// Use the configured, default, object store
None => self.default_store.get_log_store(&table.path),
};

let delta_table = DeltaTable::new(table_log_store, Default::default());
Ok((Arc::from(table.name), Arc::new(delta_table) as _))
}

pub async fn build_functions(
Expand Down Expand Up @@ -178,3 +215,13 @@ impl Metastore {
})
}
}

impl ObjectStoreFactory for Metastore {
fn parse_url_opts(
&self,
url: &Url,
_options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
Err(DeltaTableError::InvalidTableLocation(url.clone().into()))
}
}
Loading

0 comments on commit 39f18f8

Please sign in to comment.