Skip to content

Commit

Permalink
Support Pandas 2.2, unpin Pyarrow, and fail on warnings in tests (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
hoxbro authored Aug 13, 2024
1 parent dbdf154 commit e7798e3
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 90 deletions.
5 changes: 3 additions & 2 deletions pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ lint = ["py311", "lint"]
dask-core = "*"
fsspec = "*"
numba = "*"
pandas = "<2.2" # FIX: Temporary upper pin
packaging = "*"
pandas = "*"
pip = "*"
pyarrow = ">=10,<15" # FIX: Temporary upper pin
pyarrow = ">=10"
retrying = "*"

[feature.py39.dependencies]
Expand Down
15 changes: 13 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,15 @@ classifiers = [
"Topic :: Scientific/Engineering",
"Topic :: Software Development :: Libraries",
]
dependencies = ['dask', 'fsspec >=2022.8', 'numba', 'pandas', 'pyarrow >=10', 'retrying']
dependencies = [
'dask',
'fsspec >=2022.8',
'numba',
'packaging',
'pandas',
'pyarrow >=10',
'retrying',
]

[project.urls]
Homepage = "https://github.com/holoviz/spatialpandas"
Expand Down Expand Up @@ -61,7 +69,10 @@ addopts = [
minversion = "7"
xfail_strict = true
log_cli_level = "INFO"
filterwarnings = []
filterwarnings = [
"error",
"ignore:datetime.datetime.utcnow():DeprecationWarning:botocore", # https://github.com/boto/boto3/issues/3889
]

[tool.ruff]
fix = true
Expand Down
7 changes: 6 additions & 1 deletion spatialpandas/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pyarrow as pa
import pyarrow.parquet as pq
from retrying import retry
from packaging.version import Version

import dask
import dask.dataframe as dd
Expand Down Expand Up @@ -190,7 +191,11 @@ def pack_partitions(self, npartitions=None, p=15, shuffle='tasks'):

# Set index to distance. This will trigger an expensive shuffle
# sort operation
ddf = ddf.set_index('hilbert_distance', npartitions=npartitions, shuffle=shuffle)
if Version(dask.__version__) >= Version('2024.1'):
shuffle_kwargs = {'shuffle_method': shuffle}
else:
shuffle_kwargs = {'shuffle': shuffle}
ddf = ddf.set_index('hilbert_distance', npartitions=npartitions, **shuffle_kwargs)

if ddf.npartitions != npartitions:
# set_index doesn't change the number of partitions if the partitions
Expand Down
16 changes: 16 additions & 0 deletions spatialpandas/geodataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,26 @@ def __init__(self, data=None, index=None, geometry=None, **kwargs):
def _constructor(self):
return _MaybeGeoDataFrame

def _constructor_from_mgr(self, mgr, axes):
if not any(isinstance(block.dtype, GeometryDtype) for block in mgr.blocks):
return pd.DataFrame._from_mgr(mgr, axes)

gdf = GeoDataFrame._from_mgr(mgr, axes)
if (gdf.columns == "geometry").sum() == 1: # only if "geometry" is single col
gdf._geometry = "geometry"
return gdf

@property
def _constructor_sliced(self):
return _MaybeGeoSeries

def _constructor_sliced_from_mgr(self, mgr, axes):
is_row_proxy = mgr.index.is_(self.columns)

if isinstance(mgr.blocks[0].dtype, GeometryDtype) and not is_row_proxy:
return GeoSeries._from_mgr(mgr, axes)
return pd.Series._from_mgr(mgr, axes)

def set_geometry(self, geometry, inplace=False):
if (geometry not in self or
not isinstance(self[geometry].dtype, GeometryDtype)):
Expand Down
35 changes: 18 additions & 17 deletions spatialpandas/geoseries.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import pandas as pd
from packaging.version import Version

from .geometry import GeometryDtype, Geometry

Expand Down Expand Up @@ -36,24 +35,13 @@ def __init__(self, data, index=None, name=None, dtype=None, **kwargs):
super().__init__(data, index=index, name=name, **kwargs)

def _constructor_from_mgr(self, mgr, axes):
if Version(pd.__version__) < Version('2.1'):
return super()._constructor_from_mgr(mgr, axes)
from pandas.core.internals import SingleBlockManager
assert isinstance(mgr, SingleBlockManager)

# Copied from pandas.Series._constructor_from_mgr
# https://github.com/pandas-dev/pandas/blob/80a1a8bc3e07972376284ffce425a2abd1e58604/pandas/core/series.py#L582-L590
# Changed if statement to use GeoSeries instead of Series.
# REF: https://github.com/pandas-dev/pandas/pull/52132
# REF: https://github.com/pandas-dev/pandas/pull/53871
if not isinstance(mgr.blocks[0].dtype, GeometryDtype):
return pd.Series._from_mgr(mgr, axes)

ser = self._from_mgr(mgr, axes=axes)
ser._name = None # caller is responsible for setting real name

if type(self) is GeoSeries: # Changed line
# fastpath avoiding constructor call
return ser
else:
assert axes is mgr.axes
return self._constructor(ser, copy=False)
return GeoSeries._from_mgr(mgr, axes)

@property
def _constructor(self):
Expand All @@ -64,6 +52,19 @@ def _constructor_expanddim(self):
from .geodataframe import GeoDataFrame
return GeoDataFrame

def _constructor_expanddim_from_mgr(self, mgr, axes):
from .geodataframe import GeoDataFrame
df = pd.DataFrame._from_mgr(mgr, axes)
geo_cols = [col for col in df.columns if isinstance(df[col].dtype, GeometryDtype)]
if geo_cols:
geo_col_name = geo_cols if len(geo_cols) == 1 else None

if geo_col_name is None or not isinstance(getattr(df[geo_col_name], "dtype", None), GeometryDtype):
df = GeoDataFrame(df)
else:
df = df.set_geometry(geo_col_name)
return df

@property
def bounds(self):
return pd.DataFrame(
Expand Down
75 changes: 15 additions & 60 deletions spatialpandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import fsspec
import pandas as pd
import pyarrow as pa
from dask import delayed
from dask.dataframe import from_delayed, from_pandas
from dask.dataframe import read_parquet as dd_read_parquet
Expand All @@ -29,9 +28,6 @@
# improve pandas compatibility, based on geopandas _compat.py
PANDAS_GE_12 = Version(pd.__version__) >= Version("1.2.0")

# When we drop support for pyarrow < 5 all code related to this can be removed.
LEGACY_PYARROW = Version(pa.__version__) < Version("5.0.0")


def _load_parquet_pandas_metadata(
path,
Expand All @@ -45,35 +41,20 @@ def _load_parquet_pandas_metadata(
raise ValueError("Path not found: " + path)

if filesystem.isdir(path):
if LEGACY_PYARROW:
basic_kwargs = dict(validate_schema=False)
else:
basic_kwargs = dict(use_legacy_dataset=False)

pqds = ParquetDataset(
path,
filesystem=filesystem,
**basic_kwargs,
**engine_kwargs,
)

if LEGACY_PYARROW:
common_metadata = pqds.common_metadata
if common_metadata is None:
# Get metadata for first piece
piece = pqds.pieces[0]
metadata = piece.get_metadata().metadata
else:
metadata = pqds.common_metadata.metadata
else:
filename = "/".join([_get_parent_path(pqds.files[0]), "_common_metadata"])
try:
common_metadata = _read_metadata(filename, filesystem=filesystem)
except FileNotFoundError:
# Common metadata doesn't exist, so get metadata for first piece instead
filename = pqds.files[0]
common_metadata = _read_metadata(filename, filesystem=filesystem)
metadata = common_metadata.metadata
filename = "/".join([_get_parent_path(pqds.files[0]), "_common_metadata"])
try:
common_metadata = _read_metadata(filename, filesystem=filesystem)
except FileNotFoundError:
# Common metadata doesn't exist, so get metadata for first piece instead
filename = pqds.files[0]
common_metadata = _read_metadata(filename, filesystem=filesystem)
metadata = common_metadata.metadata
else:
with filesystem.open(path) as f:
pf = ParquetFile(f)
Expand Down Expand Up @@ -128,29 +109,15 @@ def read_parquet(
engine_kwargs = engine_kwargs or {}
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)

if LEGACY_PYARROW:
basic_kwargs = dict(validate_schema=False)
else:
basic_kwargs = dict(use_legacy_dataset=False)

# Load using pyarrow to handle parquet files and directories across filesystems
dataset = ParquetDataset(
path,
filesystem=filesystem,
**basic_kwargs,
**engine_kwargs,
**kwargs,
)

if LEGACY_PYARROW:
metadata = _load_parquet_pandas_metadata(
path,
filesystem=filesystem,
storage_options=storage_options,
engine_kwargs=engine_kwargs,
)
else:
metadata = dataset.schema.pandas_metadata
metadata = dataset.schema.pandas_metadata

# If columns specified, prepend index columns to it
if columns is not None:
Expand Down Expand Up @@ -338,11 +305,6 @@ def _perform_read_parquet_dask(
filesystem,
storage_options,
)
if LEGACY_PYARROW:
basic_kwargs = dict(validate_schema=False)
else:
basic_kwargs = dict(use_legacy_dataset=False)

datasets = []
for path in paths:
if filesystem.isdir(path):
Expand All @@ -353,7 +315,6 @@ def _perform_read_parquet_dask(
d = ParquetDataset(
path,
filesystem=filesystem,
**basic_kwargs,
**engine_kwargs,
)
datasets.append(d)
Expand Down Expand Up @@ -420,10 +381,7 @@ def _perform_read_parquet_dask(
else:
cols_no_index = None

if LEGACY_PYARROW:
files = paths
else:
files = getattr(datasets[0], "files", paths)
files = getattr(datasets[0], "files", paths)

meta = dd_read_parquet(
files[0],
Expand Down Expand Up @@ -524,14 +482,11 @@ def _read_metadata(filename, filesystem):
def _load_partition_bounds(pqds, filesystem=None):
partition_bounds = None

if LEGACY_PYARROW:
common_metadata = pqds.common_metadata
else:
filename = "/".join([_get_parent_path(pqds.files[0]), "_common_metadata"])
try:
common_metadata = _read_metadata(filename, filesystem=filesystem)
except FileNotFoundError:
common_metadata = None
filename = "/".join([_get_parent_path(pqds.files[0]), "_common_metadata"])
try:
common_metadata = _read_metadata(filename, filesystem=filesystem)
except FileNotFoundError:
common_metadata = None

if common_metadata is not None and b'spatialpandas' in common_metadata.metadata:
spatial_metadata = json.loads(
Expand Down
3 changes: 2 additions & 1 deletion spatialpandas/tests/geometry/strategies.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import numpy as np
from geopandas import GeoSeries
from geopandas.array import from_shapely
Expand All @@ -11,7 +12,7 @@

hyp_settings = settings(
deadline=None,
max_examples=100,
max_examples=int(os.environ.get('HYPOTHESIS_MAX_EXAMPLES', 100)),
suppress_health_check=[HealthCheck.too_slow],
)

Expand Down
3 changes: 2 additions & 1 deletion spatialpandas/tests/spatialindex/test_hilbert_curve.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from itertools import product

import hypothesis.strategies as st
Expand All @@ -18,7 +19,7 @@
distances_from_coordinates,
)

hyp_settings = settings(deadline=None)
hyp_settings = settings(deadline=None, max_examples=int(os.environ.get('HYPOTHESIS_MAX_EXAMPLES', 100)))

# ### strategies ###
st_p = st.integers(min_value=1, max_value=5)
Expand Down
3 changes: 2 additions & 1 deletion spatialpandas/tests/spatialindex/test_rtree.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import pickle

import hypothesis.strategies as st
Expand All @@ -9,7 +10,7 @@
from spatialpandas.spatialindex import HilbertRtree

# ### hypothesis settings ###
hyp_settings = settings(deadline=None)
hyp_settings = settings(deadline=None, max_examples=int(os.environ.get('HYPOTHESIS_MAX_EXAMPLES', 100)))


# ### Custom strategies ###
Expand Down
Loading

0 comments on commit e7798e3

Please sign in to comment.