Skip to content

Commit

Permalink
brings tracking of marketplace event back to validator integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
marcinczenko committed Oct 17, 2024
1 parent 47e277b commit 7e06bd8
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 53 deletions.
6 changes: 1 addition & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,8 @@ jobs:
uses: fabiocaccamo/create-matrix-action@v4
with:
matrix: |
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {all}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {amd64}, builder {macos-13}, tests {all}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {windows}, cpu {amd64}, builder {windows-latest}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {msys2}
os {windows}, cpu {amd64}, builder {windows-latest}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {msys2}
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {windows}, cpu {amd64}, builder {windows-latest}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {msys2}
os {windows}, cpu {amd64}, builder {windows-latest}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {msys2}
build:
needs: matrix
Expand Down
6 changes: 3 additions & 3 deletions build.nims
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ task testContracts, "Build & run Codex Contract tests":

task testIntegration, "Run integration tests":
buildBinary "codex", params = "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE -d:codex_enable_proof_failures=true"
test "testIntegration"
# test "testIntegration"
# use params to enable logging from the integration test executable
# test "testIntegration", params = "-d:chronicles_sinks=textlines[notimestamps,stdout],textlines[dynamic] " &
# "-d:chronicles_enabled_topics:integration:TRACE"
test "testIntegration", params = "-d:chronicles_sinks=textlines[notimestamps,stdout],textlines[dynamic] " &
"-d:chronicles_enabled_topics:integration:TRACE"

task build, "build codex binary":
codexTask()
Expand Down
173 changes: 128 additions & 45 deletions tests/integration/testvalidator.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ export logutils
logScope:
topics = "integration test validation"

template eventuallyS*(expression: untyped, timeout=10, step = 5): bool =
template eventuallyS*(expression: untyped, timeout=10, step = 5,
cancelWhenExpression: untyped = false): bool =
bind Moment, now, seconds

proc eventuallyS: Future[bool] {.async.} =
Expand All @@ -25,6 +26,8 @@ template eventuallyS*(expression: untyped, timeout=10, step = 5): bool =
echo (i*step).seconds
if endTime < Moment.now():
return false
if cancelWhenExpression:
return false
await sleepAsync(step.seconds)
return true

Expand All @@ -34,10 +37,89 @@ marketplacesuite "Validation":
let nodes = 3
let tolerance = 1
let proofProbability = 1
when defined(windows):
let providerUrl = "ws://localhost:8545"
else:
let providerUrl = "http://localhost:8545"

var slotsFilled: seq[SlotId]
var slotsFreed: seq[SlotId]
var requestsFailed: seq[RequestId]
var requestCancelled = false

var slotFilledSubscription: provider.Subscription
var requestFailedSubscription: provider.Subscription
var slotFreedSubscription: provider.Subscription
var requestCancelledSubscription: provider.Subscription

proc trackSlotsFilled(marketplace: Marketplace):
Future[provider.Subscription] {.async.} =
slotsFilled = newSeq[SlotId]()
proc onSlotFilled(event: SlotFilled) =
let slotId = slotId(event.requestId, event.slotIndex)
slotsFilled.add(slotId)
debug "SlotFilled", requestId = event.requestId, slotIndex = event.slotIndex,
slotId = slotId

let subscription = await marketplace.subscribe(SlotFilled, onSlotFilled)
subscription

proc trackRequestsFailed(marketplace: Marketplace):
Future[provider.Subscription] {.async.} =
requestsFailed = newSeq[RequestId]()
proc onRequestFailed(event: RequestFailed) =
requestsFailed.add(event.requestId)
debug "RequestFailed", requestId = event.requestId

let subscription = await marketplace.subscribe(RequestFailed, onRequestFailed)
subscription

proc trackRequestCancelled(marketplace: Marketplace, requestId: RequestId):
Future[provider.Subscription] {.async.} =
requestCancelled = false
proc onRequestCancelled(event: RequestCancelled) =
if requestId == event.requestId:
requestCancelled = true
debug "RequestCancelled", requestId = event.requestId

let subscription = await marketplace.subscribe(RequestCancelled, onRequestCancelled)
subscription

proc trackSlotsFreed(marketplace: Marketplace, requestId: RequestId):
Future[provider.Subscription] {.async.} =
slotsFreed = newSeq[SlotId]()
proc onSlotFreed(event: SlotFreed) =
if event.requestId == requestId:
let slotId = slotId(event.requestId, event.slotIndex)
slotsFreed.add(slotId)
debug "SlotFreed", requestId = event.requestId, slotIndex = event.slotIndex,
slotId = slotId, slotsFreed = slotsFreed.len

let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed)
subscription

proc startTrackingEvents(marketplace: Marketplace, requestId: RequestId) {.async.} =
slotFilledSubscription = await marketplace.trackSlotsFilled()
requestFailedSubscription = await marketplace.trackRequestsFailed()
slotFreedSubscription = await marketplace.trackSlotsFreed(requestId)
requestCancelledSubscription =
await marketplace.trackRequestCancelled(requestId)

proc stopTrackingEvents() {.async.} =
await slotFilledSubscription.unsubscribe()
await slotFreedSubscription.unsubscribe()
await requestFailedSubscription.unsubscribe()
await requestCancelledSubscription.unsubscribe()

proc checkSlotsFailed(marketplace: Marketplace, slotsFilled: seq[SlotId],
slotsFreed: seq[SlotId]) {.async.} =
let slotsNotFreed = slotsFilled.filter(
slotId => not slotsFreed.contains(slotId)
).toHashSet
var slotsFailed = initHashSet[SlotId]()
for slotId in slotsFilled:
let state = await marketplace.slotState(slotId)
if state == SlotState.Failed:
slotsFailed.incl(slotId)

debug "slots failed", slotsFailed = slotsFailed, slotsNotFreed = slotsNotFreed
check slotsNotFreed == slotsFailed

test "validator marks proofs as missing when using validation groups", NodeConfigs(
# Uncomment to start Hardhat automatically, typically so logs can be inspected locally
Expand All @@ -46,28 +128,25 @@ marketplacesuite "Validation":

clients:
CodexConfigs.init(nodes=1)
.withEthProvider(providerUrl)
# .debug() # uncomment to enable console log output
.debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("purchases", "onchain")
.some,

providers:
CodexConfigs.init(nodes=1)
.withSimulateProofFailures(idx=0, failEveryNProofs=1)
.withEthProvider(providerUrl)
# .debug() # uncomment to enable console log output
# .withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics("sales", "onchain")
.some,

validators:
CodexConfigs.init(nodes=2)
.withEthProvider(providerUrl)
.withValidationGroups(groups = 2)
.withValidationGroupIndex(idx = 0, groupIndex = 0)
.withValidationGroupIndex(idx = 1, groupIndex = 1)
# .debug() # uncomment to enable console log output
.debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("validator") # each topic as a separate string argument
.some
Expand All @@ -76,9 +155,7 @@ marketplacesuite "Validation":
let expiry = 5.periods
let duration = expiry + 10.periods

echo fmt"{providerUrl = }"

# for a good start
# let mine a block to sync the blocktime with the current clock
discard await ethProvider.send("evm_mine")

var currentTime = await ethProvider.currentTime()
Expand All @@ -91,7 +168,6 @@ marketplacesuite "Validation":
createAvailabilities(data.len * 2, duration)

let cid = client0.upload(data).get

let purchaseId = await client0.requestStorage(
cid,
expiry=expiry,
Expand All @@ -102,6 +178,8 @@ marketplacesuite "Validation":
)
let requestId = client0.requestId(purchaseId).get

await marketplace.startTrackingEvents(requestId)

debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId

echo fmt"expiry = {(expiry + 60).int.seconds}"
Expand All @@ -120,38 +198,39 @@ marketplacesuite "Validation":
fail()

discard await ethProvider.send("evm_mine")

currentTime = await ethProvider.currentTime()
let secondsTillRequestEnd = (requestEndTime - currentTime.truncate(uint64)).int

debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds

# Because of Erasure Coding, the expected number of slots being freed
# is tolerance + 1. When more than tolerance slots are freed, the whole
# request will fail. Thus, awaiting for a failing state should
# be sufficient to conclude that validators did their job correctly.
# NOTICE: We actually have to wait for the "errored" state, because
# immediately after withdrawing the funds the purchasing state machine
# transitions to the "errored" state.
check eventuallyS(client0.purchaseStateIs(purchaseId, "errored"),
timeout = secondsTillRequestEnd + 60, step = 5)

# Because of erasure coding, after (tolerance + 1) slots are freed, the
# remaining nodes are be freed but marked as "Failed" as the whole
# request fails. A couple of checks to capture this:
let expectedSlotsFreed = tolerance + 1
check eventuallyS((slotsFreed.len == expectedSlotsFreed and
requestsFailed.contains(requestId)),
timeout = secondsTillRequestEnd + 60, step = 5,
cancelWhenExpression = requestCancelled)

# extra check
await marketplace.checkSlotsFailed(slotsFilled, slotsFreed)

await stopTrackingEvents()

test "validator uses historical state to mark missing proofs", NodeConfigs(
# Uncomment to start Hardhat automatically, typically so logs can be inspected locally
hardhat:
HardhatConfig.none,

clients:
CodexConfigs.init(nodes=1)
.withEthProvider(providerUrl)
# .debug() # uncomment to enable console log output
.debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("purchases", "onchain")
.some,

providers:
CodexConfigs.init(nodes=1)
.withEthProvider(providerUrl)
.withSimulateProofFailures(idx=0, failEveryNProofs=1)
# .debug() # uncomment to enable console log output
# .withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
Expand All @@ -162,9 +241,7 @@ marketplacesuite "Validation":
let expiry = 5.periods
let duration = expiry + 10.periods

echo fmt"{providerUrl = }"

# for a good start
# let mine a block to sync the blocktime with the current clock
discard await ethProvider.send("evm_mine")

var currentTime = await ethProvider.currentTime()
Expand All @@ -177,7 +254,6 @@ marketplacesuite "Validation":
createAvailabilities(data.len * 2, duration)

let cid = client0.upload(data).get

let purchaseId = await client0.requestStorage(
cid,
expiry=expiry,
Expand All @@ -188,6 +264,8 @@ marketplacesuite "Validation":
)
let requestId = client0.requestId(purchaseId).get

await marketplace.startTrackingEvents(requestId)

debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId

echo fmt"expiry = {(expiry + 60).int.seconds}"
Expand All @@ -205,16 +283,15 @@ marketplacesuite "Validation":
if purchaseState != "started":
fail()

# just to make sure we have a mined block that separates us
# from the block containing the last SlotFilled event
# extra block just to make sure we have one that separates us
# from the block containing the last (past) SlotFilled event
discard await ethProvider.send("evm_mine")

var validators = CodexConfigs.init(nodes=2)
.withEthProvider(providerUrl)
.withValidationGroups(groups = 2)
.withValidationGroupIndex(idx = 0, groupIndex = 0)
.withValidationGroupIndex(idx = 1, groupIndex = 1)
# .debug() # uncomment to enable console log output
.debug() # uncomment to enable console log output
.withLogFile() # uncomment to output log file to: # tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("validator") # each topic as a separate string argument

Expand All @@ -226,17 +303,23 @@ marketplacesuite "Validation":
node: node
)

discard await ethProvider.send("evm_mine")
currentTime = await ethProvider.currentTime()
let secondsTillRequestEnd = (requestEndTime - currentTime.truncate(uint64)).int

debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds

# Because of Erasure Coding, the expected number of slots being freed
# is tolerance + 1. When more than tolerance slots are freed, the whole
# request will fail. Thus, awaiting for a failing state should
# be sufficient to conclude that validators did their job correctly.
# NOTICE: We actually have to wait for the "errored" state, because
# immediately after withdrawing the funds the purchasing state machine
# transitions to the "errored" state.
check eventuallyS(client0.purchaseStateIs(purchaseId, "errored"),
timeout = secondsTillRequestEnd + 60, step = 5)
# Because of erasure coding, after (tolerance + 1) slots are freed, the
# remaining nodes are be freed but marked as "Failed" as the whole
# request fails. A couple of checks to capture this:
let expectedSlotsFreed = tolerance + 1

check eventuallyS((slotsFreed.len == expectedSlotsFreed and
requestsFailed.contains(requestId)),
timeout = secondsTillRequestEnd + 60, step = 5,
cancelWhenExpression = requestCancelled)

# extra check
await marketplace.checkSlotsFailed(slotsFilled, slotsFreed)

await stopTrackingEvents()

0 comments on commit 7e06bd8

Please sign in to comment.