Skip to content

Commit

Permalink
chain: fix data races in NeutrinoClient
Browse files Browse the repository at this point in the history
Implements a concurrent test on Rescan and NotifyReceived.
Modifies the rescanQuit prop to be a channel of channels with
a buffer of 1.  If a rescan is running and Rescan is called,
the channel in the buffer will be dequeued and closed.  A new
channel will be put on the channel in it's place.

Wraps the handlers in closures that use the new channel.  Avoids
a data race in constantly switching out rescanQuit because
channels are concurrent safe.

Uses a rescannerCh to hold the rescanner and prevent panic calls
when concurrently accessing a single rescanner.  Updates the
TestNeutrinoClientNotifyReceivedRescan function to also make
calls to NotifyBlocks periodically.

Adds a createRescanner method to simplify the NotifyReceived
and Rescan methods.  Adds consistency in the recreation of
a rescanner.  Updates a mock test to allow use of actual
errors.

Ensures that upstream clients that expect a *neutrino.ChainService
to be the CS in a NeutrinoClient have access to all public
methods of the *neutrino.ChainService by expanding the interface
definition.
  • Loading branch information
MStreet3 committed Oct 17, 2022
1 parent 58c2680 commit 4ae09a5
Show file tree
Hide file tree
Showing 3 changed files with 614 additions and 136 deletions.
180 changes: 180 additions & 0 deletions chain/mocks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package chain

import (
"container/list"
"errors"
"testing"

"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/btcutil/gcs"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/neutrino"
"github.com/lightninglabs/neutrino/banman"
"github.com/lightninglabs/neutrino/headerfs"
)

var (
ErrNotImplemented = errors.New("not implemented")
_ NeutrinoRescanner = (*mockRescanner)(nil)
_ NeutrinoChainService = (*mockChainService)(nil)
testBestBlock = &headerfs.BlockStamp{
Height: 42,
}
)

func newMockNeutrinoClient(t *testing.T,
opts ...func(*mockRescanner)) *NeutrinoClient {
t.Helper()
var (
chainParams = &chaincfg.Params{}
chainSvc = &mockChainService{}
newRescanner = func(ro ...neutrino.RescanOption) NeutrinoRescanner {
mrs := &mockRescanner{
updateArgs: list.New(),
}
for _, o := range opts {
o(mrs)
}
return mrs
}
)
return &NeutrinoClient{
CS: chainSvc,
chainParams: chainParams,
newRescanner: newRescanner,
rescanQuitCh: make(chan chan struct{}, 1),
rescannerCh: make(chan NeutrinoRescanner, 1),
}
}

type mockRescanner struct {
updateArgs *list.List
errs []error
rescanQuit <-chan struct{}
}

func (m *mockRescanner) Start() <-chan error {
errs := make(chan error)
go func() {
defer close(errs)
for _, err := range m.errs {
err := err
select {
case <-m.rescanQuit:
return
case errs <- err:
}
}
}()
return errs
}

func (m *mockRescanner) WaitForShutdown() {
// no-op
}

func (m *mockRescanner) Update(opts ...neutrino.UpdateOption) error {
m.updateArgs.PushBack(opts)
return nil
}

type mockChainService struct{}

func (m *mockChainService) Start() error {
return nil
}

func (m *mockChainService) GetBlock(
chainhash.Hash,
...neutrino.QueryOption,
) (*btcutil.Block, error) {
return nil, ErrNotImplemented
}

func (m *mockChainService) GetBlockHeight(*chainhash.Hash) (int32, error) {
return 0, ErrNotImplemented
}

func (m *mockChainService) BestBlock() (*headerfs.BlockStamp, error) {
return m.getBestBlock(), nil
}

func (m *mockChainService) getBestBlock() *headerfs.BlockStamp {
return testBestBlock
}

func (m *mockChainService) GetBlockHash(int64) (*chainhash.Hash, error) {
return nil, ErrNotImplemented
}

func (m *mockChainService) GetBlockHeader(
*chainhash.Hash,
) (*wire.BlockHeader, error) {
return &wire.BlockHeader{}, nil
}

func (m *mockChainService) IsCurrent() bool {
return false
}

func (m *mockChainService) SendTransaction(*wire.MsgTx) error {
return ErrNotImplemented
}

func (m *mockChainService) GetCFilter(
chainhash.Hash,
wire.FilterType,
...neutrino.QueryOption,
) (*gcs.Filter, error) {
return nil, ErrNotImplemented
}

func (m *mockChainService) GetUtxo(
_ ...neutrino.RescanOption,
) (*neutrino.SpendReport, error) {
return nil, ErrNotImplemented
}

func (m *mockChainService) BanPeer(string, banman.Reason) error {
return ErrNotImplemented
}

func (m *mockChainService) IsBanned(addr string) bool {
panic(ErrNotImplemented)
}

func (m *mockChainService) AddPeer(*neutrino.ServerPeer) {
panic(ErrNotImplemented)
}

func (m *mockChainService) AddBytesSent(uint64) {
panic(ErrNotImplemented)
}

func (m *mockChainService) AddBytesReceived(uint64) {
panic(ErrNotImplemented)
}

func (m *mockChainService) NetTotals() (uint64, uint64) {
panic(ErrNotImplemented)
}

func (m *mockChainService) UpdatePeerHeights(
*chainhash.Hash, int32, *neutrino.ServerPeer,
) {
panic(ErrNotImplemented)
}

func (m *mockChainService) ChainParams() chaincfg.Params {
panic(ErrNotImplemented)
}

func (m *mockChainService) Stop() error {
panic(ErrNotImplemented)
}

func (m *mockChainService) PeerByAddr(string) *neutrino.ServerPeer {
panic(ErrNotImplemented)
}
Loading

0 comments on commit 4ae09a5

Please sign in to comment.