diff --git a/src/uproot/source/fsspec.py b/src/uproot/source/fsspec.py index 9141695b4..29718de32 100644 --- a/src/uproot/source/fsspec.py +++ b/src/uproot/source/fsspec.py @@ -14,6 +14,20 @@ import uproot.source.futures +class PartFuture: + """For splitting the result of fs._cat_ranges into its components""" + + def __init__(self, parent_future: concurrent.futures.Future, part_index: int): + self._parent = parent_future + self._part_index = part_index + + def add_done_callback(self, callback, *, context=None): + self._parent.add_done_callback(callback) + + def result(self, timeout=None): + return self._parent.result(timeout=timeout)[self._part_index] + + class FSSpecSource(uproot.source.chunk.Source): """ Args: @@ -157,21 +171,27 @@ async def async_wrapper_thread(blocking_func, *args, **kwargs): # TODO: when python 3.8 is dropped, use `asyncio.to_thread` instead (also remove the try/except block above) return await to_thread(blocking_func, *args, **kwargs) - chunks = [] - for start, stop in ranges: - # _cat_file is async while cat_file is not. - coroutine = ( - self._fs._cat_file(self._file_path, start=start, end=stop) - if self._async_impl - else async_wrapper_thread( - self._fs.cat_file, self._file_path, start=start, end=stop - ) + paths = [self._file_path] * len(ranges) + starts = [start for start, _ in ranges] + ends = [stop for _, stop in ranges] + # _cat_ranges is async while cat_ranges is not. + coroutine = ( + self._fs._cat_ranges(paths=paths, starts=starts, ends=ends) + if self._async_impl + else async_wrapper_thread( + self._fs.cat_ranges, paths=paths, starts=starts, ends=ends ) + ) - future = self._executor.submit(coroutine) + future = self._executor.submit(coroutine) - chunk = uproot.source.chunk.Chunk(self, start, stop, future) - future.add_done_callback(uproot.source.chunk.notifier(chunk, notifications)) + chunks = [] + for index, (start, stop) in enumerate(ranges): + chunk_future = PartFuture(future, index) + chunk = uproot.source.chunk.Chunk(self, start, stop, chunk_future) + chunk_future.add_done_callback( + uproot.source.chunk.notifier(chunk, notifications) + ) chunks.append(chunk) return chunks