Skip to content

Commit

Permalink
remove Clone impl fro mut iterators, sealed iterators to prevent leak…
Browse files Browse the repository at this point in the history
…ing write locks
  • Loading branch information
rdfriese committed Feb 26, 2024
1 parent 2293879 commit ed82808
Show file tree
Hide file tree
Showing 42 changed files with 752 additions and 125 deletions.
2 changes: 1 addition & 1 deletion src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ pub(crate) mod private {
use std::sync::Arc;
#[doc(hidden)]
#[enum_dispatch(LamellarReadArray<T>,LamellarWriteArray<T>)]
pub trait LamellarArrayPrivate<T: Dist> {
pub trait LamellarArrayPrivate<T: Dist>: Clone {
// // fn my_pe(&self) -> usize;
fn inner_array(&self) -> &UnsafeArray<T>;
fn local_as_ptr(&self) -> *const T;
Expand Down
22 changes: 21 additions & 1 deletion src/array/atomic/iteration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::array::iterator::distributed_iterator::{
};
use crate::array::iterator::local_iterator::{IndexedLocalIterator, LocalIterator};
use crate::array::iterator::one_sided_iterator::OneSidedIter;
use crate::array::iterator::{LamellarArrayIterators, LamellarArrayMutIterators};
use crate::array::iterator::{private::*, LamellarArrayIterators, LamellarArrayMutIterators};
use crate::array::*;
use crate::memregion::Dist;

Expand All @@ -17,6 +17,16 @@ pub struct AtomicDistIter<T: Dist> {
end_i: usize,
}

impl<T: Dist> IterClone for AtomicDistIter<T> {
fn iter_clone(&self, _: Sealed) -> Self {
AtomicDistIter {
data: self.data.clone(),
cur_i: self.cur_i,
end_i: self.end_i,
}
}
}

impl<T: Dist> std::fmt::Debug for AtomicDistIter<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down Expand Up @@ -48,6 +58,16 @@ pub struct AtomicLocalIter<T: Dist> {
end_i: usize,
}

impl<T: Dist> IterClone for AtomicLocalIter<T> {
fn iter_clone(&self, _: Sealed) -> Self {
AtomicLocalIter {
data: self.data.clone(),
cur_i: self.cur_i,
end_i: self.end_i,
}
}
}

impl<T: Dist> std::fmt::Debug for AtomicLocalIter<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down
50 changes: 23 additions & 27 deletions src/array/generic_atomic/iteration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use crate::array::iterator::distributed_iterator::{
};
use crate::array::iterator::local_iterator::{LocalIterator, LocalIteratorLauncher};
use crate::array::iterator::one_sided_iterator::OneSidedIter;
use crate::array::iterator::{LamellarArrayIterators, LamellarArrayMutIterators, Schedule};
use crate::array::iterator::{
private::*, LamellarArrayIterators, LamellarArrayMutIterators, Schedule,
};
use crate::array::*;
use crate::memregion::Dist;
// use parking_lot::{
Expand All @@ -20,6 +22,16 @@ pub struct GenericAtomicDistIter<T: Dist> {
end_i: usize,
}

impl<T: Dist> IterClone for GenericAtomicDistIter<T> {
fn iter_clone(&self, _: Sealed) -> Self {
GenericAtomicDistIter {
data: self.data.clone(),
cur_i: self.cur_i,
end_i: self.end_i,
}
}
}

impl<T: Dist> std::fmt::Debug for GenericAtomicDistIter<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand All @@ -40,6 +52,16 @@ pub struct GenericAtomicLocalIter<T: Dist> {
end_i: usize,
}

impl<T: Dist> IterClone for GenericAtomicLocalIter<T> {
fn iter_clone(&self, _: Sealed) -> Self {
GenericAtomicLocalIter {
data: self.data.clone(),
cur_i: self.cur_i,
end_i: self.end_i,
}
}
}

impl<T: Dist> std::fmt::Debug for GenericAtomicLocalIter<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand All @@ -52,32 +74,6 @@ impl<T: Dist> std::fmt::Debug for GenericAtomicLocalIter<T> {
}
}

// impl<T: Dist> GenericAtomicDistIter<T> {
// pub(crate) fn new(data: GenericAtomicArray<T>, cur_i: usize, cnt: usize) -> Self {
// // println!("new dist iter {:?} {:? } {:?}",cur_i, cnt, cur_i+cnt);
// GenericAtomicDistIter {
// data,
// cur_i,
// end_i: cur_i + cnt,
// }
// }
// }
// impl<T: Dist + 'static> GenericAtomicDistIter<T> {
// pub fn for_each<F>(&self, op: F) -> DistIterForEachHandle
// where
// F: Fn(GenericAtomicElement<T>) + SyncSend + Clone + 'static,
// {
// self.data.clone().for_each(self, op)
// }
// pub fn for_each_async<F, Fut>(&self, op: F) -> DistIterForEachHandle
// where
// F: Fn(GenericAtomicElement<T>) -> Fut + SyncSend + Clone + 'static,
// Fut: Future<Output = ()> + Send + 'static,
// {
// self.data.clone().for_each_async(self, op)
// }
// }

impl<T: Dist> DistributedIterator for GenericAtomicDistIter<T> {
type Item = GenericAtomicElement<T>;
type Array = GenericAtomicArray<T>;
Expand Down
54 changes: 51 additions & 3 deletions src/array/global_lock_atomic/iteration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ use crate::array::iterator::local_iterator::{
IndexedLocalIterator, LocalIterator, LocalIteratorLauncher,
};
use crate::array::iterator::one_sided_iterator::OneSidedIter;
use crate::array::iterator::{LamellarArrayIterators, LamellarArrayMutIterators, Schedule};
use crate::array::iterator::{
private::*, LamellarArrayIterators, LamellarArrayMutIterators, Schedule,
};
use crate::array::private::LamellarArrayPrivate;
use crate::array::*;
use crate::darc::global_rw_darc::GlobalRwDarcReadGuard;
Expand All @@ -22,6 +24,18 @@ pub struct GlobalLockDistIter<T: Dist> {
_marker: PhantomData<&'static T>,
}

impl<T: Dist> IterClone for GlobalLockDistIter<T> {
fn iter_clone(&self, _: Sealed) -> Self {
GlobalLockDistIter {
data: self.data.clone(),
lock: self.lock.clone(),
cur_i: self.cur_i,
end_i: self.end_i,
_marker: PhantomData,
}
}
}

impl<T: Dist> std::fmt::Debug for GlobalLockDistIter<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand All @@ -44,6 +58,18 @@ pub struct GlobalLockLocalIter<T: Dist> {
_marker: PhantomData<&'static T>,
}

impl<T: Dist> IterClone for GlobalLockLocalIter<T> {
fn iter_clone(&self, _: Sealed) -> Self {
GlobalLockLocalIter {
data: self.data.clone(),
lock: self.lock.clone(),
cur_i: self.cur_i,
end_i: self.end_i,
_marker: PhantomData,
}
}
}

impl<T: Dist> std::fmt::Debug for GlobalLockLocalIter<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down Expand Up @@ -151,7 +177,6 @@ impl<T: Dist + 'static> IndexedLocalIterator for GlobalLockLocalIter<T> {
}
}

#[derive(Clone)]
pub struct GlobalLockDistIterMut<T: Dist> {
data: GlobalLockArray<T>,
lock: Arc<GlobalRwDarcCollectiveWriteGuard<()>>,
Expand All @@ -160,6 +185,18 @@ pub struct GlobalLockDistIterMut<T: Dist> {
_marker: PhantomData<&'static T>,
}

impl<T: Dist> IterClone for GlobalLockDistIterMut<T> {
fn iter_clone(&self, _: Sealed) -> Self {
GlobalLockDistIterMut {
data: self.data.clone(),
lock: self.lock.clone(),
cur_i: self.cur_i,
end_i: self.end_i,
_marker: PhantomData,
}
}
}

impl<T: Dist> std::fmt::Debug for GlobalLockDistIterMut<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand All @@ -172,7 +209,6 @@ impl<T: Dist> std::fmt::Debug for GlobalLockDistIterMut<T> {
}
}

#[derive(Clone)]
pub struct GlobalLockLocalIterMut<T: Dist> {
data: GlobalLockArray<T>,
lock: Arc<GlobalRwDarcWriteGuard<()>>,
Expand All @@ -181,6 +217,18 @@ pub struct GlobalLockLocalIterMut<T: Dist> {
_marker: PhantomData<&'static T>,
}

impl<T: Dist> IterClone for GlobalLockLocalIterMut<T> {
fn iter_clone(&self, _: Sealed) -> Self {
GlobalLockLocalIterMut {
data: self.data.clone(),
lock: self.lock.clone(),
cur_i: self.cur_i,
end_i: self.end_i,
_marker: PhantomData,
}
}
}

impl<T: Dist> std::fmt::Debug for GlobalLockLocalIterMut<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down
29 changes: 25 additions & 4 deletions src/array/iterator/distributed_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use take::*;
pub(crate) use consumer::*;

use crate::array::iterator::one_sided_iterator::OneSidedIterator;
use crate::array::iterator::{IterRequest, Schedule};
use crate::array::iterator::{private::*, IterRequest, Schedule};
use crate::array::{
operations::ArrayOps, AtomicArray, Distribution, GenericAtomicArray, LamellarArray,
LamellarArrayPut, NativeAtomicArray, TeamFrom, UnsafeArray,
Expand Down Expand Up @@ -292,7 +292,7 @@ pub trait DistIteratorLauncher {
/// The functions in this trait are available on all distributed iterators.
/// Additonaly functionality can be found in the [IndexedDistributedIterator] trait:
/// these methods are only available for distributed iterators where the number of elements is known in advance (e.g. after invoking `filter` these methods would be unavailable)
pub trait DistributedIterator: SyncSend + Clone + 'static {
pub trait DistributedIterator: SyncSend + IterClone + 'static {
/// The type of item this distributed iterator produces
type Item: Send;

Expand Down Expand Up @@ -809,7 +809,7 @@ pub trait DistributedIterator: SyncSend + Clone + 'static {
}

/// An interface for dealing with distributed iterators which are indexable, meaning it returns an iterator of known length
pub trait IndexedDistributedIterator: DistributedIterator + SyncSend + Clone + 'static {
pub trait IndexedDistributedIterator: DistributedIterator + SyncSend + IterClone + 'static {
/// yields the global array index along with each element
///
/// # Examples
Expand Down Expand Up @@ -938,6 +938,17 @@ pub struct DistIter<'a, T: Dist + 'static, A: LamellarArray<T>> {
_marker: PhantomData<&'a T>,
}

impl<'a, T: Dist, A: LamellarArray<T>> IterClone for DistIter<'a, T, A> {
fn iter_clone(&self, _: Sealed) -> Self {
DistIter {
data: self.data.clone(),
cur_i: self.cur_i,
end_i: self.end_i,
_marker: PhantomData,
}
}
}

impl<'a, T: Dist, A: LamellarArray<T>> std::fmt::Debug for DistIter<'a, T, A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down Expand Up @@ -1032,14 +1043,24 @@ impl<
/// let dist_iter = array.dist_iter_mut().for_each(move |e| *e = my_pe );
/// world.block_on(dist_iter);
///```
#[derive(Clone)]
pub struct DistIterMut<'a, T: Dist, A: LamellarArray<T>> {
data: A,
cur_i: usize,
end_i: usize,
_marker: PhantomData<&'a T>,
}

impl<'a, T: Dist, A: LamellarArray<T>> IterClone for DistIterMut<'a, T, A> {
fn iter_clone(&self, _: Sealed) -> Self {
DistIterMut {
data: self.data.clone(),
cur_i: self.cur_i,
end_i: self.end_i,
_marker: PhantomData,
}
}
}

impl<'a, T: Dist, A: LamellarArray<T>> std::fmt::Debug for DistIterMut<'a, T, A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down
10 changes: 3 additions & 7 deletions src/array/iterator/distributed_iterator/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ where
if self.cur_i < self.end_i {
// let size = std::cmp::min(self.chunk_size, self.end_i-self.cur_i);
let start_i = self.cur_i * self.chunk_size;
let iter = self.iter.clone().init(start_i, self.chunk_size);
let iter = self.iter.iter_clone(Sealed).init(start_i, self.chunk_size);
// println!("new Chunk {:?} {:?} {:?} {:?}",self.cur_i, self.end_i, start_i,start_i+self.chunk_size);
let chunk = Chunk { iter: iter };
self.cur_i += 1;
Expand All @@ -73,7 +73,7 @@ where
// Some(g_index)
// }
// fn subarray_index(&self, index: usize) -> Option<usize> {
// let g_index = self.iter.subarray_index(index * self.chunk_size)? / self.chunk_size;
// let g_index = self.iter.subarray_index(index * self.chunk_size)? / self.chunk_size;
// // println!("enumerate index: {:?} global_index {:?}", index,g_index);
// Some(g_index)
// }
Expand All @@ -85,11 +85,7 @@ where
}
}

impl<I> IndexedDistributedIterator for Chunks<I>
where
I: IndexedDistributedIterator,
{}

impl<I> IndexedDistributedIterator for Chunks<I> where I: IndexedDistributedIterator {}

#[derive(Clone)]
pub struct Chunk<I> {
Expand Down
Loading

0 comments on commit ed82808

Please sign in to comment.