From ed82808d46a2bc825d841b490b51509231b51082 Mon Sep 17 00:00:00 2001 From: "Ryan D. Friese" Date: Mon, 26 Feb 2024 15:27:03 -0800 Subject: [PATCH] remove Clone impl fro mut iterators, sealed iterators to prevent leaking write locks --- src/array.rs | 2 +- src/array/atomic/iteration.rs | 22 +++++- src/array/generic_atomic/iteration.rs | 50 +++++++------- src/array/global_lock_atomic/iteration.rs | 54 ++++++++++++++- src/array/iterator/distributed_iterator.rs | 29 ++++++-- .../iterator/distributed_iterator/chunks.rs | 10 +-- .../distributed_iterator/consumer/collect.rs | 32 +++++++-- .../distributed_iterator/consumer/count.rs | 26 ++++++- .../distributed_iterator/consumer/for_each.rs | 68 +++++++++++++++++-- .../distributed_iterator/consumer/reduce.rs | 25 ++++++- .../distributed_iterator/consumer/sum.rs | 23 ++++++- .../distributed_iterator/enumerate.rs | 10 +++ .../iterator/distributed_iterator/filter.rs | 10 +++ .../distributed_iterator/filter_map.rs | 10 +++ .../iterator/distributed_iterator/map.rs | 10 +++ .../distributed_iterator/monotonic.rs | 10 +++ .../iterator/distributed_iterator/skip.rs | 10 +++ .../iterator/distributed_iterator/step_by.rs | 10 +++ .../iterator/distributed_iterator/take.rs | 10 +++ .../iterator/distributed_iterator/zip.rs | 20 ++++-- src/array/iterator/local_iterator.rs | 29 ++++++-- src/array/iterator/local_iterator/chunks.rs | 13 +++- .../local_iterator/consumer/collect.rs | 27 ++++++-- .../iterator/local_iterator/consumer/count.rs | 26 ++++++- .../local_iterator/consumer/for_each.rs | 39 +++++++++-- .../local_iterator/consumer/reduce.rs | 27 ++++++-- .../iterator/local_iterator/consumer/sum.rs | 23 ++++++- .../iterator/local_iterator/enumerate.rs | 10 +++ src/array/iterator/local_iterator/filter.rs | 10 +++ .../iterator/local_iterator/filter_map.rs | 10 +++ src/array/iterator/local_iterator/map.rs | 10 +++ .../iterator/local_iterator/monotonic.rs | 9 +++ src/array/iterator/local_iterator/skip.rs | 10 +++ src/array/iterator/local_iterator/step_by.rs | 10 +++ src/array/iterator/local_iterator/take.rs | 9 +++ src/array/iterator/local_iterator/zip.rs | 10 +++ src/array/iterator/mod.rs | 7 ++ src/array/local_lock_atomic/iteration.rs | 54 ++++++++++++++- src/array/local_lock_atomic/local_chunks.rs | 43 ++++++++---- src/array/native_atomic/iteration.rs | 24 ++++++- src/array/unsafe/iteration/distributed.rs | 19 ++++-- src/array/unsafe/iteration/local.rs | 17 +++-- 42 files changed, 752 insertions(+), 125 deletions(-) diff --git a/src/array.rs b/src/array.rs index b50861df..a25a58f3 100644 --- a/src/array.rs +++ b/src/array.rs @@ -786,7 +786,7 @@ pub(crate) mod private { use std::sync::Arc; #[doc(hidden)] #[enum_dispatch(LamellarReadArray,LamellarWriteArray)] - pub trait LamellarArrayPrivate { + pub trait LamellarArrayPrivate: Clone { // // fn my_pe(&self) -> usize; fn inner_array(&self) -> &UnsafeArray; fn local_as_ptr(&self) -> *const T; diff --git a/src/array/atomic/iteration.rs b/src/array/atomic/iteration.rs index f805f3ec..bdcc014f 100644 --- a/src/array/atomic/iteration.rs +++ b/src/array/atomic/iteration.rs @@ -5,7 +5,7 @@ use crate::array::iterator::distributed_iterator::{ }; use crate::array::iterator::local_iterator::{IndexedLocalIterator, LocalIterator}; use crate::array::iterator::one_sided_iterator::OneSidedIter; -use crate::array::iterator::{LamellarArrayIterators, LamellarArrayMutIterators}; +use crate::array::iterator::{private::*, LamellarArrayIterators, LamellarArrayMutIterators}; use crate::array::*; use crate::memregion::Dist; @@ -17,6 +17,16 @@ pub struct AtomicDistIter { end_i: usize, } +impl IterClone for AtomicDistIter { + fn iter_clone(&self, _: Sealed) -> Self { + AtomicDistIter { + data: self.data.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + } + } +} + impl std::fmt::Debug for AtomicDistIter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -48,6 +58,16 @@ pub struct AtomicLocalIter { end_i: usize, } +impl IterClone for AtomicLocalIter { + fn iter_clone(&self, _: Sealed) -> Self { + AtomicLocalIter { + data: self.data.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + } + } +} + impl std::fmt::Debug for AtomicLocalIter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/src/array/generic_atomic/iteration.rs b/src/array/generic_atomic/iteration.rs index 6f5bfbe1..925f32fe 100644 --- a/src/array/generic_atomic/iteration.rs +++ b/src/array/generic_atomic/iteration.rs @@ -4,7 +4,9 @@ use crate::array::iterator::distributed_iterator::{ }; use crate::array::iterator::local_iterator::{LocalIterator, LocalIteratorLauncher}; use crate::array::iterator::one_sided_iterator::OneSidedIter; -use crate::array::iterator::{LamellarArrayIterators, LamellarArrayMutIterators, Schedule}; +use crate::array::iterator::{ + private::*, LamellarArrayIterators, LamellarArrayMutIterators, Schedule, +}; use crate::array::*; use crate::memregion::Dist; // use parking_lot::{ @@ -20,6 +22,16 @@ pub struct GenericAtomicDistIter { end_i: usize, } +impl IterClone for GenericAtomicDistIter { + fn iter_clone(&self, _: Sealed) -> Self { + GenericAtomicDistIter { + data: self.data.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + } + } +} + impl std::fmt::Debug for GenericAtomicDistIter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -40,6 +52,16 @@ pub struct GenericAtomicLocalIter { end_i: usize, } +impl IterClone for GenericAtomicLocalIter { + fn iter_clone(&self, _: Sealed) -> Self { + GenericAtomicLocalIter { + data: self.data.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + } + } +} + impl std::fmt::Debug for GenericAtomicLocalIter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -52,32 +74,6 @@ impl std::fmt::Debug for GenericAtomicLocalIter { } } -// impl GenericAtomicDistIter { -// pub(crate) fn new(data: GenericAtomicArray, cur_i: usize, cnt: usize) -> Self { -// // println!("new dist iter {:?} {:? } {:?}",cur_i, cnt, cur_i+cnt); -// GenericAtomicDistIter { -// data, -// cur_i, -// end_i: cur_i + cnt, -// } -// } -// } -// impl GenericAtomicDistIter { -// pub fn for_each(&self, op: F) -> DistIterForEachHandle -// where -// F: Fn(GenericAtomicElement) + SyncSend + Clone + 'static, -// { -// self.data.clone().for_each(self, op) -// } -// pub fn for_each_async(&self, op: F) -> DistIterForEachHandle -// where -// F: Fn(GenericAtomicElement) -> Fut + SyncSend + Clone + 'static, -// Fut: Future + Send + 'static, -// { -// self.data.clone().for_each_async(self, op) -// } -// } - impl DistributedIterator for GenericAtomicDistIter { type Item = GenericAtomicElement; type Array = GenericAtomicArray; diff --git a/src/array/global_lock_atomic/iteration.rs b/src/array/global_lock_atomic/iteration.rs index 37a4c168..df71e494 100644 --- a/src/array/global_lock_atomic/iteration.rs +++ b/src/array/global_lock_atomic/iteration.rs @@ -6,7 +6,9 @@ use crate::array::iterator::local_iterator::{ IndexedLocalIterator, LocalIterator, LocalIteratorLauncher, }; use crate::array::iterator::one_sided_iterator::OneSidedIter; -use crate::array::iterator::{LamellarArrayIterators, LamellarArrayMutIterators, Schedule}; +use crate::array::iterator::{ + private::*, LamellarArrayIterators, LamellarArrayMutIterators, Schedule, +}; use crate::array::private::LamellarArrayPrivate; use crate::array::*; use crate::darc::global_rw_darc::GlobalRwDarcReadGuard; @@ -22,6 +24,18 @@ pub struct GlobalLockDistIter { _marker: PhantomData<&'static T>, } +impl IterClone for GlobalLockDistIter { + fn iter_clone(&self, _: Sealed) -> Self { + GlobalLockDistIter { + data: self.data.clone(), + lock: self.lock.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + _marker: PhantomData, + } + } +} + impl std::fmt::Debug for GlobalLockDistIter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -44,6 +58,18 @@ pub struct GlobalLockLocalIter { _marker: PhantomData<&'static T>, } +impl IterClone for GlobalLockLocalIter { + fn iter_clone(&self, _: Sealed) -> Self { + GlobalLockLocalIter { + data: self.data.clone(), + lock: self.lock.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + _marker: PhantomData, + } + } +} + impl std::fmt::Debug for GlobalLockLocalIter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -151,7 +177,6 @@ impl IndexedLocalIterator for GlobalLockLocalIter { } } -#[derive(Clone)] pub struct GlobalLockDistIterMut { data: GlobalLockArray, lock: Arc>, @@ -160,6 +185,18 @@ pub struct GlobalLockDistIterMut { _marker: PhantomData<&'static T>, } +impl IterClone for GlobalLockDistIterMut { + fn iter_clone(&self, _: Sealed) -> Self { + GlobalLockDistIterMut { + data: self.data.clone(), + lock: self.lock.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + _marker: PhantomData, + } + } +} + impl std::fmt::Debug for GlobalLockDistIterMut { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -172,7 +209,6 @@ impl std::fmt::Debug for GlobalLockDistIterMut { } } -#[derive(Clone)] pub struct GlobalLockLocalIterMut { data: GlobalLockArray, lock: Arc>, @@ -181,6 +217,18 @@ pub struct GlobalLockLocalIterMut { _marker: PhantomData<&'static T>, } +impl IterClone for GlobalLockLocalIterMut { + fn iter_clone(&self, _: Sealed) -> Self { + GlobalLockLocalIterMut { + data: self.data.clone(), + lock: self.lock.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + _marker: PhantomData, + } + } +} + impl std::fmt::Debug for GlobalLockLocalIterMut { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/src/array/iterator/distributed_iterator.rs b/src/array/iterator/distributed_iterator.rs index 4149510e..56be3e22 100644 --- a/src/array/iterator/distributed_iterator.rs +++ b/src/array/iterator/distributed_iterator.rs @@ -37,7 +37,7 @@ use take::*; pub(crate) use consumer::*; use crate::array::iterator::one_sided_iterator::OneSidedIterator; -use crate::array::iterator::{IterRequest, Schedule}; +use crate::array::iterator::{private::*, IterRequest, Schedule}; use crate::array::{ operations::ArrayOps, AtomicArray, Distribution, GenericAtomicArray, LamellarArray, LamellarArrayPut, NativeAtomicArray, TeamFrom, UnsafeArray, @@ -292,7 +292,7 @@ pub trait DistIteratorLauncher { /// The functions in this trait are available on all distributed iterators. /// Additonaly functionality can be found in the [IndexedDistributedIterator] trait: /// these methods are only available for distributed iterators where the number of elements is known in advance (e.g. after invoking `filter` these methods would be unavailable) -pub trait DistributedIterator: SyncSend + Clone + 'static { +pub trait DistributedIterator: SyncSend + IterClone + 'static { /// The type of item this distributed iterator produces type Item: Send; @@ -809,7 +809,7 @@ pub trait DistributedIterator: SyncSend + Clone + 'static { } /// An interface for dealing with distributed iterators which are indexable, meaning it returns an iterator of known length -pub trait IndexedDistributedIterator: DistributedIterator + SyncSend + Clone + 'static { +pub trait IndexedDistributedIterator: DistributedIterator + SyncSend + IterClone + 'static { /// yields the global array index along with each element /// /// # Examples @@ -938,6 +938,17 @@ pub struct DistIter<'a, T: Dist + 'static, A: LamellarArray> { _marker: PhantomData<&'a T>, } +impl<'a, T: Dist, A: LamellarArray> IterClone for DistIter<'a, T, A> { + fn iter_clone(&self, _: Sealed) -> Self { + DistIter { + data: self.data.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + _marker: PhantomData, + } + } +} + impl<'a, T: Dist, A: LamellarArray> std::fmt::Debug for DistIter<'a, T, A> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -1032,7 +1043,6 @@ impl< /// let dist_iter = array.dist_iter_mut().for_each(move |e| *e = my_pe ); /// world.block_on(dist_iter); ///``` -#[derive(Clone)] pub struct DistIterMut<'a, T: Dist, A: LamellarArray> { data: A, cur_i: usize, @@ -1040,6 +1050,17 @@ pub struct DistIterMut<'a, T: Dist, A: LamellarArray> { _marker: PhantomData<&'a T>, } +impl<'a, T: Dist, A: LamellarArray> IterClone for DistIterMut<'a, T, A> { + fn iter_clone(&self, _: Sealed) -> Self { + DistIterMut { + data: self.data.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + _marker: PhantomData, + } + } +} + impl<'a, T: Dist, A: LamellarArray> std::fmt::Debug for DistIterMut<'a, T, A> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/src/array/iterator/distributed_iterator/chunks.rs b/src/array/iterator/distributed_iterator/chunks.rs index abc0b47c..c8d9dc9f 100644 --- a/src/array/iterator/distributed_iterator/chunks.rs +++ b/src/array/iterator/distributed_iterator/chunks.rs @@ -47,7 +47,7 @@ where if self.cur_i < self.end_i { // let size = std::cmp::min(self.chunk_size, self.end_i-self.cur_i); let start_i = self.cur_i * self.chunk_size; - let iter = self.iter.clone().init(start_i, self.chunk_size); + let iter = self.iter.iter_clone(Sealed).init(start_i, self.chunk_size); // println!("new Chunk {:?} {:?} {:?} {:?}",self.cur_i, self.end_i, start_i,start_i+self.chunk_size); let chunk = Chunk { iter: iter }; self.cur_i += 1; @@ -73,7 +73,7 @@ where // Some(g_index) // } // fn subarray_index(&self, index: usize) -> Option { - // let g_index = self.iter.subarray_index(index * self.chunk_size)? / self.chunk_size; + // let g_index = self.iter.subarray_index(index * self.chunk_size)? / self.chunk_size; // // println!("enumerate index: {:?} global_index {:?}", index,g_index); // Some(g_index) // } @@ -85,11 +85,7 @@ where } } -impl IndexedDistributedIterator for Chunks -where - I: IndexedDistributedIterator, -{} - +impl IndexedDistributedIterator for Chunks where I: IndexedDistributedIterator {} #[derive(Clone)] pub struct Chunk { diff --git a/src/array/iterator/distributed_iterator/consumer/collect.rs b/src/array/iterator/distributed_iterator/consumer/collect.rs index a9ec30b4..d16e1826 100644 --- a/src/array/iterator/distributed_iterator/consumer/collect.rs +++ b/src/array/iterator/distributed_iterator/consumer/collect.rs @@ -1,7 +1,7 @@ use crate::active_messaging::{LamellarArcLocalAm, SyncSend}; use crate::array::iterator::consumer::*; use crate::array::iterator::distributed_iterator::{DistributedIterator, Monotonic}; -use crate::array::iterator::IterRequest; +use crate::array::iterator::{private::*, IterRequest}; use crate::array::operations::ArrayOps; use crate::array::{Distribution, TeamFrom, TeamInto}; use crate::lamellar_request::LamellarRequest; @@ -21,6 +21,16 @@ pub struct Collect { pub(crate) _phantom: PhantomData, } +impl IterClone for Collect { + fn iter_clone(&self, _: Sealed) -> Self { + Collect { + iter: self.iter.iter_clone(Sealed), + distribution: self.distribution.clone(), + _phantom: self._phantom.clone(), + } + } +} + impl IterConsumer for Collect where I: DistributedIterator, @@ -42,7 +52,7 @@ where } fn into_am(&self, schedule: IterSchedule) -> LamellarArcLocalAm { Arc::new(CollectAm { - iter: self.clone(), + iter: self.iter_clone(Sealed), schedule, }) } @@ -70,6 +80,16 @@ pub struct CollectAsync { pub(crate) _phantom: PhantomData<(A, B)>, } +impl IterClone for CollectAsync { + fn iter_clone(&self, _: Sealed) -> Self { + CollectAsync { + iter: self.iter.iter_clone(Sealed), + distribution: self.distribution.clone(), + _phantom: self._phantom.clone(), + } + } +} + impl IterConsumer for CollectAsync where I: DistributedIterator, @@ -92,7 +112,7 @@ where } fn into_am(&self, schedule: IterSchedule) -> LamellarArcLocalAm { Arc::new(CollectAsyncAm { - iter: self.clone(), + iter: self.iter_clone(Sealed), schedule, }) } @@ -115,7 +135,7 @@ where impl Clone for CollectAsync where - I: DistributedIterator, + I: DistributedIterator + Clone, I::Item: Future + Send + 'static, B: Dist + ArrayOps, A: for<'a> TeamFrom<(&'a Vec, Distribution)> + SyncSend + Clone + 'static, @@ -190,7 +210,7 @@ where A: for<'a> TeamFrom<(&'a Vec, Distribution)> + SyncSend + Clone + 'static, { async fn exec(&self) -> Vec { - let iter = self.schedule.init_iter(self.iter.clone()); + let iter = self.schedule.init_iter(self.iter.iter_clone(Sealed)); iter.collect::>() } } @@ -216,7 +236,7 @@ where A: for<'a> TeamFrom<(&'a Vec, Distribution)> + SyncSend + Clone + 'static, { async fn exec(&self) -> Vec<(usize, B)> { - let mut iter = self.schedule.init_iter(self.iter.clone()); + let mut iter = self.schedule.init_iter(self.iter.iter_clone(Sealed)); let mut res = vec![]; while let Some((index, elem)) = iter.next() { res.push((index, elem.await)); diff --git a/src/array/iterator/distributed_iterator/consumer/count.rs b/src/array/iterator/distributed_iterator/consumer/count.rs index 2d58ceca..6b32ab6b 100644 --- a/src/array/iterator/distributed_iterator/consumer/count.rs +++ b/src/array/iterator/distributed_iterator/consumer/count.rs @@ -1,7 +1,7 @@ use crate::active_messaging::LamellarArcLocalAm; use crate::array::iterator::consumer::*; use crate::array::iterator::distributed_iterator::DistributedIterator; -use crate::array::iterator::IterRequest; +use crate::array::iterator::{private::*, IterRequest}; use crate::lamellar_request::LamellarRequest; use crate::lamellar_team::LamellarTeamRT; use crate::scheduler::SchedulerQueue; @@ -19,6 +19,14 @@ pub struct Count { pub(crate) iter: I, } +impl IterClone for Count { + fn iter_clone(&self, _: Sealed) -> Self { + Count { + iter: self.iter.iter_clone(Sealed), + } + } +} + impl IterConsumer for Count where I: DistributedIterator, @@ -36,7 +44,7 @@ where } fn into_am(&self, schedule: IterSchedule) -> LamellarArcLocalAm { Arc::new(CountAm { - iter: self.clone(), + iter: self.iter_clone(Sealed), schedule, }) } @@ -121,13 +129,25 @@ pub(crate) struct CountAm { pub(crate) schedule: IterSchedule, } +impl IterClone for CountAm +where + I: IterClone, +{ + fn iter_clone(&self, _: Sealed) -> Self { + CountAm { + iter: self.iter.iter_clone(Sealed), + schedule: self.schedule.clone(), + } + } +} + #[lamellar_impl::rt_am_local] impl LamellarAm for CountAm where I: DistributedIterator + 'static, { async fn exec(&self) -> usize { - let mut iter = self.schedule.init_iter(self.iter.clone()); + let mut iter = self.schedule.init_iter(self.iter.iter_clone(Sealed)); let mut count: usize = 0; while let Some(_) = iter.next() { count += 1; diff --git a/src/array/iterator/distributed_iterator/consumer/for_each.rs b/src/array/iterator/distributed_iterator/consumer/for_each.rs index 4fc41c00..a45bbff9 100644 --- a/src/array/iterator/distributed_iterator/consumer/for_each.rs +++ b/src/array/iterator/distributed_iterator/consumer/for_each.rs @@ -1,7 +1,7 @@ use crate::active_messaging::{LamellarArcLocalAm, SyncSend}; use crate::array::iterator::consumer::*; use crate::array::iterator::distributed_iterator::DistributedIterator; -use crate::array::iterator::IterRequest; +use crate::array::iterator::{private::*, IterRequest}; use crate::lamellar_request::LamellarRequest; use crate::lamellar_team::LamellarTeamRT; @@ -20,6 +20,19 @@ where pub(crate) op: F, } +impl IterClone for ForEach +where + I: DistributedIterator + 'static, + F: Fn(I::Item) + SyncSend + Clone + 'static, +{ + fn iter_clone(&self, _: Sealed) -> Self { + ForEach { + iter: self.iter.iter_clone(Sealed), + op: self.op.clone(), + } + } +} + impl IterConsumer for ForEach where I: DistributedIterator + 'static, @@ -39,7 +52,7 @@ where } fn into_am(&self, schedule: IterSchedule) -> LamellarArcLocalAm { Arc::new(ForEachAm { - iter: self.clone(), + iter: self.iter_clone(Sealed), op: self.op.clone(), schedule, }) @@ -68,6 +81,20 @@ where // pub(crate) _phantom: PhantomData, } +impl IterClone for ForEachAsync +where + I: DistributedIterator + 'static, + F: Fn(I::Item) -> Fut + SyncSend + Clone + 'static, + Fut: Future + Send + 'static, +{ + fn iter_clone(&self, _: Sealed) -> Self { + ForEachAsync { + iter: self.iter.iter_clone(Sealed), + op: self.op.clone(), + } + } +} + impl IterConsumer for ForEachAsync where I: DistributedIterator + 'static, @@ -88,7 +115,7 @@ where } fn into_am(&self, schedule: IterSchedule) -> LamellarArcLocalAm { Arc::new(ForEachAsyncAm { - iter: self.clone(), + iter: self.iter_clone(Sealed), op: self.op.clone(), schedule, // _phantom: self._phantom.clone(), @@ -111,7 +138,7 @@ where impl Clone for ForEachAsync where - I: DistributedIterator + 'static, + I: DistributedIterator + Clone + 'static, F: Fn(I::Item) -> Fut + SyncSend + Clone + 'static, Fut: Future + Send + 'static, { @@ -155,6 +182,20 @@ where pub(crate) schedule: IterSchedule, } +impl IterClone for ForEachAm +where + I: DistributedIterator + 'static, + F: Fn(I::Item) + SyncSend + Clone + 'static, +{ + fn iter_clone(&self, _: Sealed) -> Self { + ForEachAm { + op: self.op.clone(), + iter: self.iter.iter_clone(Sealed), + schedule: self.schedule.clone(), + } + } +} + #[lamellar_impl::rt_am_local] impl LamellarAm for ForEachAm where @@ -163,7 +204,7 @@ where { async fn exec(&self) { // println!("foreacham: {:?}", std::thread::current().id()); - let mut iter = self.schedule.init_iter(self.iter.clone()); + let mut iter = self.schedule.init_iter(self.iter.iter_clone(Sealed)); while let Some(elem) = iter.next() { (&self.op)(elem); } @@ -183,6 +224,21 @@ where // pub(crate) _phantom: PhantomData } +impl IterClone for ForEachAsyncAm +where + I: DistributedIterator + 'static, + F: Fn(I::Item) -> Fut + SyncSend + Clone + 'static, + Fut: Future + Send + 'static, +{ + fn iter_clone(&self, _: Sealed) -> Self { + ForEachAsyncAm { + op: self.op.clone(), + iter: self.iter.iter_clone(Sealed), + schedule: self.schedule.clone(), + } + } +} + #[lamellar_impl::rt_am_local] impl LamellarAm for ForEachAsyncAm where @@ -191,7 +247,7 @@ where Fut: Future + Send + 'static, { async fn exec(&self) { - let mut iter = self.schedule.init_iter(self.iter.clone()); + let mut iter = self.schedule.init_iter(self.iter.iter_clone(Sealed)); while let Some(elem) = iter.next() { (&self.op)(elem).await; } diff --git a/src/array/iterator/distributed_iterator/consumer/reduce.rs b/src/array/iterator/distributed_iterator/consumer/reduce.rs index de7dede8..5cc8493b 100644 --- a/src/array/iterator/distributed_iterator/consumer/reduce.rs +++ b/src/array/iterator/distributed_iterator/consumer/reduce.rs @@ -2,7 +2,7 @@ use crate::active_messaging::{LamellarArcLocalAm, SyncSend}; use crate::array::iterator::consumer::*; use crate::array::iterator::distributed_iterator::DistributedIterator; use crate::array::iterator::one_sided_iterator::OneSidedIterator; -use crate::array::iterator::IterRequest; +use crate::array::iterator::{private::*, IterRequest}; use crate::array::{ArrayOps, Distribution, LamellarArray, UnsafeArray}; use crate::lamellar_request::LamellarRequest; use crate::lamellar_team::LamellarTeamRT; @@ -18,6 +18,15 @@ pub struct Reduce { pub(crate) op: F, } +impl IterClone for Reduce { + fn iter_clone(&self, _: Sealed) -> Self { + Reduce { + iter: self.iter.iter_clone(Sealed), + op: self.op.clone(), + } + } +} + impl IterConsumer for Reduce where I: DistributedIterator + 'static, @@ -38,7 +47,7 @@ where } fn into_am(&self, schedule: IterSchedule) -> LamellarArcLocalAm { Arc::new(ReduceAm { - iter: self.clone(), + iter: self.iter_clone(Sealed), op: self.op.clone(), schedule, }) @@ -120,6 +129,16 @@ pub(crate) struct ReduceAm { pub(crate) schedule: IterSchedule, } +impl IterClone for ReduceAm { + fn iter_clone(&self, _: Sealed) -> Self { + ReduceAm { + op: self.op.clone(), + iter: self.iter.iter_clone(Sealed), + schedule: self.schedule.clone(), + } + } +} + #[lamellar_impl::rt_am_local] impl LamellarAm for ReduceAm where @@ -128,7 +147,7 @@ where F: Fn(I::Item, I::Item) -> I::Item + SyncSend + Clone + 'static, { async fn exec(&self) -> Option { - let mut iter = self.schedule.init_iter(self.iter.clone()); + let mut iter = self.schedule.init_iter(self.iter.iter_clone(Sealed)); match iter.next() { Some(mut accum) => { while let Some(elem) = iter.next() { diff --git a/src/array/iterator/distributed_iterator/consumer/sum.rs b/src/array/iterator/distributed_iterator/consumer/sum.rs index ac6f8bef..14c17113 100644 --- a/src/array/iterator/distributed_iterator/consumer/sum.rs +++ b/src/array/iterator/distributed_iterator/consumer/sum.rs @@ -2,7 +2,7 @@ use crate::active_messaging::LamellarArcLocalAm; use crate::array::iterator::consumer::*; use crate::array::iterator::distributed_iterator::DistributedIterator; use crate::array::iterator::one_sided_iterator::OneSidedIterator; -use crate::array::iterator::IterRequest; +use crate::array::iterator::{private::*, IterRequest}; use crate::array::{ArrayOps, Distribution, LamellarArray, UnsafeArray}; use crate::lamellar_request::LamellarRequest; use crate::lamellar_team::LamellarTeamRT; @@ -17,6 +17,14 @@ pub struct Sum { pub(crate) iter: I, } +impl IterClone for Sum { + fn iter_clone(&self, _: Sealed) -> Self { + Sum { + iter: self.iter.iter_clone(Sealed), + } + } +} + impl IterConsumer for Sum where I: DistributedIterator + 'static, @@ -35,7 +43,7 @@ where } fn into_am(&self, schedule: IterSchedule) -> LamellarArcLocalAm { Arc::new(SumAm { - iter: self.clone(), + iter: self.iter_clone(Sealed), schedule, }) } @@ -100,6 +108,15 @@ pub(crate) struct SumAm { pub(crate) schedule: IterSchedule, } +impl IterClone for SumAm { + fn iter_clone(&self, _: Sealed) -> Self { + SumAm { + iter: self.iter.iter_clone(Sealed), + schedule: self.schedule.clone(), + } + } +} + #[lamellar_impl::rt_am_local] impl LamellarAm for SumAm where @@ -107,7 +124,7 @@ where I::Item: Dist + ArrayOps + std::iter::Sum, { async fn exec(&self) -> I::Item { - let iter = self.schedule.init_iter(self.iter.clone()); + let iter = self.schedule.init_iter(self.iter.iter_clone(Sealed)); iter.sum::() } } diff --git a/src/array/iterator/distributed_iterator/enumerate.rs b/src/array/iterator/distributed_iterator/enumerate.rs index e4228631..b4ece0d6 100644 --- a/src/array/iterator/distributed_iterator/enumerate.rs +++ b/src/array/iterator/distributed_iterator/enumerate.rs @@ -5,6 +5,16 @@ pub struct Enumerate { iter: I, cur_index: usize, } + +impl IterClone for Enumerate { + fn iter_clone(&self, _: Sealed) -> Self { + Enumerate { + iter: self.iter.iter_clone(Sealed), + cur_index: self.cur_index, + } + } +} + impl Enumerate where I: IndexedDistributedIterator, diff --git a/src/array/iterator/distributed_iterator/filter.rs b/src/array/iterator/distributed_iterator/filter.rs index 6669ba60..417a6f7d 100644 --- a/src/array/iterator/distributed_iterator/filter.rs +++ b/src/array/iterator/distributed_iterator/filter.rs @@ -5,6 +5,16 @@ pub struct Filter { iter: I, f: F, } + +impl IterClone for Filter { + fn iter_clone(&self, _: Sealed) -> Self { + Filter { + iter: self.iter.iter_clone(Sealed), + f: self.f.clone(), + } + } +} + impl Filter where I: DistributedIterator, diff --git a/src/array/iterator/distributed_iterator/filter_map.rs b/src/array/iterator/distributed_iterator/filter_map.rs index 110fcc0b..9c168799 100644 --- a/src/array/iterator/distributed_iterator/filter_map.rs +++ b/src/array/iterator/distributed_iterator/filter_map.rs @@ -5,6 +5,16 @@ pub struct FilterMap { iter: I, f: F, } + +impl IterClone for FilterMap { + fn iter_clone(&self, _: Sealed) -> Self { + FilterMap { + iter: self.iter.iter_clone(Sealed), + f: self.f.clone(), + } + } +} + impl FilterMap where I: DistributedIterator, diff --git a/src/array/iterator/distributed_iterator/map.rs b/src/array/iterator/distributed_iterator/map.rs index be46d00a..c2293d89 100644 --- a/src/array/iterator/distributed_iterator/map.rs +++ b/src/array/iterator/distributed_iterator/map.rs @@ -5,6 +5,16 @@ pub struct Map { iter: I, f: F, } + +impl IterClone for Map { + fn iter_clone(&self, _: Sealed) -> Self { + Map { + iter: self.iter.iter_clone(Sealed), + f: self.f.clone(), + } + } +} + impl Map where I: DistributedIterator, diff --git a/src/array/iterator/distributed_iterator/monotonic.rs b/src/array/iterator/distributed_iterator/monotonic.rs index 243e3085..fad72be4 100644 --- a/src/array/iterator/distributed_iterator/monotonic.rs +++ b/src/array/iterator/distributed_iterator/monotonic.rs @@ -5,6 +5,16 @@ pub struct Monotonic { iter: I, cur_index: usize, } + +impl IterClone for Monotonic { + fn iter_clone(&self, _: Sealed) -> Self { + Monotonic { + iter: self.iter.iter_clone(Sealed), + cur_index: self.cur_index, + } + } +} + impl Monotonic where I: DistributedIterator, diff --git a/src/array/iterator/distributed_iterator/skip.rs b/src/array/iterator/distributed_iterator/skip.rs index 32257aaf..281e323e 100644 --- a/src/array/iterator/distributed_iterator/skip.rs +++ b/src/array/iterator/distributed_iterator/skip.rs @@ -8,6 +8,16 @@ pub struct Skip { skip_index: usize, } +impl IterClone for Skip { + fn iter_clone(&self, _: Sealed) -> Self { + Skip { + iter: self.iter.iter_clone(Sealed), + count: self.count, + skip_index: self.skip_index, + } + } +} + impl Skip where I: IndexedDistributedIterator, diff --git a/src/array/iterator/distributed_iterator/step_by.rs b/src/array/iterator/distributed_iterator/step_by.rs index 26c3fa92..1102b0e2 100644 --- a/src/array/iterator/distributed_iterator/step_by.rs +++ b/src/array/iterator/distributed_iterator/step_by.rs @@ -8,6 +8,16 @@ pub struct StepBy { add_one: usize, //if we dont align perfectly we will need to add 1 to our iteration index calculation } +impl IterClone for StepBy { + fn iter_clone(&self, _: Sealed) -> Self { + StepBy { + iter: self.iter.iter_clone(Sealed), + step_size: self.step_size, + add_one: self.add_one, + } + } +} + impl StepBy where I: IndexedDistributedIterator, diff --git a/src/array/iterator/distributed_iterator/take.rs b/src/array/iterator/distributed_iterator/take.rs index 3f924254..b4054a30 100644 --- a/src/array/iterator/distributed_iterator/take.rs +++ b/src/array/iterator/distributed_iterator/take.rs @@ -8,6 +8,16 @@ pub struct Take { cur_index: usize, } +impl IterClone for Take { + fn iter_clone(&self, _: Sealed) -> Self { + Take { + iter: self.iter.iter_clone(Sealed), + count: self.count, + cur_index: self.cur_index, + } + } +} + impl Take where I: IndexedDistributedIterator, diff --git a/src/array/iterator/distributed_iterator/zip.rs b/src/array/iterator/distributed_iterator/zip.rs index 3a4c6533..47f2c1aa 100644 --- a/src/array/iterator/distributed_iterator/zip.rs +++ b/src/array/iterator/distributed_iterator/zip.rs @@ -5,6 +5,16 @@ pub struct Zip { a: A, b: B, } + +impl IterClone for Zip { + fn iter_clone(&self, _: Sealed) -> Self { + Zip { + a: self.a.clone(), + b: self.b.clone(), + } + } +} + impl Zip where A: IndexedDistributedIterator, @@ -81,12 +91,12 @@ where in_elems } // fn global_index(&self, index: usize) -> Option { - // let g_index = self.a.global_index(index); + // let g_index = self.a.global_index(index); // // println!("enumerate index: {:?} global_index {:?}", index,g_index); // g_index // } // fn subarray_index(&self, index: usize) -> Option { - // let g_index = self.a.subarray_index(index); + // let g_index = self.a.subarray_index(index); // // println!("enumerate index: {:?} global_index {:?}", index,g_index); // g_index // } @@ -99,9 +109,9 @@ where } } -impl IndexedDistributedIterator for Zip +impl IndexedDistributedIterator for Zip where A: IndexedDistributedIterator, B: IndexedDistributedIterator, -{} - +{ +} diff --git a/src/array/iterator/local_iterator.rs b/src/array/iterator/local_iterator.rs index 7b177843..47762323 100644 --- a/src/array/iterator/local_iterator.rs +++ b/src/array/iterator/local_iterator.rs @@ -34,7 +34,7 @@ use zip::*; pub(crate) use consumer::*; -use crate::array::iterator::Schedule; +use crate::array::iterator::{private::*, Schedule}; use crate::array::{operations::ArrayOps, AtomicArray, Distribution, LamellarArray, TeamFrom}; use crate::memregion::Dist; use crate::LamellarTeamRT; @@ -202,7 +202,7 @@ pub trait LocalIteratorLauncher { /// The functions in this trait are available on all local iterators. /// Additonaly functionality can be found in the [IndexedLocalIterator] trait: /// these methods are only available for local iterators where the number of elements is known in advance (e.g. after invoking `filter` these methods would be unavailable) -pub trait LocalIterator: SyncSend + Clone + 'static { +pub trait LocalIterator: SyncSend + IterClone + 'static { /// The type of item this distributed iterator produces type Item: Send; @@ -727,7 +727,7 @@ pub trait LocalIterator: SyncSend + Clone + 'static { } /// An interface for dealing with local iterators which are indexable, meaning it returns an iterator of known length -pub trait IndexedLocalIterator: LocalIterator + SyncSend + Clone + 'static { +pub trait IndexedLocalIterator: LocalIterator + SyncSend + IterClone + 'static { /// yields the local (to the calling PE) index along with each element /// /// # Examples @@ -973,6 +973,17 @@ pub struct LocalIter<'a, T: Dist + 'static, A: LamellarArray> { _marker: PhantomData<&'a T>, } +impl<'a, T: Dist, A: LamellarArray> IterClone for LocalIter<'a, T, A> { + fn iter_clone(&self, _: Sealed) -> Self { + LocalIter { + data: self.data.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + _marker: PhantomData, + } + } +} + impl<'a, T: Dist, A: LamellarArray> std::fmt::Debug for LocalIter<'a, T, A> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -1071,7 +1082,6 @@ impl< /// let local_iter = array.local_iter_mut().for_each(move|e| e.store(my_pe) ); /// world.block_on(local_iter); ///``` -#[derive(Clone)] pub struct LocalIterMut<'a, T: Dist, A: LamellarArray> { data: A, cur_i: usize, @@ -1079,6 +1089,17 @@ pub struct LocalIterMut<'a, T: Dist, A: LamellarArray> { _marker: PhantomData<&'a T>, } +impl<'a, T: Dist, A: LamellarArray> IterClone for LocalIterMut<'a, T, A> { + fn iter_clone(&self, _: Sealed) -> Self { + LocalIterMut { + data: self.data.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + _marker: PhantomData, + } + } +} + impl<'a, T: Dist, A: LamellarArray> std::fmt::Debug for LocalIterMut<'a, T, A> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/src/array/iterator/local_iterator/chunks.rs b/src/array/iterator/local_iterator/chunks.rs index 130f848e..0d5a07c3 100644 --- a/src/array/iterator/local_iterator/chunks.rs +++ b/src/array/iterator/local_iterator/chunks.rs @@ -8,6 +8,17 @@ pub struct Chunks { chunk_size: usize, } +impl IterClone for Chunks { + fn iter_clone(&self, _: Sealed) -> Self { + Chunks { + iter: self.iter.iter_clone(Sealed), + cur_i: self.cur_i, + end_i: self.end_i, + chunk_size: self.chunk_size, + } + } +} + impl Chunks where I: IndexedLocalIterator, @@ -43,7 +54,7 @@ where fn next(&mut self) -> Option { if self.cur_i < self.end_i { let start_i = self.cur_i * self.chunk_size; - let iter = self.iter.clone().init(start_i, self.chunk_size); + let iter = self.iter.iter_clone(Sealed).init(start_i, self.chunk_size); let chunk = Chunk { iter: iter }; self.cur_i += 1; Some(chunk) diff --git a/src/array/iterator/local_iterator/consumer/collect.rs b/src/array/iterator/local_iterator/consumer/collect.rs index 0aabcade..1484b69b 100644 --- a/src/array/iterator/local_iterator/consumer/collect.rs +++ b/src/array/iterator/local_iterator/consumer/collect.rs @@ -1,7 +1,7 @@ use crate::active_messaging::{LamellarArcLocalAm, SyncSend}; use crate::array::iterator::consumer::*; use crate::array::iterator::local_iterator::{LocalIterator, Monotonic}; -use crate::array::iterator::IterRequest; +use crate::array::iterator::{private::*, IterRequest}; use crate::array::operations::ArrayOps; use crate::array::{Distribution, TeamFrom, TeamInto}; use crate::lamellar_request::LamellarRequest; @@ -20,6 +20,16 @@ pub struct Collect { pub(crate) _phantom: PhantomData, } +impl IterClone for Collect { + fn iter_clone(&self, _: Sealed) -> Self { + Collect { + iter: self.iter.iter_clone(Sealed), + distribution: self.distribution.clone(), + _phantom: self._phantom.clone(), + } + } +} + impl IterConsumer for Collect where I: LocalIterator, @@ -42,7 +52,7 @@ where } fn into_am(&self, schedule: IterSchedule) -> LamellarArcLocalAm { Arc::new(CollectAm { - iter: self.clone(), + iter: self.iter_clone(Sealed), schedule, }) } @@ -164,6 +174,15 @@ pub(crate) struct CollectAm { pub(crate) schedule: IterSchedule, } +impl IterClone for CollectAm { + fn iter_clone(&self, _: Sealed) -> Self { + CollectAm { + iter: self.iter.iter_clone(Sealed), + schedule: self.schedule.clone(), + } + } +} + #[lamellar_impl::rt_am_local] impl LamellarAm for CollectAm where @@ -172,7 +191,7 @@ where A: for<'a> TeamFrom<(&'a Vec, Distribution)> + SyncSend + Clone + 'static, { async fn exec(&self) -> Vec { - let iter = self.schedule.init_iter(self.iter.clone()); + let iter = self.schedule.init_iter(self.iter.iter_clone(Sealed)); iter.collect::>() } } @@ -193,7 +212,7 @@ where // I::Item: Sync, // { // async fn exec(&self) -> Vec<(usize,I::Item)> { -// let mut iter = self.schedule.monotonic_iter(self.iter.clone()); +// let mut iter = self.schedule.monotonic_iter(self.iter.iter_clone(Sealed)); // let mut res = vec![]; // while let Some(elem) = iter.next(){ // res.push(elem.await); diff --git a/src/array/iterator/local_iterator/consumer/count.rs b/src/array/iterator/local_iterator/consumer/count.rs index b86d0ad8..db6c25c3 100644 --- a/src/array/iterator/local_iterator/consumer/count.rs +++ b/src/array/iterator/local_iterator/consumer/count.rs @@ -1,7 +1,7 @@ use crate::active_messaging::LamellarArcLocalAm; -use crate::array::iterator::consumer::*; use crate::array::iterator::local_iterator::LocalIterator; use crate::array::iterator::IterRequest; +use crate::array::iterator::{consumer::*, private::*}; use crate::lamellar_request::LamellarRequest; use crate::lamellar_team::LamellarTeamRT; @@ -14,6 +14,14 @@ pub struct Count { pub(crate) iter: I, } +impl IterClone for Count { + fn iter_clone(&self, _: Sealed) -> Self { + Count { + iter: self.iter.iter_clone(Sealed), + } + } +} + impl IterConsumer for Count where I: LocalIterator, @@ -31,7 +39,7 @@ where } fn into_am(&self, schedule: IterSchedule) -> LamellarArcLocalAm { Arc::new(CountAm { - iter: self.clone(), + iter: self.iter_clone(Sealed), schedule, }) } @@ -79,13 +87,25 @@ pub(crate) struct CountAm { pub(crate) schedule: IterSchedule, } +impl IterClone for CountAm +where + I: IterClone, +{ + fn iter_clone(&self, _: Sealed) -> Self { + CountAm { + iter: self.iter.iter_clone(Sealed), + schedule: self.schedule.clone(), + } + } +} + #[lamellar_impl::rt_am_local] impl LamellarAm for CountAm where I: LocalIterator + 'static, { async fn exec(&self) -> usize { - let mut iter = self.schedule.init_iter(self.iter.clone()); + let mut iter = self.schedule.init_iter(self.iter.iter_clone(Sealed)); let mut count: usize = 0; while let Some(_) = iter.next() { count += 1; diff --git a/src/array/iterator/local_iterator/consumer/for_each.rs b/src/array/iterator/local_iterator/consumer/for_each.rs index 23e4ecc8..479639c0 100644 --- a/src/array/iterator/local_iterator/consumer/for_each.rs +++ b/src/array/iterator/local_iterator/consumer/for_each.rs @@ -1,7 +1,7 @@ use crate::active_messaging::{LamellarArcLocalAm, SyncSend}; use crate::array::iterator::consumer::*; use crate::array::iterator::local_iterator::LocalIterator; -use crate::array::iterator::IterRequest; +use crate::array::iterator::{private::*, IterRequest}; use crate::lamellar_request::LamellarRequest; use crate::lamellar_team::LamellarTeamRT; @@ -20,6 +20,19 @@ where pub(crate) op: F, } +impl IterClone for ForEach +where + I: LocalIterator + 'static, + F: Fn(I::Item) + SyncSend + Clone + 'static, +{ + fn iter_clone(&self, _: Sealed) -> Self { + ForEach { + iter: self.iter.iter_clone(Sealed), + op: self.op.clone(), + } + } +} + impl IterConsumer for ForEach where I: LocalIterator + 'static, @@ -42,7 +55,7 @@ where } fn into_am(&self, schedule: IterSchedule) -> LamellarArcLocalAm { Arc::new(ForEachAm { - iter: self.clone(), + iter: self.iter_clone(Sealed), op: self.op.clone(), schedule, }) @@ -71,6 +84,20 @@ where // pub(crate) _phantom: PhantomData, } +impl IterClone for ForEachAsync +where + I: LocalIterator + 'static, + F: Fn(I::Item) -> Fut + SyncSend + Clone + 'static, + Fut: Future + Send + 'static, +{ + fn iter_clone(&self, _: Sealed) -> Self { + ForEachAsync { + iter: self.iter.iter_clone(Sealed), + op: self.op.clone(), + } + } +} + impl IterConsumer for ForEachAsync where I: LocalIterator + 'static, @@ -91,7 +118,7 @@ where } fn into_am(&self, schedule: IterSchedule) -> LamellarArcLocalAm { Arc::new(ForEachAsyncAm { - iter: self.clone(), + iter: self.iter_clone(Sealed), op: self.op.clone(), schedule, // _phantom: self._phantom.clone(), @@ -111,7 +138,7 @@ where impl Clone for ForEachAsync where - I: LocalIterator + 'static, + I: LocalIterator + Clone + 'static, F: Fn(I::Item) -> Fut + SyncSend + Clone + 'static, Fut: Future + Send + 'static, { @@ -163,7 +190,7 @@ where { async fn exec(&self) { // println!("foreacham: {:?}", std::thread::current().id()); - let mut iter = self.schedule.init_iter(self.iter.clone()); + let mut iter = self.schedule.init_iter(self.iter.iter_clone(Sealed)); while let Some(elem) = iter.next() { (&self.op)(elem); } @@ -191,7 +218,7 @@ where Fut: Future + Send + 'static, { async fn exec(&self) { - let mut iter = self.schedule.init_iter(self.iter.clone()); + let mut iter = self.schedule.init_iter(self.iter.iter_clone(Sealed)); while let Some(elem) = iter.next() { (&self.op)(elem).await; } diff --git a/src/array/iterator/local_iterator/consumer/reduce.rs b/src/array/iterator/local_iterator/consumer/reduce.rs index f928a2c2..51d4d71b 100644 --- a/src/array/iterator/local_iterator/consumer/reduce.rs +++ b/src/array/iterator/local_iterator/consumer/reduce.rs @@ -1,7 +1,7 @@ use crate::active_messaging::{LamellarArcLocalAm, SyncSend}; use crate::array::iterator::consumer::*; use crate::array::iterator::local_iterator::LocalIterator; -use crate::array::iterator::IterRequest; +use crate::array::iterator::{private::*, IterRequest}; use crate::lamellar_request::LamellarRequest; use crate::lamellar_team::LamellarTeamRT; @@ -15,6 +15,15 @@ pub struct Reduce { pub(crate) op: F, } +impl IterClone for Reduce { + fn iter_clone(&self, _: Sealed) -> Self { + Reduce { + iter: self.iter.iter_clone(Sealed), + op: self.op.clone(), + } + } +} + impl IterConsumer for Reduce where I: LocalIterator + 'static, @@ -35,7 +44,7 @@ where } fn into_am(&self, schedule: IterSchedule) -> LamellarArcLocalAm { Arc::new(ReduceAm { - iter: self.clone(), + iter: self.iter_clone(Sealed), op: self.op.clone(), schedule, }) @@ -124,6 +133,16 @@ pub(crate) struct ReduceAm { pub(crate) schedule: IterSchedule, } +impl IterClone for ReduceAm { + fn iter_clone(&self, _: Sealed) -> Self { + ReduceAm { + op: self.op.clone(), + iter: self.iter.iter_clone(Sealed), + schedule: self.schedule.clone(), + } + } +} + #[lamellar_impl::rt_am_local] impl LamellarAm for ReduceAm where @@ -132,7 +151,7 @@ where F: Fn(I::Item, I::Item) -> I::Item + SyncSend + Clone + 'static, { async fn exec(&self) -> Option { - let mut iter = self.schedule.init_iter(self.iter.clone()); + let mut iter = self.schedule.init_iter(self.iter.iter_clone(Sealed)); match iter.next() { Some(mut accum) => { while let Some(elem) = iter.next() { @@ -162,7 +181,7 @@ where // Fut: Future + SyncSend + Clone + 'static, // { // async fn exec(&self) -> Option { -// let mut iter = self.schedule.init_iter(self.iter.clone()); +// let mut iter = self.schedule.init_iter(self.iter.iter_clone(Sealed)); // let mut accum = iter.next(); // while let Some(elem) = iter.next() { // accum = Some((self.op)(accum.unwrap(), elem).await); diff --git a/src/array/iterator/local_iterator/consumer/sum.rs b/src/array/iterator/local_iterator/consumer/sum.rs index 18ed88ec..076d99bf 100644 --- a/src/array/iterator/local_iterator/consumer/sum.rs +++ b/src/array/iterator/local_iterator/consumer/sum.rs @@ -1,7 +1,7 @@ use crate::active_messaging::{LamellarArcLocalAm, SyncSend}; use crate::array::iterator::consumer::*; use crate::array::iterator::local_iterator::LocalIterator; -use crate::array::iterator::IterRequest; +use crate::array::iterator::{private::*, IterRequest}; use crate::lamellar_request::LamellarRequest; use crate::lamellar_team::LamellarTeamRT; @@ -14,6 +14,14 @@ pub(crate) struct Sum { pub(crate) iter: I, } +impl IterClone for Sum { + fn iter_clone(&self, _: Sealed) -> Self { + Sum { + iter: self.iter.iter_clone(Sealed), + } + } +} + impl IterConsumer for Sum where I: LocalIterator + 'static, @@ -32,7 +40,7 @@ where } fn into_am(&self, schedule: IterSchedule) -> LamellarArcLocalAm { Arc::new(SumAm { - iter: self.clone(), + iter: self.iter_clone(Sealed), schedule, }) } @@ -80,6 +88,15 @@ pub(crate) struct SumAm { pub(crate) schedule: IterSchedule, } +impl IterClone for SumAm { + fn iter_clone(&self, _: Sealed) -> Self { + SumAm { + iter: self.iter.iter_clone(Sealed), + schedule: self.schedule.clone(), + } + } +} + #[lamellar_impl::rt_am_local] impl LamellarAm for SumAm where @@ -87,7 +104,7 @@ where I::Item: SyncSend + std::iter::Sum, { async fn exec(&self) -> Option { - let iter = self.schedule.init_iter(self.iter.clone()); + let iter = self.schedule.init_iter(self.iter.iter_clone(Sealed)); iter.sum::() } } diff --git a/src/array/iterator/local_iterator/enumerate.rs b/src/array/iterator/local_iterator/enumerate.rs index 4ea7325a..ef100b84 100644 --- a/src/array/iterator/local_iterator/enumerate.rs +++ b/src/array/iterator/local_iterator/enumerate.rs @@ -5,6 +5,16 @@ pub struct Enumerate { iter: I, cur_index: usize, } + +impl IterClone for Enumerate { + fn iter_clone(&self, _: Sealed) -> Self { + Enumerate { + iter: self.iter.iter_clone(Sealed), + cur_index: self.cur_index, + } + } +} + impl Enumerate where I: IndexedLocalIterator, diff --git a/src/array/iterator/local_iterator/filter.rs b/src/array/iterator/local_iterator/filter.rs index c74f7d06..b13ae8f2 100644 --- a/src/array/iterator/local_iterator/filter.rs +++ b/src/array/iterator/local_iterator/filter.rs @@ -5,6 +5,16 @@ pub struct Filter { iter: I, f: F, } + +impl IterClone for Filter { + fn iter_clone(&self, _: Sealed) -> Self { + Filter { + iter: self.iter.iter_clone(Sealed), + f: self.f.clone(), + } + } +} + impl Filter where I: LocalIterator, diff --git a/src/array/iterator/local_iterator/filter_map.rs b/src/array/iterator/local_iterator/filter_map.rs index 59216e39..d8e8fb5f 100644 --- a/src/array/iterator/local_iterator/filter_map.rs +++ b/src/array/iterator/local_iterator/filter_map.rs @@ -5,6 +5,16 @@ pub struct FilterMap { iter: I, f: F, } + +impl IterClone for FilterMap { + fn iter_clone(&self, _: Sealed) -> Self { + FilterMap { + iter: self.iter.iter_clone(Sealed), + f: self.f.clone(), + } + } +} + impl FilterMap where I: LocalIterator, diff --git a/src/array/iterator/local_iterator/map.rs b/src/array/iterator/local_iterator/map.rs index 2d094e57..4f7e4b21 100644 --- a/src/array/iterator/local_iterator/map.rs +++ b/src/array/iterator/local_iterator/map.rs @@ -5,6 +5,16 @@ pub struct Map { iter: I, f: F, } + +impl IterClone for Map { + fn iter_clone(&self, _: Sealed) -> Self { + Map { + iter: self.iter.iter_clone(Sealed), + f: self.f.clone(), + } + } +} + impl Map where I: LocalIterator, diff --git a/src/array/iterator/local_iterator/monotonic.rs b/src/array/iterator/local_iterator/monotonic.rs index 07235e58..5415bd94 100644 --- a/src/array/iterator/local_iterator/monotonic.rs +++ b/src/array/iterator/local_iterator/monotonic.rs @@ -5,6 +5,15 @@ pub struct Monotonic { iter: I, cur_index: usize, } + +impl IterClone for Monotonic { + fn iter_clone(&self, _: Sealed) -> Self { + Monotonic { + iter: self.iter.iter_clone(Sealed), + cur_index: self.cur_index, + } + } +} impl Monotonic where I: LocalIterator, diff --git a/src/array/iterator/local_iterator/skip.rs b/src/array/iterator/local_iterator/skip.rs index 3b18f6ac..f1f32094 100644 --- a/src/array/iterator/local_iterator/skip.rs +++ b/src/array/iterator/local_iterator/skip.rs @@ -8,6 +8,16 @@ pub struct Skip { skip_offset: usize, } +impl IterClone for Skip { + fn iter_clone(&self, _: Sealed) -> Self { + Skip { + iter: self.iter.iter_clone(Sealed), + skip_count: self.skip_count, + skip_offset: self.skip_offset, + } + } +} + impl Skip where I: IndexedLocalIterator, diff --git a/src/array/iterator/local_iterator/step_by.rs b/src/array/iterator/local_iterator/step_by.rs index f64ad0a7..6d080dd7 100644 --- a/src/array/iterator/local_iterator/step_by.rs +++ b/src/array/iterator/local_iterator/step_by.rs @@ -8,6 +8,16 @@ pub struct StepBy { add_one: usize, //if we dont align perfectly we will need to add 1 to our iteration index calculation } +impl IterClone for StepBy { + fn iter_clone(&self, _: Sealed) -> Self { + StepBy { + iter: self.iter.iter_clone(Sealed), + step_size: self.step_size, + add_one: self.add_one, + } + } +} + impl StepBy where I: IndexedLocalIterator, diff --git a/src/array/iterator/local_iterator/take.rs b/src/array/iterator/local_iterator/take.rs index 0f522214..97de9e9e 100644 --- a/src/array/iterator/local_iterator/take.rs +++ b/src/array/iterator/local_iterator/take.rs @@ -7,6 +7,15 @@ pub struct Take { take_count: usize, } +impl IterClone for Take { + fn iter_clone(&self, _: Sealed) -> Self { + Take { + iter: self.iter.iter_clone(Sealed), + take_count: self.take_count, + } + } +} + impl Take where I: IndexedLocalIterator, diff --git a/src/array/iterator/local_iterator/zip.rs b/src/array/iterator/local_iterator/zip.rs index 61947b81..61cad7f3 100644 --- a/src/array/iterator/local_iterator/zip.rs +++ b/src/array/iterator/local_iterator/zip.rs @@ -5,6 +5,16 @@ pub struct Zip { a: A, b: B, } + +impl IterClone for Zip { + fn iter_clone(&self, _: Sealed) -> Self { + Zip { + a: self.a.iter_clone(Sealed), + b: self.b.iter_clone(Sealed), + } + } +} + impl Zip where A: IndexedLocalIterator, diff --git a/src/array/iterator/mod.rs b/src/array/iterator/mod.rs index 62cb0113..a172db5c 100644 --- a/src/array/iterator/mod.rs +++ b/src/array/iterator/mod.rs @@ -19,6 +19,13 @@ pub trait IterRequest { fn wait(self: Box) -> Self::Output; } +pub(crate) mod private { + pub struct Sealed; + pub trait IterClone: Sized { + fn iter_clone(&self, _: Sealed) -> Self; + } +} + /// The Schedule type controls how elements of a LamellarArray are distributed to threads when /// calling `for_each_with_schedule` on a local or distributed iterator. /// diff --git a/src/array/local_lock_atomic/iteration.rs b/src/array/local_lock_atomic/iteration.rs index 6de62980..c231e4e4 100644 --- a/src/array/local_lock_atomic/iteration.rs +++ b/src/array/local_lock_atomic/iteration.rs @@ -5,7 +5,9 @@ use crate::array::iterator::local_iterator::{ IndexedLocalIterator, LocalIterator, LocalIteratorLauncher, }; use crate::array::iterator::one_sided_iterator::OneSidedIter; -use crate::array::iterator::{LamellarArrayIterators, LamellarArrayMutIterators, Schedule}; +use crate::array::iterator::{ + private::*, LamellarArrayIterators, LamellarArrayMutIterators, Schedule, +}; use crate::array::local_lock_atomic::*; use crate::array::private::LamellarArrayPrivate; use crate::array::*; @@ -26,6 +28,18 @@ pub struct LocalLockDistIter<'a, T: Dist> { _marker: PhantomData<&'a T>, } +impl<'a, T: Dist> IterClone for LocalLockDistIter<'a, T> { + fn iter_clone(&self, _: Sealed) -> Self { + LocalLockDistIter { + data: self.data.clone(), + lock: self.lock.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + _marker: PhantomData, + } + } +} + impl<'a, T: Dist> std::fmt::Debug for LocalLockDistIter<'a, T> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -48,6 +62,18 @@ pub struct LocalLockLocalIter<'a, T: Dist> { _marker: PhantomData<&'a T>, } +impl<'a, T: Dist> IterClone for LocalLockLocalIter<'a, T> { + fn iter_clone(&self, _: Sealed) -> Self { + LocalLockLocalIter { + data: self.data.clone(), + lock: self.lock.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + _marker: PhantomData, + } + } +} + impl<'a, T: Dist> std::fmt::Debug for LocalLockLocalIter<'a, T> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -155,7 +181,6 @@ impl IndexedLocalIterator for LocalLockLocalIter<'static, T> } } -#[derive(Clone)] pub struct LocalLockDistIterMut<'a, T: Dist> { data: LocalLockArray, lock: Arc>>, @@ -164,6 +189,18 @@ pub struct LocalLockDistIterMut<'a, T: Dist> { _marker: PhantomData<&'a T>, } +impl<'a, T: Dist> IterClone for LocalLockDistIterMut<'a, T> { + fn iter_clone(&self, _: Sealed) -> Self { + LocalLockDistIterMut { + data: self.data.clone(), + lock: self.lock.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + _marker: PhantomData, + } + } +} + impl<'a, T: Dist> std::fmt::Debug for LocalLockDistIterMut<'a, T> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -176,7 +213,6 @@ impl<'a, T: Dist> std::fmt::Debug for LocalLockDistIterMut<'a, T> { } } -#[derive(Clone)] pub struct LocalLockLocalIterMut<'a, T: Dist> { data: LocalLockArray, lock: Arc>>, @@ -185,6 +221,18 @@ pub struct LocalLockLocalIterMut<'a, T: Dist> { _marker: PhantomData<&'a T>, } +impl<'a, T: Dist> IterClone for LocalLockLocalIterMut<'a, T> { + fn iter_clone(&self, _: Sealed) -> Self { + LocalLockLocalIterMut { + data: self.data.clone(), + lock: self.lock.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + _marker: PhantomData, + } + } +} + impl<'a, T: Dist> std::fmt::Debug for LocalLockLocalIterMut<'a, T> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/src/array/local_lock_atomic/local_chunks.rs b/src/array/local_lock_atomic/local_chunks.rs index 9640f552..1f5d41d1 100644 --- a/src/array/local_lock_atomic/local_chunks.rs +++ b/src/array/local_lock_atomic/local_chunks.rs @@ -1,23 +1,13 @@ -use crate::array::iterator::local_iterator::{ - IndexedLocalIterator, LocalIterator, LocalIteratorLauncher, -}; -use crate::array::iterator::{LamellarArrayIterators, LamellarArrayMutIterators, Schedule}; +use crate::array::iterator::local_iterator::{IndexedLocalIterator, LocalIterator}; +use crate::array::iterator::private::*; use crate::array::local_lock_atomic::*; -use crate::array::{operations::ArrayOps, AtomicArray, Distribution, LamellarArray, TeamFrom}; +use crate::array::LamellarArray; use crate::memregion::Dist; -use crate::LamellarTeamRT; -use crate::active_messaging::SyncSend; - -use enum_dispatch::enum_dispatch; -use futures::Future; -use std::marker::PhantomData; -use std::pin::Pin; use std::sync::Arc; #[derive(Clone)] pub struct LocalLockLocalChunks { - // data: &'a [T], chunk_size: usize, index: usize, //global index within the array local data end_index: usize, //global index within the array local data @@ -26,7 +16,19 @@ pub struct LocalLockLocalChunks { lock_guard: Arc>>, } -#[derive(Clone)] +impl IterClone for LocalLockLocalChunks { + fn iter_clone(&self, _: Sealed) -> Self { + LocalLockLocalChunks { + chunk_size: self.chunk_size, + index: self.index, + end_index: self.end_index, + array: self.array.clone(), + lock: self.lock.clone(), + lock_guard: self.lock_guard.clone(), + } + } +} + pub struct LocalLockLocalChunksMut { // data: &'a mut [T], chunk_size: usize, @@ -37,6 +39,19 @@ pub struct LocalLockLocalChunksMut { lock_guard: Arc>>, } +impl IterClone for LocalLockLocalChunksMut { + fn iter_clone(&self, _: Sealed) -> Self { + LocalLockLocalChunksMut { + chunk_size: self.chunk_size, + index: self.index, + end_index: self.end_index, + array: self.array.clone(), + lock: self.lock.clone(), + lock_guard: self.lock_guard.clone(), + } + } +} + #[derive(Debug)] pub struct LocalLockMutChunkLocalData<'a, T: Dist> { data: &'a mut [T], diff --git a/src/array/native_atomic/iteration.rs b/src/array/native_atomic/iteration.rs index b1775322..7c9fb394 100644 --- a/src/array/native_atomic/iteration.rs +++ b/src/array/native_atomic/iteration.rs @@ -3,7 +3,9 @@ use crate::array::iterator::distributed_iterator::{ }; use crate::array::iterator::local_iterator::{LocalIterator, LocalIteratorLauncher}; use crate::array::iterator::one_sided_iterator::OneSidedIter; -use crate::array::iterator::{LamellarArrayIterators, LamellarArrayMutIterators, Schedule}; +use crate::array::iterator::{ + private::*, LamellarArrayIterators, LamellarArrayMutIterators, Schedule, +}; use crate::array::native_atomic::*; // use crate::array::private::LamellarArrayPrivate; use crate::array::*; @@ -21,6 +23,16 @@ pub struct NativeAtomicDistIter { end_i: usize, } +impl IterClone for NativeAtomicDistIter { + fn iter_clone(&self, _: Sealed) -> Self { + NativeAtomicDistIter { + data: self.data.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + } + } +} + impl std::fmt::Debug for NativeAtomicDistIter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( @@ -41,6 +53,16 @@ pub struct NativeAtomicLocalIter { end_i: usize, } +impl IterClone for NativeAtomicLocalIter { + fn iter_clone(&self, _: Sealed) -> Self { + NativeAtomicLocalIter { + data: self.data.clone(), + cur_i: self.cur_i, + end_i: self.end_i, + } + } +} + impl std::fmt::Debug for NativeAtomicLocalIter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/src/array/unsafe/iteration/distributed.rs b/src/array/unsafe/iteration/distributed.rs index a677969a..fc351d04 100644 --- a/src/array/unsafe/iteration/distributed.rs +++ b/src/array/unsafe/iteration/distributed.rs @@ -1,5 +1,6 @@ use crate::active_messaging::SyncSend; use crate::array::iterator::distributed_iterator::*; +use crate::array::iterator::private::*; use crate::array::r#unsafe::UnsafeArray; use crate::array::{ArrayOps, Distribution, LamellarArray, TeamFrom}; @@ -57,7 +58,7 @@ impl DistIteratorLauncher for UnsafeArray { F: Fn(I::Item) + SyncSend + Clone + 'static, { let for_each = ForEach { - iter: iter.clone(), + iter: iter.iter_clone(Sealed), op, }; self.barrier(); @@ -91,7 +92,7 @@ impl DistIteratorLauncher for UnsafeArray { Fut: Future + Send + 'static, { let for_each = ForEachAsync { - iter: iter.clone(), + iter: iter.iter_clone(Sealed), op, }; self.barrier(); @@ -125,7 +126,7 @@ impl DistIteratorLauncher for UnsafeArray { F: Fn(I::Item, I::Item) -> I::Item + SyncSend + Clone + 'static, { let reduce = Reduce { - iter: iter.clone(), + iter: iter.iter_clone(Sealed), op, }; match sched { @@ -158,7 +159,7 @@ impl DistIteratorLauncher for UnsafeArray { A: for<'a> TeamFrom<(&'a Vec, Distribution)> + SyncSend + Clone + 'static, { let collect = Collect { - iter: iter.clone().monotonic(), + iter: iter.iter_clone(Sealed).monotonic(), distribution: d, _phantom: PhantomData, }; @@ -198,7 +199,7 @@ impl DistIteratorLauncher for UnsafeArray { A: for<'a> TeamFrom<(&'a Vec, Distribution)> + SyncSend + Clone + 'static, { let collect = CollectAsync { - iter: iter.clone().monotonic(), + iter: iter.iter_clone(Sealed).monotonic(), distribution: d, _phantom: PhantomData, }; @@ -226,7 +227,9 @@ impl DistIteratorLauncher for UnsafeArray { where I: DistributedIterator + 'static, { - let count = Count { iter: iter.clone() }; + let count = Count { + iter: iter.iter_clone(Sealed), + }; match sched { Schedule::Static => self.sched_static(count), Schedule::Dynamic => self.sched_dynamic(count), @@ -253,7 +256,9 @@ impl DistIteratorLauncher for UnsafeArray { I: DistributedIterator + 'static, I::Item: Dist + ArrayOps + std::iter::Sum, { - let sum = Sum { iter: iter.clone() }; + let sum = Sum { + iter: iter.iter_clone(Sealed), + }; match sched { Schedule::Static => self.sched_static(sum), Schedule::Dynamic => self.sched_dynamic(sum), diff --git a/src/array/unsafe/iteration/local.rs b/src/array/unsafe/iteration/local.rs index 1ad136ee..267c7901 100644 --- a/src/array/unsafe/iteration/local.rs +++ b/src/array/unsafe/iteration/local.rs @@ -1,5 +1,6 @@ use crate::active_messaging::SyncSend; use crate::array::iterator::local_iterator::*; +use crate::array::iterator::private::*; use crate::array::r#unsafe::UnsafeArray; use crate::array::{ArrayOps, Distribution, TeamFrom}; @@ -49,7 +50,7 @@ impl LocalIteratorLauncher for UnsafeArray { F: Fn(I::Item) + SyncSend + Clone + 'static, { let for_each = ForEach { - iter: iter.clone(), + iter: iter.iter_clone(Sealed), op, }; match sched { @@ -82,7 +83,7 @@ impl LocalIteratorLauncher for UnsafeArray { Fut: Future + Send + 'static, { let for_each = ForEachAsync { - iter: iter.clone(), + iter: iter.iter_clone(Sealed), op: op.clone(), }; match sched { @@ -115,7 +116,7 @@ impl LocalIteratorLauncher for UnsafeArray { F: Fn(I::Item, I::Item) -> I::Item + SyncSend + Clone + 'static, { let reduce = Reduce { - iter: iter.clone(), + iter: iter.iter_clone(Sealed), op, }; match sched { @@ -179,7 +180,7 @@ impl LocalIteratorLauncher for UnsafeArray { A: for<'a> TeamFrom<(&'a Vec, Distribution)> + SyncSend + Clone + 'static, { let collect = Collect { - iter: iter.clone().monotonic(), + iter: iter.iter_clone(Sealed).monotonic(), distribution: d, _phantom: PhantomData, }; @@ -238,7 +239,9 @@ impl LocalIteratorLauncher for UnsafeArray { where I: LocalIterator + 'static, { - let count = Count { iter: iter.clone() }; + let count = Count { + iter: iter.iter_clone(Sealed), + }; match sched { Schedule::Static => self.sched_static(count), Schedule::Dynamic => self.sched_dynamic(count), @@ -265,7 +268,9 @@ impl LocalIteratorLauncher for UnsafeArray { I: LocalIterator + 'static, I::Item: SyncSend + std::iter::Sum, { - let sum = Sum { iter: iter.clone() }; + let sum = Sum { + iter: iter.iter_clone(Sealed), + }; match sched { Schedule::Static => self.sched_static(sum), Schedule::Dynamic => self.sched_dynamic(sum),