From 684ef4b95786b6d778c336f81970093e79e7e983 Mon Sep 17 00:00:00 2001 From: Zolisa Bleki Date: Wed, 25 Dec 2024 20:18:25 +0200 Subject: [PATCH] Rename `Deferred` to `IO` to reflect that its an I/O monad. Also use destructive substituion where needed to make function signature return types more explicit. --- README.md | 2 +- zarr-eio/src/storage.ml | 17 ++++----- zarr-eio/src/storage.mli | 8 ++-- zarr-eio/test/test_eio.ml | 8 ++-- zarr-lwt/src/storage.ml | 63 +++++++++++++++---------------- zarr-lwt/src/storage.mli | 10 ++--- zarr-lwt/test/test_lwt.ml | 12 +++--- zarr-sync/src/storage.ml | 32 +++++++--------- zarr-sync/src/storage.mli | 8 ++-- zarr-sync/test/test_sync.ml | 6 +-- zarr/src/codecs.ml | 30 +++++++-------- zarr/src/codecs.mli | 13 +++---- zarr/src/storage/memory.ml | 65 ++++++++++++++------------------ zarr/src/storage/storage.ml | 54 +++++++++++++------------- zarr/src/storage/storage_intf.ml | 51 ++++++++++++------------- zarr/src/storage/zip.ml | 39 ++++++++++--------- zarr/src/types.ml | 31 +++++++-------- 17 files changed, 210 insertions(+), 239 deletions(-) diff --git a/README.md b/README.md index aac53936..82fa54e6 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ open Zarr open Zarr.Codecs open Zarr.Indexing open Zarr_sync.Storage -open Deferred.Infix (* opens infix operators >>= and >>| for monadic bind & map *) +open IO.Infix (* opens infix operators >>= and >>| for monadic bind & map *) let store = FilesystemStore.create "testdata.zarr";; ``` diff --git a/zarr-eio/src/storage.ml b/zarr-eio/src/storage.ml index ead57147..526a84c0 100644 --- a/zarr-eio/src/storage.ml +++ b/zarr-eio/src/storage.ml @@ -1,4 +1,4 @@ -module Deferred = struct +module IO = struct type 'a t = 'a let return = Fun.id let bind x f = f x @@ -19,14 +19,13 @@ module Deferred = struct end end -module ZipStore = Zarr.Zip.Make(Deferred) -module MemoryStore = Zarr.Memory.Make(Deferred) +module ZipStore = Zarr.Zip.Make(IO) +module MemoryStore = Zarr.Memory.Make(IO) module FilesystemStore = struct - module IO = struct - module Deferred = Deferred - + module S = struct type t = {root : Eio.Fs.dir_ty Eio.Path.t; perm : Eio.File.Unix_perm.t} + type 'a io = 'a IO.t let fspath_to_key t (path : Eio.Fs.dir_ty Eio.Path.t) = let s = snd path and pos = String.length (snd t.root) + 1 in @@ -121,12 +120,12 @@ module FilesystemStore = struct let create ?(perm=0o700) ~env dirname = Zarr.Util.create_parent_dir dirname perm; Sys.mkdir dirname perm; - IO.{root = Eio.Path.(Eio.Stdenv.fs env / Zarr.Util.sanitize_dir dirname); perm} + S.{root = Eio.Path.(Eio.Stdenv.fs env / Zarr.Util.sanitize_dir dirname); perm} let open_store ?(perm=0o700) ~env dirname = if Sys.is_directory dirname - then IO.{root = Eio.Path.(Eio.Stdenv.fs env / Zarr.Util.sanitize_dir dirname); perm} + then S.{root = Eio.Path.(Eio.Stdenv.fs env / Zarr.Util.sanitize_dir dirname); perm} else raise (Zarr.Storage.Not_a_filesystem_store dirname) - include Zarr.Storage.Make(IO) + include Zarr.Storage.Make(IO)(S) end diff --git a/zarr-eio/src/storage.mli b/zarr-eio/src/storage.mli index a5b11bfa..19678c0b 100644 --- a/zarr-eio/src/storage.mli +++ b/zarr-eio/src/storage.mli @@ -1,14 +1,14 @@ -module Deferred : Zarr.Types.Deferred with type 'a t = 'a +module IO : Zarr.Types.IO with type 'a t = 'a (** An Eio-aware in-memory storage backend for Zarr v3 hierarchy. *) -module MemoryStore : Zarr.Memory.S with module Deferred = Deferred +module MemoryStore : Zarr.Memory.S with type 'a io := 'a (** An Eio-aware Zip file storage backend for a Zarr v3 hierarchy. *) -module ZipStore : Zarr.Zip.S with module Deferred = Deferred +module ZipStore : Zarr.Zip.S with type 'a io := 'a (** An Eio-aware local filesystem storage backend for a Zarr v3 hierarchy. *) module FilesystemStore : sig - include Zarr.Storage.STORE with module Deferred = Deferred + include Zarr.Storage.S with type 'a io := 'a val create : ?perm:Eio.File.Unix_perm.t -> env: -> string -> t (** [create ~perm ~env dir] returns a new filesystem store. diff --git a/zarr-eio/test/test_eio.ml b/zarr-eio/test/test_eio.ml index ab4fed8e..1e27194d 100644 --- a/zarr-eio/test/test_eio.ml +++ b/zarr-eio/test/test_eio.ml @@ -8,9 +8,7 @@ let string_of_list = [%show: string list] let print_node_pair = [%show: Node.Array.t list * Node.Group.t list] let print_int_array = [%show : int array] -module type EIO_STORE = sig - include Zarr.Storage.STORE with type 'a Deferred.t = 'a -end +module type EIO_STORE = Zarr.Storage.S with type 'a io := 'a let test_storage (type a) (module M : EIO_STORE with type t = a) (store : a) = @@ -68,7 +66,7 @@ let test_storage assert_equal exp got; match codecs with | [`ShardingIndexed _] -> Array.delete store anode - | _ -> Deferred.return_unit) + | _ -> IO.return_unit) [[`ShardingIndexed cfg]; [`Bytes BE]]; let child = Node.Group.of_path "/some/child/group" in @@ -147,7 +145,7 @@ let _ = let zpath = tmp_dir ^ ".zip" in ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z); (* test just opening the now exisitant archive created by the previous test. *) - ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit); + ZipStore.with_open `Read_only zpath (fun _ -> ()); test_storage (module MemoryStore) @@ MemoryStore.create (); test_storage (module FilesystemStore) s) ]) diff --git a/zarr-lwt/src/storage.ml b/zarr-lwt/src/storage.ml index 7a568771..52e8cf92 100644 --- a/zarr-lwt/src/storage.ml +++ b/zarr-lwt/src/storage.ml @@ -1,4 +1,4 @@ -module Deferred = struct +module IO = struct type 'a t = 'a Lwt.t let return = Lwt.return let bind = Lwt.bind @@ -19,16 +19,16 @@ module Deferred = struct end end -module ZipStore = Zarr.Zip.Make(Deferred) -module MemoryStore = Zarr.Memory.Make(Deferred) +module ZipStore = Zarr.Zip.Make(IO) +module MemoryStore = Zarr.Memory.Make(IO) module FilesystemStore = struct - module IO = struct - module Deferred = Deferred - open Deferred.Infix - open Deferred.Syntax + module S = struct + open IO.Infix + open IO.Syntax type t = {dirname : string; perm : Lwt_unix.file_perm} + type 'a io = 'a IO.t let fspath_to_key t path = let pos = String.length t.dirname + 1 in @@ -49,7 +49,7 @@ module FilesystemStore = struct let size t key = let file_length path () = Lwt.map Int64.to_int (Lwt_io.file_length path) and filepath = key_to_fspath t key in - Lwt.catch (file_length filepath) (Fun.const @@ Deferred.return 0) + Lwt.catch (file_length filepath) (Fun.const @@ IO.return 0) let get t key = let* buf_size = size t key in @@ -148,22 +148,22 @@ module FilesystemStore = struct let create ?(perm=0o700) dirname = Zarr.Util.create_parent_dir dirname perm; Sys.mkdir dirname perm; - IO.{dirname = Zarr.Util.sanitize_dir dirname; perm} + S.{dirname = Zarr.Util.sanitize_dir dirname; perm} let open_store ?(perm=0o700) dirname = if Sys.is_directory dirname - then IO.{dirname = Zarr.Util.sanitize_dir dirname; perm} + then S.{dirname = Zarr.Util.sanitize_dir dirname; perm} else raise (Zarr.Storage.Not_a_filesystem_store dirname) - include Zarr.Storage.Make(IO) + include Zarr.Storage.Make(IO)(S) end module AmazonS3Store = struct module Credentials = Aws_s3_lwt.Credentials module S3 = Aws_s3_lwt.S3 - open Deferred.Infix - open Deferred.Syntax + open IO.Infix + open IO.Syntax exception Request_failed of S3.error @@ -178,7 +178,7 @@ module AmazonS3Store = struct let fold_or_catch ~not_found res = let return_or_raise r () = match r with - | Ok v -> Deferred.return v + | Ok v -> IO.return v | Error e -> raise (Request_failed e) and on_exception ~not_found = function | Request_failed S3.Not_found -> Lwt.return (not_found ()) @@ -190,19 +190,18 @@ module AmazonS3Store = struct let empty_Ls = Fun.const ([], S3.Ls.Done) let fold_continuation ~return ~more = function - | S3.Ls.Done -> Deferred.return return + | S3.Ls.Done -> IO.return return | S3.Ls.More continuation -> continuation () >>= fold_or_catch ~not_found:empty_Ls >>= fun (xs, cont) -> more xs cont - module IO = struct - module Deferred = Deferred - + module S = struct type t = {retries : int ;bucket : string ;cred : Credentials.t ;endpoint : Aws_s3.Region.endpoint} + type 'a io = 'a IO.t let size t key = let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in @@ -229,19 +228,19 @@ module AmazonS3Store = struct let* res = S3.retry ~retries:t.retries ~endpoint ~f () in Lwt.map (fun x -> [x]) (fold_or_catch ~not_found:(raise_not_found key) res) in - Deferred.concat_map (read_range t key) ranges + IO.concat_map (read_range t key) ranges let set t key data = let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in let f ~endpoint () = S3.put ~bucket ~credentials ~endpoint ~data ~key () in let* res = S3.retry ~retries:t.retries ~endpoint ~f () in let* _ = fold_or_catch ~not_found:(Fun.const String.empty) res in - Deferred.return_unit + IO.return_unit let set_partial_values t key ?(append=false) rsv = let* size = size t key in let* ov = match size with - | 0 -> Deferred.return String.empty + | 0 -> IO.return String.empty | _ -> get t key in let f = if append || ov = String.empty then @@ -259,7 +258,7 @@ module AmazonS3Store = struct S3.retry ~retries:t.retries ~endpoint ~f () >>= fold_or_catch ~not_found:(Fun.const ()) let rec delete_keys t cont () = - let del t xs c = Deferred.iter (delete_content t) xs >>= delete_keys t c in + let del t xs c = IO.iter (delete_content t) xs >>= delete_keys t c in fold_continuation ~return:() ~more:(del t) cont and delete_content t S3.{key; _} = erase t key @@ -269,7 +268,7 @@ module AmazonS3Store = struct let f ~endpoint () = S3.ls ~bucket ~credentials ~endpoint ~prefix () in let* res = S3.retry ~retries:t.retries ~endpoint ~f () in let* xs, rest = fold_or_catch ~not_found:empty_Ls res in - Deferred.iter (delete_content t) xs >>= delete_keys t rest + IO.iter (delete_content t) xs >>= delete_keys t rest let rec list t = let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in @@ -284,32 +283,32 @@ module AmazonS3Store = struct let append acc xs c = accumulate_keys (acc @ List.map content_key xs) c in fold_continuation ~return:acc ~more:(append acc) cont - module S = Set.Make(String) + module M = Set.Make(String) let rec partition_keys prefix ((l, r) as acc) cont = let split ~acc ~prefix xs c = partition_keys prefix (List.fold_left (add prefix) acc xs) c in - fold_continuation ~return:(l, S.elements r) ~more:(split ~acc ~prefix) cont + fold_continuation ~return:(l, M.elements r) ~more:(split ~acc ~prefix) cont and add prefix (l, r) (c : S3.content) = let size = String.length prefix in if not (String.contains_from c.key size '/') then c.key :: l, r else - l, S.add String.(sub c.key 0 @@ 1 + index_from c.key size '/') r + l, M.add String.(sub c.key 0 @@ 1 + index_from c.key size '/') r and list_dir t prefix = let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in let f ~endpoint () = S3.ls ~bucket ~credentials ~endpoint ~prefix () in let* res = S3.retry ~retries:t.retries ~endpoint ~f () in let* xs, rest = fold_or_catch ~not_found:empty_Ls res in - let init = List.fold_left (add prefix) ([], S.empty) xs in + let init = List.fold_left (add prefix) ([], M.empty) xs in partition_keys prefix init rest let rec rename t prefix new_prefix = let upload t (k, v) = set t k v in let* xs = list t in let to_delete = List.filter (String.starts_with ~prefix) xs in - let* data = Deferred.fold_left (rename_and_add ~t ~prefix ~new_prefix) [] to_delete in - let* () = Deferred.iter (upload t) data in - Deferred.iter (erase t) to_delete + let* data = IO.fold_left (rename_and_add ~t ~prefix ~new_prefix) [] to_delete in + let* () = IO.iter (upload t) data in + IO.iter (erase t) to_delete and rename_and_add ~t ~prefix ~new_prefix acc k = let l = String.length prefix in @@ -321,7 +320,7 @@ module AmazonS3Store = struct let* res = Credentials.Helper.get_credentials ~profile () in let cred = Result.fold ~ok:Fun.id ~error:raise res in let endpoint = Aws_s3.Region.endpoint ~inet ~scheme region in - f IO.{bucket; cred; endpoint; retries} + f S.{bucket; cred; endpoint; retries} - include Zarr.Storage.Make(IO) + include Zarr.Storage.Make(IO)(S) end diff --git a/zarr-lwt/src/storage.mli b/zarr-lwt/src/storage.mli index 4326e6de..774a4e14 100644 --- a/zarr-lwt/src/storage.mli +++ b/zarr-lwt/src/storage.mli @@ -1,14 +1,14 @@ -module Deferred : Zarr.Types.Deferred with type 'a t = 'a Lwt.t +module IO : Zarr.Types.IO with type 'a t = 'a Lwt.t (** An Lwt-aware in-memory storage backend for Zarr v3 hierarchy. *) -module MemoryStore : Zarr.Memory.S with module Deferred = Deferred +module MemoryStore : Zarr.Memory.S with type 'a io := 'a Lwt.t (** An Lwt-aware Zip file storage backend for a Zarr v3 hierarchy. *) -module ZipStore : Zarr.Zip.S with module Deferred = Deferred +module ZipStore : Zarr.Zip.S with type 'a io := 'a Lwt.t (** An Lwt-aware local filesystem storage backend for a Zarr V3 hierarchy. *) module FilesystemStore : sig - include Zarr.Storage.STORE with module Deferred = Deferred + include Zarr.Storage.S with type 'a io := 'a Lwt.t val create : ?perm:Unix.file_perm -> string -> t (** [create ~perm dir] returns a new filesystem store. @@ -25,7 +25,7 @@ end module AmazonS3Store : sig exception Request_failed of Aws_s3_lwt.S3.error - include Zarr.Storage.STORE with module Deferred = Deferred + include Zarr.Storage.S with type 'a io := 'a Lwt.t val with_open : ?scheme:[ `Http | `Https ] -> diff --git a/zarr-lwt/test/test_lwt.ml b/zarr-lwt/test/test_lwt.ml index cf0b8932..3221fe95 100644 --- a/zarr-lwt/test/test_lwt.ml +++ b/zarr-lwt/test/test_lwt.ml @@ -8,14 +8,12 @@ let string_of_list = [%show: string list] let print_node_pair = [%show: Node.Array.t list * Node.Group.t list] let print_int_array = [%show : int array] -module type LWT_STORE = sig - include Zarr.Storage.STORE with type 'a Deferred.t = 'a Lwt.t -end +module type LWT_STORE = Zarr.Storage.S with type 'a io := 'a Lwt.t let test_storage (type a) (module M : LWT_STORE with type t = a) (store : a) = let open M in - let open M.Deferred.Infix in + let open IO.Infix in let gnode = Node.Group.root in hierarchy store >>= fun nodes -> @@ -69,7 +67,7 @@ let test_storage assert_equal exp got; match codecs with | [`ShardingIndexed _] -> Array.delete store anode - | _ -> Deferred.return_unit) + | _ -> IO.return_unit) [[`ShardingIndexed cfg]; [`Bytes BE]] >>= fun () -> let child = Node.Group.of_path "/some/child/group" in @@ -114,7 +112,7 @@ let test_storage clear store >>= fun () -> hierarchy store >>= fun got -> assert_equal ~printer:print_node_pair ([], []) got; - Deferred.return_unit + IO.return_unit let _ = run_test_tt_main @@ ("Run Zarr Lwt API tests" >::: [ @@ -151,7 +149,7 @@ let _ = Lwt_main.run @@ Lwt.join [ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z) (* test just opening the now exisitant archive created by the previous test. *) - ;ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit) + ;ZipStore.with_open `Read_only zpath (fun _ -> Lwt.return_unit) ;AmazonS3Store.with_open ~region ~bucket ~profile (test_storage (module AmazonS3Store)) ;test_storage (module MemoryStore) @@ MemoryStore.create () ;test_storage (module FilesystemStore) s]) diff --git a/zarr-sync/src/storage.ml b/zarr-sync/src/storage.ml index 60f8036a..7b0b738e 100644 --- a/zarr-sync/src/storage.ml +++ b/zarr-sync/src/storage.ml @@ -1,4 +1,4 @@ -module Deferred = struct +module IO = struct type 'a t = 'a let return = Fun.id let bind x f = f x @@ -19,14 +19,13 @@ module Deferred = struct end end -module ZipStore = Zarr.Zip.Make(Deferred) -module MemoryStore = Zarr.Memory.Make(Deferred) +module ZipStore = Zarr.Zip.Make(IO) +module MemoryStore = Zarr.Memory.Make(IO) module FilesystemStore = struct - module IO = struct - module Deferred = Deferred - + module S = struct type t = {dirname : string; perm : int} + type 'a io = 'a IO.t let fspath_to_key t path = let pos = String.length t.dirname + 1 in @@ -35,8 +34,7 @@ module FilesystemStore = struct let key_to_fspath t key = Filename.concat t.dirname key let get t key = - let p = key_to_fspath t key in - try In_channel.(with_open_gen [Open_rdonly] t.perm p input_all) with + try In_channel.(with_open_gen [Open_rdonly] t.perm (key_to_fspath t key) input_all) with | Sys_error _ -> raise (Zarr.Storage.Key_not_found key) let get_partial_values t key ranges = @@ -71,13 +69,9 @@ module FilesystemStore = struct List.iter (write ~oc) rvs; Out_channel.flush oc - let is_member t key = Sys.file_exists (key_to_fspath t key) - let erase t key = Sys.remove (key_to_fspath t key) - - let size t key = - match In_channel.(with_open_gen [Open_rdonly] t.perm (key_to_fspath t key) length) with - | exception Sys_error _ -> 0 - | s -> Int64.to_int s + let size t key = + try In_channel.(with_open_gen [Open_rdonly] t.perm (key_to_fspath t key) length) |> Int64.to_int with + | Sys_error _ -> 0 let rec walk t acc dir = let accumulate ~t a x = match Filename.concat dir x with @@ -98,19 +92,21 @@ module FilesystemStore = struct let list t = walk t [] (key_to_fspath t "") let list_prefix t prefix = walk t [] (key_to_fspath t prefix) + let erase t key = Sys.remove (key_to_fspath t key) let erase_prefix t pre = List.iter (erase t) (list_prefix t pre) let rename t k k' = Sys.rename (key_to_fspath t k) (key_to_fspath t k') + let is_member t key = Sys.file_exists (key_to_fspath t key) end let create ?(perm=0o700) dirname = Zarr.Util.create_parent_dir dirname perm; Sys.mkdir dirname perm; - IO.{dirname = Zarr.Util.sanitize_dir dirname; perm} + S.{dirname = Zarr.Util.sanitize_dir dirname; perm} let open_store ?(perm=0o700) dirname = if Sys.is_directory dirname - then IO.{dirname = Zarr.Util.sanitize_dir dirname; perm} + then S.{dirname = Zarr.Util.sanitize_dir dirname; perm} else raise (Zarr.Storage.Not_a_filesystem_store dirname) - include Zarr.Storage.Make(IO) + include Zarr.Storage.Make(IO)(S) end diff --git a/zarr-sync/src/storage.mli b/zarr-sync/src/storage.mli index 0c916043..fde585a9 100644 --- a/zarr-sync/src/storage.mli +++ b/zarr-sync/src/storage.mli @@ -1,14 +1,14 @@ -module Deferred : Zarr.Types.Deferred with type 'a t = 'a +module IO : Zarr.Types.IO with type 'a t = 'a (** A blocking I/O in-memory storage backend for Zarr v3 hierarchy. *) -module MemoryStore : Zarr.Memory.S with module Deferred = Deferred +module MemoryStore : Zarr.Memory.S with type 'a io := 'a (** A blocking I/O Zip file storage backend for a Zarr v3 hierarchy. *) -module ZipStore : Zarr.Zip.S with module Deferred = Deferred +module ZipStore : Zarr.Zip.S with type 'a io := 'a (** A blocking I/O local filesystem storage backend for a Zarr v3 hierarchy. *) module FilesystemStore : sig - include Zarr.Storage.STORE with module Deferred = Deferred + include Zarr.Storage.S with type 'a io := 'a val create : ?perm:int -> string -> t (** [create ~perm dir] returns a new filesystem store. diff --git a/zarr-sync/test/test_sync.ml b/zarr-sync/test/test_sync.ml index bd5f9e54..4d081b42 100644 --- a/zarr-sync/test/test_sync.ml +++ b/zarr-sync/test/test_sync.ml @@ -8,9 +8,7 @@ let string_of_list = [%show: string list] let print_node_pair = [%show: Node.Array.t list * Node.Group.t list] let print_int_array = [%show : int array] -module type SYNC_STORE = sig - include Zarr.Storage.STORE with type 'a Deferred.t = 'a -end +module type SYNC_STORE = Zarr.Storage.S with type 'a io := 'a let test_storage (type a) (module M : SYNC_STORE with type t = a) (store : a) = @@ -207,7 +205,7 @@ let _ = let zpath = tmp_dir ^ ".zip" in ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z); (* test just opening the now exisitant archive created by the previous test. *) - ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit); + ZipStore.with_open `Read_only zpath (fun _ -> ()); test_storage (module MemoryStore) @@ MemoryStore.create (); test_storage (module FilesystemStore) s) ]) diff --git a/zarr/src/codecs.ml b/zarr/src/codecs.ml index a85ff2cf..03b28fe9 100644 --- a/zarr/src/codecs.ml +++ b/zarr/src/codecs.ml @@ -191,13 +191,12 @@ module ArrayMap = Util.ArrayMap module RegularGrid = Extensions.RegularGrid module rec ArrayToBytes : sig - module Make (Io : Types.IO) : sig - open Io + module Make (IO : Types.IO) : sig type t = internal_shard_config - type get_partial_values = Types.range list -> string list Deferred.t - type set_fn = ?append:bool -> (int * string) list -> unit Deferred.t - val partial_encode : t -> get_partial_values -> set_fn -> int -> 'a array_repr -> (int array * 'a) list -> 'a -> unit Deferred.t - val partial_decode : t -> get_partial_values -> int -> 'a array_repr -> (int * int array) list -> 'a -> (int * 'a) list Deferred.t + type get_partial_values = Types.range list -> string list IO.t + type set_fn = ?append:bool -> (int * string) list -> unit IO.t + val partial_encode : t -> get_partial_values -> set_fn -> int -> 'a array_repr -> (int array * 'a) list -> 'a -> unit IO.t + val partial_decode : t -> get_partial_values -> int -> 'a array_repr -> (int * int array) list -> 'a -> (int * 'a) list IO.t end val parse : arraytobytes -> int array -> unit val encoded_size : int -> fixed_arraytobytes -> int @@ -207,14 +206,13 @@ module rec ArrayToBytes : sig val to_yojson : arraytobytes -> Yojson.Safe.t end = struct - module Make (Io : Types.IO) = struct - open Io - open Deferred.Syntax + module Make (IO : Types.IO) = struct + open IO.Syntax open ShardingIndexed type t = ShardingIndexed.t - type get_partial_values = (int * int option) list -> string list Deferred.t - type set_fn = ?append:bool -> (int * string) list -> unit Deferred.t + type get_partial_values = (int * int option) list -> string list IO.t + type set_fn = ?append:bool -> (int * string) list -> unit IO.t let add_binding ~grid acc (c, v) = let id, co = RegularGrid.index_coord_pair grid c in @@ -314,11 +312,11 @@ end = struct List.fold_left2 (accumulate_nonempty ~repr' ~idx_arr) (shardsize, [], []) xs nonempty' in let* () = match inplace with - | [] -> Deferred.return_unit + | [] -> IO.return_unit | rs -> set_partial rs in let* () = match nonempty_append with - | [] -> Deferred.return_unit + | [] -> IO.return_unit | rs -> set_partial ~append:true (List.rev rs) in (* new values that need to be written to previously empty inner chunks will @@ -328,7 +326,7 @@ end = struct List.fold_left (accumulate_empty ~repr' ~idx_arr ~fv) (bsize, []) empty in let* () = match empty_append with - | [] -> Deferred.return_unit + | [] -> IO.return_unit | rs -> set_partial ~append:true (List.rev rs) in let ib = encode_index_chain t.index_codecs idx_arr in @@ -813,8 +811,8 @@ module Chain = struct Error (Printf.sprintf "%s codec is unsupported or has invalid configuration." codec) end -module Make (Io : Types.IO) = struct - module M = ArrayToBytes.Make(Io) +module Make (IO : Types.IO) = struct + module M = ArrayToBytes.Make(IO) let is_just_sharding : Chain.t -> bool = function | {a2a = []; a2b = `ShardingIndexed _; b2b = []} -> true diff --git a/zarr/src/codecs.mli b/zarr/src/codecs.mli index cd254e12..5e39ed1d 100644 --- a/zarr/src/codecs.mli +++ b/zarr/src/codecs.mli @@ -107,8 +107,7 @@ end (** A functor for generating a Sharding Indexed codec that supports partial (en/de)coding via IO operations. *) -module Make (Io : Types.IO) : sig - open Io +module Make (IO : Types.IO) : sig (** [is_just_sharding t] is [true] if the codec chain [t] contains only the [sharding_indexed] codec. *) @@ -116,20 +115,20 @@ module Make (Io : Types.IO) : sig val partial_encode : Chain.t -> - ((int * int option) list -> string list Deferred.t) -> - (?append:bool -> (int * string) list -> unit Deferred.t) -> + (Types.range list -> string list IO.t) -> + (?append:bool -> (int * string) list -> unit IO.t) -> int -> 'a array_repr -> (int array * 'a) list -> 'a -> - unit Deferred.t + unit IO.t val partial_decode : Chain.t -> - ((int * int option) list -> string list Deferred.t) -> + (Types.range list -> string list IO.t) -> int -> 'a array_repr -> (int * int array) list -> 'a -> - (int * 'a) list Deferred.t + (int * 'a) list IO.t end diff --git a/zarr/src/storage/memory.ml b/zarr/src/storage/memory.ml index 1b0b9eb1..e25d7821 100644 --- a/zarr/src/storage/memory.ml +++ b/zarr/src/storage/memory.ml @@ -1,58 +1,55 @@ module type S = sig - include Storage.STORE + include Storage.S val create : unit -> t (** [create ()] returns a new In-memory Zarr store type.*) end -module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferred.t = struct - open Deferred.Syntax +module Make (IO : Types.IO) : S with type 'a io := 'a IO.t = struct + open IO.Syntax module M = Map.Make(String) - module IO = struct - module Deferred = Deferred - + module Store = struct type t = string M.t Atomic.t + type 'a io = 'a IO.t - let get : t -> string -> string Deferred.t = fun t key -> + let get : t -> string -> string io = fun t key -> match M.find_opt key (Atomic.get t) with | None -> raise (Storage.Key_not_found key) - | Some v -> Deferred.return v + | Some v -> IO.return v - let rec set : t -> string -> string -> unit Deferred.t = fun t key value -> + let rec set : t -> string -> string -> unit io = fun t key value -> let m = Atomic.get t in if Atomic.compare_and_set t m (M.add key value m) - then Deferred.return_unit else set t key value + then IO.return_unit else set t key value - let list : t -> string list Deferred.t = fun t -> + let list : t -> string list io = fun t -> let m = Atomic.get t in - Deferred.return @@ M.fold (fun k _ acc -> k :: acc) m [] + IO.return @@ M.fold (fun k _ acc -> k :: acc) m [] - let is_member : t -> string -> bool Deferred.t = fun t key -> + let is_member : t -> string -> bool io = fun t key -> let m = Atomic.get t in - Deferred.return (M.mem key m) + IO.return (M.mem key m) - let rec erase : t -> string -> unit Deferred.t = fun t key -> + let rec erase : t -> string -> unit io = fun t key -> let m = Atomic.get t in let m' = M.update key (Fun.const None) m in if Atomic.compare_and_set t m m' - then Deferred.return_unit else erase t key + then IO.return_unit else erase t key - let size : t -> string -> int Deferred.t = fun t key -> + let size : t -> string -> int io = fun t key -> match M.find_opt key (Atomic.get t) with - | None -> Deferred.return 0 - | Some e -> Deferred.return (String.length e) + | None -> IO.return 0 + | Some e -> IO.return (String.length e) - let rec erase_prefix : t -> string -> unit Deferred.t = fun t prefix -> + let rec erase_prefix : t -> string -> unit io = fun t prefix -> let pred ~prefix k v = if String.starts_with ~prefix k then None else Some v in let m = Atomic.get t in let m' = M.filter_map (pred ~prefix) m in if Atomic.compare_and_set t m m' - then Deferred.return_unit else erase_prefix t prefix + then IO.return_unit else erase_prefix t prefix - let get_partial_values : - t -> string -> (int * int option) list -> string list Deferred.t - = fun t key ranges -> + let get_partial_values t key (ranges : Types.range list) = let read_range ~data ~size (ofs, len) = match len with | Some l -> String.sub data ofs l | None -> String.sub data ofs (size - ofs) @@ -61,9 +58,7 @@ module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferre let size = String.length data in List.map (read_range ~data ~size) ranges - let rec set_partial_values : - t -> string -> ?append:bool -> (int * string) list -> unit Deferred.t - = fun t key ?(append=false) rv -> + let rec set_partial_values t key ?(append=false) (rv : (int * string) list) = let m = Atomic.get t in let ov = Option.fold ~none:String.empty ~some:Fun.id (M.find_opt key m) in let f = if append || ov = String.empty then @@ -75,9 +70,9 @@ module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferre in let m' = M.add key (List.fold_left f ov rv) m in if Atomic.compare_and_set t m m' - then Deferred.return_unit else set_partial_values t key ~append rv + then IO.return_unit else set_partial_values t key ~append rv - let list_dir : t -> string -> (string list * string list) Deferred.t = fun t prefix -> + let list_dir : t -> string -> (string list * string list) io = fun t prefix -> let module S = Set.Make(String) in let add ~size ~prefix key _ ((l, r) as acc) = if not (String.starts_with ~prefix key) then acc else @@ -87,11 +82,9 @@ module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferre let size = String.length prefix in let m = Atomic.get t in let keys, prefixes = M.fold (add ~prefix ~size) m ([], S.empty) in - Deferred.return (keys, S.elements prefixes) + IO.return (keys, S.elements prefixes) - let rec rename : - t -> string -> string -> unit Deferred.t - = fun t prefix new_prefix -> + let rec rename : t -> string -> string -> unit io = fun t prefix new_prefix -> let add ~prefix ~new_prefix k v acc = if not (String.starts_with ~prefix k) then M.add k v acc else let l = String.length prefix in @@ -101,10 +94,10 @@ module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferre let m = Atomic.get t in let m' = M.fold (add ~prefix ~new_prefix) m M.empty in if Atomic.compare_and_set t m m' - then Deferred.return_unit else rename t prefix new_prefix + then IO.return_unit else rename t prefix new_prefix end - let create : unit -> IO.t = fun () -> Atomic.make M.empty + let create : unit -> Store.t = fun () -> Atomic.make M.empty - include Storage.Make(IO) + include Storage.Make(IO)(Store) end diff --git a/zarr/src/storage/storage.ml b/zarr/src/storage/storage.ml index 343fe596..136e3d07 100644 --- a/zarr/src/storage/storage.ml +++ b/zarr/src/storage/storage.ml @@ -1,14 +1,14 @@ include Storage_intf -module Make (Io : Types.IO) = struct - module Io_chain = Codecs.Make(Io) - module Deferred = Io.Deferred +module Make (IO : Types.IO) (Store : Types.Store with type 'a io = 'a IO.t) = struct + module IO_chain = Codecs.Make(IO) - open Io - open Deferred.Infix - open Deferred.Syntax + open IO.Infix + open IO.Syntax + open Store - type t = Io.t + type t = Store.t + type 'a io = 'a IO.t let maybe_rename t old_name new_name = function | false -> raise (Key_not_found old_name) @@ -27,29 +27,29 @@ module Make (Io : Types.IO) = struct let hierarchy t = let add ~t ((left, right) as acc) k = - if not (String.ends_with ~suffix:"zarr.json" k) then Deferred.return acc else + if not (String.ends_with ~suffix:"zarr.json" k) then IO.return acc else let path = if k = "zarr.json" then "/" else "/" ^ String.(sub k 0 (length k - 10)) in - Deferred.map (choose path left right) (node_kind t k) + IO.map (choose path left right) (node_kind t k) in - list t >>= Deferred.fold_left (add ~t) ([], []) + list t >>= IO.fold_left (add ~t) ([], []) let clear t = erase_prefix t "" module Group = struct let exists t node = is_member t (Node.Group.to_metakey node) let delete t node = erase_prefix t (Node.Group.to_prefix node) - let metadata t node = Deferred.map Metadata.Group.decode (get t @@ Node.Group.to_metakey node) + let metadata t node = IO.map Metadata.Group.decode (get t @@ Node.Group.to_metakey node) (* This recursively creates parent group nodes if they don't exist.*) let rec create ?(attrs=`Null) t node = let maybe_create ~attrs t node = function - | true -> Deferred.return_unit + | true -> IO.return_unit | false -> let key = Node.Group.to_metakey node and meta = Metadata.Group.(update_attributes default attrs) in let* () = set t key (Metadata.Group.encode meta) in match Node.Group.parent node with - | None -> Deferred.return_unit + | None -> IO.return_unit | Some p -> create t p in exists t node >>= maybe_create ~attrs t node @@ -57,13 +57,13 @@ module Make (Io : Types.IO) = struct let children t node = let add ~t (left, right) prefix = let path = "/" ^ String.sub prefix 0 (String.length prefix - 1) in - Deferred.map (choose path left right) (node_kind t @@ prefix ^ "zarr.json") + IO.map (choose path left right) (node_kind t @@ prefix ^ "zarr.json") in let maybe_enumerate t node = function - | false -> Deferred.return ([], []) + | false -> IO.return ([], []) | true -> let* _, ps = list_dir t (Node.Group.to_prefix node) in - Deferred.fold_left (add ~t) ([], []) ps + IO.fold_left (add ~t) ([], []) ps in exists t node >>= maybe_enumerate t node @@ -78,7 +78,7 @@ module Make (Io : Types.IO) = struct module Indexing = Ndarray.Indexing let exists t node = is_member t (Node.Array.to_metakey node) let delete t node = erase_prefix t (Node.Array.to_key node ^ "/") - let metadata t node = Deferred.map Metadata.Array.decode (get t @@ Node.Array.to_metakey node) + let metadata t node = IO.map Metadata.Array.decode (get t @@ Node.Array.to_metakey node) (* This recursively creates parent group nodes if they don't exist.*) let create ?(sep=`Slash) ?(dimension_names=[]) ?(attributes=`Null) ~codecs ~shape ~chunks kind fv node t = @@ -87,7 +87,7 @@ module Make (Io : Types.IO) = struct let value = Metadata.Array.encode m in let key = Node.Array.to_metakey node in let* () = set t key value in - Option.fold ~none:Deferred.return_unit ~some:(Group.create t) (Node.Array.parent node) + Option.fold ~none:IO.return_unit ~some:(Group.create t) (Node.Array.parent node) let write t node slice x = let update_ndarray ~arr (c, v) = Ndarray.set arr c v in @@ -97,10 +97,10 @@ module Make (Io : Types.IO) = struct in let update_chunk ~t ~meta ~prefix ~chain ~fv ~repr (idx, pairs) = let ckey = prefix ^ Metadata.Array.chunk_key meta idx in - if Io_chain.is_just_sharding chain then + if IO_chain.is_just_sharding chain then let pget = get_partial_values t ckey and pset = set_partial_values t ckey in let* shardsize = size t ckey in - Io_chain.partial_encode chain pget pset shardsize repr pairs fv + IO_chain.partial_encode chain pget pset shardsize repr pairs fv else is_member t ckey >>= function | true -> let* v = get t ckey in @@ -128,7 +128,7 @@ module Make (Io : Types.IO) = struct and prefix = Node.Array.to_key node ^ "/" and chain = Metadata.Array.codecs meta and bindings = ArrayMap.bindings m in - Deferred.iter (update_chunk ~t ~meta ~prefix ~chain ~fv ~repr) bindings + IO.iter (update_chunk ~t ~meta ~prefix ~chain ~fv ~repr) bindings let read (type a) t node slice (kind : a Ndarray.dtype) = let add_indexed_coord ~meta acc (i, y) = @@ -138,10 +138,10 @@ module Make (Io : Types.IO) = struct let read_chunk ~t ~meta ~prefix ~chain ~fv ~repr (idx, pairs) = let ckey = prefix ^ Metadata.Array.chunk_key meta idx in size t ckey >>= function - | 0 -> Deferred.return @@ List.map (fun (i, _) -> i, fv) pairs - | shardsize when Io_chain.is_just_sharding chain -> + | 0 -> IO.return @@ List.map (fun (i, _) -> i, fv) pairs + | shardsize when IO_chain.is_just_sharding chain -> let pget = get_partial_values t ckey in - Io_chain.partial_decode chain pget shardsize repr pairs fv + IO_chain.partial_decode chain pget shardsize repr pairs fv | _ -> let+ v = get t ckey in let arr = Codecs.Chain.decode chain repr v in @@ -159,7 +159,7 @@ module Make (Io : Types.IO) = struct and prefix = Node.Array.to_key node ^ "/" and fv = Metadata.Array.fillvalue_of_kind meta kind and repr = Codecs.{kind; shape = Metadata.Array.chunk_shape meta} in - let+ ps = Deferred.concat_map (read_chunk ~t ~meta ~prefix ~chain ~fv ~repr) (ArrayMap.bindings m) in + let+ ps = IO.concat_map (read_chunk ~t ~meta ~prefix ~chain ~fv ~repr) (ArrayMap.bindings m) in (* sorting restores the C-order of the decoded array coordinates.*) let sorted_pairs = List.fast_sort (fun (x, _) (y, _) -> Int.compare x y) ps in let vs = List.map snd sorted_pairs in @@ -172,7 +172,7 @@ module Make (Io : Types.IO) = struct end) in let maybe_erase t key = function - | false -> Deferred.return_unit + | false -> IO.return_unit | true -> erase t key in let remove ~t ~meta ~prefix v = @@ -186,7 +186,7 @@ module Make (Io : Types.IO) = struct and s' = S.of_list (Metadata.Array.chunk_indices meta new_shape) in let unreachable_chunks = S.elements (S.diff s s') and prefix = Node.Array.to_key node ^ "/" in - let* () = Deferred.iter (remove ~t ~meta ~prefix) unreachable_chunks in + let* () = IO.iter (remove ~t ~meta ~prefix) unreachable_chunks in set t (Node.Array.to_metakey node) Metadata.Array.(encode @@ update_shape meta new_shape) let rename t node str = diff --git a/zarr/src/storage/storage_intf.ml b/zarr/src/storage/storage_intf.ml index ba112098..c23312c5 100644 --- a/zarr/src/storage/storage_intf.ml +++ b/zarr/src/storage/storage_intf.ml @@ -4,39 +4,41 @@ exception Invalid_array_slice exception Key_not_found of string exception Not_a_filesystem_store of string -module type STORE = sig - module Deferred : Types.Deferred +module type S = sig type t (** The storage type. *) + type 'a io + (** The I/O monad type.*) + module Group : sig - val create : ?attrs:Yojson.Safe.t -> t -> Node.Group.t -> unit Deferred.t + val create : ?attrs:Yojson.Safe.t -> t -> Node.Group.t -> unit io (** [create ?attrs t node] creates a group node in store [t] containing attributes [attrs]. This is a no-op if [node] is already a member of this store. *) - val metadata : t -> Node.Group.t -> Metadata.Group.t Deferred.t + val metadata : t -> Node.Group.t -> Metadata.Group.t io (** [metadata node t] returns the metadata of group node [node]. @raise Key_not_found if node is not a member of store [t].*) - val children : t -> Node.Group.t -> (Node.Array.t list * Node.Group.t list) Deferred.t + val children : t -> Node.Group.t -> (Node.Array.t list * Node.Group.t list) io (** [children t n] returns a tuple of child nodes of group node [n]. This operation returns a pair of empty lists if node [n] has no children or is not a member of store [t]. @raise Parse_error if any child node has invalid [node_type] metadata.*) - val delete : t -> Node.Group.t -> unit Deferred.t + val delete : t -> Node.Group.t -> unit io (** [delete t n] erases group node [n] from store [t]. This also erases all child nodes of [n]. If node [n] is not a member of store [t] then this is a no-op. *) - val exists : t -> Node.Group.t -> bool Deferred.t + val exists : t -> Node.Group.t -> bool io (** [exists t n] returns [true] if group node [n] is a member of store [t] and [false] otherwise. *) - val rename : t -> Node.Group.t -> string -> unit Deferred.t + val rename : t -> Node.Group.t -> string -> unit io (** [rename t g name] changes the name of group node [g] in store [t] to [name]. @raise Key_not_found if [g] is not a member of store [t]. @@ -56,7 +58,7 @@ module type STORE = sig 'a -> Node.Array.t -> t -> - unit Deferred.t + unit io (** [create ~sep ~dimension_names ~attributes ~codecs ~shape ~chunks kind fill node t] creates an array node in store [t] where: - Separator [sep] is used in the array's chunk key encoding. @@ -74,20 +76,20 @@ module type STORE = sig if [codecs] contains a shardingindexed codec with an incorrect inner chunk shape. *) - val metadata : t -> Node.Array.t -> Metadata.Array.t Deferred.t + val metadata : t -> Node.Array.t -> Metadata.Array.t io (** [metadata node t] returns the metadata of array node [node]. @raise Key_not_found if node is not a member of store [t]. *) - val delete : t -> Node.Array.t -> unit Deferred.t + val delete : t -> Node.Array.t -> unit io (** [delete t n] erases array node [n] from store [t]. If node [n] is not a member of store [t] then this is a no-op. *) - val exists : t -> Node.Array.t -> bool Deferred.t + val exists : t -> Node.Array.t -> bool io (** [exists t n] returns [true] if array node [n] is a member of store [t] and [false] otherwise. *) - val write : t -> Node.Array.t -> Ndarray.Indexing.index array -> 'a Ndarray.t -> unit Deferred.t + val write : t -> Node.Array.t -> Ndarray.Indexing.index array -> 'a Ndarray.t -> unit io (** [write t n s x] writes n-dimensional array [x] to the slice [s] of array node [n] in store [t]. @@ -97,12 +99,7 @@ module type STORE = sig if the kind of [x] is not compatible with node [n]'s data type as described in its metadata document. *) - val read : - t -> - Node.Array.t -> - Ndarray.Indexing.index array -> - 'a Ndarray.dtype -> - 'a Ndarray.t Deferred.t + val read : t -> Node.Array.t -> Ndarray.Indexing.index array -> 'a Ndarray.dtype -> 'a Ndarray.t io (** [read t n s k] reads an n-dimensional array of size determined by slice [s] from array node [n]. @@ -112,7 +109,7 @@ module type STORE = sig @raise Invalid_array_slice if the slice [s] is not a valid slice of array node [n].*) - val reshape : t -> Node.Array.t -> int array -> unit Deferred.t + val reshape : t -> Node.Array.t -> int array -> unit io (** [reshape t n shape] resizes array node [n] of store [t] into new size [shape]. Note that when the resizing involves shrinking an array along any dimensions, any old unreachable chunks that fall outside of @@ -123,7 +120,7 @@ module type STORE = sig @raise Key_not_found if node [n] is not a member of store [t]. *) - val rename : t -> Node.Array.t -> string -> unit Deferred.t + val rename : t -> Node.Array.t -> string -> unit io (** [rename t n name] changes the name of array node [n] in store [t] to [name]. @raise Key_not_found if [g] is not a member of store [t]. @@ -131,7 +128,7 @@ module type STORE = sig @raise Node_invariant if [name] is an invalid node name.*) end - val hierarchy : t -> (Node.Array.t list * Node.Group.t list) Deferred.t + val hierarchy : t -> (Node.Array.t list * Node.Group.t list) io (** [hierarchy t] returns [p] where [p] is a pair of lists representing all nodes in store [t]. The first element of the pair is a list of all array nodes, and the second element is a list of @@ -140,7 +137,7 @@ module type STORE = sig @raise Parse_error if any node has invalid [node_type] metadata.*) - val clear : t -> unit Deferred.t + val clear : t -> unit io (** [clear t] clears the store [t] by deleting all nodes. If the store is already empty, this is a no-op. *) end @@ -167,11 +164,11 @@ module type Interface = sig exception Not_a_filesystem_store of string (** raised when opening a file that as if it was a Filesystem Zarr store. *) - module type STORE = STORE + module type S = S (** The module interface that all supported stores must implement. *) - module Make : functor (Io : Types.IO) -> STORE - with type t = Io.t and module Deferred = Io.Deferred + module Make : functor (IO : Types.IO) (Store : Types.Store with type 'a io = 'a IO.t) -> S + with type t = Store.t and type 'a io = 'a IO.t (** A functor for minting a new storage type as long as it's argument - module implements the {!STORE} interface. *) + module implements the {!Store} interface. *) end diff --git a/zarr/src/storage/zip.ml b/zarr/src/storage/zip.ml index 7774f582..78859b2c 100644 --- a/zarr/src/storage/zip.ml +++ b/zarr/src/storage/zip.ml @@ -1,13 +1,13 @@ module type S = sig - include Storage.STORE + include Storage.S val with_open : ?level:[ `None | `Fast | `Default | `Best ] -> ?perm:int -> [< `Read_only | `Read_write ] -> string -> - (t -> 'a Deferred.t) -> - 'a Deferred.t + (t -> 'a io) -> + 'a io (** [with_open mode p f] opens the zip archive at path [p] and applies function [f] to its open handle and writes any changes back to the zip archive if [mode] is [`Read_write], otherwise discards them at exit. @@ -28,8 +28,8 @@ module type S = sig } *) end -module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferred.t = struct - open Deferred.Syntax +module Make (IO : Types.IO) : S with type 'a io := 'a IO.t = struct + open IO.Syntax let fold_kind ~dir ~file = function | Zipc.Member.Dir -> dir @@ -37,14 +37,13 @@ module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferre let fold_result ~ok res = Result.fold ~error:failwith ~ok res - module IO = struct - module Deferred = Deferred - + module Store = struct type t = {ic : Zipc.t Atomic.t; level : Zipc_deflate.level} + type 'a io = 'a IO.t let is_member t key = let z = Atomic.get t.ic in - Deferred.return (Zipc.mem key z) + IO.return (Zipc.mem key z) let size t key = let decompressed_size = function @@ -54,7 +53,7 @@ module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferre fold_kind ~dir:0 ~file:Zipc.File.decompressed_size entry_kind in let z = Atomic.get t.ic in - Deferred.return (decompressed_size @@ Zipc.find key z) + IO.return (decompressed_size @@ Zipc.find key z) let get t key = let to_string f = fold_result ~ok:Fun.id (Zipc.File.to_binary_string f) in @@ -65,7 +64,7 @@ module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferre fold_kind ~dir:String.empty ~file:to_string entry_kind in let z = Atomic.get t.ic in - Deferred.return (decompressed_value @@ Zipc.find key z) + IO.return (decompressed_value @@ Zipc.find key z) let get_partial_values t key ranges = let read_range ~data ~size (ofs, len) = match len with @@ -79,7 +78,7 @@ module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferre let list t = let accumulate_path m acc = Zipc.Member.path m :: acc in let z = Atomic.get t.ic in - Deferred.return (Zipc.fold accumulate_path z []) + IO.return (Zipc.fold accumulate_path z []) let list_dir t prefix = let module S = Set.Make(String) in @@ -92,7 +91,7 @@ module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferre in let z = Atomic.get t.ic in let ks, ps = Zipc.fold (accumulate ~prefix) z ([], S.empty) in - Deferred.return (ks, S.elements ps) + IO.return (ks, S.elements ps) let rec set t key value = let res = Zipc.File.deflate_of_binary_string ~level:t.level value in @@ -100,7 +99,7 @@ module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferre let m = fold_result ~ok:Fun.id Zipc.Member.(make ~path:key f) in let z = Atomic.get t.ic in if Atomic.compare_and_set t.ic z (Zipc.add m z) - then Deferred.return_unit else set t key value + then IO.return_unit else set t key value let rec set_partial_values t key ?(append=false) rv = let to_string f = fold_result ~ok:Fun.id (Zipc.File.to_binary_string f) in @@ -124,12 +123,12 @@ module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferre let file = Zipc.Member.File (fold_result ~ok:Fun.id res) in let m = fold_result ~ok:Fun.id Zipc.Member.(make ~path:key file) in if Atomic.compare_and_set t.ic z (Zipc.add m z) - then Deferred.return_unit else set_partial_values t key ~append rv + then IO.return_unit else set_partial_values t key ~append rv let rec erase t key = let z = Atomic.get t.ic in if Atomic.compare_and_set t.ic z (Zipc.remove key z) - then Deferred.return_unit else erase t key + then IO.return_unit else erase t key let rec erase_prefix t prefix = let accumulate ~prefix m acc = @@ -139,7 +138,7 @@ module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferre let z = Atomic.get t.ic in let z' = Zipc.fold (accumulate ~prefix) z Zipc.empty in if Atomic.compare_and_set t.ic z z' - then Deferred.return_unit else erase_prefix t prefix + then IO.return_unit else erase_prefix t prefix (* Adapted from: https://github.com/dbuenzli/zipc/issues/8#issuecomment-2392417890 *) let rec rename t prefix new_prefix = @@ -157,7 +156,7 @@ module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferre let z = Atomic.get t.ic in let z' = Zipc.fold (accumulate ~prefix ~new_prefix) z Zipc.empty in if Atomic.compare_and_set t.ic z z' - then Deferred.return_unit else rename t prefix new_prefix + then IO.return_unit else rename t prefix new_prefix end let with_open ?(level=`Default) ?(perm=0o700) mode path f = @@ -166,7 +165,7 @@ module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferre let flags = [Open_wronly; Open_trunc; Open_creat] in Out_channel.with_open_gen flags perm path (write ~str) in - let make z = IO.{ic = Atomic.make z; level} in + let make z = Store.{ic = Atomic.make z; level} in let x = if not (Sys.file_exists path) then make Zipc.empty else let s = In_channel.(with_open_bin path input_all) in fold_result ~ok:make (Zipc.of_binary_string s) @@ -179,5 +178,5 @@ module Make (Deferred : Types.Deferred) : S with type 'a Deferred.t = 'a Deferre fold_result ~ok:(write_to_disk ~perm ~path) str; out - include Storage.Make(IO) + include Storage.Make(IO)(Store) end diff --git a/zarr/src/types.ml b/zarr/src/types.ml index a9f2f6cd..7599983f 100644 --- a/zarr/src/types.ml +++ b/zarr/src/types.ml @@ -1,4 +1,4 @@ -module type Deferred = sig +module type IO = sig type 'a t val return : 'a -> 'a t val bind : 'a t -> ('a -> 'b t) -> 'b t @@ -23,7 +23,7 @@ type value = string type range_start = int type prefix = string -module type IO = sig +module type Store = sig (** The abstract store interface that stores should implement. The store interface defines a set of operations involving keys and values. @@ -39,20 +39,17 @@ module type IO = sig operations involving prefixes. In the context of this interface, a prefix is a string containing only characters that are valid for use in keys and ending with a trailing / character. *) - - module Deferred : Deferred - type t - val size : t -> key -> int Deferred.t - val get : t -> key -> value Deferred.t - val get_partial_values : t -> string -> range list -> value list Deferred.t - val set : t -> key -> value -> unit Deferred.t - val set_partial_values : - t -> key -> ?append:bool -> (range_start * value) list -> unit Deferred.t - val erase : t -> key -> unit Deferred.t - val erase_prefix : t -> key -> unit Deferred.t - val list : t -> key list Deferred.t - val list_dir : t -> key -> (key list * prefix list) Deferred.t - val is_member : t -> key -> bool Deferred.t - val rename : t -> key -> key -> unit Deferred.t + type 'a io + val size : t -> key -> int io + val get : t -> key -> value io + val get_partial_values : t -> string -> range list -> value list io + val set : t -> key -> value -> unit io + val set_partial_values : t -> key -> ?append:bool -> (range_start * value) list -> unit io + val erase : t -> key -> unit io + val erase_prefix : t -> key -> unit io + val list : t -> key list io + val list_dir : t -> key -> (key list * prefix list) io + val is_member : t -> key -> bool io + val rename : t -> key -> key -> unit io end