Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GPU-Aware Communication #208

Draft
wants to merge 113 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
113 commits
Select commit Hold shift + click to select a range
d8ef62c
Start editing benchmark
ZwFink Dec 28, 2020
6804a96
Creation of standalone GPU pingpong file
ZwFink Dec 30, 2020
28203e1
Added CPU-only pingpong
ZwFink Dec 30, 2020
681129c
Added correct print format, pinned memory now used for staging host m…
ZwFink Dec 30, 2020
5fdfee3
Stage script
ZwFink Dec 30, 2020
5412634
Reset changes to pingpong
ZwFink Dec 30, 2020
2d4b2c4
Unify API between the CPU/GPU benchmarks
ZwFink Dec 30, 2020
76c50c7
Add macro test
ZwFink Jan 4, 2021
805db5f
Merge branch 'feature-gpu_direct' of https://github.com/UIUC-PPL/char…
ZwFink Jan 4, 2021
ba4a9f1
add methods for Charm++ CUDA interface
ZwFink Jan 6, 2021
3f34671
sender-side GPU direct
ZwFink Jan 9, 2021
4048015
add method to get GPU data
ZwFink Jan 12, 2021
fc535bb
add methods to support receiver-side GPU Direct
ZwFink Jan 12, 2021
67ed75b
hook up to Charm++ GPUDirect functionality
ZwFink Jan 15, 2021
0f8bf70
hooks into lib
ZwFink Jan 15, 2021
18b4e13
fix syntax error
ZwFink Jan 15, 2021
22297d2
fix libcharm call
ZwFink Jan 17, 2021
0b316d6
gpu_recv_bufs now correctly retrieved
ZwFink Jan 17, 2021
01a31d0
fix incorrect check for CUDA data
ZwFink Jan 17, 2021
683a1c0
add more API calls
ZwFink Jan 17, 2021
91a1737
add call to register future deposit, add size of ck device buffer in …
ZwFink Jan 17, 2021
e012722
WIP
ZwFink Jan 17, 2021
fa20725
fix datatype passed to CkGetGPUDirectData
ZwFink Jan 18, 2021
a952d25
Temporary fix for requiring libmpi.so
minitu Jan 18, 2021
266b19f
Remove break from GPU pingpong benchmark
minitu Jan 19, 2021
25d3cce
fix datatype passed to CkGetGPUDirectData
ZwFink Jan 18, 2021
663ac5a
Debugging, fixed future ID and passing pointers to CkGetGPUDirectData
minitu Jan 20, 2021
9000ec3
call local charm object, not charm remote when depositing GPU recv fu…
ZwFink Jan 20, 2021
baa180b
Remove debugging print statements
minitu Jan 21, 2021
1061a11
put the pong back in the benchmark
ZwFink Jan 21, 2021
a6c3684
Separate send and recv buffers in GPU pingpong
minitu Jan 21, 2021
bc5e3ad
Merge remote-tracking branch 'origin/jchoi/feature-gpu_direct' into f…
ZwFink Jan 22, 2021
3d2fd15
use pinned memory by default, add warmup iterations
ZwFink Jan 23, 2021
3677738
remove warmup for only the first size
ZwFink Jan 23, 2021
aac9651
send, recv can now have the pointers set directly
ZwFink Jan 23, 2021
8e1c02f
add calls to CkArraySendWithDeviceDataFromPointers when applicable
ZwFink Jan 23, 2021
7100495
update DirectCopy API so buffer info is not always gathered
ZwFink Jan 23, 2021
61603ad
update CkGetGPUDirectData to use future value
ZwFink Jan 23, 2021
30a5340
GPUDirect buffers can now come from device pointers
ZwFink Jan 23, 2021
882eb98
add benchmark with new address optimization
ZwFink Jan 23, 2021
4be40c9
update low/high iter breakpoint threshold according to experimental m…
ZwFink Jan 24, 2021
8ef56e5
add hooks to cuda copy functions
ZwFink Jan 25, 2021
f058ab3
don't use slow Numba transfer functionality when using host-staging
ZwFink Jan 25, 2021
6cad70b
host-staging bandwidth test
ZwFink Jan 25, 2021
9e0ff31
add gpudirect bw test
ZwFink Jan 25, 2021
1752446
remove comment
ZwFink Jan 26, 2021
9582b2a
fix benchmark for non gpu-direct
ZwFink Jan 26, 2021
6f95507
separate channels for data/ack
ZwFink Jan 26, 2021
c62c621
fix indentation of partner_ack
ZwFink Jan 26, 2021
99d4398
initialize jacobi object
ZwFink Jan 26, 2021
121076d
add CLI arguments
ZwFink Jan 26, 2021
b202bab
create file containing cuda kernels
ZwFink Jan 26, 2021
d22cdfb
initialized a few kernels
ZwFink Jan 26, 2021
bc82174
include time as well
ZwFink Jan 26, 2021
a4ba3cf
include boundary, pack kernels
ZwFink Jan 27, 2021
7652d80
invoke pack/unpack kernels
ZwFink Jan 27, 2021
8a7253e
use integer division
ZwFink Jan 27, 2021
0d614be
add program initialization
ZwFink Jan 27, 2021
cf97f37
globals are now broadcast to chares
ZwFink Jan 27, 2021
9cdd749
fix Block import, turn args into vars
ZwFink Jan 27, 2021
123d54f
enum members no longer tuple
ZwFink Jan 27, 2021
731c96a
enum members no longer tuple
ZwFink Jan 27, 2021
cc07901
fix some names, print statements
ZwFink Jan 27, 2021
29baab2
add default types for arguments
ZwFink Jan 27, 2021
fbd08cc
fix python syntax errors
ZwFink Jan 27, 2021
b5d7bff
include chare file
ZwFink Jan 27, 2021
e4338c8
fix spelling, corrected tuple access
ZwFink Jan 27, 2021
6e07660
add chare initialization
ZwFink Jan 27, 2021
b2d9c6e
add neighbor channel initialization
ZwFink Jan 27, 2021
457a9fa
finish impl
ZwFink Jan 27, 2021
a65c2dd
chares now run
ZwFink Jan 27, 2021
e5c6aa0
fix mispelled var
ZwFink Jan 27, 2021
a4dbe3c
correct ghost info now received, output runtime info
ZwFink Jan 28, 2021
b8eca90
Fix wrong assertion
minitu Jan 28, 2021
60a250c
use charmlib hooks for copying
ZwFink Jan 28, 2021
b700cba
temporary hard-code stream 0
ZwFink Jan 28, 2021
475540a
Charm4py Jacobi3D: Update timer outputs, make default iter 100
minitu Jan 28, 2021
004e0b4
Merge branch 'feature-gpu_direct' into jchoi/feature-gpu_direct
minitu Jan 28, 2021
c0e5398
Charm4py Jacobi3D: Revert to old host-staging method to avoid errors
minitu Jan 28, 2021
91be5a6
Revert back host-staging mechanism
minitu Jan 28, 2021
3069ce2
Fix recvGhost to work with more than 2 processes
minitu Jan 28, 2021
12483a0
Fix chare dimension calculation error
minitu Jan 28, 2021
b02d6d7
Add scripts
minitu Jan 28, 2021
20913a9
Update Charm4py script, need to use source activate instead of conda …
minitu Jan 28, 2021
581dae9
Add iwait_map function to fix the issue
ZwFink Mar 17, 2021
bab3d20
include np and greenlet
ZwFink Mar 17, 2021
65ad611
update to use charm.iwait_map
ZwFink Mar 31, 2021
69dd6f5
Fix charm.iwait_map bug
ZwFink Mar 31, 2021
304a452
include bug fix for charm.iwait_map
ZwFink Mar 31, 2021
afeaa6b
no need for channel assertion
ZwFink Mar 31, 2021
959b038
Merge branch 'bug-iwait_apifix' into feature-gpu_direct
ZwFink Mar 31, 2021
0f34c1a
correctly differentiate between instances when GPU data sent with oth…
ZwFink Apr 21, 2021
92f2cba
numpy arrays can now be sent with GPU-direct data
ZwFink Apr 21, 2021
760e716
removed debug print
ZwFink Apr 22, 2021
c254008
no need for different function when buffers + gpu arrays sent in same…
ZwFink Apr 22, 2021
75f358d
tests sending one device array
ZwFink Apr 22, 2021
7ee684e
flake8 compliance
ZwFink Apr 22, 2021
4f84baa
fixed hangup when multiple device arrays are sent
ZwFink Apr 22, 2021
49c82a1
remove comment
ZwFink Apr 22, 2021
f9ffaf6
change != None to is not None
ZwFink Apr 22, 2021
46851e6
tests for multiple arrays
ZwFink Apr 22, 2021
e730942
sizes should be int to match charm++ side
ZwFink Apr 24, 2021
491de28
streams supported at charm4py layer
ZwFink Apr 26, 2021
fbcef04
use streams when provided
ZwFink Apr 26, 2021
f7d9083
device send function array-specific
ZwFink Apr 26, 2021
aae2ff1
let CkArraySendWithDeviceDataFromPointers determine number of buffers
ZwFink May 18, 2021
abf27e3
add general slower case for getting gpu pointers/sizes from memoryviews
ZwFink May 18, 2021
8188d29
use Charm++ functionality for GPU-direct group support
ZwFink May 19, 2021
895e619
Groups can now use GPU-direct functionality
ZwFink May 19, 2021
0abd443
test both arrays and groups
ZwFink May 19, 2021
53d35f2
make post/src buffers more kwargs more general
ZwFink May 20, 2021
52856ad
update tests with new kwarg names
ZwFink May 20, 2021
ba3e95c
update calls to match new API
ZwFink May 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions charm4py/channel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from .threads import LocalFuture
from .charm import charm
import time


class Channel(object):
Expand Down Expand Up @@ -52,20 +54,55 @@ def ready(self):
def waitReady(self, f):
self.wait_ready = f

def send(self, *msg):
def send(self, *msg, **kwargs):
if not self.established:
self.established_fut = LocalFuture()
self.established_fut.get()
self.setEstablished()
self.remote._channelRecv__(self.remote_port, self.send_seqno, *msg)
self.remote._channelRecv__(self.remote_port, self.send_seqno, *msg, **kwargs)
self.send_seqno = (self.send_seqno + 1) % CHAN_BUF_SIZE

def recv(self):
def recv(self, *post_buffers, post_addresses = None, post_sizes = None, stream_ptrs = None):
if self.recv_seqno in self.data:
ret = self.data.pop(self.recv_seqno)
else:
self.recv_fut = LocalFuture()
ret = self.recv_fut.get()
self.recv_fut = None
self.recv_seqno = (self.recv_seqno + 1) % CHAN_BUF_SIZE

if post_buffers:
if isinstance(ret, tuple):
gpu_recv_bufs = ret[-1]
ret = ret[0:-1]
if len(ret) == 1:
ret = ret[0]
else:
gpu_recv_bufs = ret

assert len(post_buffers) == len(gpu_recv_bufs)

recv_future = charm.getGPUDirectData(post_buffers, gpu_recv_bufs, stream_ptrs)
recv_future.get()
elif post_addresses is not None:
if isinstance(ret, tuple):
gpu_recv_bufs = ret[-1]
ret = ret[0:-1]
if len(ret) == 1:
ret = ret[0]
else:
gpu_recv_bufs = ret

assert len(post_addresses) == len(gpu_recv_bufs)
assert post_sizes
recv_future = charm.getGPUDirectDataFromAddresses(post_addresses, post_sizes, gpu_recv_bufs, stream_ptrs)
recv_future.get()


return ret






60 changes: 55 additions & 5 deletions charm4py/chare.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,9 @@ def proxy_entry_method(proxy, *args, **kwargs):
for i in range(num_args, argcount):
argname = argnames[i]
# first look for argument in kwargs
# TODO: Should stream_ptrs be skipped?
if argname in {'stream_ptrs', 'src_ptrs', 'src_sizes'}:
continue
if argname in kwargs:
args.append(kwargs[argname])
else:
Expand All @@ -485,8 +488,28 @@ def proxy_entry_method(proxy, *args, **kwargs):
gid = proxy.gid
if Options.local_msg_optim and (elemIdx == charm._myPe) and (len(args) > 0):
destObj = charm.groups[gid]
msg = charm.packMsg(destObj, args, header)
charm.CkGroupSend(gid, elemIdx, ep, msg)
should_pack_gpu = True
if 'src_ptrs' in kwargs:
should_pack_gpu = False
msg = charm.packMsg(destObj, args, header, pack_gpu=should_pack_gpu)
if msg[1] or not should_pack_gpu:
if 'stream_ptrs' in kwargs:
stream_ptrs = kwargs['stream_ptrs']
else:
stream_ptrs = None
if should_pack_gpu:
charm.CkGroupSendWithDeviceData(gid, elemIdx, ep,
msg, stream_ptrs
)
else:
charm.CkGroupSendWithDeviceDataFromPointers(gid, elemIdx, ep,
msg, kwargs['src_ptrs'],
kwargs['src_sizes'],
stream_ptrs
)

else:
charm.CkGroupSend(gid, elemIdx, ep, msg)
else:
root, sid = proxy.section
header[b'sid'] = sid
Expand Down Expand Up @@ -721,7 +744,10 @@ def proxy_entry_method(proxy, *args, **kwargs):
for i in range(num_args, argcount):
argname = argnames[i]
# first look for argument in kwargs
if argname in kwargs:
# TODO: Should stream_ptrs be skipped?
if argname in {'stream_ptrs', 'src_ptrs', 'src_sizes'}:
continue
if argname in kwargs and argname:
args.append(kwargs[argname])
else:
# if not there, see if there is a default value
Expand All @@ -741,15 +767,39 @@ def proxy_entry_method(proxy, *args, **kwargs):
if elemIdx == ():
header[b'bcast'] = True
if not proxy.issec or elemIdx != ():
# TODO: Check that this is channel proxy method?
destObj = None
aid = proxy.aid
if Options.local_msg_optim and (len(args) > 0):
array = charm.arrays[aid]
if elemIdx in array:
destObj = array[elemIdx]
msg = charm.packMsg(destObj, args, header)
charm.CkArraySend(aid, elemIdx, ep, msg)
should_pack_gpu = True
if 'src_ptrs' in kwargs:
should_pack_gpu = False
msg = charm.packMsg(destObj, args, header, pack_gpu = should_pack_gpu)
if msg[1] or not should_pack_gpu:
if 'stream_ptrs' in kwargs:
stream_ptrs = kwargs['stream_ptrs']
else:
stream_ptrs = None
if should_pack_gpu:
charm.CkArraySendWithDeviceData(aid, elemIdx, ep,
msg, stream_ptrs
)
else:
charm.CkArraySendWithDeviceDataFromPointers(aid, elemIdx, ep,
msg, kwargs['src_ptrs'],
kwargs['src_sizes'],
stream_ptrs
)



else:
charm.CkArraySend(aid, elemIdx, ep, msg)
else:
# TODO: Error if trying to send ZC data
root, sid = proxy.section
header[b'sid'] = sid
if Options.local_msg_optim and root == charm._myPe:
Expand Down
137 changes: 136 additions & 1 deletion charm4py/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from . import reduction
from . import wait
import array
import numpy as np
import greenlet
try:
import numpy
except ImportError:
Expand All @@ -40,6 +42,14 @@ class NumpyDummy:
def SECTION_ALL(obj):
return 0

def getDeviceDataInfo(devArray):
return devArray.__cuda_array_interface__['data']

def getDeviceDataAddress(devArray):
return getDeviceDataInfo(devArray)[0]

def getDeviceDataSizeInBytes(devArray):
return devArray.nbytes

class Options(object):

Expand Down Expand Up @@ -106,11 +116,13 @@ def __init__(self):
self.options.interactive.verbose = 1
self.options.interactive.broadcast_imports = True

'''
if 'OMPI_COMM_WORLD_SIZE' in os.environ:
# this is needed for OpenMPI, see:
# https://svn.open-mpi.org/trac/ompi/wiki/Linkers
import ctypes
self.__libmpi__ = ctypes.CDLL('libmpi.so', mode=ctypes.RTLD_GLOBAL)
'''
self.lib = load_charm_library(self)
self.ReducerType = self.lib.ReducerType
self.CkContributeToChare = self.lib.CkContributeToChare
Expand All @@ -120,6 +132,13 @@ def __init__(self):
self.CkChareSend = self.lib.CkChareSend
self.CkGroupSend = self.lib.CkGroupSend
self.CkArraySend = self.lib.CkArraySend
self.CkArraySendWithDeviceData = self.lib.CkArraySendWithDeviceData
self.CkGroupSendWithDeviceData = self.lib.CkGroupSendWithDeviceData
self.CkArraySendWithDeviceDataFromPointersArray = self.lib.CkArraySendWithDeviceDataFromPointersArray
self.CkArraySendWithDeviceDataFromPointersOther = self.lib.CkArraySendWithDeviceDataFromPointersOther
self.CkGroupSendWithDeviceDataFromPointersArray = self.lib.CkGroupSendWithDeviceDataFromPointersArray
self.CkGroupSendWithDeviceDataFromPointersOther = self.lib.CkGroupSendWithDeviceDataFromPointersOther
self.CkCudaEnabled = self.lib.CkCudaEnabled
self.reducers = reduction.ReducerContainer(self)
self.redMgr = reduction.ReductionManager(self, self.reducers)
self.mainchareRegistered = False
Expand Down Expand Up @@ -305,6 +324,21 @@ def recvArrayMsg(self, aid, index, ep, msg, dcopy_start):
self.arrays[aid][index] = obj
em.run(obj, header, args) # now call the user's array element __init__

def recvGPUDirectArrayMsg(self, aid, index, ep,
devBuf_ptrs, msg, dcopy_start
):
obj = self.arrays[aid][index]
header, args = self.unpackMsg(msg, dcopy_start, obj)
args.append(devBuf_ptrs)

self.invokeEntryMethod(obj, ep, header, args)

def recvGPUDirectGroupMsg(self, gid, ep, devBuf_ptrs, msg, dcopy_start):
obj = self.groups[gid]
header, args = self.unpackMsg(msg, dcopy_start, obj)
args.append(devBuf_ptrs)
self.invokeEntryMethod(obj, ep, header, args)

def recvArrayBcast(self, aid, indexes, ep, msg, dcopy_start):
header, args = self.unpackMsg(msg, dcopy_start, None)
array = self.arrays[aid]
Expand Down Expand Up @@ -332,6 +366,70 @@ def unpackMsg(self, msg, dcopy_start, dest_obj):

return header, args

def getGPUDirectData(self, post_buffers, remote_bufs, stream_ptrs):
# this future will only be satisfied when all buffers have been received
return_fut = self.Future()
post_buf_data = [getDeviceDataAddress(buf) for buf in post_buffers]
post_buf_sizes = [getDeviceDataSizeInBytes(buf) for buf in post_buffers]
if not stream_ptrs:
stream_ptrs = [0] * len(post_buffers)
self.lib.getGPUDirectData(post_buf_data, post_buf_sizes, remote_bufs, stream_ptrs, return_fut)
return return_fut

def getGPUDirectDataFromAddresses(self, post_buf_ptrs, post_buf_sizes, remote_bufs, stream_ptrs):
# this future will only be satisfied when all buffers have been received
return_fut = self.Future()
if not stream_ptrs:
stream_ptrs = array.array('L', [0] * len(post_buf_ptrs))
self.lib.getGPUDirectDataFromAddresses(post_buf_ptrs, post_buf_sizes, remote_bufs, stream_ptrs, return_fut)
return return_fut

def CkArraySendWithDeviceDataFromPointers(self, array_id, index, ep,
msg, gpu_src_ptrs,
gpu_src_sizes,
stream_ptrs
):
if isinstance(gpu_src_ptrs, array.array):
assert isinstance(gpu_src_sizes, array.array), \
"GPU source pointers and sizes must be of the same type."
self.CkArraySendWithDeviceDataFromPointersArray(array_id, index, ep,
msg, gpu_src_ptrs,
gpu_src_sizes,
stream_ptrs,
len(gpu_src_ptrs)
)
else:
self.CkArraySendWithDeviceDataFromPointersOther(array_id, index, ep,
msg, gpu_src_ptrs,
gpu_src_sizes,
stream_ptrs,
len(gpu_src_ptrs)
)

def CkGroupSendWithDeviceDataFromPointers(self, gid, elemIdx, ep,
msg, gpu_src_ptrs, gpu_src_sizes,
stream_ptrs):
if isinstance(gpu_src_ptrs, array.array):
assert isinstance(gpu_src_sizes, array.array), \
"GPU source pointers and sizes must be of the same type."
self.CkGroupSendWithDeviceDataFromPointersArray(gid, elemIdx, ep, msg,
gpu_src_ptrs,
gpu_src_sizes,
stream_ptrs,
len(gpu_src_ptrs)
)
else:
self.CkGroupSendWithDeviceDataFromPointersOther(gid, elemIdx, ep, msg,
gpu_src_ptrs,
gpu_src_sizes,
stream_ptrs,
len(gpu_src_ptrs)
)

# deposit value of one of the futures that was created on this PE
def _future_deposit_result(self, fid, result=None):
self.threadMgr.depositFuture(fid, result)

def packMsg(self, destObj, msgArgs, header):
"""Prepares a message for sending, given arguments to an entry method invocation.

Expand Down Expand Up @@ -815,6 +913,7 @@ def triggerCallable(self, tag):
def iwait(self, objs):
n = len(objs)
f = LocalFuture()

for obj in objs:
if obj.ready():
n -= 1
Expand All @@ -826,6 +925,43 @@ def iwait(self, objs):
n -= 1
yield obj

def iwait_map(self, func, objs):
n = len(objs)
f = LocalFuture()
remaining_grs = [n]

def map_func(remaining, obj):
gr = greenlet.getcurrent()
gr.notify = gr.parent.notify
gr.obj = gr.parent.obj
gr.fu = 1
func(obj)
remaining[0] -= 1

def gr_func():
return map_func(remaining_grs, obj)

for obj in objs:
if obj.ready():
new_gr = greenlet.greenlet(gr_func)
n -= 1
obj = new_gr.switch()
while obj:
new_gr = greenlet.greenlet(gr_func)
n -= 1
obj = new_gr.switch()
else:
obj.waitReady(f)
while n > 0:
obj = self.threadMgr.pauseThread()
while obj:
new_gr = greenlet.greenlet(gr_func)
n -= 1
obj = new_gr.switch()

while remaining_grs[0]:
self.threadMgr.pauseThread()

def wait(self, objs):
for o in self.iwait(objs):
pass
Expand Down Expand Up @@ -1156,6 +1292,5 @@ def rebuildNumpyArray(data, shape, dt):
a.shape = shape
return a.copy()


charm = Charm()
readonlies = __ReadOnlies()
Loading