Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): Support PyCapsule Interface in DataFrame & Series constructors #17693

Merged
merged 12 commits into from
Jul 25, 2024
17 changes: 16 additions & 1 deletion py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
from polars.selectors import _expand_selector_dicts, _expand_selectors

with contextlib.suppress(ImportError): # Module not available when building docs
from polars.polars import PyDataFrame
from polars.polars import PyDataFrame, PySeries
from polars.polars import dtype_str_repr as _dtype_str_repr
from polars.polars import write_clipboard_string as _write_clipboard_string

Expand Down Expand Up @@ -414,6 +414,21 @@ def __init__(
self._df = dataframe_to_pydf(
data, schema=schema, schema_overrides=schema_overrides, strict=strict
)

elif hasattr(data, "__arrow_c_array__"):
# This uses the fact that PySeries.from_arrow_c_array will create a
# struct-typed Series. Then we unpack that to a DataFrame.
tmp_col_name = ""
s = wrap_s(PySeries.from_arrow_c_array(data))
self._df = s.to_frame(tmp_col_name).unnest(tmp_col_name)._df

elif hasattr(data, "__arrow_c_stream__"):
# This uses the fact that PySeries.from_arrow_c_stream will create a
# struct-typed Series. Then we unpack that to a DataFrame.
tmp_col_name = ""
s = wrap_s(PySeries.from_arrow_c_stream(data))
self._df = s.to_frame(tmp_col_name).unnest(tmp_col_name)._df

else:
msg = (
f"DataFrame constructor called with unsupported type {type(data).__name__!r}"
Expand Down
6 changes: 6 additions & 0 deletions py-polars/polars/series/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,12 @@ def __init__(
original_name, values, dtype=dtype, strict=strict
)

elif hasattr(values, "__arrow_c_array__"):
self._s = PySeries.from_arrow_c_array(values)

elif hasattr(values, "__arrow_c_stream__"):
self._s = PySeries.from_arrow_c_stream(values)

else:
msg = (
f"Series constructor called with unsupported type {type(values).__name__!r}"
Expand Down
122 changes: 122 additions & 0 deletions py-polars/src/series/import.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use polars::export::arrow;
use polars::export::arrow::array::Array;
use polars::export::arrow::ffi;
use polars::export::arrow::ffi::{
ArrowArray, ArrowArrayStream, ArrowArrayStreamReader, ArrowSchema,
};
use pyo3::exceptions::{PyTypeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{PyCapsule, PyTuple, PyType};

use super::*;

/// Validate PyCapsule has provided name
fn validate_pycapsule_name(capsule: &Bound<PyCapsule>, expected_name: &str) -> PyResult<()> {
let capsule_name = capsule.name()?;
if let Some(capsule_name) = capsule_name {
let capsule_name = capsule_name.to_str()?;
if capsule_name != expected_name {
return Err(PyValueError::new_err(format!(
"Expected name '{}' in PyCapsule, instead got '{}'",
expected_name, capsule_name
)));
}
} else {
return Err(PyValueError::new_err(
"Expected schema PyCapsule to have name set.",
));
}

Ok(())
}

/// Import `__arrow_c_array__` across Python boundary
pub(crate) fn call_arrow_c_array<'py>(
ob: &'py Bound<PyAny>,
) -> PyResult<(Bound<'py, PyCapsule>, Bound<'py, PyCapsule>)> {
if !ob.hasattr("__arrow_c_array__")? {
return Err(PyValueError::new_err(
"Expected an object with dunder __arrow_c_array__",
));
}

let tuple = ob.getattr("__arrow_c_array__")?.call0()?;
if !tuple.is_instance_of::<PyTuple>() {
return Err(PyTypeError::new_err(
"Expected __arrow_c_array__ to return a tuple.",
));
}

let schema_capsule = tuple.get_item(0)?.downcast_into()?;
let array_capsule = tuple.get_item(1)?.downcast_into()?;
Ok((schema_capsule, array_capsule))
}

pub(crate) fn import_array_pycapsules(
schema_capsule: &Bound<PyCapsule>,
array_capsule: &Bound<PyCapsule>,
) -> PyResult<(arrow::datatypes::Field, Box<dyn Array>)> {
validate_pycapsule_name(schema_capsule, "arrow_schema")?;
validate_pycapsule_name(array_capsule, "arrow_array")?;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a // SAFETY comment explaining which invariants must hold here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look and see if those safety comments are ok

let schema_ptr = unsafe { schema_capsule.reference::<ArrowSchema>() };
let array_ptr = unsafe { std::ptr::replace(array_capsule.pointer() as _, ArrowArray::empty()) };

let (field, array) = unsafe {
kylebarron marked this conversation as resolved.
Show resolved Hide resolved
let field = ffi::import_field_from_c(schema_ptr).unwrap();
let array = ffi::import_array_from_c(array_ptr, field.data_type().clone()).unwrap();
(field, array)
};

Ok((field, array))
}

/// Import `__arrow_c_stream__` across Python boundary.
fn call_arrow_c_stream<'py>(ob: &'py Bound<PyAny>) -> PyResult<Bound<'py, PyCapsule>> {
if !ob.hasattr("__arrow_c_stream__")? {
return Err(PyValueError::new_err(
"Expected an object with dunder __arrow_c_stream__",
));
}

let capsule = ob.getattr("__arrow_c_stream__")?.call0()?.downcast_into()?;
Ok(capsule)
}

pub(crate) fn import_stream_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<PySeries> {
validate_pycapsule_name(capsule, "arrow_array_stream")?;

// Takes ownership of the pointed to ArrowArrayStream
// This acts to move the data out of the capsule pointer, setting the release callback to NULL
let stream_ptr =
Box::new(unsafe { std::ptr::replace(capsule.pointer() as _, ArrowArrayStream::empty()) });

let mut stream = unsafe {
ArrowArrayStreamReader::try_new(stream_ptr)
.map_err(|err| PyValueError::new_err(err.to_string()))?
};

let mut produced_arrays: Vec<Box<dyn Array>> = vec![];
while let Some(array) = unsafe { stream.next() } {
produced_arrays.push(array.unwrap());
}

let s = Series::try_from((stream.field(), produced_arrays)).unwrap();
Ok(PySeries::new(s))
}
#[pymethods]
impl PySeries {
#[classmethod]
pub fn from_arrow_c_array(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {
let (schema_capsule, array_capsule) = call_arrow_c_array(ob)?;
let (field, array) = import_array_pycapsules(&schema_capsule, &array_capsule)?;
let s = Series::try_from((&field, array)).unwrap();
Ok(PySeries::new(s))
}

#[classmethod]
pub fn from_arrow_c_stream(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {
let capsule = call_arrow_c_stream(ob)?;
import_stream_pycapsule(&capsule)
}
}
1 change: 1 addition & 0 deletions py-polars/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod c_interface;
mod comparison;
mod construction;
mod export;
mod import;
mod numpy_ufunc;
mod scatter;

Expand Down
5 changes: 0 additions & 5 deletions py-polars/tests/unit/constructors/test_constructors.py
Original file line number Diff line number Diff line change
Expand Up @@ -958,11 +958,6 @@ def _constructor(self) -> type:
assert df.schema == {"colx": pl.Datetime("us")}
assert df.rows() == [(datetime(2022, 10, 31, 10, 30, 45, 123456),)]

# pandas is not available
monkeypatch.setattr(pl.dataframe.frame, "_check_for_pandas", lambda x: False)
with pytest.raises(TypeError):
pl.DataFrame(pandas_df)

kylebarron marked this conversation as resolved.
Show resolved Hide resolved

def test_init_errors() -> None:
# Length mismatch
Expand Down