Skip to content

Commit

Permalink
ARROW-4212: [C++][Python] CudaBuffer view of arbitrary device memory …
Browse files Browse the repository at this point in the history
…object

This PR implements the following new features:
1. `CudaContext::GetDeviceAddress` method
2. `pyarrow.cuda.Context.get_device_address` method
3. `pyarrow.cuda.Context.buffer_from_object` method to create a CudaBuffer view of arbitrary device memory object that implements `__cuda_array_interface__` attribute or that is `CudaBuffer` or `CudaHostBuffer` or numba `MemoryPointer` object.
4. Tests for `buffer_from_object` method

and the following improvements:
1. `pyarrow.cuda.Context.foreign_buffer` ensures that the used device memory address is valid for the given context.

Author: Pearu Peterson <[email protected]>

Closes apache#3439 from pearu/arrow-4212 and squashes the following commits:

ba90573 <Pearu Peterson> Add 2-D and 3-D tests. buffer_from_object requires contiguous memory input.
355fc03 <Pearu Peterson> Apply more feedback fixes.
22134da <Pearu Peterson> Refactor. Add tests for negative strides and reduce test cases.
dbd1cec <Pearu Peterson> Moved some buffer_from_objects tests under test_cuda.
562c180 <Pearu Peterson> Change GetDeviceAddress API to use uint8_t, add C++ tests for it, update docs and apply feedback.
2726a91e <Pearu Peterson> Add buffer_from_object and get_device_address methods.
df8656f <Pearu Peterson> Add GetDeviceAddress method to CudaContext.
  • Loading branch information
pearu authored and pitrou committed Jan 24, 2019
1 parent 9460bb7 commit f0c464e
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 17 deletions.
14 changes: 14 additions & 0 deletions cpp/src/arrow/gpu/cuda-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,5 +343,19 @@ TEST_F(TestCudaArrowIpc, BasicWriteRead) {
CompareBatch(*batch, *cpu_batch);
}

class TestCudaContext : public TestCudaBufferBase {
public:
void SetUp() { TestCudaBufferBase::SetUp(); }
};

TEST_F(TestCudaContext, GetDeviceAddress) {
const int64_t kSize = 100;
std::shared_ptr<CudaBuffer> buffer;
uint8_t* devptr = NULL;
ASSERT_OK(context_->Allocate(kSize, &buffer));
ASSERT_OK(context_->GetDeviceAddress(buffer.get()->mutable_data(), &devptr));
ASSERT_EQ(buffer.get()->mutable_data(), devptr);
}

} // namespace cuda
} // namespace arrow
7 changes: 7 additions & 0 deletions cpp/src/arrow/gpu/cuda_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,5 +343,12 @@ void* CudaContext::handle() const { return impl_->context_handle(); }

int CudaContext::device_number() const { return impl_->device().device_num; }

Status CudaContext::GetDeviceAddress(uint8_t* addr, uint8_t** devaddr) {
ContextSaver set_temporary(reinterpret_cast<CUcontext>(handle()));
CU_RETURN_NOT_OK(cuPointerGetAttribute(devaddr, CU_POINTER_ATTRIBUTE_DEVICE_POINTER,
reinterpret_cast<CUdeviceptr>(addr)));
return Status::OK();
}

} // namespace cuda
} // namespace arrow
14 changes: 14 additions & 0 deletions cpp/src/arrow/gpu/cuda_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,20 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this<CudaContext
/// \brief Return device number
int device_number() const;

/// \brief Return the device address that is reachable from kernels
/// running in the context
/// \param[in] addr device or host memory address
/// \param[out] devaddr the device address
/// \return Status
///
/// The device address is defined as a memory address accessible by
/// device. While it is often a device memory address, it can be
/// also a host memory address, for instance, when the memory is
/// allocated as host memory (using cudaMallocHost or cudaHostAlloc)
/// or as managed memory (using cudaMallocManaged) or the host
/// memory is page-locked (using cudaHostRegister).
Status GetDeviceAddress(uint8_t* addr, uint8_t** devaddr);

private:
CudaContext();

Expand Down
99 changes: 91 additions & 8 deletions python/pyarrow/_cuda.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
# specific language governing permissions and limitations
# under the License.


from pyarrow.compat import tobytes
from pyarrow.lib cimport *
from pyarrow.includes.libarrow_cuda cimport *
from pyarrow.lib import py_buffer, allocate_buffer, as_buffer
from pyarrow.lib import py_buffer, allocate_buffer, as_buffer, ArrowTypeError
from pyarrow.util import get_contiguous_span
cimport cpython as cp


Expand Down Expand Up @@ -137,10 +139,40 @@ cdef class Context:

@property
def bytes_allocated(self):
""" Return the number of allocated bytes.
"""Return the number of allocated bytes.
"""
return self.context.get().bytes_allocated()

def get_device_address(self, address):
"""Return the device address that is reachable from kernels running in
the context
Parameters
----------
address : int
Specify memory address value
Returns
-------
device_address : int
Device address accessible from device context
Notes
-----
The device address is defined as a memory address accessible
by device. While it is often a device memory address but it
can be also a host memory address, for instance, when the
memory is allocated as host memory (using cudaMallocHost or
cudaHostAlloc) or as managed memory (using cudaMallocManaged)
or the host memory is page-locked (using cudaHostRegister).
"""
cdef:
uintptr_t c_addr = address
uint8_t* c_devaddr
check_status(self.context.get().GetDeviceAddress(<uint8_t*>c_addr,
&c_devaddr))
return <uintptr_t>c_devaddr

def new_buffer(self, nbytes):
"""Return new device buffer.
Expand All @@ -159,26 +191,32 @@ cdef class Context:
return pyarrow_wrap_cudabuffer(cudabuf)

def foreign_buffer(self, address, size):
"""Create device buffer from device address and size as a view.
"""Create device buffer from address and size as a view.
The caller is responsible for allocating and freeing the
memory as well as ensuring that the memory belongs to the
CUDA context that this Context instance holds.
memory. When `address==size==0` then a new zero-sized buffer
is returned.
Parameters
----------
address : int
Specify the starting address of the buffer.
Specify the starting address of the buffer. The address can
refer to both device or host memory but it must be
accessible from device after mapping it with
`get_device_address` method.
size : int
Specify the size of device buffer in bytes.
Returns
-------
cbuf : CudaBuffer
Device buffer as a view of device memory.
Device buffer as a view of device reachable memory.
"""
if not address and size == 0:
return self.new_buffer(0)
cdef:
intptr_t c_addr = address
uintptr_t c_addr = self.get_device_address(address)
int64_t c_size = size
shared_ptr[CCudaBuffer] cudabuf
check_status(self.context.get().View(<uint8_t*>c_addr,
Expand Down Expand Up @@ -246,6 +284,49 @@ cdef class Context:
result.copy_from_device(buf, position=0, nbytes=size)
return result

def buffer_from_object(self, obj):
"""Create device buffer view of arbitrary object that references
device accessible memory.
When the object contains a non-contiguous view of device
accessbile memory then the returned device buffer will contain
contiguous view of the memory, that is, including the
intermediate data that is otherwise invisible to the input
object.
Parameters
----------
obj : {object, Buffer, HostBuffer, CudaBuffer, ...}
Specify an object that holds (device or host) address that
can be accessed from device. This includes objects with
types defined in pyarrow.cuda as well as arbitrary objects
that implement the CUDA array interface as defined by numba.
Returns
-------
cbuf : CudaBuffer
Device buffer as a view of device accessible memory.
"""
if isinstance(obj, HostBuffer):
return self.foreign_buffer(obj.address, obj.size)
elif isinstance(obj, Buffer):
return CudaBuffer.from_buffer(obj)
elif isinstance(obj, CudaBuffer):
return obj
elif hasattr(obj, '__cuda_array_interface__'):
desc = obj.__cuda_array_interface__
addr = desc['data'][0]
if addr is None:
return self.new_buffer(0)
import numpy as np
start, end = get_contiguous_span(
desc['shape'], desc.get('strides'),
np.dtype(desc['typestr']).itemsize)
return self.foreign_buffer(addr + start, end - start)
raise ArrowTypeError('cannot create device buffer view from'
' `%s` object' % (type(obj)))


cdef class IpcMemHandle:
"""A serializable container for a CUDA IPC handle.
Expand Down Expand Up @@ -343,6 +424,8 @@ cdef class CudaBuffer(Buffer):
Device buffer as a view of numba MemoryPointer.
"""
ctx = Context.from_numba(mem.context)
if mem.device_pointer.value is None and mem.size==0:
return ctx.new_buffer(0)
return ctx.foreign_buffer(mem.device_pointer.value, mem.size)

def to_numba(self):
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow_cuda.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::cuda" nogil:
int64_t bytes_allocated() const
const void* handle() const
int device_number() const
CStatus GetDeviceAddress(uint8_t* addr, uint8_t** devaddr)

cdef cppclass CCudaIpcMemHandle" arrow::cuda::CudaIpcMemHandle":
@staticmethod
Expand Down
50 changes: 41 additions & 9 deletions python/pyarrow/tests/test_cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_Context():
cuda.Context(cuda.Context.get_num_devices())


@pytest.mark.parametrize("size", [0, 1, 8, 1000])
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_manage_allocate_free_host(size):
buf = cuda.new_host_buffer(size)
arr = np.frombuffer(buf, dtype=np.uint8)
Expand Down Expand Up @@ -102,7 +102,7 @@ def make_random_buffer(size, target='host'):
raise ValueError('invalid target value')


@pytest.mark.parametrize("size", [0, 1, 8, 1000])
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_context_device_buffer(size):
# Creating device buffer from host buffer;
arr, buf = make_random_buffer(size)
Expand Down Expand Up @@ -230,7 +230,39 @@ def test_context_device_buffer(size):
np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)


@pytest.mark.parametrize("size", [0, 1, 8, 1000])
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_context_from_object(size):
ctx = global_context
arr, cbuf = make_random_buffer(size, target='device')
dtype = arr.dtype

# Creating device buffer from a CUDA host buffer
hbuf = cuda.new_host_buffer(size * arr.dtype.itemsize)
np.frombuffer(hbuf, dtype=dtype)[:] = arr
cbuf2 = ctx.buffer_from_object(hbuf)
assert cbuf2.size == cbuf.size
arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
np.testing.assert_equal(arr, arr2)

# Creating device buffer from a device buffer
cbuf2 = ctx.buffer_from_object(cbuf2)
assert cbuf2.size == cbuf.size
arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
np.testing.assert_equal(arr, arr2)

# Trying to create a device buffer from a Buffer
with pytest.raises(pa.ArrowTypeError,
match=('buffer is not backed by a CudaBuffer')):
ctx.buffer_from_object(pa.py_buffer(b"123"))

# Trying to create a device buffer from numpy.array
with pytest.raises(pa.ArrowTypeError,
match=('cannot create device buffer view from'
' `<class \'numpy.ndarray\'>` object')):
ctx.buffer_from_object(np.array([1, 2, 3]))


@pytest.mark.parametrize("size", [0, 1, 1000])
def test_CudaBuffer(size):
arr, buf = make_random_buffer(size)
assert arr.tobytes() == buf.to_pybytes()
Expand All @@ -255,7 +287,7 @@ def test_CudaBuffer(size):
cuda.CudaBuffer()


@pytest.mark.parametrize("size", [0, 1, 8, 1000])
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_HostBuffer(size):
arr, buf = make_random_buffer(size)
assert arr.tobytes() == buf.to_pybytes()
Expand All @@ -281,7 +313,7 @@ def test_HostBuffer(size):
cuda.HostBuffer()


@pytest.mark.parametrize("size", [0, 1, 8, 1000])
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_copy_from_to_host(size):

# Create a buffer in host containing range(size)
Expand All @@ -306,7 +338,7 @@ def test_copy_from_to_host(size):
np.testing.assert_equal(arr, arr2)


@pytest.mark.parametrize("size", [0, 1, 8, 1000])
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_copy_to_host(size):
arr, dbuf = make_random_buffer(size, target='device')

Expand Down Expand Up @@ -366,7 +398,7 @@ def test_copy_to_host(size):
dbuf.copy_to_host(buf=buf, position=position, nbytes=nbytes)


@pytest.mark.parametrize("size", [0, 1, 8, 1000])
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_copy_from_device(size):
arr, buf = make_random_buffer(size=size, target='device')
lst = arr.tolist()
Expand Down Expand Up @@ -410,7 +442,7 @@ def put(*args, **kwargs):
put(position=position, nbytes=nbytes)


@pytest.mark.parametrize("size", [0, 1, 8, 1000])
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_copy_from_host(size):
arr, buf = make_random_buffer(size=size, target='host')
lst = arr.tolist()
Expand Down Expand Up @@ -617,7 +649,7 @@ def other_process_for_test_IPC(handle_buffer, expected_arr):

@cuda_ipc
@pytest.mark.skipif(sys.version_info[0] == 2, reason="test needs Python 3")
@pytest.mark.parametrize("size", [0, 1, 8, 1000])
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_IPC(size):
import multiprocessing
ctx = multiprocessing.get_context('spawn')
Expand Down
Loading

0 comments on commit f0c464e

Please sign in to comment.