From 284600b44234d8a9b1e7a5b39069b7fe1e773a73 Mon Sep 17 00:00:00 2001 From: Maelkum Date: Fri, 5 Jan 2024 16:42:09 +0100 Subject: [PATCH 1/7] Adding support for multiple topics --- api/execute.go | 7 +++- api/install.go | 7 ++-- api/node.go | 4 +- cmd/node/flags.go | 1 + cmd/node/main.go | 7 +++- config/model.go | 11 ++--- node/config.go | 14 +++---- node/config_internal_test.go | 8 ++-- node/head_execute.go | 11 +++-- node/message.go | 38 +++++++++++++---- node/node.go | 11 ++++- node/rest.go | 14 ++++--- node/roll_call.go | 16 ++++++-- node/run.go | 79 ++++++++++++++++++++---------------- 14 files changed, 149 insertions(+), 79 deletions(-) diff --git a/api/execute.go b/api/execute.go index 2165d2a2..595f52a2 100644 --- a/api/execute.go +++ b/api/execute.go @@ -14,7 +14,10 @@ import ( ) // ExecuteRequest describes the payload for the REST API request for function execution. -type ExecuteRequest execute.Request +type ExecuteRequest struct { + execute.Request + Topic string `json:"topic,omitempty"` +} // ExecuteResponse describes the REST API response for function execution. type ExecuteResponse struct { @@ -44,7 +47,7 @@ func (a *API) Execute(ctx echo.Context) error { } // Get the execution result. - code, id, results, cluster, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(req)) + code, id, results, cluster, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(req.Request), req.Topic) if err != nil { a.Log.Warn().Str("function", req.FunctionID).Err(err).Msg("node failed to execute function") } diff --git a/api/install.go b/api/install.go index db993f8f..b2908360 100644 --- a/api/install.go +++ b/api/install.go @@ -17,8 +17,9 @@ const ( // InstallFunctionRequest describes the payload for the REST API request for function install. type InstallFunctionRequest struct { - CID string `json:"cid"` - URI string `json:"uri"` + CID string `json:"cid"` + URI string `json:"uri"` + Topic string `json:"topic"` } // InstallFunctionResponse describes the REST API response for the function install. @@ -46,7 +47,7 @@ func (a *API) Install(ctx echo.Context) error { // Start function install in a separate goroutine and signal when it's done. fnErr := make(chan error) go func() { - err = a.Node.PublishFunctionInstall(reqCtx, req.URI, req.CID) + err = a.Node.PublishFunctionInstall(reqCtx, req.URI, req.CID, req.Topic) fnErr <- err }() diff --git a/api/node.go b/api/node.go index 3e91d41e..27471902 100644 --- a/api/node.go +++ b/api/node.go @@ -8,7 +8,7 @@ import ( ) type Node interface { - ExecuteFunction(context.Context, execute.Request) (code codes.Code, requestID string, results execute.ResultMap, peers execute.Cluster, err error) + ExecuteFunction(ctx context.Context, req execute.Request, topic string) (code codes.Code, requestID string, results execute.ResultMap, peers execute.Cluster, err error) ExecutionResult(id string) (execute.Result, bool) - PublishFunctionInstall(ctx context.Context, uri string, cid string) error + PublishFunctionInstall(ctx context.Context, uri string, cid string, topic string) error } diff --git a/cmd/node/flags.go b/cmd/node/flags.go index cd70e014..0c7532e1 100644 --- a/cmd/node/flags.go +++ b/cmd/node/flags.go @@ -35,6 +35,7 @@ func parseFlags() *config.Config { pflag.StringVar(&cfg.RuntimePath, "runtime-path", "", "runtime path (used by the worker node)") pflag.StringVar(&cfg.RuntimeCLI, "runtime-cli", "", "runtime path (used by the worker node)") pflag.BoolVar(&cfg.LoadAttributes, "attributes", false, "node should try to load its attribute data from IPFS") + pflag.StringSliceVar(&cfg.Topics, "topic", nil, "topics node should subscribe to") // Host configuration. pflag.StringVar(&cfg.Host.PrivateKey, "private-key", "", "private key that the b7s host will use") diff --git a/cmd/node/main.go b/cmd/node/main.go index 06b37a10..0a452fbe 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -157,7 +157,7 @@ func run() int { executor, err := executor.New(log, execOptions...) if err != nil { log.Error(). - Err(err). + Err(err). Str("workspace", cfg.Workspace). Str("runtime_path", cfg.RuntimePath). Msg("could not create an executor") @@ -181,6 +181,11 @@ func run() int { // Create function store. fstore := fstore.New(log, functionStore, cfg.Workspace) + // If we have topics specified, use those. + if len(cfg.Topics) > 0 { + opts = append(opts, node.WithTopics(cfg.Topics)) + } + // Instantiate node. node, err := node.New(log, host, peerstore, fstore, opts...) if err != nil { diff --git a/config/model.go b/config/model.go index f64b96f4..10cba7c6 100644 --- a/config/model.go +++ b/config/model.go @@ -8,12 +8,13 @@ type Config struct { Role string BootNodes []string Concurrency uint + Topics []string - Host Host - API string - RuntimePath string - RuntimeCLI string - LoadAttributes bool + Host Host + API string + RuntimePath string + RuntimeCLI string + LoadAttributes bool CPUPercentage float64 MemoryMaxKB int64 diff --git a/node/config.go b/node/config.go index 9f676da4..9fbbe046 100644 --- a/node/config.go +++ b/node/config.go @@ -15,7 +15,7 @@ type Option func(*Config) // DefaultConfig represents the default settings for the node. var DefaultConfig = Config{ Role: blockless.WorkerNode, - Topic: DefaultTopic, + Topics: []string{DefaultTopic}, HealthInterval: DefaultHealthInterval, RollCallTimeout: DefaultRollCallTimeout, Concurrency: DefaultConcurrency, @@ -28,7 +28,7 @@ var DefaultConfig = Config{ // Config represents the Node configuration. type Config struct { Role blockless.NodeRole // Node role. - Topic string // Topic to subscribe to. + Topics []string // Topics to subscribe to. Execute blockless.Executor // Executor to use for running functions. HealthInterval time.Duration // How often should we emit the health ping. RollCallTimeout time.Duration // How long do we wait for roll call responses. @@ -47,8 +47,8 @@ func (n *Node) ValidateConfig() error { return errors.New("node role is not valid") } - if n.cfg.Topic == "" { - return errors.New("topic cannot be empty") + if len(n.cfg.Topics) == 0 { + return errors.New("topics cannot be empty") } // Worker specific validation. @@ -83,10 +83,10 @@ func WithRole(role blockless.NodeRole) Option { } } -// WithTopic specifies the p2p topic to which node should subscribe. -func WithTopic(topic string) Option { +// WithTopics specifies the p2p topics to which node should subscribe. +func WithTopics(topics []string) Option { return func(cfg *Config) { - cfg.Topic = topic + cfg.Topics = topics } } diff --git a/node/config_internal_test.go b/node/config_internal_test.go index c7606b54..d94919f2 100644 --- a/node/config_internal_test.go +++ b/node/config_internal_test.go @@ -24,14 +24,14 @@ func TestConfig_NodeRole(t *testing.T) { func TestConfig_Topic(t *testing.T) { - const topic = "super-secret-topic" + topics := []string{"super-secret-topic"} cfg := Config{ - Topic: "", + Topics: []string{}, } - WithTopic(topic)(&cfg) - require.Equal(t, topic, cfg.Topic) + WithTopics(topics)(&cfg) + require.Equal(t, topics, cfg.Topics) } func TestConfig_Executor(t *testing.T) { diff --git a/node/head_execute.go b/node/head_execute.go index 3ee45b34..840129f5 100644 --- a/node/head_execute.go +++ b/node/head_execute.go @@ -17,6 +17,7 @@ import ( "github.com/blocklessnetwork/b7s/models/response" ) +// TODO: Check - head node really accepts execution requests from the REST API. Should this message handling be cognizant of `topics`? func (n *Node) headProcessExecute(ctx context.Context, from peer.ID, payload []byte) error { // Unpack the request. @@ -34,7 +35,7 @@ func (n *Node) headProcessExecute(ctx context.Context, from peer.ID, payload []b log := n.log.With().Str("request", req.RequestID).Str("peer", from.String()).Str("function", req.FunctionID).Logger() - code, results, cluster, err := n.headExecute(ctx, requestID, req.Request) + code, results, cluster, err := n.headExecute(ctx, requestID, req.Request, "") if err != nil { log.Error().Err(err).Msg("execution failed") } @@ -66,7 +67,11 @@ func (n *Node) headProcessExecute(ctx context.Context, from peer.ID, payload []b // headExecute is called on the head node. The head node will publish a roll call and delegate an execution request to chosen nodes. // The returned map contains execution results, mapped to the peer IDs of peers who reported them. -func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Request) (codes.Code, execute.ResultMap, execute.Cluster, error) { +func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Request, topic string) (codes.Code, execute.ResultMap, execute.Cluster, error) { + + if topic == "" { + topic = DefaultTopic + } nodeCount := 1 if req.Config.NodeCount > 1 { @@ -89,7 +94,7 @@ func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Re log.Info().Msg("processing execution request") // Phase 1. - Issue roll call to nodes. - reportingPeers, err := n.executeRollCall(ctx, requestID, req.FunctionID, nodeCount, consensusAlgo, req.Config.Attributes) + reportingPeers, err := n.executeRollCall(ctx, requestID, req.FunctionID, nodeCount, consensusAlgo, topic, req.Config.Attributes) if err != nil { code := codes.Error if errors.Is(err, blockless.ErrRollCallTimeout) { diff --git a/node/message.go b/node/message.go index 47e3f14d..ea430db6 100644 --- a/node/message.go +++ b/node/message.go @@ -9,15 +9,30 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ) -func (n *Node) subscribe(ctx context.Context) (*pubsub.Subscription, error) { +type topicInfo struct { + handle *pubsub.Topic + subscription *pubsub.Subscription +} - topic, subscription, err := n.host.Subscribe(ctx, n.cfg.Topic) - if err != nil { - return nil, fmt.Errorf("could not subscribe to topic: %w", err) +func (n *Node) subscribeToTopics(ctx context.Context) error { + + // TODO: If some topics/subscriptions failed, cleanup those already subscribed to. + for _, topicName := range n.cfg.Topics { + + topic, subscription, err := n.host.Subscribe(ctx, topicName) + if err != nil { + return fmt.Errorf("could not subscribe to topic (name: %s): %w", topicName, err) + } + + ti := &topicInfo{ + handle: topic, + subscription: subscription, + } + + n.topics[topicName] = ti } - n.topic = topic - return subscription, nil + return nil } // send serializes the message and sends it to the specified peer. @@ -59,6 +74,10 @@ func (n *Node) sendToMany(ctx context.Context, peers []peer.ID, msg interface{}) } func (n *Node) publish(ctx context.Context, msg interface{}) error { + return n.publishToTopic(ctx, DefaultTopic, msg) +} + +func (n *Node) publishToTopic(ctx context.Context, topic string, msg interface{}) error { // Serialize the message. payload, err := json.Marshal(msg) @@ -66,8 +85,13 @@ func (n *Node) publish(ctx context.Context, msg interface{}) error { return fmt.Errorf("could not encode record: %w", err) } + topicInfo, ok := n.topics[topic] + if !ok { + return fmt.Errorf("cannot publish to an unknown topic: %s", topic) + } + // Publish message. - err = n.host.Publish(ctx, n.topic, payload) + err = n.host.Publish(ctx, topicInfo.handle, payload) if err != nil { return fmt.Errorf("could not publish message: %w", err) } diff --git a/node/node.go b/node/node.go index 491f4c26..a118717a 100644 --- a/node/node.go +++ b/node/node.go @@ -3,10 +3,10 @@ package node import ( "context" "fmt" + "slices" "sync" "github.com/google/uuid" - pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" "github.com/rs/zerolog" @@ -31,7 +31,7 @@ type Node struct { executor blockless.Executor fstore FStore - topic *pubsub.Topic + topics map[string]*topicInfo sema chan struct{} wg *sync.WaitGroup attributes *attributes.Attestation @@ -57,6 +57,12 @@ func New(log zerolog.Logger, host *host.Host, peerStore PeerStore, fstore FStore option(&cfg) } + // Ensure default topic is included in the topic list. + defaultSubscription := slices.Contains(cfg.Topics, DefaultTopic) + if !defaultSubscription { + cfg.Topics = append(cfg.Topics, DefaultTopic) + } + n := &Node{ cfg: cfg, @@ -68,6 +74,7 @@ func New(log zerolog.Logger, host *host.Host, peerStore PeerStore, fstore FStore wg: &sync.WaitGroup{}, sema: make(chan struct{}, cfg.Concurrency), + topics: make(map[string]*topicInfo), rollCall: newQueue(rollCallQueueBufferSize), clusters: make(map[string]consensusExecutor), executeResponses: waitmap.New(), diff --git a/node/rest.go b/node/rest.go index d9234d9e..2a8a23a2 100644 --- a/node/rest.go +++ b/node/rest.go @@ -12,7 +12,7 @@ import ( ) // ExecuteFunction can be used to start function execution. At the moment this is used by the API server to start execution on the head node. -func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { +func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request, topic string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { if !n.isHead() { return codes.NotAvailable, "", nil, execute.Cluster{}, fmt.Errorf("action not supported on this node type") @@ -23,7 +23,7 @@ func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request) (codes. return codes.Error, "", nil, execute.Cluster{}, fmt.Errorf("could not generate request ID: %w", err) } - code, results, cluster, err := n.headExecute(ctx, requestID, req) + code, results, cluster, err := n.headExecute(ctx, requestID, req, topic) if err != nil { n.log.Error().Str("request", requestID).Err(err).Msg("execution failed") } @@ -38,7 +38,7 @@ func (n *Node) ExecutionResult(id string) (execute.Result, bool) { } // PublishFunctionInstall publishes a function install message. -func (n *Node) PublishFunctionInstall(ctx context.Context, uri string, cid string) error { +func (n *Node) PublishFunctionInstall(ctx context.Context, uri string, cid string, topic string) error { var req request.InstallFunction if uri != "" { @@ -51,9 +51,13 @@ func (n *Node) PublishFunctionInstall(ctx context.Context, uri string, cid strin req = createInstallMessageFromCID(cid) } - n.log.Debug().Str("url", req.ManifestURL).Str("cid", req.CID).Msg("publishing function install message") + if topic == "" { + topic = DefaultTopic + } + + n.log.Debug().Str("topic", topic).Str("url", req.ManifestURL).Str("cid", req.CID).Msg("publishing function install message") - err := n.publish(ctx, req) + err := n.publishToTopic(ctx, topic, req) if err != nil { return fmt.Errorf("could not publish message: %w", err) } diff --git a/node/roll_call.go b/node/roll_call.go index 98d955f7..e22591e0 100644 --- a/node/roll_call.go +++ b/node/roll_call.go @@ -102,7 +102,15 @@ func (n *Node) processRollCall(ctx context.Context, from peer.ID, payload []byte return nil } -func (n *Node) executeRollCall(ctx context.Context, requestID string, functionID string, nodeCount int, consensus consensus.Type, attributes *execute.Attributes) ([]peer.ID, error) { +func (n *Node) executeRollCall( + ctx context.Context, + requestID string, + functionID string, + nodeCount int, + consensus consensus.Type, + topic string, + attributes *execute.Attributes, +) ([]peer.ID, error) { // Create a logger with relevant context. log := n.log.With().Str("request", requestID).Str("function", functionID).Int("node_count", nodeCount).Logger() @@ -112,7 +120,7 @@ func (n *Node) executeRollCall(ctx context.Context, requestID string, functionID n.rollCall.create(requestID) defer n.rollCall.remove(requestID) - err := n.publishRollCall(ctx, requestID, functionID, consensus, attributes) + err := n.publishRollCall(ctx, requestID, functionID, consensus, topic, attributes) if err != nil { return nil, fmt.Errorf("could not publish roll call: %w", err) } @@ -165,7 +173,7 @@ rollCallResponseLoop: // publishRollCall will create a roll call request for executing the given function. // On successful issuance of the roll call request, we return the ID of the issued request. -func (n *Node) publishRollCall(ctx context.Context, requestID string, functionID string, consensus consensus.Type, attributes *execute.Attributes) error { +func (n *Node) publishRollCall(ctx context.Context, requestID string, functionID string, consensus consensus.Type, topic string, attributes *execute.Attributes) error { // Create a roll call request. rollCall := request.RollCall{ @@ -178,7 +186,7 @@ func (n *Node) publishRollCall(ctx context.Context, requestID string, functionID } // Publish the mssage. - err := n.publish(ctx, rollCall) + err := n.publishToTopic(ctx, topic, rollCall) if err != nil { return fmt.Errorf("could not publish to topic: %w", err) } diff --git a/node/run.go b/node/run.go index af863487..2d1eb45a 100644 --- a/node/run.go +++ b/node/run.go @@ -7,6 +7,7 @@ import ( "fmt" "io" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/network" "github.com/blocklessnetwork/b7s/models/blockless" @@ -15,8 +16,9 @@ import ( // Run will start the main loop for the node. func (n *Node) Run(ctx context.Context) error { - // Subscribe to the specified topic. - subscription, err := n.subscribe(ctx) + n.log.Info().Strs("topics", n.cfg.Topics).Msg("topics node will subscribe to") + + err := n.subscribeToTopics(ctx) if err != nil { return fmt.Errorf("could not subscribe to topic: %w", err) } @@ -31,7 +33,7 @@ func (n *Node) Run(ctx context.Context) error { // NOTE: Potentially signal any error here so that we abort the node // run loop if anything failed. go func() { - err = n.host.DiscoverPeers(ctx, n.cfg.Topic) + err = n.host.DiscoverPeers(ctx, n.cfg.Topics[0]) if err != nil { n.log.Error().Err(err).Msg("could not discover peers") } @@ -45,38 +47,47 @@ func (n *Node) Run(ctx context.Context) error { n.log.Info().Uint("concurrency", n.cfg.Concurrency).Msg("starting node main loop") - // Message processing loop. - for { - - // Retrieve next message. - msg, err := subscription.Next(ctx) - if err != nil { - // NOTE: Cancelling the context will lead us here. - n.log.Error().Err(err).Msg("could not receive message") - break - } - - // Skip messages we published. - if msg.ReceivedFrom == n.host.ID() { - continue - } - - n.log.Trace().Str("id", msg.ID).Str("peer", msg.ReceivedFrom.String()).Msg("received message") - - // Try to get a slot for processing the request. - n.sema <- struct{}{} - n.wg.Add(1) - - go func() { - // Free up slot after we're done. - defer n.wg.Done() - defer func() { <-n.sema }() - - err = n.processMessage(ctx, msg.ReceivedFrom, msg.Data) - if err != nil { - n.log.Error().Err(err).Str("id", msg.ID).Str("peer_id", msg.ReceivedFrom.String()).Msg("could not process message") + // Process topic messages. + // TODO: Perhaps now using a buffered channel makes more sense, with goroutines filling up the channel + // and the main processing loop consumes it. + for name, topic := range n.topics { + + go func(name string, subscription *pubsub.Subscription) { + + // Message processing loops. + for { + + // Retrieve next message. + msg, err := subscription.Next(ctx) + if err != nil { + // NOTE: Cancelling the context will lead us here. + n.log.Error().Err(err).Msg("could not receive message") + break + } + + // Skip messages we published. + if msg.ReceivedFrom == n.host.ID() { + continue + } + + n.log.Trace().Str("topic", name).Str("id", msg.ID).Str("peer", msg.ReceivedFrom.String()).Msg("received message") + + // Try to get a slot for processing the request. + n.sema <- struct{}{} + n.wg.Add(1) + + go func() { + // Free up slot after we're done. + defer n.wg.Done() + defer func() { <-n.sema }() + + err = n.processMessage(ctx, msg.ReceivedFrom, msg.Data) + if err != nil { + n.log.Error().Err(err).Str("topic", name).Str("id", msg.ID).Str("peer_id", msg.ReceivedFrom.String()).Msg("could not process message") + } + }() } - }() + }(name, topic.subscription) } n.log.Debug().Msg("waiting for messages being processed") From c8be22e3b932c52a3774fde514e38374296bb4ee Mon Sep 17 00:00:00 2001 From: Maelkum Date: Sat, 6 Jan 2024 21:32:41 +0100 Subject: [PATCH 2/7] Rename 'topic' to 'subgroup' for better understanding --- api/execute.go | 4 ++-- api/install.go | 8 ++++---- api/node.go | 2 +- node/head_execute.go | 8 ++------ node/rest.go | 4 ++-- node/roll_call.go | 4 ++++ 6 files changed, 15 insertions(+), 15 deletions(-) diff --git a/api/execute.go b/api/execute.go index 595f52a2..0e1362fd 100644 --- a/api/execute.go +++ b/api/execute.go @@ -16,7 +16,7 @@ import ( // ExecuteRequest describes the payload for the REST API request for function execution. type ExecuteRequest struct { execute.Request - Topic string `json:"topic,omitempty"` + Subgroup string `json:"subgroup,omitempty"` } // ExecuteResponse describes the REST API response for function execution. @@ -47,7 +47,7 @@ func (a *API) Execute(ctx echo.Context) error { } // Get the execution result. - code, id, results, cluster, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(req.Request), req.Topic) + code, id, results, cluster, err := a.Node.ExecuteFunction(ctx.Request().Context(), execute.Request(req.Request), req.Subgroup) if err != nil { a.Log.Warn().Str("function", req.FunctionID).Err(err).Msg("node failed to execute function") } diff --git a/api/install.go b/api/install.go index b2908360..2e49f133 100644 --- a/api/install.go +++ b/api/install.go @@ -17,9 +17,9 @@ const ( // InstallFunctionRequest describes the payload for the REST API request for function install. type InstallFunctionRequest struct { - CID string `json:"cid"` - URI string `json:"uri"` - Topic string `json:"topic"` + CID string `json:"cid"` + URI string `json:"uri"` + Subgroup string `json:"subgroup"` } // InstallFunctionResponse describes the REST API response for the function install. @@ -47,7 +47,7 @@ func (a *API) Install(ctx echo.Context) error { // Start function install in a separate goroutine and signal when it's done. fnErr := make(chan error) go func() { - err = a.Node.PublishFunctionInstall(reqCtx, req.URI, req.CID, req.Topic) + err = a.Node.PublishFunctionInstall(reqCtx, req.URI, req.CID, req.Subgroup) fnErr <- err }() diff --git a/api/node.go b/api/node.go index 27471902..07534e94 100644 --- a/api/node.go +++ b/api/node.go @@ -8,7 +8,7 @@ import ( ) type Node interface { - ExecuteFunction(ctx context.Context, req execute.Request, topic string) (code codes.Code, requestID string, results execute.ResultMap, peers execute.Cluster, err error) + ExecuteFunction(ctx context.Context, req execute.Request, subgroup string) (code codes.Code, requestID string, results execute.ResultMap, peers execute.Cluster, err error) ExecutionResult(id string) (execute.Result, bool) PublishFunctionInstall(ctx context.Context, uri string, cid string, topic string) error } diff --git a/node/head_execute.go b/node/head_execute.go index 840129f5..b9879022 100644 --- a/node/head_execute.go +++ b/node/head_execute.go @@ -67,11 +67,7 @@ func (n *Node) headProcessExecute(ctx context.Context, from peer.ID, payload []b // headExecute is called on the head node. The head node will publish a roll call and delegate an execution request to chosen nodes. // The returned map contains execution results, mapped to the peer IDs of peers who reported them. -func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Request, topic string) (codes.Code, execute.ResultMap, execute.Cluster, error) { - - if topic == "" { - topic = DefaultTopic - } +func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Request, subgroup string) (codes.Code, execute.ResultMap, execute.Cluster, error) { nodeCount := 1 if req.Config.NodeCount > 1 { @@ -94,7 +90,7 @@ func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Re log.Info().Msg("processing execution request") // Phase 1. - Issue roll call to nodes. - reportingPeers, err := n.executeRollCall(ctx, requestID, req.FunctionID, nodeCount, consensusAlgo, topic, req.Config.Attributes) + reportingPeers, err := n.executeRollCall(ctx, requestID, req.FunctionID, nodeCount, consensusAlgo, subgroup, req.Config.Attributes) if err != nil { code := codes.Error if errors.Is(err, blockless.ErrRollCallTimeout) { diff --git a/node/rest.go b/node/rest.go index 2a8a23a2..835182e5 100644 --- a/node/rest.go +++ b/node/rest.go @@ -12,7 +12,7 @@ import ( ) // ExecuteFunction can be used to start function execution. At the moment this is used by the API server to start execution on the head node. -func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request, topic string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { +func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request, subgroup string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { if !n.isHead() { return codes.NotAvailable, "", nil, execute.Cluster{}, fmt.Errorf("action not supported on this node type") @@ -23,7 +23,7 @@ func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request, topic s return codes.Error, "", nil, execute.Cluster{}, fmt.Errorf("could not generate request ID: %w", err) } - code, results, cluster, err := n.headExecute(ctx, requestID, req, topic) + code, results, cluster, err := n.headExecute(ctx, requestID, req, subgroup) if err != nil { n.log.Error().Str("request", requestID).Err(err).Msg("execution failed") } diff --git a/node/roll_call.go b/node/roll_call.go index e22591e0..560ab167 100644 --- a/node/roll_call.go +++ b/node/roll_call.go @@ -185,6 +185,10 @@ func (n *Node) publishRollCall(ctx context.Context, requestID string, functionID Attributes: attributes, } + if topic == "" { + topic = DefaultTopic + } + // Publish the mssage. err := n.publishToTopic(ctx, topic, rollCall) if err != nil { From db26122f6392e5f05e21f63c697e5b223236673d Mon Sep 17 00:00:00 2001 From: Maelkum Date: Sun, 7 Jan 2024 17:57:55 +0100 Subject: [PATCH 3/7] Fixing pubsub handling for multiple topics and tweaking message processing loop --- cmd/node/flags.go | 2 +- host/host.go | 3 +++ host/subscribe.go | 14 +++++++--- node/message.go | 7 ++++- node/node.go | 4 +-- node/run.go | 67 +++++++++++++++++++++++++++++------------------ 6 files changed, 63 insertions(+), 34 deletions(-) diff --git a/cmd/node/flags.go b/cmd/node/flags.go index 0c7532e1..351ffd95 100644 --- a/cmd/node/flags.go +++ b/cmd/node/flags.go @@ -33,7 +33,7 @@ func parseFlags() *config.Config { pflag.StringVar(&cfg.API, "rest-api", "", "address where the head node REST API will listen on") pflag.StringVar(&cfg.Workspace, "workspace", "./workspace", "directory that the node can use for file storage") pflag.StringVar(&cfg.RuntimePath, "runtime-path", "", "runtime path (used by the worker node)") - pflag.StringVar(&cfg.RuntimeCLI, "runtime-cli", "", "runtime path (used by the worker node)") + pflag.StringVar(&cfg.RuntimeCLI, "runtime-cli", "", "runtime CLI name (used by the worker node)") pflag.BoolVar(&cfg.LoadAttributes, "attributes", false, "node should try to load its attribute data from IPFS") pflag.StringSliceVar(&cfg.Topics, "topic", nil, "topics node should subscribe to") diff --git a/host/host.go b/host/host.go index f34682f7..a40a79a9 100644 --- a/host/host.go +++ b/host/host.go @@ -9,6 +9,7 @@ import ( "github.com/rs/zerolog" "github.com/libp2p/go-libp2p" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" ma "github.com/multiformats/go-multiaddr" @@ -20,6 +21,8 @@ type Host struct { log zerolog.Logger cfg Config + + pubsub *pubsub.PubSub } // New creates a new Host. diff --git a/host/subscribe.go b/host/subscribe.go index 64e9a9f1..6f145764 100644 --- a/host/subscribe.go +++ b/host/subscribe.go @@ -7,17 +7,23 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" ) -// Subscribe will have the host start listening to a specified gossipsub topic. -func (h *Host) Subscribe(ctx context.Context, topic string) (*pubsub.Topic, *pubsub.Subscription, error) { +func (h *Host) InitPubSub(ctx context.Context) error { // Get a new PubSub object with the default router. pubsub, err := pubsub.NewGossipSub(ctx, h) if err != nil { - return nil, nil, fmt.Errorf("could not create new gossipsub: %w", err) + return fmt.Errorf("could not create new gossipsub: %w", err) } + h.pubsub = pubsub + + return nil +} + +// Subscribe will have the host start listening to a specified gossipsub topic. +func (h *Host) Subscribe(topic string) (*pubsub.Topic, *pubsub.Subscription, error) { // Join the specified topic. - th, err := pubsub.Join(topic) + th, err := h.pubsub.Join(topic) if err != nil { return nil, nil, fmt.Errorf("could not join topic: %w", err) } diff --git a/node/message.go b/node/message.go index ea430db6..be95794d 100644 --- a/node/message.go +++ b/node/message.go @@ -16,10 +16,15 @@ type topicInfo struct { func (n *Node) subscribeToTopics(ctx context.Context) error { + err := n.host.InitPubSub(ctx) + if err != nil { + return fmt.Errorf("could not initialize pubsub: %w", err) + } + // TODO: If some topics/subscriptions failed, cleanup those already subscribed to. for _, topicName := range n.cfg.Topics { - topic, subscription, err := n.host.Subscribe(ctx, topicName) + topic, subscription, err := n.host.Subscribe(topicName) if err != nil { return fmt.Errorf("could not subscribe to topic (name: %s): %w", topicName, err) } diff --git a/node/node.go b/node/node.go index a118717a..f2f61146 100644 --- a/node/node.go +++ b/node/node.go @@ -32,7 +32,6 @@ type Node struct { fstore FStore topics map[string]*topicInfo - sema chan struct{} wg *sync.WaitGroup attributes *attributes.Attestation @@ -71,8 +70,7 @@ func New(log zerolog.Logger, host *host.Host, peerStore PeerStore, fstore FStore fstore: fstore, executor: cfg.Execute, - wg: &sync.WaitGroup{}, - sema: make(chan struct{}, cfg.Concurrency), + wg: &sync.WaitGroup{}, topics: make(map[string]*topicInfo), rollCall: newQueue(rollCallQueueBufferSize), diff --git a/node/run.go b/node/run.go index 2d1eb45a..2e27297c 100644 --- a/node/run.go +++ b/node/run.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "sync" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/network" @@ -32,12 +33,15 @@ func (n *Node) Run(ctx context.Context) error { // Discover peers. // NOTE: Potentially signal any error here so that we abort the node // run loop if anything failed. - go func() { - err = n.host.DiscoverPeers(ctx, n.cfg.Topics[0]) - if err != nil { - n.log.Error().Err(err).Msg("could not discover peers") - } - }() + // TODO: SUS1 - DHT stuff inside gets multiplied. + for _, topic := range n.cfg.Topics { + go func(topic string) { + err = n.host.DiscoverPeers(ctx, topic) + if err != nil { + n.log.Error().Err(err).Msg("could not discover peers") + } + }(topic) + } // Start the health signal emitter in a separate goroutine. go n.HealthPing(ctx) @@ -47,12 +51,16 @@ func (n *Node) Run(ctx context.Context) error { n.log.Info().Uint("concurrency", n.cfg.Concurrency).Msg("starting node main loop") - // Process topic messages. - // TODO: Perhaps now using a buffered channel makes more sense, with goroutines filling up the channel - // and the main processing loop consumes it. + msgs := make(chan *pubsub.Message, n.cfg.Concurrency) + var topicWorkers sync.WaitGroup + + // Process topic messages - spin up a goroutine for each topic that will feed the main processing loop below. for name, topic := range n.topics { + topicWorkers.Add(1) + go func(name string, subscription *pubsub.Subscription) { + defer topicWorkers.Done() // Message processing loops. for { @@ -70,28 +78,37 @@ func (n *Node) Run(ctx context.Context) error { continue } - n.log.Trace().Str("topic", name).Str("id", msg.ID).Str("peer", msg.ReceivedFrom.String()).Msg("received message") - - // Try to get a slot for processing the request. - n.sema <- struct{}{} - n.wg.Add(1) + n.log.Trace().Str("topic", name).Str("peer", msg.ReceivedFrom.String()).Str("id", msg.ID).Msg("received message") - go func() { - // Free up slot after we're done. - defer n.wg.Done() - defer func() { <-n.sema }() - - err = n.processMessage(ctx, msg.ReceivedFrom, msg.Data) - if err != nil { - n.log.Error().Err(err).Str("topic", name).Str("id", msg.ID).Str("peer_id", msg.ReceivedFrom.String()).Msg("could not process message") - } - }() + msgs <- msg } }(name, topic.subscription) } - n.log.Debug().Msg("waiting for messages being processed") + // Read and process messages. + go func() { + for msg := range msgs { + + n.log.Debug().Str("peer", msg.ReceivedFrom.String()).Str("id", msg.ID).Msg("processing message") + + n.wg.Add(1) + go func(msg *pubsub.Message) { + defer n.wg.Done() + + err = n.processMessage(ctx, msg.ReceivedFrom, msg.Data) + if err != nil { + n.log.Error().Err(err).Str("id", msg.ID).Str("peer", msg.ReceivedFrom.String()).Msg("could not process message") + } + }(msg) + } + }() + // Waiting for topic workers to stop (context canceled). + topicWorkers.Wait() + // Signal that no new messages will be incoming. + close(msgs) + + n.log.Debug().Msg("waiting for messages being processed") n.wg.Wait() return nil From 05c08b4abe04a34730f9b6b4a6c758618748fddc Mon Sep 17 00:00:00 2001 From: Maelkum Date: Sun, 7 Jan 2024 18:53:36 +0100 Subject: [PATCH 4/7] Head node joins a topic if it was previously unknown --- host/subscribe.go | 18 +++++++++++++++++- node/message.go | 16 +++++++++++++--- node/node.go | 11 ++++++++--- node/roll_call.go | 2 +- node/run.go | 7 +++++-- node/subgroups.go | 36 ++++++++++++++++++++++++++++++++++++ 6 files changed, 80 insertions(+), 10 deletions(-) create mode 100644 node/subgroups.go diff --git a/host/subscribe.go b/host/subscribe.go index 6f145764..4c3551b1 100644 --- a/host/subscribe.go +++ b/host/subscribe.go @@ -2,6 +2,7 @@ package host import ( "context" + "errors" "fmt" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -19,11 +20,26 @@ func (h *Host) InitPubSub(ctx context.Context) error { return nil } +func (h *Host) JoinTopic(topic string) (*pubsub.Topic, error) { + + if h.pubsub == nil { + return nil, errors.New("pubsub is not initialized") + } + + // Join the specified topic. + th, err := h.pubsub.Join(topic) + if err != nil { + return nil, fmt.Errorf("could not join topic: %w", err) + } + + return th, nil +} + // Subscribe will have the host start listening to a specified gossipsub topic. func (h *Host) Subscribe(topic string) (*pubsub.Topic, *pubsub.Subscription, error) { // Join the specified topic. - th, err := h.pubsub.Join(topic) + th, err := h.JoinTopic(topic) if err != nil { return nil, nil, fmt.Errorf("could not join topic: %w", err) } diff --git a/node/message.go b/node/message.go index be95794d..03bd2186 100644 --- a/node/message.go +++ b/node/message.go @@ -34,7 +34,8 @@ func (n *Node) subscribeToTopics(ctx context.Context) error { subscription: subscription, } - n.topics[topicName] = ti + // No need for locking since this initialization is done once on start. + n.subgroups.topics[topicName] = ti } return nil @@ -90,9 +91,18 @@ func (n *Node) publishToTopic(ctx context.Context, topic string, msg interface{} return fmt.Errorf("could not encode record: %w", err) } - topicInfo, ok := n.topics[topic] + n.subgroups.RLock() + topicInfo, ok := n.subgroups.topics[topic] + n.subgroups.RUnlock() + if !ok { - return fmt.Errorf("cannot publish to an unknown topic: %s", topic) + n.log.Info().Str("topic", topic).Msg("unknown topic, joining now") + + var err error + topicInfo, err = n.joinTopic(topic) + if err != nil { + return fmt.Errorf("could not join topic (topic: %s): %w", topic, err) + } } // Publish message. diff --git a/node/node.go b/node/node.go index f2f61146..bf44cd95 100644 --- a/node/node.go +++ b/node/node.go @@ -31,7 +31,7 @@ type Node struct { executor blockless.Executor fstore FStore - topics map[string]*topicInfo + subgroups workSubgroups wg *sync.WaitGroup attributes *attributes.Attestation @@ -62,6 +62,11 @@ func New(log zerolog.Logger, host *host.Host, peerStore PeerStore, fstore FStore cfg.Topics = append(cfg.Topics, DefaultTopic) } + subgroups := workSubgroups{ + RWMutex: &sync.RWMutex{}, + topics: make(map[string]*topicInfo), + } + n := &Node{ cfg: cfg, @@ -70,9 +75,9 @@ func New(log zerolog.Logger, host *host.Host, peerStore PeerStore, fstore FStore fstore: fstore, executor: cfg.Execute, - wg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, + subgroups: subgroups, - topics: make(map[string]*topicInfo), rollCall: newQueue(rollCallQueueBufferSize), clusters: make(map[string]consensusExecutor), executeResponses: waitmap.New(), diff --git a/node/roll_call.go b/node/roll_call.go index 560ab167..9564a53b 100644 --- a/node/roll_call.go +++ b/node/roll_call.go @@ -113,7 +113,7 @@ func (n *Node) executeRollCall( ) ([]peer.ID, error) { // Create a logger with relevant context. - log := n.log.With().Str("request", requestID).Str("function", functionID).Int("node_count", nodeCount).Logger() + log := n.log.With().Str("request", requestID).Str("function", functionID).Int("node_count", nodeCount).Str("topic", topic).Logger() log.Info().Msg("performing roll call for request") diff --git a/node/run.go b/node/run.go index 2e27297c..d0bfa47c 100644 --- a/node/run.go +++ b/node/run.go @@ -33,13 +33,15 @@ func (n *Node) Run(ctx context.Context) error { // Discover peers. // NOTE: Potentially signal any error here so that we abort the node // run loop if anything failed. - // TODO: SUS1 - DHT stuff inside gets multiplied. for _, topic := range n.cfg.Topics { go func(topic string) { + + // TODO: Check DHT initialization, now that we're working with multiple topics, may not need to repeat ALL work per topic. err = n.host.DiscoverPeers(ctx, topic) if err != nil { n.log.Error().Err(err).Msg("could not discover peers") } + }(topic) } @@ -55,7 +57,8 @@ func (n *Node) Run(ctx context.Context) error { var topicWorkers sync.WaitGroup // Process topic messages - spin up a goroutine for each topic that will feed the main processing loop below. - for name, topic := range n.topics { + // No need for locking since we're still single threaded here and these (subscribed) topics will not be touched by other code. + for name, topic := range n.subgroups.topics { topicWorkers.Add(1) diff --git a/node/subgroups.go b/node/subgroups.go new file mode 100644 index 00000000..6d21fe39 --- /dev/null +++ b/node/subgroups.go @@ -0,0 +1,36 @@ +package node + +import ( + "fmt" + "sync" +) + +// Subgroups are (optional) groups of nodes that can work on specific things. +// Generally all nodes subscribe to the B7S general topic and can receive work from there. +// However, nodes can also be part of smaller groups, where they join a specific topic where +// some specific work (roll calls) may be published to. +type workSubgroups struct { + *sync.RWMutex + topics map[string]*topicInfo +} + +// wrapper around topic joining + housekeeping. +func (n *Node) joinTopic(topic string) (*topicInfo, error) { + + n.subgroups.Lock() + defer n.subgroups.Unlock() + + th, err := n.host.JoinTopic(topic) + if err != nil { + return nil, fmt.Errorf("could not join topic (topic: %s): %w", topic, err) + } + + // NOTE: No subscription, joining topic only. + ti := &topicInfo{ + handle: th, + } + + n.subgroups.topics[topic] = ti + + return ti, nil +} From 639e80001016a537a0393073db81982c61901426 Mon Sep 17 00:00:00 2001 From: Maelkum Date: Sun, 7 Jan 2024 19:14:12 +0100 Subject: [PATCH 5/7] Updating tests --- api/execute_test.go | 4 ++-- api/install_test.go | 4 ++-- api/node.go | 2 +- node/execute_internal_test.go | 9 ++++++--- node/health_internal_test.go | 9 ++++++--- node/message_internal_test.go | 9 ++++++--- node/rest.go | 10 +++++----- node/rest_internal_test.go | 2 +- node/roll_call_internal_test.go | 9 ++++++--- testing/mocks/node.go | 16 ++++++++-------- 10 files changed, 43 insertions(+), 31 deletions(-) diff --git a/api/execute_test.go b/api/execute_test.go index e58bd5b7..1ce5ad06 100644 --- a/api/execute_test.go +++ b/api/execute_test.go @@ -31,7 +31,7 @@ func TestAPI_Execute(t *testing.T) { expectedCode := codes.OK node := mocks.BaselineNode(t) - node.ExecuteFunctionFunc = func(context.Context, execute.Request) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { + node.ExecuteFunctionFunc = func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { res := execute.ResultMap{ mocks.GenericPeerID: executionResult, @@ -85,7 +85,7 @@ func TestAPI_Execute_HandlesErrors(t *testing.T) { expectedCode := codes.Error node := mocks.BaselineNode(t) - node.ExecuteFunctionFunc = func(context.Context, execute.Request) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { + node.ExecuteFunctionFunc = func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { res := execute.ResultMap{ mocks.GenericPeerID: executionResult, diff --git a/api/install_test.go b/api/install_test.go index ed91a9ed..13acd073 100644 --- a/api/install_test.go +++ b/api/install_test.go @@ -67,7 +67,7 @@ func TestAPI_FunctionInstall_HandlesErrors(t *testing.T) { ) node := mocks.BaselineNode(t) - node.PublishFunctionInstallFunc = func(context.Context, string, string) error { + node.PublishFunctionInstallFunc = func(context.Context, string, string, string) error { time.Sleep(installDuration) return nil } @@ -99,7 +99,7 @@ func TestAPI_FunctionInstall_HandlesErrors(t *testing.T) { t.Parallel() node := mocks.BaselineNode(t) - node.PublishFunctionInstallFunc = func(context.Context, string, string) error { + node.PublishFunctionInstallFunc = func(context.Context, string, string, string) error { return mocks.GenericError } diff --git a/api/node.go b/api/node.go index 07534e94..4a45d4ee 100644 --- a/api/node.go +++ b/api/node.go @@ -10,5 +10,5 @@ import ( type Node interface { ExecuteFunction(ctx context.Context, req execute.Request, subgroup string) (code codes.Code, requestID string, results execute.ResultMap, peers execute.Cluster, err error) ExecutionResult(id string) (execute.Result, bool) - PublishFunctionInstall(ctx context.Context, uri string, cid string, topic string) error + PublishFunctionInstall(ctx context.Context, uri string, cid string, subgroup string) error } diff --git a/node/execute_internal_test.go b/node/execute_internal_test.go index 9ccf215e..6ddb2c79 100644 --- a/node/execute_internal_test.go +++ b/node/execute_internal_test.go @@ -277,7 +277,7 @@ func TestNode_HeadExecute(t *testing.T) { node := createNode(t, blockless.HeadNode) ctx := context.Background() - _, err := node.subscribe(ctx) + err := node.subscribeToTopics(ctx) require.NoError(t, err) // Create a host that will receive the execution response. @@ -331,7 +331,7 @@ func TestNode_HeadExecute(t *testing.T) { node.listenDirectMessages(ctx) defer cancel() - _, err := node.subscribe(ctx) + err := node.subscribeToTopics(ctx) require.NoError(t, err) // Create a host that will simulate a worker. @@ -340,7 +340,10 @@ func TestNode_HeadExecute(t *testing.T) { mockWorker, err := host.New(mocks.NoopLogger, loopback, 0) require.NoError(t, err) - _, subscription, err := mockWorker.Subscribe(ctx, topic) + err = mockWorker.InitPubSub(ctx) + require.NoError(t, err) + + _, subscription, err := mockWorker.Subscribe(topic) require.NoError(t, err) hostAddNewPeer(t, node.host, mockWorker) diff --git a/node/health_internal_test.go b/node/health_internal_test.go index edac6104..64257081 100644 --- a/node/health_internal_test.go +++ b/node/health_internal_test.go @@ -37,7 +37,7 @@ func TestNode_Health(t *testing.T) { nhost, err := host.New(logger, loopback, 0) require.NoError(t, err) - node, err := New(logger, nhost, peerstore, functionHandler, WithRole(blockless.HeadNode), WithHealthInterval(healthInterval), WithTopic(topic)) + node, err := New(logger, nhost, peerstore, functionHandler, WithRole(blockless.HeadNode), WithHealthInterval(healthInterval), WithTopics([]string{topic})) require.NoError(t, err) // Create a host that will listen on the the topic to verify health pings @@ -55,11 +55,14 @@ func TestNode_Health(t *testing.T) { err = node.host.Connect(ctx, *info) require.NoError(t, err) + err = receiver.InitPubSub(ctx) + require.NoError(t, err) + // Have both client and node subscribe to the same topic. - _, subscription, err := receiver.Subscribe(ctx, topic) + _, subscription, err := receiver.Subscribe(topic) require.NoError(t, err) - _, err = node.subscribe(ctx) + err = node.subscribeToTopics(ctx) require.NoError(t, err) go node.HealthPing(ctx) diff --git a/node/message_internal_test.go b/node/message_internal_test.go index d7967993..44182020 100644 --- a/node/message_internal_test.go +++ b/node/message_internal_test.go @@ -64,16 +64,19 @@ func TestNode_Messaging(t *testing.T) { ctx := context.Background() + err = client.InitPubSub(ctx) + require.NoError(t, err) + // Establish a connection between peers. clientInfo := hostGetAddrInfo(t, client) - err := node.host.Connect(ctx, *clientInfo) + err = node.host.Connect(ctx, *clientInfo) require.NoError(t, err) // Have both client and node subscribe to the same topic. - _, subscription, err := client.Subscribe(ctx, topic) + _, subscription, err := client.Subscribe(topic) require.NoError(t, err) - _, err = node.subscribe(ctx) + err = node.subscribeToTopics(ctx) require.NoError(t, err) time.Sleep(subscriptionDiseminationPause) diff --git a/node/rest.go b/node/rest.go index 835182e5..ac9bfb06 100644 --- a/node/rest.go +++ b/node/rest.go @@ -38,7 +38,7 @@ func (n *Node) ExecutionResult(id string) (execute.Result, bool) { } // PublishFunctionInstall publishes a function install message. -func (n *Node) PublishFunctionInstall(ctx context.Context, uri string, cid string, topic string) error { +func (n *Node) PublishFunctionInstall(ctx context.Context, uri string, cid string, subgroup string) error { var req request.InstallFunction if uri != "" { @@ -51,13 +51,13 @@ func (n *Node) PublishFunctionInstall(ctx context.Context, uri string, cid strin req = createInstallMessageFromCID(cid) } - if topic == "" { - topic = DefaultTopic + if subgroup == "" { + subgroup = DefaultTopic } - n.log.Debug().Str("topic", topic).Str("url", req.ManifestURL).Str("cid", req.CID).Msg("publishing function install message") + n.log.Debug().Str("subgroup", subgroup).Str("url", req.ManifestURL).Str("cid", req.CID).Msg("publishing function install message") - err := n.publishToTopic(ctx, topic, req) + err := n.publishToTopic(ctx, subgroup, req) if err != nil { return fmt.Errorf("could not publish message: %w", err) } diff --git a/node/rest_internal_test.go b/node/rest_internal_test.go index d204d637..542ac1bb 100644 --- a/node/rest_internal_test.go +++ b/node/rest_internal_test.go @@ -12,7 +12,7 @@ import ( func TestNode_RestExecuteNotSupportedOnWorker(t *testing.T) { node := createNode(t, blockless.WorkerNode) - _, _, _, _, err := node.ExecuteFunction(context.Background(), mocks.GenericExecutionRequest) + _, _, _, _, err := node.ExecuteFunction(context.Background(), mocks.GenericExecutionRequest, "") require.Error(t, err) } diff --git a/node/roll_call_internal_test.go b/node/roll_call_internal_test.go index f4da5a3a..16f288e9 100644 --- a/node/roll_call_internal_test.go +++ b/node/roll_call_internal_test.go @@ -245,6 +245,9 @@ func TestNode_RollCall(t *testing.T) { receiver, err := host.New(mocks.NoopLogger, loopback, 0) require.NoError(t, err) + err = receiver.InitPubSub(ctx) + require.NoError(t, err) + hostAddNewPeer(t, node.host, receiver) info := hostGetAddrInfo(t, receiver) @@ -252,10 +255,10 @@ func TestNode_RollCall(t *testing.T) { require.NoError(t, err) // Have both client and node subscribe to the same topic. - _, subscription, err := receiver.Subscribe(ctx, topic) + _, subscription, err := receiver.Subscribe(topic) require.NoError(t, err) - _, err = node.subscribe(ctx) + err = node.subscribeToTopics(ctx) require.NoError(t, err) time.Sleep(subscriptionDiseminationPause) @@ -263,7 +266,7 @@ func TestNode_RollCall(t *testing.T) { requestID, err := newRequestID() require.NoError(t, err) - err = node.publishRollCall(ctx, requestID, functionID, consensus.Type(0), nil) + err = node.publishRollCall(ctx, requestID, functionID, consensus.Type(0), "", nil) require.NoError(t, err) deadlineCtx, cancel := context.WithTimeout(ctx, publishTimeout) diff --git a/testing/mocks/node.go b/testing/mocks/node.go index 72165677..aa0d71be 100644 --- a/testing/mocks/node.go +++ b/testing/mocks/node.go @@ -10,16 +10,16 @@ import ( // Node implements the `Node` interface expected by the API. type Node struct { - ExecuteFunctionFunc func(context.Context, execute.Request) (codes.Code, string, execute.ResultMap, execute.Cluster, error) + ExecuteFunctionFunc func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) ExecutionResultFunc func(id string) (execute.Result, bool) - PublishFunctionInstallFunc func(ctx context.Context, uri string, cid string) error + PublishFunctionInstallFunc func(ctx context.Context, uri string, cid string, subgroup string) error } func BaselineNode(t *testing.T) *Node { t.Helper() node := Node{ - ExecuteFunctionFunc: func(context.Context, execute.Request) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { + ExecuteFunctionFunc: func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { results := execute.ResultMap{ GenericPeerID: GenericExecutionResult, @@ -31,7 +31,7 @@ func BaselineNode(t *testing.T) *Node { ExecutionResultFunc: func(id string) (execute.Result, bool) { return GenericExecutionResult, true }, - PublishFunctionInstallFunc: func(ctx context.Context, uri string, cid string) error { + PublishFunctionInstallFunc: func(ctx context.Context, uri string, cid string, subgroup string) error { return nil }, } @@ -39,14 +39,14 @@ func BaselineNode(t *testing.T) *Node { return &node } -func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { - return n.ExecuteFunctionFunc(ctx, req) +func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request, subgroup string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { + return n.ExecuteFunctionFunc(ctx, req, subgroup) } func (n *Node) ExecutionResult(id string) (execute.Result, bool) { return n.ExecutionResultFunc(id) } -func (n *Node) PublishFunctionInstall(ctx context.Context, uri string, cid string) error { - return n.PublishFunctionInstallFunc(ctx, uri, cid) +func (n *Node) PublishFunctionInstall(ctx context.Context, uri string, cid string, subgroup string) error { + return n.PublishFunctionInstallFunc(ctx, uri, cid, subgroup) } From 613c95d46b6beb1fde660301e1a89c034f229865 Mon Sep 17 00:00:00 2001 From: Maelkum Date: Sun, 7 Jan 2024 19:47:05 +0100 Subject: [PATCH 6/7] Update comment --- node/head_execute.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/head_execute.go b/node/head_execute.go index b9879022..a72a183a 100644 --- a/node/head_execute.go +++ b/node/head_execute.go @@ -17,7 +17,7 @@ import ( "github.com/blocklessnetwork/b7s/models/response" ) -// TODO: Check - head node really accepts execution requests from the REST API. Should this message handling be cognizant of `topics`? +// NOTE: head node typically receives execution requests from the REST API. This message handling is not cognizant of subgroups. func (n *Node) headProcessExecute(ctx context.Context, from peer.ID, payload []byte) error { // Unpack the request. From ce3927baa7b0cc25c9a63d35df29fabab8c69459 Mon Sep 17 00:00:00 2001 From: Maelkum Date: Wed, 10 Jan 2024 15:16:53 +0100 Subject: [PATCH 7/7] Revert concurrency setting and message processing loop --- node/message.go | 2 ++ node/node.go | 4 +++- node/run.go | 49 ++++++++++++++++++++----------------------------- 3 files changed, 25 insertions(+), 30 deletions(-) diff --git a/node/message.go b/node/message.go index 03bd2186..d2f2d17e 100644 --- a/node/message.go +++ b/node/message.go @@ -21,6 +21,8 @@ func (n *Node) subscribeToTopics(ctx context.Context) error { return fmt.Errorf("could not initialize pubsub: %w", err) } + n.log.Info().Strs("topics", n.cfg.Topics).Msg("topics node will subscribe to") + // TODO: If some topics/subscriptions failed, cleanup those already subscribed to. for _, topicName := range n.cfg.Topics { diff --git a/node/node.go b/node/node.go index bf44cd95..50d08a0f 100644 --- a/node/node.go +++ b/node/node.go @@ -31,8 +31,9 @@ type Node struct { executor blockless.Executor fstore FStore - subgroups workSubgroups + sema chan struct{} wg *sync.WaitGroup + subgroups workSubgroups attributes *attributes.Attestation rollCall *rollCallQueue @@ -76,6 +77,7 @@ func New(log zerolog.Logger, host *host.Host, peerStore PeerStore, fstore FStore executor: cfg.Execute, wg: &sync.WaitGroup{}, + sema: make(chan struct{}, cfg.Concurrency), subgroups: subgroups, rollCall: newQueue(rollCallQueueBufferSize), diff --git a/node/run.go b/node/run.go index d0bfa47c..94f33212 100644 --- a/node/run.go +++ b/node/run.go @@ -17,11 +17,9 @@ import ( // Run will start the main loop for the node. func (n *Node) Run(ctx context.Context) error { - n.log.Info().Strs("topics", n.cfg.Topics).Msg("topics node will subscribe to") - err := n.subscribeToTopics(ctx) if err != nil { - return fmt.Errorf("could not subscribe to topic: %w", err) + return fmt.Errorf("could not subscribe to topics: %w", err) } // Sync functions now in case they were removed from the storage. @@ -53,17 +51,16 @@ func (n *Node) Run(ctx context.Context) error { n.log.Info().Uint("concurrency", n.cfg.Concurrency).Msg("starting node main loop") - msgs := make(chan *pubsub.Message, n.cfg.Concurrency) - var topicWorkers sync.WaitGroup + var workers sync.WaitGroup // Process topic messages - spin up a goroutine for each topic that will feed the main processing loop below. // No need for locking since we're still single threaded here and these (subscribed) topics will not be touched by other code. for name, topic := range n.subgroups.topics { - topicWorkers.Add(1) + workers.Add(1) go func(name string, subscription *pubsub.Subscription) { - defer topicWorkers.Done() + defer workers.Done() // Message processing loops. for { @@ -83,33 +80,27 @@ func (n *Node) Run(ctx context.Context) error { n.log.Trace().Str("topic", name).Str("peer", msg.ReceivedFrom.String()).Str("id", msg.ID).Msg("received message") - msgs <- msg + // Try to get a slot for processing the request. + n.sema <- struct{}{} + n.wg.Add(1) + + go func(msg *pubsub.Message) { + // Free up slot after we're done. + defer n.wg.Done() + defer func() { <-n.sema }() + + err = n.processMessage(ctx, msg.ReceivedFrom, msg.Data) + if err != nil { + n.log.Error().Err(err).Str("id", msg.ID).Str("peer", msg.ReceivedFrom.String()).Msg("could not process message") + } + }(msg) } }(name, topic.subscription) } - // Read and process messages. - go func() { - for msg := range msgs { - - n.log.Debug().Str("peer", msg.ReceivedFrom.String()).Str("id", msg.ID).Msg("processing message") - - n.wg.Add(1) - go func(msg *pubsub.Message) { - defer n.wg.Done() - - err = n.processMessage(ctx, msg.ReceivedFrom, msg.Data) - if err != nil { - n.log.Error().Err(err).Str("id", msg.ID).Str("peer", msg.ReceivedFrom.String()).Msg("could not process message") - } - }(msg) - } - }() + n.log.Debug().Msg("waiting for workers") - // Waiting for topic workers to stop (context canceled). - topicWorkers.Wait() - // Signal that no new messages will be incoming. - close(msgs) + workers.Wait() n.log.Debug().Msg("waiting for messages being processed") n.wg.Wait()