Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename Deferred to IO to reflect that its an I/O monad. #88

Merged
merged 1 commit into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ open Zarr
open Zarr.Codecs
open Zarr.Indexing
open Zarr_sync.Storage
open Deferred.Infix (* opens infix operators >>= and >>| for monadic bind & map *)
open IO.Infix (* opens infix operators >>= and >>| for monadic bind & map *)

let store = FilesystemStore.create "testdata.zarr";;
```
Expand Down
17 changes: 8 additions & 9 deletions zarr-eio/src/storage.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Deferred = struct
module IO = struct
type 'a t = 'a
let return = Fun.id
let bind x f = f x
Expand All @@ -19,14 +19,13 @@ module Deferred = struct
end
end

module ZipStore = Zarr.Zip.Make(Deferred)
module MemoryStore = Zarr.Memory.Make(Deferred)
module ZipStore = Zarr.Zip.Make(IO)
module MemoryStore = Zarr.Memory.Make(IO)

module FilesystemStore = struct
module IO = struct
module Deferred = Deferred

module S = struct
type t = {root : Eio.Fs.dir_ty Eio.Path.t; perm : Eio.File.Unix_perm.t}
type 'a io = 'a IO.t

let fspath_to_key t (path : Eio.Fs.dir_ty Eio.Path.t) =
let s = snd path and pos = String.length (snd t.root) + 1 in
Expand Down Expand Up @@ -121,12 +120,12 @@ module FilesystemStore = struct
let create ?(perm=0o700) ~env dirname =
Zarr.Util.create_parent_dir dirname perm;
Sys.mkdir dirname perm;
IO.{root = Eio.Path.(Eio.Stdenv.fs env / Zarr.Util.sanitize_dir dirname); perm}
S.{root = Eio.Path.(Eio.Stdenv.fs env / Zarr.Util.sanitize_dir dirname); perm}

let open_store ?(perm=0o700) ~env dirname =
if Sys.is_directory dirname
then IO.{root = Eio.Path.(Eio.Stdenv.fs env / Zarr.Util.sanitize_dir dirname); perm}
then S.{root = Eio.Path.(Eio.Stdenv.fs env / Zarr.Util.sanitize_dir dirname); perm}
else raise (Zarr.Storage.Not_a_filesystem_store dirname)

include Zarr.Storage.Make(IO)
include Zarr.Storage.Make(IO)(S)
end
8 changes: 4 additions & 4 deletions zarr-eio/src/storage.mli
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
module Deferred : Zarr.Types.Deferred with type 'a t = 'a
module IO : Zarr.Types.IO with type 'a t = 'a

(** An Eio-aware in-memory storage backend for Zarr v3 hierarchy. *)
module MemoryStore : Zarr.Memory.S with module Deferred = Deferred
module MemoryStore : Zarr.Memory.S with type 'a io := 'a

(** An Eio-aware Zip file storage backend for a Zarr v3 hierarchy. *)
module ZipStore : Zarr.Zip.S with module Deferred = Deferred
module ZipStore : Zarr.Zip.S with type 'a io := 'a

(** An Eio-aware local filesystem storage backend for a Zarr v3 hierarchy. *)
module FilesystemStore : sig
include Zarr.Storage.STORE with module Deferred = Deferred
include Zarr.Storage.S with type 'a io := 'a

val create : ?perm:Eio.File.Unix_perm.t -> env:<fs : Eio.Fs.dir_ty Eio.Path.t; ..> -> string -> t
(** [create ~perm ~env dir] returns a new filesystem store.
Expand Down
8 changes: 3 additions & 5 deletions zarr-eio/test/test_eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ let string_of_list = [%show: string list]
let print_node_pair = [%show: Node.Array.t list * Node.Group.t list]
let print_int_array = [%show : int array]

module type EIO_STORE = sig
include Zarr.Storage.STORE with type 'a Deferred.t = 'a
end
module type EIO_STORE = Zarr.Storage.S with type 'a io := 'a

let test_storage
(type a) (module M : EIO_STORE with type t = a) (store : a) =
Expand Down Expand Up @@ -68,7 +66,7 @@ let test_storage
assert_equal exp got;
match codecs with
| [`ShardingIndexed _] -> Array.delete store anode
| _ -> Deferred.return_unit)
| _ -> IO.return_unit)
[[`ShardingIndexed cfg]; [`Bytes BE]];

let child = Node.Group.of_path "/some/child/group" in
Expand Down Expand Up @@ -147,7 +145,7 @@ let _ =
let zpath = tmp_dir ^ ".zip" in
ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z);
(* test just opening the now exisitant archive created by the previous test. *)
ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit);
ZipStore.with_open `Read_only zpath (fun _ -> ());
test_storage (module MemoryStore) @@ MemoryStore.create ();
test_storage (module FilesystemStore) s)
])
63 changes: 31 additions & 32 deletions zarr-lwt/src/storage.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Deferred = struct
module IO = struct
type 'a t = 'a Lwt.t
let return = Lwt.return
let bind = Lwt.bind
Expand All @@ -19,16 +19,16 @@
end
end

module ZipStore = Zarr.Zip.Make(Deferred)
module MemoryStore = Zarr.Memory.Make(Deferred)
module ZipStore = Zarr.Zip.Make(IO)
module MemoryStore = Zarr.Memory.Make(IO)

module FilesystemStore = struct
module IO = struct
module Deferred = Deferred
open Deferred.Infix
open Deferred.Syntax
module S = struct
open IO.Infix
open IO.Syntax

type t = {dirname : string; perm : Lwt_unix.file_perm}
type 'a io = 'a IO.t

let fspath_to_key t path =
let pos = String.length t.dirname + 1 in
Expand All @@ -49,7 +49,7 @@
let size t key =
let file_length path () = Lwt.map Int64.to_int (Lwt_io.file_length path)
and filepath = key_to_fspath t key in
Lwt.catch (file_length filepath) (Fun.const @@ Deferred.return 0)
Lwt.catch (file_length filepath) (Fun.const @@ IO.return 0)

let get t key =
let* buf_size = size t key in
Expand Down Expand Up @@ -148,22 +148,22 @@
let create ?(perm=0o700) dirname =
Zarr.Util.create_parent_dir dirname perm;
Sys.mkdir dirname perm;
IO.{dirname = Zarr.Util.sanitize_dir dirname; perm}
S.{dirname = Zarr.Util.sanitize_dir dirname; perm}

let open_store ?(perm=0o700) dirname =
if Sys.is_directory dirname
then IO.{dirname = Zarr.Util.sanitize_dir dirname; perm}
then S.{dirname = Zarr.Util.sanitize_dir dirname; perm}
else raise (Zarr.Storage.Not_a_filesystem_store dirname)

include Zarr.Storage.Make(IO)
include Zarr.Storage.Make(IO)(S)
end

module AmazonS3Store = struct
module Credentials = Aws_s3_lwt.Credentials
module S3 = Aws_s3_lwt.S3

open Deferred.Infix
open Deferred.Syntax
open IO.Infix
open IO.Syntax

exception Request_failed of S3.error

Expand All @@ -178,7 +178,7 @@

let fold_or_catch ~not_found res =
let return_or_raise r () = match r with
| Ok v -> Deferred.return v
| Ok v -> IO.return v
| Error e -> raise (Request_failed e)
and on_exception ~not_found = function
| Request_failed S3.Not_found -> Lwt.return (not_found ())
Expand All @@ -190,19 +190,18 @@
let empty_Ls = Fun.const ([], S3.Ls.Done)

let fold_continuation ~return ~more = function
| S3.Ls.Done -> Deferred.return return
| S3.Ls.Done -> IO.return return
| S3.Ls.More continuation ->
continuation () >>= fold_or_catch ~not_found:empty_Ls >>= fun (xs, cont) ->
more xs cont

module IO = struct
module Deferred = Deferred

module S = struct
type t =
{retries : int
;bucket : string
;cred : Credentials.t
;endpoint : Aws_s3.Region.endpoint}
type 'a io = 'a IO.t

let size t key =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
Expand All @@ -229,19 +228,19 @@
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
Lwt.map (fun x -> [x]) (fold_or_catch ~not_found:(raise_not_found key) res)
in
Deferred.concat_map (read_range t key) ranges
IO.concat_map (read_range t key) ranges

let set t key data =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let f ~endpoint () = S3.put ~bucket ~credentials ~endpoint ~data ~key () in
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
let* _ = fold_or_catch ~not_found:(Fun.const String.empty) res in
Deferred.return_unit
IO.return_unit

let set_partial_values t key ?(append=false) rsv =
let* size = size t key in
let* ov = match size with
| 0 -> Deferred.return String.empty
| 0 -> IO.return String.empty
| _ -> get t key
in
let f = if append || ov = String.empty then
Expand All @@ -259,7 +258,7 @@
S3.retry ~retries:t.retries ~endpoint ~f () >>= fold_or_catch ~not_found:(Fun.const ())

let rec delete_keys t cont () =
let del t xs c = Deferred.iter (delete_content t) xs >>= delete_keys t c in
let del t xs c = IO.iter (delete_content t) xs >>= delete_keys t c in

Check warning on line 261 in zarr-lwt/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-lwt/src/storage.ml#L261

Added line #L261 was not covered by tests
fold_continuation ~return:() ~more:(del t) cont

and delete_content t S3.{key; _} = erase t key
Expand All @@ -269,7 +268,7 @@
let f ~endpoint () = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
Deferred.iter (delete_content t) xs >>= delete_keys t rest
IO.iter (delete_content t) xs >>= delete_keys t rest

let rec list t =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
Expand All @@ -284,32 +283,32 @@
let append acc xs c = accumulate_keys (acc @ List.map content_key xs) c in
fold_continuation ~return:acc ~more:(append acc) cont

module S = Set.Make(String)
module M = Set.Make(String)

let rec partition_keys prefix ((l, r) as acc) cont =
let split ~acc ~prefix xs c = partition_keys prefix (List.fold_left (add prefix) acc xs) c in
fold_continuation ~return:(l, S.elements r) ~more:(split ~acc ~prefix) cont
fold_continuation ~return:(l, M.elements r) ~more:(split ~acc ~prefix) cont

and add prefix (l, r) (c : S3.content) =
let size = String.length prefix in
if not (String.contains_from c.key size '/') then c.key :: l, r else
l, S.add String.(sub c.key 0 @@ 1 + index_from c.key size '/') r
l, M.add String.(sub c.key 0 @@ 1 + index_from c.key size '/') r

and list_dir t prefix =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let f ~endpoint () = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
let* res = S3.retry ~retries:t.retries ~endpoint ~f () in
let* xs, rest = fold_or_catch ~not_found:empty_Ls res in
let init = List.fold_left (add prefix) ([], S.empty) xs in
let init = List.fold_left (add prefix) ([], M.empty) xs in
partition_keys prefix init rest

let rec rename t prefix new_prefix =
let upload t (k, v) = set t k v in
let* xs = list t in
let to_delete = List.filter (String.starts_with ~prefix) xs in
let* data = Deferred.fold_left (rename_and_add ~t ~prefix ~new_prefix) [] to_delete in
let* () = Deferred.iter (upload t) data in
Deferred.iter (erase t) to_delete
let* data = IO.fold_left (rename_and_add ~t ~prefix ~new_prefix) [] to_delete in
let* () = IO.iter (upload t) data in
IO.iter (erase t) to_delete

and rename_and_add ~t ~prefix ~new_prefix acc k =
let l = String.length prefix in
Expand All @@ -321,7 +320,7 @@
let* res = Credentials.Helper.get_credentials ~profile () in
let cred = Result.fold ~ok:Fun.id ~error:raise res in
let endpoint = Aws_s3.Region.endpoint ~inet ~scheme region in
f IO.{bucket; cred; endpoint; retries}
f S.{bucket; cred; endpoint; retries}

include Zarr.Storage.Make(IO)
include Zarr.Storage.Make(IO)(S)
end
10 changes: 5 additions & 5 deletions zarr-lwt/src/storage.mli
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
module Deferred : Zarr.Types.Deferred with type 'a t = 'a Lwt.t
module IO : Zarr.Types.IO with type 'a t = 'a Lwt.t

(** An Lwt-aware in-memory storage backend for Zarr v3 hierarchy. *)
module MemoryStore : Zarr.Memory.S with module Deferred = Deferred
module MemoryStore : Zarr.Memory.S with type 'a io := 'a Lwt.t

(** An Lwt-aware Zip file storage backend for a Zarr v3 hierarchy. *)
module ZipStore : Zarr.Zip.S with module Deferred = Deferred
module ZipStore : Zarr.Zip.S with type 'a io := 'a Lwt.t

(** An Lwt-aware local filesystem storage backend for a Zarr V3 hierarchy. *)
module FilesystemStore : sig
include Zarr.Storage.STORE with module Deferred = Deferred
include Zarr.Storage.S with type 'a io := 'a Lwt.t

val create : ?perm:Unix.file_perm -> string -> t
(** [create ~perm dir] returns a new filesystem store.
Expand All @@ -25,7 +25,7 @@ end
module AmazonS3Store : sig
exception Request_failed of Aws_s3_lwt.S3.error

include Zarr.Storage.STORE with module Deferred = Deferred
include Zarr.Storage.S with type 'a io := 'a Lwt.t

val with_open :
?scheme:[ `Http | `Https ] ->
Expand Down
12 changes: 5 additions & 7 deletions zarr-lwt/test/test_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ let string_of_list = [%show: string list]
let print_node_pair = [%show: Node.Array.t list * Node.Group.t list]
let print_int_array = [%show : int array]

module type LWT_STORE = sig
include Zarr.Storage.STORE with type 'a Deferred.t = 'a Lwt.t
end
module type LWT_STORE = Zarr.Storage.S with type 'a io := 'a Lwt.t

let test_storage
(type a) (module M : LWT_STORE with type t = a) (store : a) =
let open M in
let open M.Deferred.Infix in
let open IO.Infix in
let gnode = Node.Group.root in

hierarchy store >>= fun nodes ->
Expand Down Expand Up @@ -69,7 +67,7 @@ let test_storage
assert_equal exp got;
match codecs with
| [`ShardingIndexed _] -> Array.delete store anode
| _ -> Deferred.return_unit)
| _ -> IO.return_unit)
[[`ShardingIndexed cfg]; [`Bytes BE]] >>= fun () ->

let child = Node.Group.of_path "/some/child/group" in
Expand Down Expand Up @@ -114,7 +112,7 @@ let test_storage
clear store >>= fun () ->
hierarchy store >>= fun got ->
assert_equal ~printer:print_node_pair ([], []) got;
Deferred.return_unit
IO.return_unit

let _ =
run_test_tt_main @@ ("Run Zarr Lwt API tests" >::: [
Expand Down Expand Up @@ -151,7 +149,7 @@ let _ =
Lwt_main.run @@ Lwt.join
[ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z)
(* test just opening the now exisitant archive created by the previous test. *)
;ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit)
;ZipStore.with_open `Read_only zpath (fun _ -> Lwt.return_unit)
;AmazonS3Store.with_open ~region ~bucket ~profile (test_storage (module AmazonS3Store))
;test_storage (module MemoryStore) @@ MemoryStore.create ()
;test_storage (module FilesystemStore) s])
Expand Down
Loading
Loading