Skip to content

Commit

Permalink
style: nph implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
AuHau committed Jan 14, 2025
1 parent 0cffa02 commit db6bd63
Show file tree
Hide file tree
Showing 274 changed files with 7,307 additions and 7,627 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ jobs:
matrix: ${{ needs.matrix.outputs.matrix }}
cache_nonce: ${{ needs.matrix.outputs.cache_nonce }}

linting:
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'
steps:
- uses: actions/checkout@v4
- name: Check `nph` formatting
uses: arnetheduck/nph-action@v1
with:
version: latest
options: "codex/ tests/"
fail: true
suggest: true

coverage:
# Force to stick to ubuntu 20.04 for coverage because
# lcov was updated to 2.x version in ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,6 @@
[submodule "vendor/nim-zippy"]
path = vendor/nim-zippy
url = https://github.com/status-im/nim-zippy.git
[submodule "vendor/nph"]
path = vendor/nph
url = https://github.com/arnetheduck/nph.git
39 changes: 39 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# version pinned by nimbus-build-system.
#PINNED_NIM_VERSION := 38640664088251bbc88917b4bacfd86ec53014b8 # 1.6.21
PINNED_NIM_VERSION := v2.0.14

ifeq ($(NIM_COMMIT),)
NIM_COMMIT := $(PINNED_NIM_VERSION)
else ifeq ($(NIM_COMMIT),pinned)
Expand Down Expand Up @@ -199,4 +200,42 @@ ifneq ($(USE_LIBBACKTRACE), 0)
+ $(MAKE) -C vendor/nim-libbacktrace clean $(HANDLE_OUTPUT)
endif

############
## Format ##
############
.PHONY: build-nph install-nph-hook clean-nph print-nph-path

# Default location for nph binary shall be next to nim binary to make it available on the path.
NPH:=$(shell dirname $(NIM_BINARY))/nph

build-nph:
ifeq ("$(wildcard $(NPH))","")
$(ENV_SCRIPT) nim c vendor/nph/src/nph.nim && \
mv vendor/nph/src/nph $(shell dirname $(NPH))
echo "nph utility is available at " $(NPH)
endif

GIT_PRE_COMMIT_HOOK := .git/hooks/pre-commit

install-nph-hook: build-nph
ifeq ("$(wildcard $(GIT_PRE_COMMIT_HOOK))","")
cp ./tools/scripts/git_pre_commit_format.sh $(GIT_PRE_COMMIT_HOOK)
else
echo "$(GIT_PRE_COMMIT_HOOK) already present, will NOT override"
exit 1
endif

nph/%: build-nph
echo -e $(FORMAT_MSG) "nph/$*" && \
$(NPH) $*

clean-nph:
rm -f $(NPH)

# To avoid hardcoding nph binary location in several places
print-nph-path:
echo "$(NPH)"

clean: | clean-nph

endif # "variables.mk" was not included
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Run the client with:
```bash
build/codex
```

## Configuration

It is possible to configure a Codex node in several ways:
Expand All @@ -51,3 +52,15 @@ To get acquainted with Codex, consider:
## API

The client exposes a REST API that can be used to interact with the clients. Overview of the API can be found on [api.codex.storage](https://api.codex.storage).

## Contributing and development

Feel free to dive in, contributions are welcomed! Open an issue or submit PRs.

### Linting and formatting

`nim-codex` uses [nph](https://github.com/arnetheduck/nph) for formatting our code and it is requrired to adhere to its styling.
If you are setting up fresh setup, in order to get `nph` run `make build-nph`.
In order to format files run `make nph/<file/folder you want to format>`.
If you want you can install Git pre-commit hook using `make install-nph-commit`, which will format modified files prior commiting them.
If you are using VSCode and the NimLang extension you can enable "Format On Save" that will format the files using `nph`.
9 changes: 2 additions & 7 deletions codex/blockexchange.nim
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import ./blockexchange/[
network,
engine,
peers]
import ./blockexchange/[network, engine, peers]

import ./blockexchange/protobuf/[
blockexc,
presence]
import ./blockexchange/protobuf/[blockexc, presence]

export network, engine, blockexc, presence, peers
38 changes: 17 additions & 21 deletions codex/blockexchange/engine/advertiser.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,19 @@ const
DefaultConcurrentAdvertRequests = 10
DefaultAdvertiseLoopSleep = 30.minutes

type
Advertiser* = ref object of RootObj
localStore*: BlockStore # Local block store for this instance
discovery*: Discovery # Discovery interface
type Advertiser* = ref object of RootObj
localStore*: BlockStore # Local block store for this instance
discovery*: Discovery # Discovery interface

advertiserRunning*: bool # Indicates if discovery is running
concurrentAdvReqs: int # Concurrent advertise requests
advertiserRunning*: bool # Indicates if discovery is running
concurrentAdvReqs: int # Concurrent advertise requests

advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
trackedFutures*: TrackedFutures # Advertise tasks futures
advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
trackedFutures*: TrackedFutures # Advertise tasks futures

advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests

proc addCidToQueue(b: Advertiser, cid: Cid) {.async.} =
if cid notin b.advertiseQueue:
Expand Down Expand Up @@ -83,7 +82,6 @@ proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} =
trace "Advertiser iterating blocks finished."

await sleepAsync(b.advertiseLocalStoreLoopSleep)

except CancelledError:
break # do not propagate as advertiseLocalStoreLoop was asyncSpawned
except CatchableError as e:
Expand All @@ -94,20 +92,17 @@ proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} =
proc processQueueLoop(b: Advertiser) {.async: (raises: []).} =
while b.advertiserRunning:
try:
let
cid = await b.advertiseQueue.get()
let cid = await b.advertiseQueue.get()

if cid in b.inFlightAdvReqs:
continue

try:
let
request = b.discovery.provide(cid)
let request = b.discovery.provide(cid)

b.inFlightAdvReqs[cid] = request
codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64)
await request

finally:
b.inFlightAdvReqs.del(cid)
codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64)
Expand All @@ -125,7 +120,7 @@ proc start*(b: Advertiser) {.async.} =

trace "Advertiser start"

proc onBlock(cid: Cid) {.async.} =
proc onBlock(cid: Cid) {.async.} =
await b.advertiseBlock(cid)

doAssert(b.localStore.onBlockStored.isNone())
Expand All @@ -136,7 +131,7 @@ proc start*(b: Advertiser) {.async.} =
return

b.advertiserRunning = true
for i in 0..<b.concurrentAdvReqs:
for i in 0 ..< b.concurrentAdvReqs:
let fut = b.processQueueLoop()
b.trackedFutures.track(fut)
asyncSpawn fut
Expand Down Expand Up @@ -166,7 +161,7 @@ proc new*(
localStore: BlockStore,
discovery: Discovery,
concurrentAdvReqs = DefaultConcurrentAdvertRequests,
advertiseLocalStoreLoopSleep = DefaultAdvertiseLoopSleep
advertiseLocalStoreLoopSleep = DefaultAdvertiseLoopSleep,
): Advertiser =
## Create a advertiser instance
##
Expand All @@ -177,4 +172,5 @@ proc new*(
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs),
trackedFutures: TrackedFutures.new(),
inFlightAdvReqs: initTable[Cid, Future[void]](),
advertiseLocalStoreLoopSleep: advertiseLocalStoreLoopSleep)
advertiseLocalStoreLoopSleep: advertiseLocalStoreLoopSleep,
)
56 changes: 24 additions & 32 deletions codex/blockexchange/engine/discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,21 @@ const
DefaultMinPeersPerBlock = 3
DefaultDiscoveryLoopSleep = 3.seconds

type
DiscoveryEngine* = ref object of RootObj
localStore*: BlockStore # Local block store for this instance
peers*: PeerCtxStore # Peer context store
network*: BlockExcNetwork # Network interface
discovery*: Discovery # Discovery interface
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
discEngineRunning*: bool # Indicates if discovery is running
concurrentDiscReqs: int # Concurrent discovery requests
discoveryLoop*: Future[void] # Discovery loop task handle
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
trackedFutures*: TrackedFutures # Tracked Discovery tasks futures
minPeersPerBlock*: int # Max number of peers with block
discoveryLoopSleep: Duration # Discovery loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests
type DiscoveryEngine* = ref object of RootObj
localStore*: BlockStore # Local block store for this instance
peers*: PeerCtxStore # Peer context store
network*: BlockExcNetwork # Network interface
discovery*: Discovery # Discovery interface
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
discEngineRunning*: bool # Indicates if discovery is running
concurrentDiscReqs: int # Concurrent discovery requests
discoveryLoop*: Future[void] # Discovery loop task handle
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
trackedFutures*: TrackedFutures # Tracked Discovery tasks futures
minPeersPerBlock*: int # Max number of peers with block
discoveryLoopSleep: Duration # Discovery loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]]
# Inflight discovery requests

proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} =
while b.discEngineRunning:
Expand All @@ -81,36 +81,27 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =

while b.discEngineRunning:
try:
let
cid = await b.discoveryQueue.get()
let cid = await b.discoveryQueue.get()

if cid in b.inFlightDiscReqs:
trace "Discovery request already in progress", cid
continue

let
haves = b.peers.peersHave(cid)
let haves = b.peers.peersHave(cid)

if haves.len < b.minPeersPerBlock:
try:
let
request = b.discovery
.find(cid)
.wait(DefaultDiscoveryTimeout)
let request = b.discovery.find(cid).wait(DefaultDiscoveryTimeout)

b.inFlightDiscReqs[cid] = request
codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64)
let
peers = await request
let peers = await request

let
dialed = await allFinished(
peers.mapIt( b.network.dialPeer(it.data) ))
let dialed = await allFinished(peers.mapIt(b.network.dialPeer(it.data)))

for i, f in dialed:
if f.failed:
await b.discovery.removeProvider(peers[i].data.peerId)

finally:
b.inFlightDiscReqs.del(cid)
codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64)
Expand Down Expand Up @@ -146,7 +137,7 @@ proc start*(b: DiscoveryEngine) {.async.} =
return

b.discEngineRunning = true
for i in 0..<b.concurrentDiscReqs:
for i in 0 ..< b.concurrentDiscReqs:
let fut = b.discoveryTaskLoop()
b.trackedFutures.track(fut)
asyncSpawn fut
Expand Down Expand Up @@ -180,7 +171,7 @@ proc new*(
pendingBlocks: PendingBlocksManager,
concurrentDiscReqs = DefaultConcurrentDiscRequests,
discoveryLoopSleep = DefaultDiscoveryLoopSleep,
minPeersPerBlock = DefaultMinPeersPerBlock
minPeersPerBlock = DefaultMinPeersPerBlock,
): DiscoveryEngine =
## Create a discovery engine instance for advertising services
##
Expand All @@ -195,4 +186,5 @@ proc new*(
trackedFutures: TrackedFutures.new(),
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
discoveryLoopSleep: discoveryLoopSleep,
minPeersPerBlock: minPeersPerBlock)
minPeersPerBlock: minPeersPerBlock,
)
Loading

0 comments on commit db6bd63

Please sign in to comment.