Skip to content

Commit

Permalink
Node use correct stores (#710)
Browse files Browse the repository at this point in the history
* don't pass erasure

* use correct stores and construct erasure inside the node

* fix tests to match new constructor

* remove prover argument

* review commets

* revert failing on no-prover for now

* small cleanup

* comment out invalid proofs broken test
  • Loading branch information
dryajov authored Feb 19, 2024
1 parent d70ab59 commit 3e88443
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 183 deletions.
10 changes: 8 additions & 2 deletions codex/codex.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import ./contracts/clock
import ./contracts/deployment
import ./utils/addrutils
import ./namespaces
import ./codextypes
import ./logutils

logScope:
Expand Down Expand Up @@ -261,8 +262,13 @@ proc new*(
blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks)
engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks)
store = NetworkStore.new(engine, repoStore)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
codexNode = CodexNodeRef.new(switch, store, engine, erasure, discovery)

codexNode = CodexNodeRef.new(
switch = switch,
networkStore = store,
engine = engine,
discovery = discovery)

restServer = RestServerRef.new(
codexNode.initRestApi(config, repoStore),
initTAddress(config.apiBindAddress , config.apiPort),
Expand Down
126 changes: 65 additions & 61 deletions codex/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import ./clock
import ./blocktype as bt
import ./manifest
import ./merkletree
import ./stores/blockstore
import ./stores
import ./blockexchange
import ./streams
import ./erasure
Expand All @@ -43,6 +43,7 @@ import ./utils
import ./errors
import ./logutils
import ./utils/poseidon2digest
import ./utils/asynciter

export logutils

Expand All @@ -61,9 +62,9 @@ type
CodexNode* = object
switch: Switch
networkId: PeerId
blockStore: BlockStore
networkStore: NetworkStore
engine: BlockExcEngine
erasure: Erasure
prover: ?Prover
discovery: Discovery
contracts*: Contracts
clock*: Clock
Expand All @@ -78,14 +79,11 @@ func switch*(self: CodexNodeRef): Switch =
return self.switch

func blockStore*(self: CodexNodeRef): BlockStore =
return self.blockStore
return self.networkStore

func engine*(self: CodexNodeRef): BlockExcEngine =
return self.engine

func erasure*(self: CodexNodeRef): Erasure =
return self.erasure

func discovery*(self: CodexNodeRef): Discovery =
return self.discovery

Expand All @@ -100,7 +98,7 @@ proc storeManifest*(
trace "Unable to create block from manifest"
return failure(error)

if err =? (await self.blockStore.putBlock(blk)).errorOption:
if err =? (await self.networkStore.putBlock(blk)).errorOption:
trace "Unable to store manifest block", cid = blk.cid, err = err.msg
return failure(err)

Expand All @@ -117,7 +115,7 @@ proc fetchManifest*(

trace "Retrieving manifest for cid", cid

without blk =? await self.blockStore.getBlock(BlockAddress.init(cid)), err:
without blk =? await self.networkStore.getBlock(BlockAddress.init(cid)), err:
trace "Error retrieve manifest block", cid, err = err.msg
return failure err

Expand Down Expand Up @@ -158,7 +156,8 @@ proc updateExpiry*(
let
ensuringFutures = Iter
.fromSlice(0..<manifest.blocksCount)
.mapIt(self.blockStore.ensureExpiry( manifest.treeCid, it, expiry ))
.mapIt(
self.networkStore.localStore.ensureExpiry( manifest.treeCid, it, expiry ))
await allFuturesThrowing(ensuringFutures)
except CancelledError as exc:
raise exc
Expand All @@ -176,16 +175,17 @@ proc fetchBatched*(
## Fetch blocks in batches of `batchSize`
##

let
iter = iter.map(
(i: int) => self.blockStore.getBlock(BlockAddress.init(cid, i))
)
# TODO: doesn't work if callee is annotated with async
# let
# iter = iter.map(
# (i: int) => self.networkStore.getBlock(BlockAddress.init(cid, i))
# )

while not iter.finished:
let blocks = collect:
for i in 0..<batchSize:
if not iter.finished:
iter.next()
self.networkStore.getBlock(BlockAddress.init(cid, iter.next()))

if blocksErr =? (await allFutureResult(blocks)).errorOption:
return failure(blocksErr)
Expand Down Expand Up @@ -216,7 +216,7 @@ proc retrieve*(
## Retrieve by Cid a single block or an entire dataset described by manifest
##

if local and not await (cid in self.blockStore):
if local and not await (cid in self.networkStore):
return failure((ref BlockNotFoundError)(msg: "Block not found in local store"))

if manifest =? (await self.fetchManifest(cid)):
Expand All @@ -226,7 +226,12 @@ proc retrieve*(
proc erasureJob(): Future[void] {.async.} =
try:
# Spawn an erasure decoding job
without res =? (await self.erasure.decode(manifest)), error:
let
erasure = Erasure.new(
self.networkStore,
leoEncoderProvider,
leoDecoderProvider)
without _ =? (await erasure.decode(manifest)), error:
trace "Unable to erasure decode manifest", cid, exc = error.msg
except CatchableError as exc:
trace "Exception decoding manifest", cid, exc = exc.msg
Expand All @@ -235,12 +240,12 @@ proc retrieve*(

# Retrieve all blocks of the dataset sequentially from the local store or network
trace "Creating store stream for manifest", cid
LPStream(StoreStream.new(self.blockStore, manifest, pad = false)).success
LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success
else:
let
stream = BufferStream.new()

without blk =? (await self.blockStore.getBlock(BlockAddress.init(cid))), err:
without blk =? (await self.networkStore.getBlock(BlockAddress.init(cid))), err:
return failure(err)

proc streamOneBlock(): Future[void] {.async.} =
Expand Down Expand Up @@ -289,7 +294,7 @@ proc store*(

cids.add(cid)

if err =? (await self.blockStore.putBlock(blk)).errorOption:
if err =? (await self.networkStore.putBlock(blk)).errorOption:
trace "Unable to store block", cid = blk.cid, err = err.msg
return failure(&"Unable to store block {blk.cid}")
except CancelledError as exc:
Expand All @@ -308,7 +313,7 @@ proc store*(
for index, cid in cids:
without proof =? tree.getProof(index), err:
return failure(err)
if err =? (await self.blockStore.putCidAndProof(treeCid, index, cid, proof)).errorOption:
if err =? (await self.networkStore.putCidAndProof(treeCid, index, cid, proof)).errorOption:
# TODO add log here
return failure(err)

Expand Down Expand Up @@ -336,13 +341,13 @@ proc store*(
return manifestBlk.cid.success

proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
without cids =? await self.blockStore.listBlocks(BlockType.Manifest):
without cids =? await self.networkStore.listBlocks(BlockType.Manifest):
warn "Failed to listBlocks"
return

for c in cids:
if cid =? await c:
without blk =? await self.blockStore.getBlock(cid):
without blk =? await self.networkStore.getBlock(cid):
warn "Failed to get manifest block by cid", cid
return

Expand Down Expand Up @@ -388,11 +393,17 @@ proc setupRequest(
return failure error

# Erasure code the dataset according to provided parameters
without encoded =? (await self.erasure.encode(manifest, ecK, ecM)), error:
let
erasure = Erasure.new(
self.networkStore.localStore,
leoEncoderProvider,
leoDecoderProvider)

without encoded =? (await erasure.encode(manifest, ecK, ecM)), error:
trace "Unable to erasure code dataset"
return failure(error)

without builder =? Poseidon2Builder.new(self.blockStore, encoded), err:
without builder =? Poseidon2Builder.new(self.networkStore.localStore, encoded), err:
trace "Unable to create slot builder"
return failure(err)

Expand All @@ -411,12 +422,6 @@ proc setupRequest(
else:
builder.verifyRoot.get.toBytes

slotRoots =
if builder.slotRoots.len <= 0:
return failure("Slots are empty")
else:
builder.slotRoots.mapIt( it.toBytes )

request = StorageRequest(
ask: StorageAsk(
slots: verifiable.numSlots.uint64,
Expand Down Expand Up @@ -496,7 +501,7 @@ proc onStore(
cid = request.content.cid
slotIdx = slotIdx

trace "Received a request to store a slot!"
trace "Received a request to store a slot"

without cid =? Cid.init(request.content.cid).mapFailure, err:
trace "Unable to parse Cid", cid
Expand All @@ -506,7 +511,7 @@ proc onStore(
trace "Unable to fetch manifest for cid", cid, err = err.msg
return failure(err)

without builder =? Poseidon2Builder.new(self.blockStore, manifest), err:
without builder =? Poseidon2Builder.new(self.networkStore, manifest), err:
trace "Unable to create slots builder", err = err.msg
return failure(err)

Expand All @@ -521,7 +526,7 @@ proc onStore(
proc updateExpiry(blocks: seq[bt.Block]): Future[?!void] {.async.} =
trace "Updating expiry for blocks", blocks = blocks.len

let ensureExpiryFutures = blocks.mapIt(self.blockStore.ensureExpiry(it.cid, expiry))
let ensureExpiryFutures = blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry))
if updateExpiryErr =? (await allFutureResult(ensureExpiryFutures)).errorOption:
return failure(updateExpiryErr)

Expand Down Expand Up @@ -555,6 +560,8 @@ proc onStore(
trace "Slot root mismatch", manifest = manifest.slotRoots[slotIdx.int], recovered = slotRoot.toSlotCid()
return failure(newException(CodexError, "Slot root mismatch"))

trace "Slot successfully retrieved and reconstructed"

return success()

proc onProve(
Expand All @@ -575,25 +582,28 @@ proc onProve(

trace "Received proof challenge"

without cid =? Cid.init(cidStr).mapFailure, err:
error "Unable to parse Cid", cid, err = err.msg
return failure(err)
if prover =? self.prover:
trace "Prover enabled"

without manifest =? await self.fetchManifest(cid), err:
error "Unable to fetch manifest for cid", err = err.msg
return failure(err)
without cid =? Cid.init(cidStr).mapFailure, err:
error "Unable to parse Cid", cid, err = err.msg
return failure(err)

without builder =? Poseidon2Builder.new(self.blockStore, manifest), err:
error "Unable to create slots builder", err = err.msg
return failure(err)
without manifest =? await self.fetchManifest(cid), err:
error "Unable to fetch manifest for cid", err = err.msg
return failure(err)

without sampler =? DataSampler.new(slotIdx, self.blockStore, builder), err:
error "Unable to create data sampler", err = err.msg
return failure(err)
without builder =? Poseidon2Builder.new(self.networkStore.localStore, manifest), err:
error "Unable to create slots builder", err = err.msg
return failure(err)

without proofInput =? await sampler.getProofInput(challenge, nSamples = 3), err:
error "Unable to get proof input for slot", err = err.msg
return failure(err)
without sampler =? DataSampler.new(slotIdx, self.networkStore.localStore, builder), err:
error "Unable to create data sampler", err = err.msg
return failure(err)

without proofInput =? await sampler.getProofInput(challenge, nSamples = 3), err:
error "Unable to get proof input for slot", err = err.msg
return failure(err)

# Todo: send proofInput to circuit. Get proof. (Profit, repeat.)

Expand Down Expand Up @@ -625,9 +635,6 @@ proc start*(self: CodexNodeRef) {.async.} =
if not self.engine.isNil:
await self.engine.start()

if not self.erasure.isNil:
await self.erasure.start()

if not self.discovery.isNil:
await self.discovery.start()

Expand Down Expand Up @@ -684,9 +691,6 @@ proc stop*(self: CodexNodeRef) {.async.} =
if not self.engine.isNil:
await self.engine.stop()

if not self.erasure.isNil:
await self.erasure.stop()

if not self.discovery.isNil:
await self.discovery.stop()

Expand All @@ -702,24 +706,24 @@ proc stop*(self: CodexNodeRef) {.async.} =
if validatorContracts =? self.contracts.validator:
await validatorContracts.stop()

if not self.blockStore.isNil:
await self.blockStore.close
if not self.networkStore.isNil:
await self.networkStore.close

proc new*(
T: type CodexNodeRef,
switch: Switch,
store: BlockStore,
networkStore: NetworkStore,
engine: BlockExcEngine,
erasure: Erasure,
discovery: Discovery,
prover = Prover.none,
contracts = Contracts.default): CodexNodeRef =
## Create new instance of a Codex self, call `start` to run it
##

CodexNodeRef(
switch: switch,
blockStore: store,
networkStore: networkStore,
engine: engine,
erasure: erasure,
prover: prover,
discovery: discovery,
contracts: contracts)
4 changes: 1 addition & 3 deletions tests/codex/node/helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ template setupAndTearDown*() {.dirty.} =
peerStore: PeerCtxStore
pendingBlocks: PendingBlocksManager
discovery: DiscoveryEngine
erasure: Erasure

let
path = currentSourcePath().parentDir
Expand All @@ -92,8 +91,7 @@ template setupAndTearDown*() {.dirty.} =
discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks)
engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks)
store = NetworkStore.new(engine, localStore)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
node = CodexNodeRef.new(switch, store, engine, erasure, blockDiscovery)
node = CodexNodeRef.new(switch, store, engine, blockDiscovery)

await node.start()

Expand Down
1 change: 1 addition & 0 deletions tests/codex/node/testcontracts.nim
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ asyncchecksuite "Test Node - Host contracts":
manifestBlock = bt.Block.new(
manifest.encode().tryGet(),
codec = ManifestCodec).tryGet()
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)

manifestCid = manifestBlock.cid
manifestCidStr = $(manifestCid)
Expand Down
2 changes: 1 addition & 1 deletion tests/codex/node/testnode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ asyncchecksuite "Test Node - Basic":
manifestBlock = bt.Block.new(
manifest.encode().tryGet(),
codec = ManifestCodec).tryGet()

erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
protected = (await erasure.encode(manifest, 3, 2)).tryGet()
builder = Poseidon2Builder.new(localStore, protected).tryGet()
verifiable = (await builder.buildManifest()).tryGet()
Expand Down
Loading

0 comments on commit 3e88443

Please sign in to comment.