Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

Adds basic support for pin and unpin operations #117

Merged
merged 15 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,21 @@ impl<Types: IpfsTypes> Ipfs<Types> {
Ok(self.repo.remove_block(cid).await?)
}

/// Pins a given Cid
pub async fn pin_block(&self, cid: &Cid) -> Result<(), Error> {
Ok(self.repo.pin_block(cid).await?)
}

/// Unpins a given Cid
pub async fn unpin_block(&self, cid: &Cid) -> Result<(), Error> {
Ok(self.repo.unpin_block(cid).await?)
}

/// Checks whether a given block is pinned
pub async fn is_pinned(&self, cid: &Cid) -> Result<bool, Error> {
Ok(self.repo.is_pinned(cid).await?)
}

/// Puts an ipld dag node into the ipfs repo.
pub async fn put_dag(&self, ipld: Ipld) -> Result<Cid, Error> {
Ok(self.dag.put(ipld, Codec::DagCBOR).await?)
Expand Down Expand Up @@ -696,4 +711,22 @@ mod tests {

ipfs.exit_daemon().await;
}

#[async_std::test]
async fn test_pin_and_unpin() {
let options = IpfsOptions::<TestTypes>::default();

let (ipfs, fut) = UninitializedIpfs::new(options).await.start().await.unwrap();
task::spawn(fut);

let data = ipld!([-1, -2, -3]);
let cid = ipfs.put_dag(data.clone()).await.unwrap();

ipfs.pin_block(&cid).await.unwrap();
assert!(ipfs.is_pinned(&cid).await.unwrap());
ipfs.unpin_block(&cid).await.unwrap();
assert!(!ipfs.is_pinned(&cid).await.unwrap());

ipfs.exit_daemon().await;
}
}
6 changes: 6 additions & 0 deletions src/repo/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ impl BlockStore for MemBlockStore {
#[derive(Clone, Debug)]
pub struct MemDataStore {
ipns: Arc<Mutex<HashMap<Vec<u8>, Vec<u8>>>>,
pin: Arc<Mutex<HashMap<Vec<u8>, Vec<u8>>>>,
}

#[async_trait]
impl DataStore for MemDataStore {
fn new(_path: PathBuf) -> Self {
MemDataStore {
ipns: Arc::new(Mutex::new(HashMap::new())),
pin: Arc::new(Mutex::new(HashMap::new())),
}
}

Expand All @@ -80,6 +82,7 @@ impl DataStore for MemDataStore {
async fn contains(&self, col: Column, key: &[u8]) -> Result<bool, Error> {
let map = match col {
Column::Ipns => &self.ipns,
Column::Pin => &self.pin,
};
let contains = map.lock().await.contains_key(key);
Ok(contains)
Expand All @@ -88,6 +91,7 @@ impl DataStore for MemDataStore {
async fn get(&self, col: Column, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
let map = match col {
Column::Ipns => &self.ipns,
Column::Pin => &self.pin,
};
let value = map.lock().await.get(key).map(|value| value.to_owned());
Ok(value)
Expand All @@ -96,6 +100,7 @@ impl DataStore for MemDataStore {
async fn put(&self, col: Column, key: &[u8], value: &[u8]) -> Result<(), Error> {
let map = match col {
Column::Ipns => &self.ipns,
Column::Pin => &self.pin,
};
map.lock().await.insert(key.to_owned(), value.to_owned());
Ok(())
Expand All @@ -104,6 +109,7 @@ impl DataStore for MemDataStore {
async fn remove(&self, col: Column, key: &[u8]) -> Result<(), Error> {
let map = match col {
Column::Ipns => &self.ipns,
Column::Pin => &self.pin,
};
map.lock().await.remove(key);
Ok(())
Expand Down
45 changes: 45 additions & 0 deletions src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub trait DataStore: Debug + Clone + Send + Sync + Unpin + 'static {
#[derive(Clone, Copy, Debug)]
pub enum Column {
Ipns,
Pin,
}

#[derive(Debug)]
Expand Down Expand Up @@ -168,6 +169,9 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {

/// Remove block from the block store.
pub async fn remove_block(&self, cid: &Cid) -> Result<(), Error> {
if self.is_pinned(cid).await? {
return Err(anyhow::anyhow!("block to remove is pinned"));
}
// sending only fails if the background task has exited
self.events
.clone()
Expand Down Expand Up @@ -208,6 +212,47 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
pub async fn remove_ipns(&self, ipns: &PeerId) -> Result<(), Error> {
self.data_store.remove(Column::Ipns, ipns.as_bytes()).await
}

pub async fn pin_block(&self, cid: &Cid) -> Result<(), Error> {
let pin_value = self.data_store.get(Column::Pin, &cid.to_bytes()).await?;

match pin_value {
Some(pin_count) => {
if pin_count[0] == std::u8::MAX {
return Err(anyhow::anyhow!("Block cannot be pinned more times"));
}
self.data_store
.put(Column::Pin, &cid.to_bytes(), &[pin_count[0] + 1])
.await
}
None => {
self.data_store
.put(Column::Pin, &cid.to_bytes(), &[1])
.await
}
}
}

pub async fn is_pinned(&self, cid: &Cid) -> Result<bool, Error> {
self.data_store.contains(Column::Pin, &cid.to_bytes()).await
}

pub async fn unpin_block(&self, cid: &Cid) -> Result<(), Error> {
let pin_value = self.data_store.get(Column::Pin, &cid.to_bytes()).await?;

match pin_value {
Some(pin_count) if pin_count[0] == 1 => {
self.data_store.remove(Column::Pin, &cid.to_bytes()).await
}
Some(pin_count) => {
self.data_store
.put(Column::Pin, &cid.to_bytes(), &[pin_count[0] - 1])
.await
}
// This is a no-op
None => Ok(()),
}
}
}

#[async_trait]
Expand Down