diff --git a/src/lib.rs b/src/lib.rs index dbdf5e9db..df2b3cebe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -318,6 +318,21 @@ impl Ipfs { Ok(self.repo.remove_block(cid).await?) } + /// Pins a given Cid + pub async fn pin_block(&self, cid: &Cid) -> Result<(), Error> { + Ok(self.repo.pin_block(cid).await?) + } + + /// Unpins a given Cid + pub async fn unpin_block(&self, cid: &Cid) -> Result<(), Error> { + Ok(self.repo.unpin_block(cid).await?) + } + + /// Checks whether a given block is pinned + pub async fn is_pinned(&self, cid: &Cid) -> Result { + Ok(self.repo.is_pinned(cid).await?) + } + /// Puts an ipld dag node into the ipfs repo. pub async fn put_dag(&self, ipld: Ipld) -> Result { Ok(self.dag.put(ipld, Codec::DagCBOR).await?) @@ -696,4 +711,22 @@ mod tests { ipfs.exit_daemon().await; } + + #[async_std::test] + async fn test_pin_and_unpin() { + let options = IpfsOptions::::default(); + + let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap(); + task::spawn(fut); + + let data = ipld!([-1, -2, -3]); + let cid = ipfs.put_dag(data.clone()).await.unwrap(); + + ipfs.pin_block(&cid).await.unwrap(); + assert!(ipfs.is_pinned(&cid).await.unwrap()); + ipfs.unpin_block(&cid).await.unwrap(); + assert!(!ipfs.is_pinned(&cid).await.unwrap()); + + ipfs.exit_daemon().await; + } } diff --git a/src/repo/mem.rs b/src/repo/mem.rs index 3758527eb..6d4077cca 100644 --- a/src/repo/mem.rs +++ b/src/repo/mem.rs @@ -59,6 +59,7 @@ impl BlockStore for MemBlockStore { #[derive(Clone, Debug)] pub struct MemDataStore { ipns: Arc, Vec>>>, + pin: Arc, Vec>>>, } #[async_trait] @@ -66,6 +67,7 @@ impl DataStore for MemDataStore { fn new(_path: PathBuf) -> Self { MemDataStore { ipns: Arc::new(Mutex::new(HashMap::new())), + pin: Arc::new(Mutex::new(HashMap::new())), } } @@ -80,6 +82,7 @@ impl DataStore for MemDataStore { async fn contains(&self, col: Column, key: &[u8]) -> Result { let map = match col { Column::Ipns => &self.ipns, + Column::Pin => &self.pin, }; let contains = map.lock().await.contains_key(key); Ok(contains) @@ -88,6 +91,7 @@ impl DataStore for MemDataStore { async fn get(&self, col: Column, key: &[u8]) -> Result>, Error> { let map = match col { Column::Ipns => &self.ipns, + Column::Pin => &self.pin, }; let value = map.lock().await.get(key).map(|value| value.to_owned()); Ok(value) @@ -96,6 +100,7 @@ impl DataStore for MemDataStore { async fn put(&self, col: Column, key: &[u8], value: &[u8]) -> Result<(), Error> { let map = match col { Column::Ipns => &self.ipns, + Column::Pin => &self.pin, }; map.lock().await.insert(key.to_owned(), value.to_owned()); Ok(()) @@ -104,6 +109,7 @@ impl DataStore for MemDataStore { async fn remove(&self, col: Column, key: &[u8]) -> Result<(), Error> { let map = match col { Column::Ipns => &self.ipns, + Column::Pin => &self.pin, }; map.lock().await.remove(key); Ok(()) diff --git a/src/repo/mod.rs b/src/repo/mod.rs index c08ce964e..9441f7cf0 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -69,6 +69,7 @@ pub trait DataStore: Debug + Clone + Send + Sync + Unpin + 'static { #[derive(Clone, Copy, Debug)] pub enum Column { Ipns, + Pin, } #[derive(Debug)] @@ -168,6 +169,9 @@ impl Repo { /// Remove block from the block store. pub async fn remove_block(&self, cid: &Cid) -> Result<(), Error> { + if self.is_pinned(cid).await? { + return Err(anyhow::anyhow!("block to remove is pinned")); + } // sending only fails if the background task has exited self.events .clone() @@ -208,6 +212,47 @@ impl Repo { pub async fn remove_ipns(&self, ipns: &PeerId) -> Result<(), Error> { self.data_store.remove(Column::Ipns, ipns.as_bytes()).await } + + pub async fn pin_block(&self, cid: &Cid) -> Result<(), Error> { + let pin_value = self.data_store.get(Column::Pin, &cid.to_bytes()).await?; + + match pin_value { + Some(pin_count) => { + if pin_count[0] == std::u8::MAX { + return Err(anyhow::anyhow!("Block cannot be pinned more times")); + } + self.data_store + .put(Column::Pin, &cid.to_bytes(), &[pin_count[0] + 1]) + .await + } + None => { + self.data_store + .put(Column::Pin, &cid.to_bytes(), &[1]) + .await + } + } + } + + pub async fn is_pinned(&self, cid: &Cid) -> Result { + self.data_store.contains(Column::Pin, &cid.to_bytes()).await + } + + pub async fn unpin_block(&self, cid: &Cid) -> Result<(), Error> { + let pin_value = self.data_store.get(Column::Pin, &cid.to_bytes()).await?; + + match pin_value { + Some(pin_count) if pin_count[0] == 1 => { + self.data_store.remove(Column::Pin, &cid.to_bytes()).await + } + Some(pin_count) => { + self.data_store + .put(Column::Pin, &cid.to_bytes(), &[pin_count[0] - 1]) + .await + } + // This is a no-op + None => Ok(()), + } + } } #[async_trait]