Skip to content

Commit

Permalink
fix: add Connection structure
Browse files Browse the repository at this point in the history
  • Loading branch information
edytapawlak committed Mar 27, 2024
1 parent ead78e8 commit 9cce0f4
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 34 deletions.
2 changes: 1 addition & 1 deletion oca/src/data_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl Namespace {
}

#[clonable]
pub trait DataStorage: Clone {
pub trait DataStorage: Clone + Send {
fn get(&self, namespace: Namespace, key: &str) -> Result<Option<Vec<u8>>, String>;
fn get_all(&self, namespace: Namespace) -> Result<HashMap<String, Vec<u8>>, String>;
fn insert(&mut self, namespace: Namespace, key: &str, value: &[u8]) -> Result<(), String>;
Expand Down
8 changes: 3 additions & 5 deletions oca/src/facade/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ use oca_bundle::Encode;
use oca_dag::build_core_db_model;

use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::Arc;

#[derive(thiserror::Error, Debug, serde::Serialize)]
#[serde(untagged)]
Expand Down Expand Up @@ -181,11 +179,11 @@ impl Facade {
)
.map_err(|errs| vec![Error::ValidationError(errs)])?;

let oca_bundle_cache_repo = OCABundleCacheRepo::new(Arc::clone(&self.connection));
let oca_bundle_cache_repo = OCABundleCacheRepo::new(self.connection());
let oca_bundle_cache_record = OCABundleCacheRecord::new(&oca_build.oca_bundle);
oca_bundle_cache_repo.insert(oca_bundle_cache_record);

let capture_base_cache_repo = CaptureBaseCacheRepo::new(Arc::clone(&self.connection));
let capture_base_cache_repo = CaptureBaseCacheRepo::new(self.connection());
let capture_base_cache_record =
CaptureBaseCacheRecord::new(&oca_build.oca_bundle.capture_base);
capture_base_cache_repo.insert(capture_base_cache_record);
Expand All @@ -200,7 +198,7 @@ impl Facade {
})
.collect::<Vec<_>>();
if !meta_overlays.is_empty() {
let oca_bundle_fts_repo = OCABundleFTSRepo::new(Arc::clone(&self.connection));
let oca_bundle_fts_repo = OCABundleFTSRepo::new(self.connection());
for meta_overlay in meta_overlays {
let oca_bundle_fts_record = OCABundleFTSRecord::new(
oca_build.oca_bundle.said.clone().unwrap().to_string(),
Expand Down
10 changes: 5 additions & 5 deletions oca/src/facade/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use said::SelfAddressingIdentifier;
use serde::Serialize;
#[cfg(feature = "local-references")]
use std::collections::HashMap;
use std::{str::FromStr, sync::Arc};
use std::{borrow::Borrow, rc::Rc};
use std::str::FromStr;
use std::borrow::Borrow;

#[derive(Debug, Serialize)]
#[serde(untagged)]
Expand Down Expand Up @@ -89,7 +89,7 @@ impl Facade {
limit: usize,
page: usize,
) -> SearchResult {
let oca_bundle_fts_repo = OCABundleFTSRepo::new(Arc::clone(&self.connection));
let oca_bundle_fts_repo = OCABundleFTSRepo::new(self.connection());
let search_result = oca_bundle_fts_repo.search(language, query, limit, page);
let records = search_result
.records
Expand Down Expand Up @@ -138,7 +138,7 @@ impl Facade {
let mut total: usize = 0;
let mut errors = vec![];

let oca_bundle_cache_repo = OCABundleCacheRepo::new(Arc::clone(&self.connection));
let oca_bundle_cache_repo = OCABundleCacheRepo::new(self.connection());
let all_oca_bundle_records = oca_bundle_cache_repo.fetch_all(limit, page);
for all_oca_bundle_record in all_oca_bundle_records {
if total == 0 {
Expand Down Expand Up @@ -175,7 +175,7 @@ impl Facade {
let mut errors = vec![];

let capture_base_cache_repo =
crate::repositories::CaptureBaseCacheRepo::new(Arc::clone(&self.connection));
crate::repositories::CaptureBaseCacheRepo::new(self.connection());
let all_capture_base_records = capture_base_cache_repo.fetch_all(limit, page);
for all_capture_base_record in all_capture_base_records {
if total == 0 {
Expand Down
32 changes: 27 additions & 5 deletions oca/src/facade/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,36 @@
use rusqlite::Params;

use crate::data_storage::DataStorage;
use crate::repositories::SQLiteConfig;
use std::borrow::Borrow;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

pub mod build;
mod explore;
mod fetch;

#[derive(Clone)]
pub struct Connection {
pub connection: Arc<Mutex<rusqlite::Connection>>
}

impl Connection {
pub fn new(path: &str) -> Self {
let conn = rusqlite::Connection::open(path).unwrap();
Self {connection: Arc::new(Mutex::new(conn))}
}

pub fn execute<P>(&self, sql: &str, params: P) where P: Params {
let connection = self.connection.lock().unwrap();
connection.execute(sql, params).unwrap();

}
}

pub struct Facade {
db: Box<dyn DataStorage>,
db_cache: Box<dyn DataStorage>,
connection: Arc<rusqlite::Connection>,
connection: Connection,
}

impl Facade {
Expand All @@ -34,14 +53,17 @@ impl Facade {
None => ":memory:".to_string(),
};

let conn = rusqlite::Connection::open(cache_path).unwrap();
Self {
db,
db_cache,
connection: Arc::new(conn),
connection: Connection::new(&cache_path),
}
}

pub(crate) fn connection(&self) -> Connection {
self.connection.clone()
}

pub fn storage(&self) -> &dyn DataStorage {
self.db_cache.borrow()
}
Expand Down
16 changes: 10 additions & 6 deletions oca/src/repositories/capture_base_cache_repo.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use oca_bundle::state::oca::capture_base::CaptureBase;
use std::{rc::Rc, sync::Arc};

use crate::facade::Connection;

#[derive(Debug)]
pub struct CaptureBaseCacheRecord {
Expand All @@ -23,20 +24,20 @@ pub struct AllCaptureBaseRecord {
}

pub struct CaptureBaseCacheRepo {
connection: Arc<rusqlite::Connection>,
connection: Connection,
}

impl CaptureBaseCacheRepo {
pub fn new(connection: Arc<rusqlite::Connection>) -> Self {
pub fn new(connection: Connection) -> Self {
let create_table_query = r#"
CREATE TABLE IF NOT EXISTS capture_base_cache(
said TEXT PRIMARY KEY,
capture_base TEXT
)"#;
connection.execute(create_table_query, ()).unwrap();
connection.execute(create_table_query, ());

Self {
connection: Arc::clone(&connection),
connection,
}
}

Expand Down Expand Up @@ -67,7 +68,10 @@ impl CaptureBaseCacheRepo {
) AS results
ON true
GROUP BY said";
let mut statement = self.connection.prepare(query).unwrap();

let connection = self.connection.connection.lock().unwrap();
let mut statement = connection.prepare(query).unwrap();

let models = statement
.query_map([limit, offset], |row| {
let cache_record =
Expand Down
15 changes: 9 additions & 6 deletions oca/src/repositories/oca_bundle_cache_repo.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use oca_bundle::{state::oca::OCABundle, Encode};
use std::{rc::Rc, sync::Arc};

use crate::facade::Connection;

#[derive(Debug)]
pub struct OCABundleCacheRecord {
Expand All @@ -23,20 +24,20 @@ pub struct AllOCABundleRecord {
}

pub struct OCABundleCacheRepo {
connection: Arc<rusqlite::Connection>,
connection: Connection,
}

impl OCABundleCacheRepo {
pub fn new(connection: Arc<rusqlite::Connection>) -> Self {
pub fn new(connection: Connection) -> Self {
let create_table_query = r#"
CREATE TABLE IF NOT EXISTS oca_bundle_cache(
said TEXT PRIMARY KEY,
oca_bundle TEXT
)"#;
connection.execute(create_table_query, ()).unwrap();
connection.execute(create_table_query, ());

Self {
connection: Arc::clone(&connection),
connection,
}
}

Expand Down Expand Up @@ -67,7 +68,9 @@ impl OCABundleCacheRepo {
) AS results
ON true
GROUP BY said";
let mut statement = self.connection.prepare(query).unwrap();

let connection = self.connection.connection.lock().unwrap();
let mut statement = connection.prepare(query).unwrap();
let models = statement
.query_map([limit, offset], |row| {
let cache_record =
Expand Down
16 changes: 10 additions & 6 deletions oca/src/repositories/oca_bundle_fts_repo.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{rc::Rc, str::FromStr, sync::Arc};
use std::str::FromStr;

use said::SelfAddressingIdentifier;

use crate::facade::Connection;

#[derive(Debug)]
pub struct OCABundleFTSRecord {
pub name: String,
Expand All @@ -27,11 +29,11 @@ impl OCABundleFTSRecord {
}

pub struct OCABundleFTSRepo {
connection: Arc<rusqlite::Connection>,
connection: Connection,
}

impl OCABundleFTSRepo {
pub fn new(connection: Arc<rusqlite::Connection>) -> Self {
pub fn new(connection: Connection) -> Self {
let create_table_query = r#"
CREATE VIRTUAL TABLE IF NOT EXISTS oca_bundle_fts
USING FTS5(
Expand All @@ -41,10 +43,10 @@ impl OCABundleFTSRepo {
oca_bundle_said UNINDEXED,
tokenize="trigram"
)"#;
connection.execute(create_table_query, ()).unwrap();
connection.execute(create_table_query, ());

Self {
connection: Arc::clone(&connection),
connection,
}
}

Expand Down Expand Up @@ -164,7 +166,9 @@ meta_overlay:{}
}
}

let mut statement = self.connection.prepare(sql_query).unwrap();

let connection = self.connection.connection.lock().unwrap();
let mut statement = connection.prepare(sql_query).unwrap();

let rows = statement
.query_map(
Expand Down

0 comments on commit 9cce0f4

Please sign in to comment.