Skip to content

Commit

Permalink
Use cat_ranges in fsspec source (#1162)
Browse files Browse the repository at this point in the history
This allows filesystem implementations to use possibly more optimal
request strategies, such as fsspec-xrootd's use of vector_read.

Co-authored-by: Jim Pivarski <[email protected]>
  • Loading branch information
nsmith- and jpivarski authored Mar 20, 2024
1 parent cdaf9a7 commit e1cc99c
Showing 1 changed file with 32 additions and 12 deletions.
44 changes: 32 additions & 12 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@
import uproot.source.futures


class PartFuture:
"""For splitting the result of fs._cat_ranges into its components"""

def __init__(self, parent_future: concurrent.futures.Future, part_index: int):
self._parent = parent_future
self._part_index = part_index

def add_done_callback(self, callback, *, context=None):
self._parent.add_done_callback(callback)

def result(self, timeout=None):
return self._parent.result(timeout=timeout)[self._part_index]


class FSSpecSource(uproot.source.chunk.Source):
"""
Args:
Expand Down Expand Up @@ -157,21 +171,27 @@ async def async_wrapper_thread(blocking_func, *args, **kwargs):
# TODO: when python 3.8 is dropped, use `asyncio.to_thread` instead (also remove the try/except block above)
return await to_thread(blocking_func, *args, **kwargs)

chunks = []
for start, stop in ranges:
# _cat_file is async while cat_file is not.
coroutine = (
self._fs._cat_file(self._file_path, start=start, end=stop)
if self._async_impl
else async_wrapper_thread(
self._fs.cat_file, self._file_path, start=start, end=stop
)
paths = [self._file_path] * len(ranges)
starts = [start for start, _ in ranges]
ends = [stop for _, stop in ranges]
# _cat_ranges is async while cat_ranges is not.
coroutine = (
self._fs._cat_ranges(paths=paths, starts=starts, ends=ends)
if self._async_impl
else async_wrapper_thread(
self._fs.cat_ranges, paths=paths, starts=starts, ends=ends
)
)

future = self._executor.submit(coroutine)
future = self._executor.submit(coroutine)

chunk = uproot.source.chunk.Chunk(self, start, stop, future)
future.add_done_callback(uproot.source.chunk.notifier(chunk, notifications))
chunks = []
for index, (start, stop) in enumerate(ranges):
chunk_future = PartFuture(future, index)
chunk = uproot.source.chunk.Chunk(self, start, stop, chunk_future)
chunk_future.add_done_callback(
uproot.source.chunk.notifier(chunk, notifications)
)
chunks.append(chunk)
return chunks

Expand Down

0 comments on commit e1cc99c

Please sign in to comment.