Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context reloaded #163

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions autopilot/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package autopilot

import (
"bytes"
"context"
"fmt"
"math/rand"
"net"
Expand Down Expand Up @@ -39,7 +40,8 @@ type Config struct {
// ConnectToPeer attempts to connect to the peer using one of its
// advertised addresses. The boolean returned signals whether the peer
// was already connected.
ConnectToPeer func(*btcec.PublicKey, []net.Addr) (bool, error)
ConnectToPeer func(context.Context, *btcec.PublicKey, []net.Addr) (bool,
error)

// DisconnectPeer attempts to disconnect the peer with the given public
// key.
Expand Down Expand Up @@ -199,20 +201,20 @@ func New(cfg Config, initialState []LocalChannel) (*Agent, error) {

// Start starts the agent along with any goroutines it needs to perform its
// normal duties.
func (a *Agent) Start() error {
func (a *Agent) Start(ctx context.Context) error {
var err error
a.started.Do(func() {
err = a.start()
err = a.start(ctx)
})
return err
}

func (a *Agent) start() error {
func (a *Agent) start(ctx context.Context) error {
rand.Seed(time.Now().Unix())
log.Infof("Autopilot Agent starting")

a.wg.Add(1)
go a.controller()
go a.controller(ctx)

return nil
}
Expand Down Expand Up @@ -401,7 +403,7 @@ func mergeChanState(pendingChans map[NodeID]LocalChannel,
// and external state changes as a result of decisions it makes w.r.t channel
// allocation, or attributes affecting its control loop being updated by the
// backing Lightning Node.
func (a *Agent) controller() {
func (a *Agent) controller(ctx context.Context) {
defer a.wg.Done()

// We'll start off by assigning our starting balance, and injecting
Expand Down Expand Up @@ -539,7 +541,7 @@ func (a *Agent) controller() {
log.Infof("Triggering attachment directive dispatch, "+
"total_funds=%v", a.totalBalance)

err := a.openChans(availableFunds, numChans, totalChans)
err := a.openChans(ctx, availableFunds, numChans, totalChans)
if err != nil {
log.Errorf("Unable to open channels: %v", err)
}
Expand All @@ -548,8 +550,8 @@ func (a *Agent) controller() {

// openChans queries the agent's heuristic for a set of channel candidates, and
// attempts to open channels to them.
func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
totalChans []LocalChannel) error {
func (a *Agent) openChans(ctx context.Context, availableFunds btcutil.Amount,
numChans uint32, totalChans []LocalChannel) error {

// As channel size we'll use the maximum channel size available.
chanSize := a.cfg.Constraints.MaxChanSize()
Expand Down Expand Up @@ -716,7 +718,7 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
a.pendingConns[nodeID] = struct{}{}

a.wg.Add(1)
go a.executeDirective(*chanCandidate)
go a.executeDirective(ctx, *chanCandidate)
}
return nil
}
Expand All @@ -725,7 +727,9 @@ func (a *Agent) openChans(availableFunds btcutil.Amount, numChans uint32,
// the given attachment directive, and open a channel of the given size.
//
// NOTE: MUST be run as a goroutine.
func (a *Agent) executeDirective(directive AttachmentDirective) {
func (a *Agent) executeDirective(ctx context.Context,
directive AttachmentDirective) {

defer a.wg.Done()

// We'll start out by attempting to connect to the peer in order to
Expand All @@ -746,7 +750,7 @@ func (a *Agent) executeDirective(directive AttachmentDirective) {
// TODO(halseth): use DialContext to cancel on transport level.
go func() {
alreadyConnected, err := a.cfg.ConnectToPeer(
pub, directive.Addrs,
ctx, pub, directive.Addrs,
)
if err != nil {
select {
Expand Down
5 changes: 3 additions & 2 deletions autopilot/manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autopilot

import (
"context"
"fmt"
"sync"

Expand Down Expand Up @@ -96,7 +97,7 @@ func (m *Manager) IsActive() bool {

// StartAgent creates and starts an autopilot agent from the Manager's
// config.
func (m *Manager) StartAgent() error {
func (m *Manager) StartAgent(ctx context.Context) error {
m.Lock()
defer m.Unlock()

Expand All @@ -119,7 +120,7 @@ func (m *Manager) StartAgent() error {
return err
}

if err := pilot.Start(); err != nil {
if err := pilot.Start(ctx); err != nil {
return err
}

Expand Down
16 changes: 9 additions & 7 deletions chanbackup/recover.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chanbackup

import (
"context"
"net"

"github.com/btcsuite/btcd/btcec/v2"
Expand Down Expand Up @@ -29,7 +30,8 @@ type PeerConnector interface {
// available addresses. Once this method returns with a non-nil error,
// the connector should attempt to persistently connect to the target
// peer in the background as a persistent attempt.
ConnectPeer(node *btcec.PublicKey, addrs []net.Addr) error
ConnectPeer(ctx context.Context, node *btcec.PublicKey,
addrs []net.Addr) error
}

// Recover attempts to recover the static channel state from a set of static
Expand All @@ -41,7 +43,7 @@ type PeerConnector interface {
// well, in order to expose the addressing information required to locate to
// and connect to each peer in order to initiate the recovery protocol.
// The number of channels that were successfully restored is returned.
func Recover(backups []Single, restorer ChannelRestorer,
func Recover(ctx context.Context, backups []Single, restorer ChannelRestorer,
peerConnector PeerConnector) (int, error) {

var numRestored int
Expand Down Expand Up @@ -70,7 +72,7 @@ func Recover(backups []Single, restorer ChannelRestorer,
backup.FundingOutpoint)

err = peerConnector.ConnectPeer(
backup.RemoteNodePub, backup.Addresses,
ctx, backup.RemoteNodePub, backup.Addresses,
)
if err != nil {
return numRestored, err
Expand All @@ -95,7 +97,7 @@ func Recover(backups []Single, restorer ChannelRestorer,
// established, then the PeerConnector will continue to attempt to re-establish
// a persistent connection in the background. The number of channels that were
// successfully restored is returned.
func UnpackAndRecoverSingles(singles PackedSingles,
func UnpackAndRecoverSingles(ctx context.Context, singles PackedSingles,
keyChain keychain.KeyRing, restorer ChannelRestorer,
peerConnector PeerConnector) (int, error) {

Expand All @@ -104,7 +106,7 @@ func UnpackAndRecoverSingles(singles PackedSingles,
return 0, err
}

return Recover(chanBackups, restorer, peerConnector)
return Recover(ctx, chanBackups, restorer, peerConnector)
}

// UnpackAndRecoverMulti is a one-shot method, that given a set of packed
Expand All @@ -114,7 +116,7 @@ func UnpackAndRecoverSingles(singles PackedSingles,
// established, then the PeerConnector will continue to attempt to re-establish
// a persistent connection in the background. The number of channels that were
// successfully restored is returned.
func UnpackAndRecoverMulti(packedMulti PackedMulti,
func UnpackAndRecoverMulti(ctx context.Context, packedMulti PackedMulti,
keyChain keychain.KeyRing, restorer ChannelRestorer,
peerConnector PeerConnector) (int, error) {

Expand All @@ -123,5 +125,5 @@ func UnpackAndRecoverMulti(packedMulti PackedMulti,
return 0, err
}

return Recover(chanBackups.StaticBackups, restorer, peerConnector)
return Recover(ctx, chanBackups.StaticBackups, restorer, peerConnector)
}
9 changes: 7 additions & 2 deletions chanrestore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lnd

import (
"context"
"fmt"
"math"
"net"
Expand Down Expand Up @@ -309,7 +310,9 @@ var _ chanbackup.ChannelRestorer = (*chanDBRestorer)(nil)
// as a persistent attempt.
//
// NOTE: Part of the chanbackup.PeerConnector interface.
func (s *server) ConnectPeer(nodePub *btcec.PublicKey, addrs []net.Addr) error {
func (s *server) ConnectPeer(ctx context.Context, nodePub *btcec.PublicKey,
addrs []net.Addr) error {

// Before we connect to the remote peer, we'll remove any connections
// to ensure the new connection is created after this new link/channel
// is known.
Expand All @@ -333,7 +336,9 @@ func (s *server) ConnectPeer(nodePub *btcec.PublicKey, addrs []net.Addr) error {
// Attempt to connect to the peer using this full address. If
// we're unable to connect to them, then we'll try the next
// address in place of it.
err := s.ConnectToPeer(netAddr, true, s.cfg.ConnectionTimeout)
err := s.ConnectToPeer(
ctx, netAddr, true, s.cfg.ConnectionTimeout,
)

// If we're already connected to this peer, then we don't
// consider this an error, so we'll exit here.
Expand Down
Loading
Loading