Skip to content

Commit

Permalink
feat: upgrade DataFusion, Arrow, PyO3, ObjectStore (#2594)
Browse files Browse the repository at this point in the history
BREAKING CHANGE:

* Removed access to `MultipartId`, `cleanup_partial_writes`. Uploads are
now automatically cleaned up.
  * `FragmentWriteProgress` no longer has any `multipart_id` argument.
* `CommitHandler` and `ManifestWriter` APIs now take a Lance
`ObjectStore` rather than `object_store::ObjectStore`.

New features:

* For S3 and Azure (these features were already implemented for GCS):
  * Max object size is now 2.5 TB, up from 50GB.
  * Retries are now performed for `Connection reset` network errors.
  * Reduced number of IOPS for writing small objects (<5MB)
  • Loading branch information
wjones127 authored Jul 16, 2024
1 parent ac3f75a commit 30af1d8
Show file tree
Hide file tree
Showing 54 changed files with 896 additions and 1,192 deletions.
73 changes: 37 additions & 36 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ exclude = ["python"]
resolver = "2"

[workspace.package]
version = "0.14.2"
version = "0.15.0"
edition = "2021"
authors = ["Lance Devs <[email protected]>"]
license = "Apache-2.0"
Expand All @@ -43,33 +43,33 @@ categories = [
rust-version = "1.78"

[workspace.dependencies]
lance = { version = "=0.14.2", path = "./rust/lance" }
lance-arrow = { version = "=0.14.2", path = "./rust/lance-arrow" }
lance-core = { version = "=0.14.2", path = "./rust/lance-core" }
lance-datafusion = { version = "=0.14.2", path = "./rust/lance-datafusion" }
lance-datagen = { version = "=0.14.2", path = "./rust/lance-datagen" }
lance-encoding = { version = "=0.14.2", path = "./rust/lance-encoding" }
lance-encoding-datafusion = { version = "=0.14.2", path = "./rust/lance-encoding-datafusion" }
lance-file = { version = "=0.14.2", path = "./rust/lance-file" }
lance-index = { version = "=0.14.2", path = "./rust/lance-index" }
lance-io = { version = "=0.14.2", path = "./rust/lance-io" }
lance-linalg = { version = "=0.14.2", path = "./rust/lance-linalg" }
lance-table = { version = "=0.14.2", path = "./rust/lance-table" }
lance-test-macros = { version = "=0.14.2", path = "./rust/lance-test-macros" }
lance-testing = { version = "=0.14.2", path = "./rust/lance-testing" }
lance = { version = "=0.15.0", path = "./rust/lance" }
lance-arrow = { version = "=0.15.0", path = "./rust/lance-arrow" }
lance-core = { version = "=0.15.0", path = "./rust/lance-core" }
lance-datafusion = { version = "=0.15.0", path = "./rust/lance-datafusion" }
lance-datagen = { version = "=0.15.0", path = "./rust/lance-datagen" }
lance-encoding = { version = "=0.15.0", path = "./rust/lance-encoding" }
lance-encoding-datafusion = { version = "=0.15.0", path = "./rust/lance-encoding-datafusion" }
lance-file = { version = "=0.15.0", path = "./rust/lance-file" }
lance-index = { version = "=0.15.0", path = "./rust/lance-index" }
lance-io = { version = "=0.15.0", path = "./rust/lance-io" }
lance-linalg = { version = "=0.15.0", path = "./rust/lance-linalg" }
lance-table = { version = "=0.15.0", path = "./rust/lance-table" }
lance-test-macros = { version = "=0.15.0", path = "./rust/lance-test-macros" }
lance-testing = { version = "=0.15.0", path = "./rust/lance-testing" }
approx = "0.5.1"
# Note that this one does not include pyarrow
arrow = { version = "51.0.0", optional = false, features = ["prettyprint"] }
arrow-arith = "51.0"
arrow-array = "51.0"
arrow-buffer = "51.0"
arrow-cast = "51.0"
arrow-data = "51.0"
arrow-ipc = { version = "51.0", features = ["zstd"] }
arrow-ord = "51.0"
arrow-row = "51.0"
arrow-schema = "51.0"
arrow-select = "51.0"
arrow = { version = "52.1", optional = false, features = ["prettyprint"] }
arrow-arith = "52.1"
arrow-array = "52.1"
arrow-buffer = "52.1"
arrow-cast = "52.1"
arrow-data = "52.1"
arrow-ipc = { version = "52.1", features = ["zstd"] }
arrow-ord = "52.1"
arrow-row = "52.1"
arrow-schema = "52.1"
arrow-select = "52.1"
async-recursion = "1.0"
async-trait = "0.1"
aws-config = "0.57"
Expand All @@ -93,17 +93,18 @@ criterion = { version = "0.5", features = [
"html_reports",
] }
crossbeam-queue = "0.3"
datafusion = { version = "37.1", default-features = false, features = [
datafusion = { version = "40.0", default-features = false, features = [
"array_expressions",
"regex_expressions",
"unicode_expressions",
] }
datafusion-common = "37.1"
datafusion-functions = { version = "37.1", features = ["regex_expressions"] }
datafusion-sql = "37.1"
datafusion-expr = "37.1"
datafusion-execution = "37.1"
datafusion-optimizer = "37.1"
datafusion-physical-expr = { version = "37.1", features = [
datafusion-common = "40.0"
datafusion-functions = { version = "40.0", features = ["regex_expressions"] }
datafusion-sql = "40.0"
datafusion-expr = "40.0"
datafusion-execution = "40.0"
datafusion-optimizer = "40.0"
datafusion-physical-expr = { version = "40.0", features = [
"regex_expressions",
] }
deepsize = "0.2.0"
Expand All @@ -119,8 +120,8 @@ mock_instant = { version = "0.3.1", features = ["sync"] }
moka = "0.11"
num-traits = "0.2"
num_cpus = "1.0"
object_store = { version = "0.9.0" }
parquet = "51.0"
object_store = { version = "0.10.1" }
parquet = "52.0"
pin-project = "1.0"
path_abs = "0.5"
pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
Expand Down
2 changes: 2 additions & 0 deletions ci/check_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ def parse_version(version: str) -> tuple[int, int, int]:

if __name__ == "__main__":
new_version = parse_version(get_versions())
print(f"New version: {new_version}")

repo = Github().get_repo(os.environ["GITHUB_REPOSITORY"])
latest_release = repo.get_latest_release()
last_version = parse_version(latest_release.tag_name[1:])
print(f"Last version: {last_version}")

# Check for a breaking-change label in the PRs between the last release and the current commit.
commits = repo.compare(latest_release.tag_name, os.environ["GITHUB_SHA"]).commits
Expand Down
17 changes: 7 additions & 10 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pylance"
version = "0.14.2"
version = "0.15.0"
edition = "2021"
authors = ["Lance Devs <[email protected]>"]
rust-version = "1.65"
Expand All @@ -12,11 +12,11 @@ name = "lance"
crate-type = ["cdylib"]

[dependencies]
arrow = { version = "51.0.0", features = ["pyarrow"] }
arrow-array = "51.0"
arrow-data = "51.0"
arrow-schema = "51.0"
object_store = "0.9.0"
arrow = { version = "52.1", features = ["pyarrow"] }
arrow-array = "52.1"
arrow-data = "52.1"
arrow-schema = "52.1"
object_store = "0.10.1"
async-trait = "0.1"
chrono = "0.4.31"
env_logger = "0.10"
Expand All @@ -42,7 +42,7 @@ lance-table = { path = "../rust/lance-table" }
lazy_static = "1"
log = "0.4"
prost = "0.12.2"
pyo3 = { version = "0.20", features = ["extension-module", "abi3-py39"] }
pyo3 = { version = "0.21", features = ["extension-module", "abi3-py39", "gil-refs"] }
tokio = { version = "1.23", features = ["rt-multi-thread"] }
uuid = "1.3.0"
serde_json = "1"
Expand All @@ -55,9 +55,6 @@ tracing-subscriber = "0.3.17"
tracing = "0.1.37"
url = "2.5.0"

# Prevent dynamic linking of lzma, which comes from datafusion
lzma-sys = { version = "*", features = ["static"] }

[features]
datagen = ["lance-datagen"]
fp16kernels = ["lance/fp16kernels"]
Expand Down
20 changes: 0 additions & 20 deletions python/python/lance/cleanup.py

This file was deleted.

76 changes: 5 additions & 71 deletions python/python/lance/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Dict, Optional

from .lance import _cleanup_partial_writes

if TYPE_CHECKING:
# We don't import directly because of circular import
from .fragment import FragmentMetadata
Expand All @@ -25,29 +23,22 @@ class FragmentWriteProgress(ABC):
This tracking class is experimental and may change in the future.
"""

def _do_begin(
self, fragment_json: str, multipart_id: Optional[str] = None, **kwargs
):
def _do_begin(self, fragment_json: str, **kwargs):
"""Called when a new fragment is created"""
from .fragment import FragmentMetadata

fragment = FragmentMetadata.from_json(fragment_json)
return self.begin(fragment, multipart_id, **kwargs)
return self.begin(fragment, **kwargs)

@abstractmethod
def begin(
self, fragment: "FragmentMetadata", multipart_id: Optional[str] = None, **kwargs
) -> None:
def begin(self, fragment: "FragmentMetadata", **kwargs) -> None:
"""Called when a new fragment is about to be written.
Parameters
----------
fragment : FragmentMetadata
The fragment that is open to write to. The fragment id might not
yet be assigned at this point.
multipart_id : str, optional
The multipart id that will be uploaded to cloud storage. This may be
used later to abort incomplete uploads if this fragment write fails.
kwargs: dict, optional
Extra keyword arguments to pass to the implementation.
Expand Down Expand Up @@ -84,9 +75,7 @@ class NoopFragmentWriteProgress(FragmentWriteProgress):
This is the default implementation.
"""

def begin(
self, fragment: "FragmentMetadata", multipart_id: Optional[str] = None, **kargs
):
def begin(self, fragment: "FragmentMetadata", **kargs):
pass

def complete(self, fragment: "FragmentMetadata", **kwargs):
Expand Down Expand Up @@ -135,25 +124,20 @@ def _in_progress_path(self, fragment: "FragmentMetadata") -> str:
def _fragment_file(self, fragment: "FragmentMetadata") -> str:
return os.path.join(self._base_path, f"fragment_{fragment.id}.json")

def begin(
self, fragment: "FragmentMetadata", multipart_id: Optional[str] = None, **kwargs
):
def begin(self, fragment: "FragmentMetadata", **kwargs):
"""Called when a new fragment is created.
Parameters
----------
fragment : FragmentMetadata
The fragment that is open to write to.
multipart_id : str, optional
The multipart id to upload this fragment to cloud storage.
"""

self._fs.create_dir(self._base_path, recursive=True)

with self._fs.open_output_stream(self._in_progress_path(fragment)) as out:
progress_data = {
"fragment_id": fragment.id,
"multipart_id": multipart_id if multipart_id else "",
"metadata": self._metadata,
}
out.write(json.dumps(progress_data).encode("utf-8"))
Expand All @@ -164,53 +148,3 @@ def begin(
def complete(self, fragment: "FragmentMetadata", **kwargs):
"""Called when a fragment is completed"""
self._fs.delete_file(self._in_progress_path(fragment))

def cleanup_partial_writes(self, dataset_uri: str) -> int:
"""
Finds all in-progress files and cleans up any partially written data
files. This is useful for cleaning up after a failed write.
Parameters
----------
dataset_uri : str
The URI of the table to clean up.
Returns
-------
int
The number of partial writes cleaned up.
"""
from pyarrow.fs import FileSelector

from .fragment import FragmentMetadata

marker_paths = []
objects = []
selector = FileSelector(self._base_path)
for info in self._fs.get_file_info(selector):
path = info.path
if path.endswith(self.PROGRESS_EXT):
marker_paths.append(path)
with self._fs.open_input_stream(path) as f:
progress_data = json.loads(f.read().decode("utf-8"))

json_path = path.rstrip(self.PROGRESS_EXT) + ".json"
with self._fs.open_input_stream(json_path) as f:
fragment_metadata = FragmentMetadata.from_json(
f.read().decode("utf-8")
)
objects.append(
(
fragment_metadata.data_files()[0].path(),
progress_data["multipart_id"],
)
)

_cleanup_partial_writes(dataset_uri, objects)

for path in marker_paths:
self._fs.delete_file(path)
json_path = path.rstrip(self.PROGRESS_EXT) + ".json"
self._fs.delete_file(json_path)

return len(objects)
5 changes: 1 addition & 4 deletions python/python/tests/helper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

from typing import Optional

from lance.fragment import FragmentMetadata
from lance.progress import FragmentWriteProgress
Expand All @@ -13,9 +12,7 @@ def __init__(self):
self.begin_called = 0
self.complete_called = 0

def begin(
self, fragment: FragmentMetadata, multipart_id: Optional[str] = None, **kwargs
):
def begin(self, fragment: FragmentMetadata, **kwargs):
self.begin_called += 1

def complete(self, fragment: FragmentMetadata):
Expand Down
6 changes: 0 additions & 6 deletions python/python/tests/test_fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ def test_dataset_progress(tmp_path: Path):
with open(progress_uri / "fragment_1.in_progress") as f:
progress_data = json.load(f)
assert progress_data["fragment_id"] == 1
assert isinstance(progress_data["multipart_id"], str)
# progress contains custom metadata
assert progress_data["metadata"]["test_key"] == "test_value"

Expand All @@ -226,11 +225,6 @@ def test_dataset_progress(tmp_path: Path):
metadata = json.load(f)
assert metadata["id"] == 1

progress.cleanup_partial_writes(str(dataset_uri))

assert not (progress_uri / "fragment_1.json").exists()
assert not (progress_uri / "fragment_1.in_progress").exists()


def test_fragment_meta():
# Intentionally leaving off column_indices / version fields to make sure
Expand Down
Loading

0 comments on commit 30af1d8

Please sign in to comment.