Skip to content

Commit

Permalink
Changes by Mikaela
Browse files Browse the repository at this point in the history
  • Loading branch information
mikelkou committed Feb 5, 2025
1 parent ca9b8f6 commit 0482e75
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 28 deletions.
5 changes: 4 additions & 1 deletion benchmarks/asv.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// The version of the config file format. Do not change, unless
// you know what you are doing.
"version": 1,
"timeout": 1200,
"install_timeout": 1200,

// The name of the project being benchmarked
"project": "scanpy",
Expand Down Expand Up @@ -43,11 +45,12 @@
// If missing or the empty string, the tool will be automatically
// determined by looking for tools on the PATH environment
// variable.
"timeout": 1200, // Timeout for each benchmark in seconds
"environment_type": "conda",

// timeout in seconds for installing any dependencies in environment
// defaults to 10 min
//"install_timeout": 600,
"install_timeout": 1200,

// the base URL to show a commit for the project.
"show_commit_url": "https://github.com/scverse/scanpy/commit/",
Expand Down
19 changes: 19 additions & 0 deletions benchmarks/benchmarks/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,23 @@ def lung93k() -> AnnData:
return _lung93k().copy()



@cache
def _musmus_11m() -> AnnData:
# Define the path to the dataset
path = "/sc/arion/projects/psychAD/mikaela/scanpy/scanpy/benchmarks/data/MusMus_4M_cells_cellxgene.h5ad"
adata = sc.read_h5ad(path)
#assert isinstance(adata.X, sparse.csr_matrix)
# Add counts layer
#adata.layers["counts"] = adata.X.astype(np.int32, copy=True)
sc.pp.log1p(adata)
return adata


def musmus_11m() -> AnnData:
return _musmus_11m().copy()


@cache
def _large_synthetic_dataset(n_obs: int = 500_000, n_vars: int = 5_000, density: float = 0.01) -> AnnData:
"""
Expand Down Expand Up @@ -176,6 +193,8 @@ def _get_dataset_raw(dataset: Dataset) -> tuple[AnnData, str | None]:
adata, batch_key = lung93k(), "PatientNumber"
case "large_synthetic":
adata, batch_key = large_synthetic_dataset(), None
case "musmus_11m":
adata, batch_key = musmus_11m(), None
case _:
msg = f"Unknown dataset {dataset}"
raise AssertionError(msg)
Expand Down
8 changes: 4 additions & 4 deletions benchmarks/benchmarks/preprocessing_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def setup(dataset: Dataset, layer: KeyCount, *_):
# ASV suite

params: tuple[list[Dataset], list[KeyCount]] = (
["pbmc3k"],
# ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k", "large_synthetic"],
["pbmc3k", "pbmc68k_reduced", "bmmc", "musmus_11m"],
# ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k", "large_synthetic", "musmus_11m"],
["counts", "counts-off-axis"],
)
param_names = ["dataset", "layer"]
Expand Down Expand Up @@ -79,8 +79,8 @@ class FastSuite:
"""Suite for fast preprocessing operations."""

params: tuple[list[Dataset], list[KeyCount]] = (
# ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k", "large_synthetic"],
["pbmc3k"],
# ["pbmc3k", "pbmc68k_reduced", "bmmc", "lung93k", "large_synthetic", "musmus_11m"],
["pbmc3k", "pbmc68k_reduced", "bmmc", "musmus_11m"],
["counts", "counts-off-axis"],
)
param_names = ["dataset", "layer"]
Expand Down
74 changes: 51 additions & 23 deletions benchmarks/benchmarks/preprocessing_counts_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,57 @@ def setup(dataset: Dataset, layer: KeyCount, *_):
assert "log1p" not in adata.uns


#def setup_dask_cluster():
# """Set up a local Dask cluster for benchmarking."""
# cluster = LocalCluster(n_workers=4, threads_per_worker=2)
# client = Client(cluster)
# return client

def setup_dask_cluster():
"""Set up a local Dask cluster for benchmarking."""
cluster = LocalCluster(n_workers=4, threads_per_worker=2)
cluster = LocalCluster(n_workers=5,
threads_per_worker=2,
memory_limit="60GB",
timeout="1200s")
client = Client(cluster)
return client


# ASV suite
params: tuple[list[Dataset], list[KeyCount]] = (
["pbmc68k_reduced"],
["musmus_11m"],
["counts", "counts-off-axis"],
)
param_names = ["dataset", "layer"]

### Dask-Based Benchmarks ###

def time_filter_cells_dask(*_):
client = setup_dask_cluster()
try:
adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10))
adata.X = adata.X.map_blocks(sparse.csr_matrix) # Ensure sparse chunks
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
adata.X = dd.from_array(adata.X, chunks=optimal_chunks).persist()
sc.pp.filter_cells(adata, min_genes=100)
assert adata.n_obs > 0 # Ensure cells are retained
finally:
client.close()

#def time_filter_cells_dask(*_):
# client = setup_dask_cluster()
# try:
# # Compute optimal chunks based on Dask cluster
# optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
# adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
# adata.X = adata.X.persist() # Persist to avoid recomputation
# sc.pp.filter_cells(adata, min_genes=100)
# finally:
# client.close()


def peakmem_filter_cells_dask(*_):
client = setup_dask_cluster()
try:
adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50))
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
sc.pp.filter_cells(adata, min_genes=100)
finally:
client.close()
Expand All @@ -61,7 +82,9 @@ def peakmem_filter_cells_dask(*_):
def time_filter_genes_dask(*_):
client = setup_dask_cluster()
try:
adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10))
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
adata.X = adata.X.persist()
sc.pp.filter_genes(adata, min_cells=3)
finally:
client.close()
Expand All @@ -70,7 +93,8 @@ def time_filter_genes_dask(*_):
def peakmem_filter_genes_dask(*_):
client = setup_dask_cluster()
try:
adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10))
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
sc.pp.filter_genes(adata, min_cells=3)
finally:
client.close()
Expand All @@ -82,44 +106,45 @@ class FastSuite:
"""Suite for benchmarking preprocessing operations with Dask."""

params: tuple[list[Dataset], list[KeyCount]] = (
["pbmc68k_reduced"],
["musmus_11m"],
["counts", "counts-off-axis"],
)
param_names = ["dataset", "layer"]

def time_calculate_qc_metrics_dask(self, *_):
client = setup_dask_cluster()
try:
adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10))
adata.X = adata.X.map_blocks(sparse.csr_matrix)
sc.pp.calculate_qc_metrics(
adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True
)
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
adata.X = adata.X.persist()
sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True)
finally:
client.close()

def peakmem_calculate_qc_metrics_dask(self, *_):
client = setup_dask_cluster()
try:
adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 10, adata.X.shape[1] // 10))
sc.pp.calculate_qc_metrics(
adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True
)
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
sc.pp.calculate_qc_metrics(adata, qc_vars=["mt"], percent_top=None, log1p=False, inplace=True)
finally:
client.close()

def time_normalize_total_dask(self, *_):
client = setup_dask_cluster()
try:
adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50))
sc.pp.normalize_total(adata, target_sum=1e4)
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
adata.X = adata.X.map_blocks(lambda x: x / x.sum(axis=1), dtype=float) # Optimize normalization
adata.X = adata.X.persist()
finally:
client.close()

def peakmem_normalize_total_dask(self, *_):
client = setup_dask_cluster()
try:
adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0], adata.X.shape[1]))
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
sc.pp.normalize_total(adata, target_sum=1e4)
finally:
client.close()
Expand All @@ -128,7 +153,9 @@ def time_log1p_dask(self, *_):
client = setup_dask_cluster()
try:
adata.uns.pop("log1p", None)
adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 50, adata.X.shape[1] // 50))
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
adata.X = adata.X.persist()
sc.pp.log1p(adata)
finally:
client.close()
Expand All @@ -137,7 +164,8 @@ def peakmem_log1p_dask(self, *_):
client = setup_dask_cluster()
try:
adata.uns.pop("log1p", None)
adata.X = dd.from_array(adata.X, chunks=(adata.X.shape[0] // 100, adata.X.shape[1] // 100))
optimal_chunks = (adata.X.shape[0] // (4 * len(client.nthreads())), adata.X.shape[1])
adata.X = dd.from_array(adata.X, chunks=optimal_chunks)
sc.pp.log1p(adata)
finally:
client.close()

0 comments on commit 0482e75

Please sign in to comment.