Skip to content

Commit

Permalink
Merge pull request #531 from splitgraph/inline-metastore
Browse files Browse the repository at this point in the history
Add support for inline-ing metastore over Flight
  • Loading branch information
gruuya authored Jun 19, 2024
2 parents 20887ea + c1e3926 commit 48262cb
Show file tree
Hide file tree
Showing 20 changed files with 433 additions and 319 deletions.
5 changes: 5 additions & 0 deletions clade/proto/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ service SchemaStoreService {
// List the available schemas
rpc ListSchemas(ListSchemaRequest) returns (ListSchemaResponse);
}

message InlineMetastoreCommandStatementQuery {
string query = 1;
ListSchemaResponse schemas = 2;
}
19 changes: 18 additions & 1 deletion clade/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
pub mod schema {
use arrow_flight::sql::{Any, ProstMessageExt};
use prost::Message;

tonic::include_proto!("clade.schema");
pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("clade_descriptor");

// Same reasoning as below, but for `get_flight_info_fallback` instead
impl ProstMessageExt for InlineMetastoreCommandStatementQuery {
fn type_url() -> &'static str {
"InlineMetastoreCommandStatementQuery"
}

fn as_any(&self) -> Any {
Any {
type_url: InlineMetastoreCommandStatementQuery::type_url().to_string(),
value: self.encode_to_vec().into(),
}
}
}
}

pub mod sync {
Expand All @@ -24,7 +41,7 @@ pub mod sync {

fn as_any(&self) -> Any {
Any {
type_url: "DataSyncCommand".to_string(),
type_url: DataSyncCommand::type_url().to_string(),
value: self.encode_to_vec().into(),
}
}
Expand Down
147 changes: 4 additions & 143 deletions src/catalog/external.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
use crate::catalog::{
not_impl, CatalogResult, CatalogStore, FunctionStore, SchemaStore, TableStore,
CatalogResult, CatalogStore, FunctionStore, SchemaStore, TableStore,
};
use crate::repository::interface::{
AllDatabaseFunctionsResult, CollectionRecord, DatabaseRecord,
DroppedTableDeletionStatus, DroppedTablesResult, TableId, TableRecord,
TableVersionId, TableVersionsResult,
};
use crate::wasm_udf::data_types::CreateFunctionDetails;
use arrow_schema::Schema;
use crate::repository::interface::AllDatabaseFunctionsResult;
use clade::schema::schema_store_service_client::SchemaStoreServiceClient;
use clade::schema::{ListSchemaRequest, ListSchemaResponse};
use tonic::transport::{channel::Channel, Endpoint, Error};
use tonic::Request;
use uuid::Uuid;

// An external store, facilitated via a remote clade server implementation
#[derive(Clone)]
Expand All @@ -37,26 +30,10 @@ impl ExternalStore {
}

#[tonic::async_trait]
impl CatalogStore for ExternalStore {
async fn create(&self, _name: &str) -> CatalogResult<()> {
not_impl()
}

async fn get(&self, _name: &str) -> CatalogResult<DatabaseRecord> {
not_impl()
}

async fn delete(&self, _name: &str) -> CatalogResult<()> {
not_impl()
}
}
impl CatalogStore for ExternalStore {}

#[tonic::async_trait]
impl SchemaStore for ExternalStore {
async fn create(&self, _catalog_name: &str, _schema_name: &str) -> CatalogResult<()> {
not_impl()
}

async fn list(&self, catalog_name: &str) -> CatalogResult<ListSchemaResponse> {
let req = Request::new(ListSchemaRequest {
catalog_name: catalog_name.to_string(),
Expand All @@ -65,133 +42,17 @@ impl SchemaStore for ExternalStore {
let response = self.client().list_schemas(req).await?;
Ok(response.into_inner())
}

async fn get(
&self,
_catalog_name: &str,
_schema_name: &str,
) -> CatalogResult<CollectionRecord> {
not_impl()
}

async fn delete(&self, _catalog_name: &str, _schema_name: &str) -> CatalogResult<()> {
not_impl()
}
}

#[tonic::async_trait]
impl TableStore for ExternalStore {
async fn create(
&self,
_catalog_name: &str,
_schema_name: &str,
_table_name: &str,
_schema: &Schema,
_uuid: Uuid,
) -> CatalogResult<(TableId, TableVersionId)> {
not_impl()
}

async fn get(
&self,
_catalog_name: &str,
_schema_name: &str,
_table_name: &str,
) -> CatalogResult<TableRecord> {
not_impl()
}

async fn create_new_version(
&self,
_uuid: Uuid,
_version: i64,
) -> CatalogResult<TableVersionId> {
not_impl()
}

async fn delete_old_versions(
&self,
_catalog_name: &str,
_schema_name: &str,
_table_name: &str,
) -> CatalogResult<u64> {
not_impl()
}

async fn get_all_versions(
&self,
_catalog_name: &str,
_table_names: Option<Vec<String>>,
) -> CatalogResult<Vec<TableVersionsResult>> {
not_impl()
}

async fn update(
&self,
_old_catalog_name: &str,
_old_schema_name: &str,
_old_table_name: &str,
_new_catalog_name: &str,
_new_schema_name: &str,
_new_table_name: &str,
) -> CatalogResult<()> {
not_impl()
}

async fn delete(
&self,
_catalog_name: &str,
_schema_name: &str,
_table_name: &str,
) -> CatalogResult<()> {
not_impl()
}

async fn get_dropped_tables(
&self,
_catalog_name: Option<String>,
) -> CatalogResult<Vec<DroppedTablesResult>> {
not_impl()
}

async fn update_dropped_table(
&self,
_uuid: Uuid,
_deletion_status: DroppedTableDeletionStatus,
) -> CatalogResult<()> {
not_impl()
}

async fn delete_dropped_table(&self, _uuid: Uuid) -> CatalogResult<()> {
not_impl()
}
}
impl TableStore for ExternalStore {}

#[tonic::async_trait]
impl FunctionStore for ExternalStore {
async fn create(
&self,
_catalog_name: &str,
_function_name: &str,
_or_replace: bool,
_details: &CreateFunctionDetails,
) -> CatalogResult<()> {
not_impl()
}

async fn list(
&self,
_catalog_name: &str,
) -> CatalogResult<Vec<AllDatabaseFunctionsResult>> {
Ok(vec![])
}

async fn delete(
&self,
_catalog_name: &str,
_if_exists: bool,
_func_names: &[String],
) -> CatalogResult<()> {
not_impl()
}
}
33 changes: 33 additions & 0 deletions src/catalog/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use crate::catalog::{
CatalogResult, CatalogStore, FunctionStore, SchemaStore, TableStore,
};
use crate::repository::interface::AllDatabaseFunctionsResult;
use clade::schema::ListSchemaResponse;

#[derive(Clone)]
pub struct MemoryStore {
pub schemas: ListSchemaResponse,
}

#[tonic::async_trait]
impl CatalogStore for MemoryStore {}

#[tonic::async_trait]
impl SchemaStore for MemoryStore {
async fn list(&self, _catalog_name: &str) -> CatalogResult<ListSchemaResponse> {
Ok(self.schemas.clone())
}
}

#[tonic::async_trait]
impl TableStore for MemoryStore {}

#[tonic::async_trait]
impl FunctionStore for MemoryStore {
async fn list(
&self,
_catalog_name: &str,
) -> CatalogResult<Vec<AllDatabaseFunctionsResult>> {
Ok(vec![])
}
}
16 changes: 16 additions & 0 deletions src/catalog/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use dashmap::DashMap;
use datafusion::catalog::schema::MemorySchemaProvider;
use datafusion::datasource::TableProvider;

use crate::catalog::memory::MemoryStore;
use deltalake::DeltaTable;
use futures::{stream, StreamExt, TryStreamExt};
use std::collections::HashMap;
Expand Down Expand Up @@ -71,6 +72,21 @@ impl Metastore {
}
}

pub fn new_from_memory(
memory_store: Arc<MemoryStore>,
object_stores: Arc<ObjectStoreFactory>,
) -> Self {
let staging_schema = Arc::new(MemorySchemaProvider::new());
Self {
catalogs: memory_store.clone(),
schemas: memory_store.clone(),
tables: memory_store.clone(),
functions: memory_store,
staging_schema,
object_stores,
}
}

pub async fn build_catalog(
&self,
catalog_name: &str,
Expand Down
Loading

0 comments on commit 48262cb

Please sign in to comment.