Skip to content

Commit

Permalink
Merge branch 'develop' into release/MAPL-v3
Browse files Browse the repository at this point in the history
  • Loading branch information
mathomp4 committed Oct 10, 2024
2 parents 327e6a6 + 5c484f2 commit 525aa8a
Show file tree
Hide file tree
Showing 15 changed files with 62 additions and 43 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Trapped more errors from Extdata's i-server

### Removed

### Deprecated
Expand Down
4 changes: 2 additions & 2 deletions gridcomps/ExtData/ExtDataGridCompMod.F90
Original file line number Diff line number Diff line change
Expand Up @@ -1475,8 +1475,8 @@ SUBROUTINE Run_ ( GC, IMPORT, EXPORT, CLOCK, rc )
_VERIFY(STATUS)
call MAPL_TimerOn(MAPLSTATE,"---IclientDone")

call i_Clients%done_collective_prefetch()
call i_Clients%wait()
call i_Clients%done_collective_prefetch(_RC)
call i_Clients%wait(_RC)

call MAPL_TimerOff(MAPLSTATE,"---IclientDone")
_VERIFY(STATUS)
Expand Down
4 changes: 2 additions & 2 deletions gridcomps/ExtData2G/ExtDataGridCompNG.F90
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,8 @@ SUBROUTINE Run_ ( GC, IMPORT, EXPORT, CLOCK, rc )
call MAPL_TimerOff(MAPLSTATE,"---prefetch")
call MAPL_TimerOn(MAPLSTATE,"---IclientDone")

call i_Clients%done_collective_prefetch()
call i_Clients%wait()
call i_Clients%done_collective_prefetch(_RC)
call i_Clients%wait(_RC)

call MAPL_TimerOff(MAPLSTATE,"---IclientDone")

Expand Down
4 changes: 2 additions & 2 deletions griddedio/FieldBundleRead.F90
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ subroutine MAPL_read_bundle(bundle,file_tmpl,time,only_vars,regrid_method,noread
end if
call cfio%request_data_from_file(trim(file_name),timeindex=time_index,rc=status)
_VERIFY(status)
call i_clients%done_collective_prefetch()
call i_clients%wait()
call i_clients%done_collective_prefetch(_RC)
call i_clients%wait(_RC)
call cfio%process_data_from_file(rc=status)
_VERIFY(status)

Expand Down
2 changes: 1 addition & 1 deletion griddedio/GriddedIO.F90
Original file line number Diff line number Diff line change
Expand Up @@ -1247,7 +1247,7 @@ subroutine request_data_from_file(this,filename,timeindex,rc)
end if
call i_Clients%collective_prefetch_data( &
this%read_collection_id, fileName, trim(names(i)), &
& ref, start=localStart, global_start=globalStart, global_count=globalCount)
& ref, start=localStart, global_start=globalStart, global_count=globalCount, _RC)
deallocate(localStart,globalStart,globalCount)
enddo
deallocate(gridLocalStart,gridGlobalStart,gridGlobalCount)
Expand Down
2 changes: 1 addition & 1 deletion griddedio/TileIO.F90
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ subroutine request_data_from_file(this,filename,timeindex,rc)
end if
ref = ArrayReference(this%tile_buffer(i)%ptr)
call i_clients%collective_prefetch_data(this%read_collection_id, filename, trim(names(i)), ref, &
start=local_start, global_start=global_start, global_count = global_count)
start=local_start, global_start=global_start, global_count = global_count, _RC)
deallocate(local_start,global_start,global_count)
else
_FAIL("rank >1 tile fields not supported")
Expand Down
8 changes: 5 additions & 3 deletions pfio/AbstractServer.F90
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ subroutine start(this, rc)
integer, optional, intent(out) :: rc
end subroutine start

subroutine clear_RequestHandle(this)
subroutine clear_RequestHandle(this, rc)
import AbstractServer
class(AbstractServer),target,intent(inout) :: this
integer, optional, intent(out) :: rc
end subroutine clear_RequestHandle

subroutine set_collective_request(this, request, have_done)
Expand Down Expand Up @@ -224,7 +225,7 @@ subroutine update_status(this, rc)
! status ==0, means the last server thread in the backlog

call this%clear_DataReference()
call this%clear_RequestHandle()
call this%clear_RequestHandle(_RC)
call this%set_status(UNALLOCATED)
call this%set_AllBacklogIsEmpty(.true.)

Expand All @@ -248,11 +249,12 @@ subroutine clean_up(this, rc)
class(AbstractServer), target, intent(inout) :: this
integer, optional, intent(out) :: rc
type(StringInteger64MapIterator) :: iter
integer :: status

if (associated(ioserver_profiler)) call ioserver_profiler%start("clean_up")

call this%clear_DataReference()
call this%clear_RequestHandle()
Call this%clear_RequestHandle(_RC)
call this%set_AllBacklogIsEmpty(.true.)
this%serverthread_done_msgs(:) = .false.

Expand Down
10 changes: 5 additions & 5 deletions pfio/BaseServer.F90
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,19 @@ function get_dmessage(this, rc) result(dmessage)
_RETURN(_SUCCESS)
end function

subroutine clear_RequestHandle(this)
subroutine clear_RequestHandle(this, rc)
class (BaseServer), target, intent(inout) :: this
integer, optional, intent(out):: rc
class(ServerThread), pointer :: thread_ptr
integer :: i,n

integer :: i,n, status

n = this%threads%size()

do i = 1, n
thread_ptr => this%threads%at(i)
call thread_ptr%clear_RequestHandle()
call thread_ptr%clear_RequestHandle(_RC)
enddo

_RETURN(_SUCCESS)

end subroutine clear_RequestHandle

Expand Down
30 changes: 17 additions & 13 deletions pfio/ClientManager.F90
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,11 @@ subroutine shake_hand(this, unusable, rc)
class (ClientManager), intent(inout) :: this
class (KeywordEnforcer), optional, intent(out) :: unusable
integer, optional, intent(out) :: rc

integer :: status
class (ClientThread), pointer :: clientPtr

clientPtr =>this%current()
call clientPtr%shake_hand()
call clientPtr%shake_hand(_RC)

_RETURN(_SUCCESS)
_UNUSED_DUMMY(unusable)
Expand All @@ -357,11 +357,12 @@ subroutine done_prefetch(this, unusable, rc)
class (ClientManager), intent(inout) :: this
class (KeywordEnforcer), optional, intent(out) :: unusable
integer, optional, intent(out) :: rc
integer :: status

class (ClientThread), pointer :: clientPtr

clientPtr =>this%current()
call clientPtr%done_prefetch()
call clientPtr%done_prefetch(_RC)

_RETURN(_SUCCESS)
_UNUSED_DUMMY(unusable)
Expand All @@ -371,11 +372,11 @@ subroutine done_collective_prefetch(this, unusable, rc)
class (ClientManager), intent(inout) :: this
class (KeywordEnforcer), optional, intent(out) :: unusable
integer, optional, intent(out) :: rc

integer :: status
class (ClientThread), pointer :: clientPtr

clientPtr =>this%current()
call clientPtr%done_collective_prefetch()
call clientPtr%done_collective_prefetch(_RC)

_RETURN(_SUCCESS)
_UNUSED_DUMMY(unusable)
Expand All @@ -385,11 +386,11 @@ subroutine done_stage(this, unusable, rc)
class (ClientManager), intent(inout) :: this
class (KeywordEnforcer), optional, intent(out) :: unusable
integer, optional, intent(out) :: rc

integer :: status
class (ClientThread), pointer :: clientPtr

clientPtr =>this%current()
call clientPtr%done_stage()
call clientPtr%done_stage(_RC)

_RETURN(_SUCCESS)
_UNUSED_DUMMY(unusable)
Expand All @@ -414,11 +415,11 @@ subroutine wait(this, unusable, rc)
class (ClientManager), target, intent(inout) :: this
class (KeywordEnforcer), optional, intent(out) :: unusable
integer, optional, intent(out) :: rc

integer :: status
class (ClientThread), pointer :: clientPtr

clientPtr =>this%current()
call clientPtr%wait_all()
call clientPtr%wait_all(_RC)

_RETURN(_SUCCESS)
_UNUSED_DUMMY(unusable)
Expand All @@ -429,10 +430,11 @@ subroutine post_wait(this, unusable, rc)
class (KeywordEnforcer), optional, intent(out) :: unusable
integer, optional, intent(out) :: rc

integer :: status
class (ClientThread), pointer :: clientPtr

clientPtr =>this%current()
call clientPtr%post_wait_all()
call clientPtr%post_wait_all(_RC)

_RETURN(_SUCCESS)
_UNUSED_DUMMY(unusable)
Expand All @@ -443,12 +445,13 @@ subroutine terminate(this, unusable, rc)
class (KeywordEnforcer), optional, intent(out) :: unusable
integer, optional, intent(out) :: rc

integer :: status
class (ClientThread), pointer :: clientPtr
integer :: i

do i = 1, this%size()
clientPtr =>this%clients%at(i)
call clientPtr%wait_all()
call clientPtr%wait_all(_RC)
call clientPtr%terminate()
enddo

Expand Down Expand Up @@ -492,6 +495,7 @@ subroutine set_optimal_server(this,nwriting,unusable,rc)
integer :: Cuttoff, ssize, lsize, tsize, ith
integer, allocatable :: nwritings_order(:)
real :: l_ratio, s_ratio
integer :: status

! if there is no "small" pool, then there is no "large" pool either, just get next
ssize = this%small_server_pool%size()
Expand All @@ -500,7 +504,7 @@ subroutine set_optimal_server(this,nwriting,unusable,rc)

if (ssize == 0) then
call this%next()
call this%wait()
call this%wait(_RC)
_RETURN(_SUCCESS)
endif

Expand Down Expand Up @@ -553,7 +557,7 @@ subroutine set_optimal_server(this,nwriting,unusable,rc)
nwritings_small(1:ssize-1) = nwritings_small(2:ssize)
nwritings_small(ssize) = nwriting
end if
call this%wait()
call this%wait(_RC)

_RETURN(_SUCCESS)
_UNUSED_DUMMY(unusable)
Expand Down
22 changes: 15 additions & 7 deletions pfio/ClientThread.F90
Original file line number Diff line number Diff line change
Expand Up @@ -461,32 +461,40 @@ subroutine done_collective_stage(this, rc)
_RETURN(_SUCCESS)
end subroutine done_collective_stage

subroutine wait(this, request_id)
subroutine wait(this, request_id, rc)
use pFIO_AbstractRequestHandleMod
class (ClientThread), target, intent(inout) :: this
integer, intent(in) :: request_id
integer, optional, intent(out) :: rc
integer :: status
class(AbstractRequestHandle), pointer :: handle

handle => this%get_RequestHandle(request_id)
call handle%wait()
call handle%data_reference%deallocate()
call this%erase_RequestHandle(request_id)

_RETURN(_SUCCESS)

end subroutine wait

subroutine wait_all(this)
subroutine wait_all(this, rc)
use pFIO_AbstractRequestHandleMod
class (ClientThread), target, intent(inout) :: this

call this%clear_RequestHandle()
integer, optional, intent(out) :: rc
integer:: status
call this%clear_RequestHandle(_RC)
!call this%shake_hand()
_RETURN(_SUCCESS)

end subroutine wait_all

subroutine post_wait_all(this)
subroutine post_wait_all(this, rc)
use pFIO_AbstractRequestHandleMod
class (ClientThread), target, intent(inout) :: this
call this%wait_all()
integer, optional, intent(out):: rc
integer :: status
call this%wait_all(_RC)
_RETURN(_SUCCESS)
end subroutine post_wait_all

integer function get_unique_request_id(this) result(request_id)
Expand Down
3 changes: 2 additions & 1 deletion pfio/FastClientThread.F90
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,10 @@ function stage_nondistributed_data(this, collection_id, file_name, var_name, dat
end function stage_nondistributed_data

! The data has been copied out and post no wait after isend
subroutine post_wait_all(this)
subroutine post_wait_all(this, rc)
use pFIO_AbstractRequestHandleMod
class (FastClientThread), target, intent(inout) :: this
integer, optional, intent(out) :: rc
! do nothing on purpose
_UNUSED_DUMMY(this)
end subroutine post_wait_all
Expand Down
2 changes: 1 addition & 1 deletion pfio/MultiCommServer.F90
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ subroutine clean_up(this, rc)


call this%clear_DataReference()
call this%clear_RequestHandle()
call this%clear_RequestHandle(_RC)
call this%set_AllBacklogIsEmpty(.true.)
this%serverthread_done_msgs(:) = .false.

Expand Down
3 changes: 2 additions & 1 deletion pfio/MultiGroupServer.F90
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ subroutine clean_up(this, rc)
class(MultiGroupServer), target, intent(inout) :: this
integer, optional, intent(out) :: rc
integer :: num_clients, n
integer :: status
class (ServerThread),pointer :: thread_ptr

if (this%front_Comm == MPI_COMM_NULL) then
Expand All @@ -289,7 +290,7 @@ subroutine clean_up(this, rc)
call thread_ptr%clear_hist_collections()
enddo ! threads

call this%clear_RequestHandle()
call this%clear_RequestHandle(_RC)
call this%set_AllBacklogIsEmpty(.true.)
this%serverthread_done_msgs(:) = .false.

Expand Down
4 changes: 2 additions & 2 deletions pfio/ServerThread.F90
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ recursive subroutine handle_Done_stage(this, message, rc)
iter = this%request_backlog%begin()
enddo

call this%clear_RequestHandle()
call this%clear_RequestHandle(_RC)
call this%clear_hist_collections()

_RETURN(_SUCCESS)
Expand Down Expand Up @@ -1068,7 +1068,7 @@ recursive subroutine handle_Done_prefetch(this, message, rc)
iter = this%request_backlog%begin()
enddo

call this%clear_RequestHandle()
call this%clear_RequestHandle(_RC)

_RETURN(_SUCCESS)
_UNUSED_DUMMY(message)
Expand Down
5 changes: 3 additions & 2 deletions pfio/tests/MockClientThread.F90
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,18 @@ function new_MockClientThread(sckt) result(c)
if(present(sckt)) call c%set_connection(sckt)
end function new_MockClientThread

subroutine wait(this, request_id)
subroutine wait(this, request_id, rc)
use pFIO_AbstractRequestHandleMod
class (MockClientThread), target, intent(inout) :: this
integer, intent(in) :: request_id
integer, optional, intent(out) :: rc
class(AbstractRequestHandle), pointer :: handle

this%counter = this%counter + 1
handle => this%get_RequestHandle(request_id)
call handle%wait()
call this%erase_RequestHandle(request_id)

_RETURN(_SUCCESS)
end subroutine wait

end module pFIO_MockClientThreadMod

0 comments on commit 525aa8a

Please sign in to comment.