From 0f98a95cf813c0e4cb064eb3738ddbf9deef3555 Mon Sep 17 00:00:00 2001 From: Xiaoying Wang Date: Wed, 22 Jan 2025 12:20:51 -0800 Subject: [PATCH] fix memory leak --- connectorx-python/pyproject.toml | 3 ++- connectorx-python/src/pandas/destination.rs | 22 +++++++-------- .../src/pandas/pandas_columns/array.rs | 17 ++++++------ .../src/pandas/pandas_columns/boolean.rs | 24 +++++++++-------- .../src/pandas/pandas_columns/bytes.rs | 17 ++++++------ .../src/pandas/pandas_columns/datetime.rs | 16 +++++------ .../src/pandas/pandas_columns/float64.rs | 20 +++++++------- .../src/pandas/pandas_columns/int64.rs | 26 +++++++++--------- .../src/pandas/pandas_columns/mod.rs | 27 ++++++------------- .../src/pandas/pandas_columns/string.rs | 17 ++++++------ 10 files changed, 88 insertions(+), 101 deletions(-) diff --git a/connectorx-python/pyproject.toml b/connectorx-python/pyproject.toml index affb854ed..4a6c35bfd 100644 --- a/connectorx-python/pyproject.toml +++ b/connectorx-python/pyproject.toml @@ -18,13 +18,14 @@ license = "MIT" maintainers = ["Weiyuan Wu "] name = "connectorx" readme = "README.md" # Markdown files are supported +version = "0.4.1-alpha1" [project] name = "connectorx" # Target file name of maturin build readme = "README.md" license = { text = "MIT" } requires-python = ">=3.10" -version = "0.4.1-alpha1" +dynamic = ["version"] [tool.poetry.dependencies] dask = {version = "^2021", optional = true, extras = ["dataframe"]} diff --git a/connectorx-python/src/pandas/destination.rs b/connectorx-python/src/pandas/destination.rs index 36532beac..f50c9538e 100644 --- a/connectorx-python/src/pandas/destination.rs +++ b/connectorx-python/src/pandas/destination.rs @@ -1,7 +1,7 @@ use super::{ pandas_columns::{ - ArrayBlock, BooleanBlock, BytesBlock, DateTimeBlock, Float64Block, HasPandasColumn, - Int64Block, PandasColumn, PandasColumnObject, PyBytes, StringBlock, + ArrayBlock, BooleanBlock, BytesBlock, DateTimeBlock, ExtractBlockFromBound, Float64Block, + HasPandasColumn, Int64Block, PandasColumn, PandasColumnObject, PyBytes, StringBlock, }, pystring::PyString, typesystem::{PandasArrayType, PandasBlockType, PandasTypeSystem}, @@ -215,7 +215,7 @@ impl<'py> Destination for PandasDestination<'py> { let buf = &self.block_datas[idx]; match block.dt { PandasBlockType::Boolean(_) => { - let bblock = buf.extract::()?; + let bblock = BooleanBlock::extract_block(buf)?; let bcols = bblock.split()?; for (&cid, bcol) in block.cids.iter().zip_eq(bcols) { @@ -227,7 +227,7 @@ impl<'py> Destination for PandasDestination<'py> { } } PandasBlockType::Float64 => { - let fblock = buf.extract::()?; + let fblock = Float64Block::extract_block(buf)?; let fcols = fblock.split()?; for (&cid, fcol) in block.cids.iter().zip_eq(fcols) { partitioned_columns[cid] = fcol @@ -238,7 +238,7 @@ impl<'py> Destination for PandasDestination<'py> { } } PandasBlockType::BooleanArray => { - let bblock = buf.extract::>()?; + let bblock = ArrayBlock::::extract_block(buf)?; let bcols = bblock.split()?; for (&cid, bcol) in block.cids.iter().zip_eq(bcols) { partitioned_columns[cid] = bcol @@ -249,7 +249,7 @@ impl<'py> Destination for PandasDestination<'py> { } } PandasBlockType::Float64Array => { - let fblock = buf.extract::>()?; + let fblock = ArrayBlock::::extract_block(buf)?; let fcols = fblock.split()?; for (&cid, fcol) in block.cids.iter().zip_eq(fcols) { partitioned_columns[cid] = fcol @@ -260,7 +260,7 @@ impl<'py> Destination for PandasDestination<'py> { } } PandasBlockType::Int64Array => { - let fblock = buf.extract::>()?; + let fblock = ArrayBlock::::extract_block(buf)?; let fcols = fblock.split()?; for (&cid, fcol) in block.cids.iter().zip_eq(fcols) { partitioned_columns[cid] = fcol @@ -271,7 +271,7 @@ impl<'py> Destination for PandasDestination<'py> { } } PandasBlockType::Int64(_) => { - let ublock = buf.extract::()?; + let ublock = Int64Block::extract_block(buf)?; let ucols = ublock.split()?; for (&cid, ucol) in block.cids.iter().zip_eq(ucols) { partitioned_columns[cid] = ucol @@ -282,7 +282,7 @@ impl<'py> Destination for PandasDestination<'py> { } } PandasBlockType::String => { - let sblock = buf.extract::()?; + let sblock = StringBlock::extract_block(buf)?; let scols = sblock.split()?; for (&cid, scol) in block.cids.iter().zip_eq(scols) { partitioned_columns[cid] = scol @@ -293,7 +293,7 @@ impl<'py> Destination for PandasDestination<'py> { } } PandasBlockType::Bytes => { - let bblock = buf.extract::()?; + let bblock = BytesBlock::extract_block(buf)?; let bcols = bblock.split()?; for (&cid, bcol) in block.cids.iter().zip_eq(bcols) { partitioned_columns[cid] = bcol @@ -304,7 +304,7 @@ impl<'py> Destination for PandasDestination<'py> { } } PandasBlockType::DateTime => { - let dblock = buf.extract::()?; + let dblock = DateTimeBlock::extract_block(buf)?; let dcols = dblock.split()?; for (&cid, dcol) in block.cids.iter().zip_eq(dcols) { partitioned_columns[cid] = dcol diff --git a/connectorx-python/src/pandas/pandas_columns/array.rs b/connectorx-python/src/pandas/pandas_columns/array.rs index c887d568d..9f38f7a5a 100644 --- a/connectorx-python/src/pandas/pandas_columns/array.rs +++ b/connectorx-python/src/pandas/pandas_columns/array.rs @@ -1,10 +1,13 @@ -use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject, GIL_MUTEX}; +use super::{ + check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject, + GIL_MUTEX, +}; use crate::errors::ConnectorXPythonError; use anyhow::anyhow; use fehler::throws; use ndarray::{ArrayViewMut2, Axis, Ix2}; -use numpy::{Element, PyArray, PyArrayDescr}; -use pyo3::{Bound, FromPyObject, Py, PyAny, PyResult, Python, ToPyObject}; +use numpy::{Element, PyArray, PyArrayDescr, PyArrayMethods}; +use pyo3::{types::PyAnyMethods, Bound, Py, PyAny, PyResult, Python, ToPyObject}; use std::any::TypeId; use std::marker::PhantomData; @@ -30,8 +33,8 @@ pub struct ArrayBlock<'a, V> { _value_type: PhantomData, } -impl<'a, V> FromPyObject<'a> for ArrayBlock<'a, V> { - fn extract(ob: &'a PyAny) -> PyResult { +impl<'a, V> ExtractBlockFromBound<'a> for ArrayBlock<'a, V> { + fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult { check_dtype(ob, "object")?; let array = ob.downcast::>()?; let data = unsafe { array.as_array_mut() }; @@ -41,10 +44,6 @@ impl<'a, V> FromPyObject<'a> for ArrayBlock<'a, V> { _value_type: PhantomData, }) } - - fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult { - Self::extract(ob.clone().into_gil_ref()) - } } impl<'a, V> ArrayBlock<'a, V> { diff --git a/connectorx-python/src/pandas/pandas_columns/boolean.rs b/connectorx-python/src/pandas/pandas_columns/boolean.rs index b61ed947f..ec71a5e62 100644 --- a/connectorx-python/src/pandas/pandas_columns/boolean.rs +++ b/connectorx-python/src/pandas/pandas_columns/boolean.rs @@ -1,10 +1,15 @@ -use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject}; +use super::{ + check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject, +}; use crate::errors::ConnectorXPythonError; use anyhow::anyhow; use fehler::throws; use ndarray::{ArrayViewMut1, ArrayViewMut2, Axis, Ix2}; -use numpy::{PyArray, PyArray1}; -use pyo3::{types::PyTuple, FromPyObject, PyAny, PyResult}; +use numpy::{PyArray, PyArray1, PyArrayMethods}; +use pyo3::{ + types::{PyAnyMethods, PyTuple, PyTupleMethods}, + PyAny, PyResult, +}; use std::any::TypeId; // Boolean @@ -12,8 +17,9 @@ pub enum BooleanBlock<'a> { NumPy(ArrayViewMut2<'a, bool>), Extention(ArrayViewMut1<'a, bool>, ArrayViewMut1<'a, bool>), } -impl<'a> FromPyObject<'a> for BooleanBlock<'a> { - fn extract(ob: &'a PyAny) -> PyResult { + +impl<'a> ExtractBlockFromBound<'a> for BooleanBlock<'a> { + fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult { if let Ok(array) = ob.downcast::>() { // if numpy array check_dtype(ob, "bool")?; @@ -22,8 +28,8 @@ impl<'a> FromPyObject<'a> for BooleanBlock<'a> { } else { // if extension array let tuple = ob.downcast::()?; - let data = tuple.get_item(0)?; - let mask = tuple.get_item(1)?; + let data = tuple.as_slice().get(0).unwrap(); + let mask = tuple.as_slice().get(1).unwrap(); check_dtype(data, "bool")?; check_dtype(mask, "bool")?; @@ -33,10 +39,6 @@ impl<'a> FromPyObject<'a> for BooleanBlock<'a> { )) } } - - fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult { - Self::extract(ob.clone().into_gil_ref()) - } } impl<'a> BooleanBlock<'a> { diff --git a/connectorx-python/src/pandas/pandas_columns/bytes.rs b/connectorx-python/src/pandas/pandas_columns/bytes.rs index b2ba2af35..5f9fd6121 100644 --- a/connectorx-python/src/pandas/pandas_columns/bytes.rs +++ b/connectorx-python/src/pandas/pandas_columns/bytes.rs @@ -1,10 +1,13 @@ -use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject, GIL_MUTEX}; +use super::{ + check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject, + GIL_MUTEX, +}; use crate::errors::ConnectorXPythonError; use anyhow::anyhow; use fehler::throws; use ndarray::{ArrayViewMut2, Axis, Ix2}; -use numpy::{Element, PyArray, PyArrayDescr}; -use pyo3::{Bound, FromPyObject, Py, PyAny, PyResult, Python}; +use numpy::{Element, PyArray, PyArrayDescr, PyArrayMethods}; +use pyo3::{types::PyAnyMethods, Bound, Py, PyAny, PyResult, Python}; use std::any::TypeId; #[derive(Clone)] @@ -28,8 +31,8 @@ pub struct BytesBlock<'a> { buf_size_mb: usize, } -impl<'a> FromPyObject<'a> for BytesBlock<'a> { - fn extract(ob: &'a PyAny) -> PyResult { +impl<'a> ExtractBlockFromBound<'a> for BytesBlock<'a> { + fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult { check_dtype(ob, "object")?; let array = ob.downcast::>()?; let data = unsafe { array.as_array_mut() }; @@ -38,10 +41,6 @@ impl<'a> FromPyObject<'a> for BytesBlock<'a> { buf_size_mb: 16, // in MB }) } - - fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult { - Self::extract(ob.clone().into_gil_ref()) - } } impl<'a> BytesBlock<'a> { diff --git a/connectorx-python/src/pandas/pandas_columns/datetime.rs b/connectorx-python/src/pandas/pandas_columns/datetime.rs index 5374ea39f..025b7558b 100644 --- a/connectorx-python/src/pandas/pandas_columns/datetime.rs +++ b/connectorx-python/src/pandas/pandas_columns/datetime.rs @@ -1,11 +1,13 @@ -use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject}; +use super::{ + check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject, +}; use crate::errors::ConnectorXPythonError; use anyhow::anyhow; use chrono::{DateTime, Utc}; use fehler::throws; use ndarray::{ArrayViewMut2, Axis, Ix2}; -use numpy::PyArray; -use pyo3::{FromPyObject, PyAny, PyResult}; +use numpy::{PyArray, PyArrayMethods}; +use pyo3::{types::PyAnyMethods, PyAny, PyResult}; use std::any::TypeId; // datetime64 is represented in int64 in numpy @@ -14,17 +16,13 @@ pub struct DateTimeBlock<'a> { data: ArrayViewMut2<'a, i64>, } -impl<'a> FromPyObject<'a> for DateTimeBlock<'a> { - fn extract(ob: &'a PyAny) -> PyResult { +impl<'a> ExtractBlockFromBound<'a> for DateTimeBlock<'a> { + fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult { check_dtype(ob, "int64")?; let array = ob.downcast::>()?; let data = unsafe { array.as_array_mut() }; Ok(DateTimeBlock { data }) } - - fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult { - Self::extract(ob.clone().into_gil_ref()) - } } impl<'a> DateTimeBlock<'a> { diff --git a/connectorx-python/src/pandas/pandas_columns/float64.rs b/connectorx-python/src/pandas/pandas_columns/float64.rs index b3c35ce21..0d5e3a985 100644 --- a/connectorx-python/src/pandas/pandas_columns/float64.rs +++ b/connectorx-python/src/pandas/pandas_columns/float64.rs @@ -1,10 +1,12 @@ -use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject}; +use super::{ + check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject, +}; use crate::errors::ConnectorXPythonError; use anyhow::anyhow; use fehler::throws; use ndarray::{ArrayViewMut2, Axis, Ix2}; -use numpy::PyArray; -use pyo3::{FromPyObject, PyAny, PyResult}; +use numpy::{PyArray, PyArrayMethods}; +use pyo3::{types::PyAnyMethods, PyAny, PyResult}; use std::any::TypeId; // Float @@ -12,17 +14,13 @@ pub struct Float64Block<'a> { data: ArrayViewMut2<'a, f64>, } -impl<'a> FromPyObject<'a> for Float64Block<'a> { - fn extract(ob: &'a PyAny) -> PyResult { +impl<'a> ExtractBlockFromBound<'a> for Float64Block<'a> { + fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult { check_dtype(ob, "float64")?; - let array = ob.downcast::>()?; - let data = unsafe { array.as_array_mut() }; + let array: &pyo3::Bound<'a, PyArray> = ob.downcast()?; + let data: ArrayViewMut2<'a, f64> = unsafe { array.as_array_mut() }; Ok(Float64Block { data }) } - - fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult { - Self::extract(ob.clone().into_gil_ref()) - } } impl<'a> Float64Block<'a> { diff --git a/connectorx-python/src/pandas/pandas_columns/int64.rs b/connectorx-python/src/pandas/pandas_columns/int64.rs index a4a95328d..6a1616d17 100644 --- a/connectorx-python/src/pandas/pandas_columns/int64.rs +++ b/connectorx-python/src/pandas/pandas_columns/int64.rs @@ -1,39 +1,41 @@ -use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject}; +use super::{ + check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject, +}; use crate::errors::ConnectorXPythonError; use anyhow::anyhow; use fehler::throws; use ndarray::{ArrayViewMut1, ArrayViewMut2, Axis, Ix2}; -use numpy::{PyArray, PyArray1}; -use pyo3::{types::PyTuple, FromPyObject, PyAny, PyResult}; +use numpy::{PyArray, PyArray1, PyArrayMethods}; +use pyo3::{ + types::{PyAnyMethods, PyTuple, PyTupleMethods}, + PyAny, PyResult, +}; use std::any::TypeId; pub enum Int64Block<'a> { NumPy(ArrayViewMut2<'a, i64>), Extention(ArrayViewMut1<'a, i64>, ArrayViewMut1<'a, bool>), } -impl<'a> FromPyObject<'a> for Int64Block<'a> { - fn extract(ob: &'a PyAny) -> PyResult { + +impl<'a> ExtractBlockFromBound<'a> for Int64Block<'a> { + fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult { if let Ok(array) = ob.downcast::>() { check_dtype(ob, "int64")?; let data = unsafe { array.as_array_mut() }; Ok(Int64Block::NumPy(data)) } else { let tuple = ob.downcast::()?; - let data = tuple.get_item(0)?; - let mask = tuple.get_item(1)?; + // let data = tuple.get_borrowed_item(0)?; + let data = tuple.as_slice().get(0).unwrap(); + let mask = tuple.as_slice().get(1).unwrap(); check_dtype(data, "int64")?; check_dtype(mask, "bool")?; - Ok(Int64Block::Extention( unsafe { data.downcast::>()?.as_array_mut() }, unsafe { mask.downcast::>()?.as_array_mut() }, )) } } - - fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult { - Self::extract(ob.clone().into_gil_ref()) - } } impl<'a> Int64Block<'a> { diff --git a/connectorx-python/src/pandas/pandas_columns/mod.rs b/connectorx-python/src/pandas/pandas_columns/mod.rs index 9f9f653a0..65e830607 100644 --- a/connectorx-python/src/pandas/pandas_columns/mod.rs +++ b/connectorx-python/src/pandas/pandas_columns/mod.rs @@ -15,7 +15,7 @@ pub use datetime::DateTimeBlock; use fehler::throw; pub use float64::Float64Block; pub use int64::Int64Block; -use pyo3::{exceptions::PyRuntimeError, PyAny, PyResult}; +use pyo3::{exceptions::PyRuntimeError, intern, types::PyAnyMethods, PyAny, PyResult}; use std::any::TypeId; use std::sync::Mutex; pub use string::StringBlock; @@ -44,9 +44,13 @@ pub trait HasPandasColumn: Sized { type PandasColumn<'a>: PandasColumn; } -pub fn check_dtype(ob: &PyAny, expected_dtype: &str) -> PyResult<()> { - let dtype = ob.getattr("dtype")?.str()?; - let dtype = dtype.to_str()?; +pub trait ExtractBlockFromBound<'a>: Sized { + fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult; +} + +pub fn check_dtype<'py>(ob: &pyo3::Bound<'py, PyAny>, expected_dtype: &str) -> PyResult<()> { + let dtype = ob.getattr(intern!(ob.py(), "dtype"))?.str()?; + // https://pyo3.rs/main/doc/pyo3/types/struct.pystring#equality if dtype != expected_dtype { throw!(PyRuntimeError::new_err(format!( "expecting ndarray to be '{}' found '{}' at {}:{}", @@ -58,18 +62,3 @@ pub fn check_dtype(ob: &PyAny, expected_dtype: &str) -> PyResult<()> { } Ok(()) } - -// pub fn check_bound_dtype<'py>(ob: &pyo3::Bound<'py, PyAny>, expected_dtype: &str) -> PyResult<()> { -// let dtype = ob.getattr(intern!(ob.py(), "dtype"))?.str()?; -// // https://pyo3.rs/main/doc/pyo3/types/struct.pystring#equality -// if dtype != expected_dtype { -// throw!(PyRuntimeError::new_err(format!( -// "expecting ndarray to be '{}' found '{}' at {}:{}", -// expected_dtype, -// dtype, -// file!(), -// line!() -// ))); -// } -// Ok(()) -// } diff --git a/connectorx-python/src/pandas/pandas_columns/string.rs b/connectorx-python/src/pandas/pandas_columns/string.rs index 6c5d33bd7..f33d6eb63 100644 --- a/connectorx-python/src/pandas/pandas_columns/string.rs +++ b/connectorx-python/src/pandas/pandas_columns/string.rs @@ -1,13 +1,16 @@ use super::super::pystring::{PyString, StringInfo}; -use super::{check_dtype, HasPandasColumn, PandasColumn, PandasColumnObject, GIL_MUTEX}; +use super::{ + check_dtype, ExtractBlockFromBound, HasPandasColumn, PandasColumn, PandasColumnObject, + GIL_MUTEX, +}; use crate::constants::PYSTRING_BUFFER_SIZE; use crate::errors::ConnectorXPythonError; use anyhow::anyhow; use fehler::throws; use itertools::Itertools; use ndarray::{ArrayViewMut2, Axis, Ix2}; -use numpy::PyArray; -use pyo3::{FromPyObject, PyAny, PyResult, Python}; +use numpy::{PyArray, PyArrayMethods}; +use pyo3::{types::PyAnyMethods, PyAny, PyResult, Python}; use std::any::TypeId; pub struct StringBlock<'a> { @@ -15,8 +18,8 @@ pub struct StringBlock<'a> { buf_size_mb: usize, } -impl<'a> FromPyObject<'a> for StringBlock<'a> { - fn extract(ob: &'a PyAny) -> PyResult { +impl<'a> ExtractBlockFromBound<'a> for StringBlock<'a> { + fn extract_block<'b: 'a>(ob: &'b pyo3::Bound<'a, PyAny>) -> PyResult { check_dtype(ob, "object")?; let array = ob.downcast::>()?; let data = unsafe { array.as_array_mut() }; @@ -25,10 +28,6 @@ impl<'a> FromPyObject<'a> for StringBlock<'a> { buf_size_mb: PYSTRING_BUFFER_SIZE, // in MB }) } - - fn extract_bound(ob: &pyo3::Bound<'a, PyAny>) -> PyResult { - Self::extract(ob.clone().into_gil_ref()) - } } impl<'a> StringBlock<'a> {