Skip to content

Commit

Permalink
Add payload event stream and fix topics
Browse files Browse the repository at this point in the history
  • Loading branch information
potuz committed Feb 12, 2025
1 parent edcc4c4 commit 7ccec2e
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 11 deletions.
7 changes: 7 additions & 0 deletions api/server/structs/endpoints_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ type BlockEvent struct {
ExecutionOptimistic bool `json:"execution_optimistic"`
}

type PayloadEvent struct {
Slot string `json:"slot"`
BlockRoot string `json:"block_root"`
ExecutionBlockHash string `json:"execution_block_hash"`
ExecutionOptimistic bool `json:"execution_optimistic"`
}

type AggregatedAttEventSource struct {
Aggregate *Attestation `json:"aggregate"`
}
Expand Down
26 changes: 20 additions & 6 deletions beacon-chain/blockchain/receive_execution_payload_envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/epbs"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/execution"
Expand Down Expand Up @@ -85,14 +87,14 @@ func (s *Service) ReceiveExecutionPayloadEnvelope(ctx context.Context, signed in
log.WithError(err).Error("could not get headroot to compute attributes")
return nil
}
execution, err := envelope.Execution()
if err != nil {
log.WithError(err).Error("could not get execution data")
return nil
}
blockHash := [32]byte(execution.BlockHash())
if bytes.Equal(headRoot, root[:]) {
attr := s.getPayloadAttribute(ctx, preState, envelope.Slot()+1, headRoot)
execution, err := envelope.Execution()
if err != nil {
log.WithError(err).Error("could not get execution data")
return nil
}
blockHash := [32]byte(execution.BlockHash())
payloadID, err := s.notifyForkchoiceUpdateEPBS(ctx, blockHash, attr)
if err != nil {
if IsInvalidBlock(err) {
Expand Down Expand Up @@ -127,6 +129,18 @@ func (s *Service) ReceiveExecutionPayloadEnvelope(ctx context.Context, signed in
if err != nil {
return errors.Wrap(err, "could not get execution data")
}
// Send feed event
// Send notification of the processed block to the state feed.
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.PayloadProcessed,
Data: &statefeed.PayloadProcessedData{
Slot: envelope.Slot(),
BlockRoot: root,
ExecutionBlockHash: blockHash,
ExecutionOptimistic: !isValidPayload,
},
})

log.WithFields(logrus.Fields{
"slot": envelope.Slot(),
"blockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(root[:])),
Expand Down
13 changes: 13 additions & 0 deletions beacon-chain/core/feed/state/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
LightClientOptimisticUpdate
// PayloadAttributes events are fired upon a missed slot or new head.
PayloadAttributes
// PayloadProcessed is fired when a payload is processed.
PayloadProcessed
)

// BlockProcessedData is the data sent with BlockProcessed events.
Expand All @@ -49,6 +51,17 @@ type BlockProcessedData struct {
Optimistic bool
}

type PayloadProcessedData struct {
// Slot is the slot of the processed block.
Slot primitives.Slot
// BlockRoot of the processed block.
BlockRoot [32]byte
// ExecutionBlockHash is the hash of the execution payload.
ExecutionBlockHash [32]byte
// ExecutionOptimistic is true if the execution payload is optimistic.
ExecutionOptimistic bool
}

// ChainStartedData is the data sent with ChainStarted events.
type ChainStartedData struct {
// StartTime is the time at which the chain started.
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/core/transition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ go_library(
"//beacon-chain/core/fulu:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition/interop:go_default_library",
"//beacon-chain/core/validators:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/core/transition/interop/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ go_library(
],
deps = [
"//beacon-chain/state:go_default_library",
"//config/features:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//io/file:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/topics_epbs.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package p2p

const (
GossipSignedExecutionPayloadHeader = "signed_execution_payload_header"
GossipSignedExecutionPayloadEnvelope = "signed_execution_payload_envelope"
GossipSignedExecutionPayloadHeader = "execution_payload_header"
GossipSignedExecutionPayloadEnvelope = "execution_payload"
GossipPayloadAttestationMessage = "payload_attestation_message"

SignedExecutionPayloadHeaderTopicFormat = GossipProtocolAndDigest + GossipSignedExecutionPayloadHeader
Expand Down
15 changes: 15 additions & 0 deletions beacon-chain/rpc/eth/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ const (
LightClientFinalityUpdateTopic = "light_client_finality_update"
// LightClientOptimisticUpdateTopic represents a new light client optimistic update event topic.
LightClientOptimisticUpdateTopic = "light_client_optimistic_update"
// ExecutionPayloadTopic represents a new execution payload event topic. New in EIP-7732
ExecutionPayloadTopic = "execution_payload"
)

var (
Expand Down Expand Up @@ -108,6 +110,7 @@ var stateFeedEventTopics = map[feed.EventType]string{
statefeed.Reorg: ChainReorgTopic,
statefeed.BlockProcessed: BlockTopic,
statefeed.PayloadAttributes: PayloadAttributesTopic,
statefeed.PayloadProcessed: ExecutionPayloadTopic,
}

var topicsForStateFeed = topicsForFeed(stateFeedEventTopics)
Expand Down Expand Up @@ -441,6 +444,8 @@ func topicForEvent(event *feed.Event) string {
return BlockTopic
case payloadattribute.EventData:
return PayloadAttributesTopic
case *statefeed.PayloadProcessedData:
return ExecutionPayloadTopic
default:
return InvalidTopic
}
Expand Down Expand Up @@ -568,6 +573,16 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi
}
return jsonMarshalReader(eventName, blk)
}, nil
case *statefeed.PayloadProcessedData:
return func() io.Reader {
pld := &structs.PayloadEvent{
Slot: fmt.Sprintf("%d", v.Slot),
BlockRoot: hexutil.Encode(v.BlockRoot[:]),
ExecutionBlockHash: hexutil.Encode(v.ExecutionBlockHash[:]),
ExecutionOptimistic: v.ExecutionOptimistic,
}
return jsonMarshalReader(eventName, pld)
}, nil
default:
return nil, errors.Wrapf(errUnhandledEventData, "event data type %T unsupported", v)
}
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ go_library(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/core/transition/interop:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/db/filters:go_default_library",
Expand Down

0 comments on commit 7ccec2e

Please sign in to comment.