-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Validator historical state restoration #922
Changes from all commits
94e61f9
e080295
0587461
0c2dd1d
c19c775
b8b2b49
7fc2072
e462511
62d6935
981afcd
a00702a
7bd76ac
d369ebc
6a61f0c
4d1efa7
5c48d0f
da88109
11d6a76
babf1fe
3ae8685
a0a43f1
509af57
9ab5c1c
4fccdc4
c31943e
afb444c
c32eac1
0534380
375b65b
00e7d8c
7c51846
6407714
ad0b8b6
c7fd863
32ebc5e
f48a939
df1eea2
c2eccc5
1a8a148
02da9de
75e86ba
5d1c1fe
e6e0db1
9eb68a0
36ad92b
321212d
a5a006d
79325c3
fba5cea
580bc5c
bb784a5
a9a1f50
044251e
abdf711
d7736f1
bccfcc4
9459508
f57d6e3
dc1869a
cfc91f2
a09f07d
14c308d
539877a
fa9d6e1
c9f66a0
19e576e
e625918
a43e1e0
5c48ca4
217b588
8b91c68
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
import pkg/ethers/provider | ||
import pkg/chronos | ||
import pkg/questionable | ||
|
||
import ../logutils | ||
|
||
from ../clock import SecondsSince1970 | ||
|
||
logScope: | ||
topics = "marketplace onchain provider" | ||
|
||
proc raiseProviderError(message: string) {.raises: [ProviderError].} = | ||
raise newException(ProviderError, message) | ||
|
||
proc blockNumberAndTimestamp*(provider: Provider, blockTag: BlockTag): | ||
Future[(UInt256, UInt256)] {.async: (raises: [ProviderError]).} = | ||
without latestBlock =? await provider.getBlock(blockTag): | ||
raiseProviderError("Could not get latest block") | ||
|
||
without latestBlockNumber =? latestBlock.number: | ||
raiseProviderError("Could not get latest block number") | ||
|
||
return (latestBlockNumber, latestBlock.timestamp) | ||
|
||
proc binarySearchFindClosestBlock( | ||
provider: Provider, | ||
epochTime: int, | ||
low: UInt256, | ||
high: UInt256): Future[UInt256] {.async: (raises: [ProviderError]).} = | ||
let (_, lowTimestamp) = | ||
await provider.blockNumberAndTimestamp(BlockTag.init(low)) | ||
let (_, highTimestamp) = | ||
await provider.blockNumberAndTimestamp(BlockTag.init(high)) | ||
if abs(lowTimestamp.truncate(int) - epochTime) < | ||
abs(highTimestamp.truncate(int) - epochTime): | ||
return low | ||
else: | ||
return high | ||
|
||
proc binarySearchBlockNumberForEpoch( | ||
provider: Provider, | ||
epochTime: UInt256, | ||
latestBlockNumber: UInt256, | ||
earliestBlockNumber: UInt256): Future[UInt256] | ||
{.async: (raises: [ProviderError]).} = | ||
var low = earliestBlockNumber | ||
var high = latestBlockNumber | ||
|
||
while low <= high: | ||
if low == 0 and high == 0: | ||
return low | ||
let mid = (low + high) div 2 | ||
let (midBlockNumber, midBlockTimestamp) = | ||
await provider.blockNumberAndTimestamp(BlockTag.init(mid)) | ||
|
||
if midBlockTimestamp < epochTime: | ||
low = mid + 1 | ||
elif midBlockTimestamp > epochTime: | ||
high = mid - 1 | ||
else: | ||
return midBlockNumber | ||
# NOTICE that by how the binary search is implemented, when it finishes | ||
# low is always greater than high - this is why we use high, where | ||
# intuitively we would use low: | ||
await provider.binarySearchFindClosestBlock( | ||
epochTime.truncate(int), low=high, high=low) | ||
|
||
proc blockNumberForEpoch*( | ||
provider: Provider, | ||
epochTime: SecondsSince1970): Future[UInt256] | ||
{.async: (raises: [ProviderError]).} = | ||
let epochTimeUInt256 = epochTime.u256 | ||
let (latestBlockNumber, latestBlockTimestamp) = | ||
await provider.blockNumberAndTimestamp(BlockTag.latest) | ||
let (earliestBlockNumber, earliestBlockTimestamp) = | ||
await provider.blockNumberAndTimestamp(BlockTag.earliest) | ||
|
||
# Initially we used the average block time to predict | ||
# the number of blocks we need to look back in order to find | ||
# the block number corresponding to the given epoch time. | ||
# This estimation can be highly inaccurate if block time | ||
# was changing in the past or is fluctuating and therefore | ||
# we used that information initially only to find out | ||
# if the available history is long enough to perform effective search. | ||
# It turns out we do not have to do that. There is an easier way. | ||
# | ||
# First we check if the given epoch time equals the timestamp of either | ||
# the earliest or the latest block. If it does, we just return the | ||
# block number of that block. | ||
# | ||
# Otherwise, if the earliest available block is not the genesis block, | ||
# we should check the timestamp of that earliest block and if it is greater | ||
# than the epoch time, we should issue a warning and return | ||
# that earliest block number. | ||
# In all other cases, thus when the earliest block is not the genesis | ||
# block but its timestamp is not greater than the requested epoch time, or | ||
# if the earliest available block is the genesis block, | ||
# (which means we have the whole history available), we should proceed with | ||
# the binary search. | ||
# | ||
# Additional benefit of this method is that we do not have to rely | ||
# on the average block time, which not only makes the whole thing | ||
# more reliable, but also easier to test. | ||
|
||
# Are lucky today? | ||
if earliestBlockTimestamp == epochTimeUInt256: | ||
return earliestBlockNumber | ||
if latestBlockTimestamp == epochTimeUInt256: | ||
return latestBlockNumber | ||
|
||
if earliestBlockNumber > 0 and earliestBlockTimestamp > epochTimeUInt256: | ||
let availableHistoryInDays = | ||
(latestBlockTimestamp - earliestBlockTimestamp) div | ||
1.days.secs.u256 | ||
warn "Short block history detected.", earliestBlockTimestamp = | ||
earliestBlockTimestamp, days = availableHistoryInDays | ||
return earliestBlockNumber | ||
|
||
return await provider.binarySearchBlockNumberForEpoch( | ||
epochTimeUInt256, latestBlockNumber, earliestBlockNumber) | ||
|
||
proc pastBlockTag*(provider: Provider, | ||
blocksAgo: int): | ||
Future[BlockTag] {.async: (raises: [ProviderError]).} = | ||
let head = await provider.getBlockNumber() | ||
return BlockTag.init(head - blocksAgo.abs.u256) | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,9 @@ type | |
proofTimeout: UInt256 | ||
config: ValidationConfig | ||
|
||
const | ||
MaxStorageRequestDuration = 30.days | ||
Comment on lines
+26
to
+27
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be somewhere else (it is more generic info/limit then only for validation), but I guess it will be more clear where to put this when the actual limit will be implemented, so for now, we can leave it here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. |
||
|
||
logScope: | ||
topics = "codex validator" | ||
|
||
|
@@ -56,15 +59,15 @@ func maxSlotsConstraintRespected(validation: Validation): bool = | |
validation.slots.len < validation.config.maxSlots | ||
|
||
func shouldValidateSlot(validation: Validation, slotId: SlotId): bool = | ||
if (validationGroups =? validation.config.groups): | ||
(groupIndexForSlotId(slotId, validationGroups) == | ||
validation.config.groupIndex) and | ||
validation.maxSlotsConstraintRespected | ||
else: | ||
validation.maxSlotsConstraintRespected | ||
without validationGroups =? validation.config.groups: | ||
return true | ||
groupIndexForSlotId(slotId, validationGroups) == | ||
validation.config.groupIndex | ||
|
||
proc subscribeSlotFilled(validation: Validation) {.async.} = | ||
proc onSlotFilled(requestId: RequestId, slotIndex: UInt256) = | ||
if not validation.maxSlotsConstraintRespected: | ||
return | ||
let slotId = slotId(requestId, slotIndex) | ||
if validation.shouldValidateSlot(slotId): | ||
trace "Adding slot", slotId | ||
|
@@ -78,7 +81,7 @@ proc removeSlotsThatHaveEnded(validation: Validation) {.async.} = | |
for slotId in slots: | ||
let state = await validation.market.slotState(slotId) | ||
if state != SlotState.Filled: | ||
trace "Removing slot", slotId | ||
trace "Removing slot", slotId, slotState = state | ||
marcinczenko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ended.incl(slotId) | ||
validation.slots.excl(ended) | ||
|
||
|
@@ -119,14 +122,37 @@ proc run(validation: Validation) {.async.} = | |
except CatchableError as e: | ||
error "Validation failed", msg = e.msg | ||
|
||
proc epochForDurationBackFromNow(validation: Validation, | ||
duration: Duration): SecondsSince1970 = | ||
return validation.clock.now - duration.secs | ||
|
||
proc restoreHistoricalState(validation: Validation) {.async.} = | ||
trace "Restoring historical state..." | ||
let startTimeEpoch = validation.epochForDurationBackFromNow(MaxStorageRequestDuration) | ||
let slotFilledEvents = await validation.market.queryPastSlotFilledEvents( | ||
fromTime = startTimeEpoch) | ||
for event in slotFilledEvents: | ||
if not validation.maxSlotsConstraintRespected: | ||
break | ||
let slotId = slotId(event.requestId, event.slotIndex) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 bit of an optimisation... we could keep track of a loop index and break early when we hit There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The We would need to modify three procedures: func shouldValidateSlot(validation: Validation, slotId: SlotId): bool =
without validationGroups =? validation.config.groups:
return true
groupIndexForSlotId(slotId, validationGroups) ==
validation.config.groupIndex
proc subscribeSlotFilled(validation: Validation) {.async.} =
proc onSlotFilled(requestId: RequestId, slotIndex: UInt256) =
if not validation.maxSlotsConstraintRespected:
return
let slotId = slotId(requestId, slotIndex)
if validation.shouldValidateSlot(slotId):
trace "Adding slot", slotId, groups = validation.config.groups,
groupIndex = validation.config.groupIndex
validation.slots.incl(slotId)
let subscription = await validation.market.subscribeSlotFilled(onSlotFilled)
validation.subscriptions.add(subscription)
proc restoreHistoricalState(validation: Validation) {.async.} =
logScope:
groups = validation.config.groups
groupIndex = validation.config.groupIndex
trace "Restoring historical state..."
let startTimeEpoch = validation.epochForDurationBackFromNow(MaxStorageRequestDuration)
let slotFilledEvents = await validation.market.queryPastSlotFilledEvents(
fromTime = startTimeEpoch)
trace "Found filled slots", numberOfSlots = slotFilledEvents.len
for event in slotFilledEvents:
if not validation.maxSlotsConstraintRespected:
break
let slotId = slotId(event.requestId, event.slotIndex)
if validation.shouldValidateSlot(slotId):
trace "Adding slot [historical]", slotId
validation.slots.incl(slotId)
trace "Removing slots that have ended..."
await removeSlotsThatHaveEnded(validation)
trace "Historical state restored", numberOfSlots = validation.slots.len Do you like it more this way? and @AuHau? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When restoring state, we could also directly check if slot has ended in the meantime and avoid calling proc restoreHistoricalState(validation: Validation) {.async.} =
logScope:
groups = validation.config.groups
groupIndex = validation.config.groupIndex
trace "Restoring historical state..."
let startTimeEpoch = validation.epochForDurationBackFromNow(MaxStorageRequestDuration)
let slotFilledEvents = await validation.market.queryPastSlotFilledEvents(
fromTime = startTimeEpoch)
for event in slotFilledEvents:
if not validation.maxSlotsConstraintRespected:
break
let slotId = slotId(event.requestId, event.slotIndex)
let slotState = await validation.market.slotState(slotId)
if slotState == SlotState.Filled and validation.shouldValidateSlot(slotId):
trace "Adding slot [historical]", slotId
validation.slots.incl(slotId)
trace "Historical state restored", numberOfSlots = validation.slots.len There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I would be down for the last version. IMHO I would not stress much about ended slots, they will be removed in the first iteration of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, proc restoreHistoricalState(validation: Validation) {.async.} =
trace "Restoring historical state..."
let startTimeEpoch = validation.epochForDurationBackFromNow(MaxStorageRequestDuration)
let slotFilledEvents = await validation.market.queryPastSlotFilledEvents(
fromTime = startTimeEpoch)
for event in slotFilledEvents:
if not validation.maxSlotsConstraintRespected:
break
let slotId = slotId(event.requestId, event.slotIndex)
let slotState = await validation.market.slotState(slotId)
if slotState == SlotState.Filled and validation.shouldValidateSlot(slotId):
trace "Adding slot [historical]", slotId
validation.slots.incl(slotId)
trace "Historical state restored", numberOfSlots = validation.slots.len There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The issue I was hoping to optimise out of, is when there are a lot of slots being validated, but honestly it's not a big deal, hence it was just a suggestion with 🟡. It just means that potentially it'll consume the thread, but honestly, for a short period of time and probably not worth spending any more time on it until we know for sure that we have this problem. This is a problem we don't currently have, so maybe just leave it out and keep it as it is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's fine. The last example should take care for not including slots that has ended. And it is no brainer, so I am ok using it. The change is marginal and if I got it right, @AuHau is also fine with using that last optimized version. |
||
let slotState = await validation.market.slotState(slotId) | ||
if slotState == SlotState.Filled and validation.shouldValidateSlot(slotId): | ||
trace "Adding slot [historical]", slotId | ||
validation.slots.incl(slotId) | ||
trace "Historical state restored", numberOfSlots = validation.slots.len | ||
|
||
proc start*(validation: Validation) {.async.} = | ||
trace "Starting validator", groups = validation.config.groups, | ||
groupIndex = validation.config.groupIndex | ||
validation.periodicity = await validation.market.periodicity() | ||
validation.proofTimeout = await validation.market.proofTimeout() | ||
await validation.subscribeSlotFilled() | ||
await validation.restoreHistoricalState() | ||
validation.running = validation.run() | ||
marcinczenko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
proc stop*(validation: Validation) {.async.} = | ||
await validation.running.cancelAndWait() | ||
if not isNil(validation.running): | ||
await validation.running.cancelAndWait() | ||
while validation.subscriptions.len > 0: | ||
let subscription = validation.subscriptions.pop() | ||
await subscription.unsubscribe() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be beneficial to preserve the original exception otherwise the underlying reason for the failure will be lost:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used like:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, callers can pull out the parent exception msg using
msdDetail
like this:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot use:
because (as you yourself pointed out earlier)
getBlock
returns anOption
and not aResult
. Right?So if there was an original
ProviderError
caused bygetBlock
it will naturally bubble-up so, we do not have to do anything about it I suppose. But thengetBlock
, which is defined as this:can still return
None
becauseeth_getBlockByNumber
is defined like this:This is why I am raising
ProviderError
when this happen, but we have no data about underlying exception that causedeth_getBlockByNumber
to returnNone
.So I think in this case everything can stays as it is, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, sorry I missed that
getBlock
returns anOption
👍I don't think it's an error really, just that the ethereum client can return null if the block doesn't exist.
What you have looks good 👍