Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore: formates codex/ and tests/ folder with nph 0.6.1
Browse files Browse the repository at this point in the history
AuHau committed Jan 21, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 9422d3f commit eb8824b
Showing 266 changed files with 7,218 additions and 7,575 deletions.
9 changes: 2 additions & 7 deletions codex/blockexchange.nim
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import ./blockexchange/[
network,
engine,
peers]
import ./blockexchange/[network, engine, peers]

import ./blockexchange/protobuf/[
blockexc,
presence]
import ./blockexchange/protobuf/[blockexc, presence]

export network, engine, blockexc, presence, peers
38 changes: 17 additions & 21 deletions codex/blockexchange/engine/advertiser.nim
Original file line number Diff line number Diff line change
@@ -34,20 +34,19 @@ const
DefaultConcurrentAdvertRequests = 10
DefaultAdvertiseLoopSleep = 30.minutes

type
Advertiser* = ref object of RootObj
localStore*: BlockStore # Local block store for this instance
discovery*: Discovery # Discovery interface
type Advertiser* = ref object of RootObj
localStore*: BlockStore # Local block store for this instance
discovery*: Discovery # Discovery interface

advertiserRunning*: bool # Indicates if discovery is running
concurrentAdvReqs: int # Concurrent advertise requests
advertiserRunning*: bool # Indicates if discovery is running
concurrentAdvReqs: int # Concurrent advertise requests

advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
trackedFutures*: TrackedFutures # Advertise tasks futures
advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
trackedFutures*: TrackedFutures # Advertise tasks futures

advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests

proc addCidToQueue(b: Advertiser, cid: Cid) {.async.} =
if cid notin b.advertiseQueue:
@@ -83,7 +82,6 @@ proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} =
trace "Advertiser iterating blocks finished."

await sleepAsync(b.advertiseLocalStoreLoopSleep)

except CancelledError:
break # do not propagate as advertiseLocalStoreLoop was asyncSpawned
except CatchableError as e:
@@ -94,20 +92,17 @@ proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} =
proc processQueueLoop(b: Advertiser) {.async: (raises: []).} =
while b.advertiserRunning:
try:
let
cid = await b.advertiseQueue.get()
let cid = await b.advertiseQueue.get()

if cid in b.inFlightAdvReqs:
continue

try:
let
request = b.discovery.provide(cid)
let request = b.discovery.provide(cid)

b.inFlightAdvReqs[cid] = request
codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64)
await request

finally:
b.inFlightAdvReqs.del(cid)
codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64)
@@ -125,7 +120,7 @@ proc start*(b: Advertiser) {.async.} =

trace "Advertiser start"

proc onBlock(cid: Cid) {.async.} =
proc onBlock(cid: Cid) {.async.} =
await b.advertiseBlock(cid)

doAssert(b.localStore.onBlockStored.isNone())
@@ -136,7 +131,7 @@ proc start*(b: Advertiser) {.async.} =
return

b.advertiserRunning = true
for i in 0..<b.concurrentAdvReqs:
for i in 0 ..< b.concurrentAdvReqs:
let fut = b.processQueueLoop()
b.trackedFutures.track(fut)
asyncSpawn fut
@@ -166,7 +161,7 @@ proc new*(
localStore: BlockStore,
discovery: Discovery,
concurrentAdvReqs = DefaultConcurrentAdvertRequests,
advertiseLocalStoreLoopSleep = DefaultAdvertiseLoopSleep
advertiseLocalStoreLoopSleep = DefaultAdvertiseLoopSleep,
): Advertiser =
## Create a advertiser instance
##
@@ -177,4 +172,5 @@ proc new*(
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs),
trackedFutures: TrackedFutures.new(),
inFlightAdvReqs: initTable[Cid, Future[void]](),
advertiseLocalStoreLoopSleep: advertiseLocalStoreLoopSleep)
advertiseLocalStoreLoopSleep: advertiseLocalStoreLoopSleep,
)
56 changes: 24 additions & 32 deletions codex/blockexchange/engine/discovery.nim
Original file line number Diff line number Diff line change
@@ -40,21 +40,21 @@ const
DefaultMinPeersPerBlock = 3
DefaultDiscoveryLoopSleep = 3.seconds

type
DiscoveryEngine* = ref object of RootObj
localStore*: BlockStore # Local block store for this instance
peers*: PeerCtxStore # Peer context store
network*: BlockExcNetwork # Network interface
discovery*: Discovery # Discovery interface
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
discEngineRunning*: bool # Indicates if discovery is running
concurrentDiscReqs: int # Concurrent discovery requests
discoveryLoop*: Future[void] # Discovery loop task handle
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
trackedFutures*: TrackedFutures # Tracked Discovery tasks futures
minPeersPerBlock*: int # Max number of peers with block
discoveryLoopSleep: Duration # Discovery loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests
type DiscoveryEngine* = ref object of RootObj
localStore*: BlockStore # Local block store for this instance
peers*: PeerCtxStore # Peer context store
network*: BlockExcNetwork # Network interface
discovery*: Discovery # Discovery interface
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
discEngineRunning*: bool # Indicates if discovery is running
concurrentDiscReqs: int # Concurrent discovery requests
discoveryLoop*: Future[void] # Discovery loop task handle
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
trackedFutures*: TrackedFutures # Tracked Discovery tasks futures
minPeersPerBlock*: int # Max number of peers with block
discoveryLoopSleep: Duration # Discovery loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]]
# Inflight discovery requests

proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} =
while b.discEngineRunning:
@@ -81,36 +81,27 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =

while b.discEngineRunning:
try:
let
cid = await b.discoveryQueue.get()
let cid = await b.discoveryQueue.get()

if cid in b.inFlightDiscReqs:
trace "Discovery request already in progress", cid
continue

let
haves = b.peers.peersHave(cid)
let haves = b.peers.peersHave(cid)

if haves.len < b.minPeersPerBlock:
try:
let
request = b.discovery
.find(cid)
.wait(DefaultDiscoveryTimeout)
let request = b.discovery.find(cid).wait(DefaultDiscoveryTimeout)

b.inFlightDiscReqs[cid] = request
codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64)
let
peers = await request
let peers = await request

let
dialed = await allFinished(
peers.mapIt( b.network.dialPeer(it.data) ))
let dialed = await allFinished(peers.mapIt(b.network.dialPeer(it.data)))

for i, f in dialed:
if f.failed:
await b.discovery.removeProvider(peers[i].data.peerId)

finally:
b.inFlightDiscReqs.del(cid)
codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64)
@@ -146,7 +137,7 @@ proc start*(b: DiscoveryEngine) {.async.} =
return

b.discEngineRunning = true
for i in 0..<b.concurrentDiscReqs:
for i in 0 ..< b.concurrentDiscReqs:
let fut = b.discoveryTaskLoop()
b.trackedFutures.track(fut)
asyncSpawn fut
@@ -180,7 +171,7 @@ proc new*(
pendingBlocks: PendingBlocksManager,
concurrentDiscReqs = DefaultConcurrentDiscRequests,
discoveryLoopSleep = DefaultDiscoveryLoopSleep,
minPeersPerBlock = DefaultMinPeersPerBlock
minPeersPerBlock = DefaultMinPeersPerBlock,
): DiscoveryEngine =
## Create a discovery engine instance for advertising services
##
@@ -195,4 +186,5 @@ proc new*(
trackedFutures: TrackedFutures.new(),
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
discoveryLoopSleep: discoveryLoopSleep,
minPeersPerBlock: minPeersPerBlock)
minPeersPerBlock: minPeersPerBlock,
)
Loading

0 comments on commit eb8824b

Please sign in to comment.