Skip to content

Commit

Permalink
tapas_loopout
Browse files Browse the repository at this point in the history
  • Loading branch information
sputn1ck committed Nov 21, 2024
1 parent 0fe952a commit 731271a
Show file tree
Hide file tree
Showing 36 changed files with 6,619 additions and 134 deletions.
421 changes: 421 additions & 0 deletions assets/actions.go

Large diffs are not rendered by default.

134 changes: 134 additions & 0 deletions assets/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package assets

import (
"context"

"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/btcutil/psbt"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop/fsm"
"github.com/lightninglabs/taproot-assets/asset"
"github.com/lightninglabs/taproot-assets/tappsbt"
"github.com/lightninglabs/taproot-assets/taprpc"
wrpc "github.com/lightninglabs/taproot-assets/taprpc/assetwalletrpc"
"github.com/lightninglabs/taproot-assets/taprpc/mintrpc"
"github.com/lightninglabs/taproot-assets/taprpc/tapdevrpc"
"github.com/lightninglabs/taproot-assets/taprpc/universerpc"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
)

const (
// DefaultSwapCSVExpiry is the default expiry for a swap in blocks.
DefaultSwapCSVExpiry = int32(24)

defaultHtlcFeeConfTarget = 3
defaultHtlcConfRequirement = 2

AssetKeyFamily = 696969
)

// TapdClient is an interface that groups the methods required to interact with
// the taproot-assets server and the wallet.
type AssetClient interface {
taprpc.TaprootAssetsClient
wrpc.AssetWalletClient
mintrpc.MintClient
universerpc.UniverseClient
tapdevrpc.TapDevClient

// FundAndSignVpacket funds ands signs a vpacket.
FundAndSignVpacket(ctx context.Context,
vpkt *tappsbt.VPacket) (*tappsbt.VPacket, error)

// PrepareAndCommitVirtualPsbts prepares and commits virtual psbts.
PrepareAndCommitVirtualPsbts(ctx context.Context,
vpkt *tappsbt.VPacket, feeRateSatPerKVByte chainfee.SatPerVByte) (
*psbt.Packet, []*tappsbt.VPacket, []*tappsbt.VPacket,
*wrpc.CommitVirtualPsbtsResponse, error)

// LogAndPublish logs and publishes the virtual psbts.
LogAndPublish(ctx context.Context, btcPkt *psbt.Packet,
activeAssets []*tappsbt.VPacket, passiveAssets []*tappsbt.VPacket,
commitResp *wrpc.CommitVirtualPsbtsResponse) (*taprpc.SendAssetResponse,
error)

// CheckBalanceById checks the balance of an asset by its id.
CheckBalanceById(ctx context.Context, assetId []byte,
requestedBalance btcutil.Amount) error

// DeriveNewKeys derives a new internal and script key.
DeriveNewKeys(ctx context.Context) (asset.ScriptKey,
keychain.KeyDescriptor, error)
}

// SwapStore is an interface that groups the methods required to store swap
// information.
type SwapStore interface {
// CreateAssetSwapOut creates a new swap out in the store.
CreateAssetSwapOut(ctx context.Context, swap *SwapOut) error

// UpdateAssetSwapHtlcOutpoint updates the htlc outpoint of a swap out.
UpdateAssetSwapHtlcOutpoint(ctx context.Context, swapHash lntypes.Hash,
outpoint *wire.OutPoint, confirmationHeight int32) error

// UpdateAssetSwapOutProof updates the proof of a swap out.
UpdateAssetSwapOutProof(ctx context.Context, swapHash lntypes.Hash,
rawProof []byte) error

// UpdateAssetSwapOutSweepTx updates the sweep tx of a swap out.
UpdateAssetSwapOutSweepTx(ctx context.Context,
swapHash lntypes.Hash, sweepTxid chainhash.Hash,
confHeight int32, sweepPkscript []byte) error

// InsertAssetSwapUpdate inserts a new swap update in the store.
InsertAssetSwapUpdate(ctx context.Context,
swapHash lntypes.Hash, state fsm.StateType) error

UpdateAssetSwapOutPreimage(ctx context.Context,
swapHash lntypes.Hash, preimage lntypes.Preimage) error
}

// BlockHeightSubscriber is responsible for subscribing to the expiry height
// of a swap, as well as getting the current block height.
type BlockHeightSubscriber interface {
// SubscribeExpiry subscribes to the expiry of a swap. It returns true
// if the expiry is already past. Otherwise, it returns false and calls
// the expiryFunc when the expiry height is reached.
SubscribeExpiry(swapHash [32]byte,
expiryHeight int32, expiryFunc func()) bool
// GetBlockHeight returns the current block height.
GetBlockHeight() int32
}

// InvoiceSubscriber is responsible for subscribing to an invoice.
type InvoiceSubscriber interface {
// SubscribeInvoice subscribes to an invoice. The update callback is
// called when the invoice is updated and the error callback is called
// when an error occurs.
SubscribeInvoice(ctx context.Context, invoiceHash lntypes.Hash,
updateCallback func(lndclient.InvoiceUpdate, error)) error
}

// TxConfirmationSubscriber is responsible for subscribing to the confirmation
// of a transaction.
type TxConfirmationSubscriber interface {

// SubscribeTxConfirmation subscribes to the confirmation of a
// pkscript on the chain. The callback is called when the pkscript is
// confirmed or when an error occurs.
SubscribeTxConfirmation(ctx context.Context, swapHash lntypes.Hash,
txid *chainhash.Hash, pkscript []byte, numConfs int32,
eightHint int32, cb func(*chainntnfs.TxConfirmation, error)) error
}

// ExchangeRateProvider is responsible for providing the exchange rate between
// assets.
type ExchangeRateProvider interface {
// GetSatsPerAssetUnit returns the amount of satoshis per asset unit.
GetSatsPerAssetUnit(assetId []byte) (btcutil.Amount, error)
}
26 changes: 26 additions & 0 deletions assets/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package assets

import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)

// Subsystem defines the sub system name of this package.
const Subsystem = "ASSETS"

// log is a logger that is initialized with no output filters. This means the
// package will not perform any logging by default until the caller requests
// it.
var log btclog.Logger

// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger(Subsystem, nil))
}

// UseLogger uses a specified Logger to output package logging info. This
// should be used in preference to SetLogWriter if the caller is also using
// btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}
203 changes: 203 additions & 0 deletions assets/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package assets

import (
"context"
"sync"
"time"

"github.com/btcsuite/btcd/btcutil"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop/fsm"
loop_rpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightninglabs/loop/utils"
"github.com/lightninglabs/taproot-assets/taprpc"
"github.com/lightningnetwork/lnd/lntypes"
)

const (
ClientKeyFamily = 696969
)

type Config struct {
AssetClient *TapdClient
Wallet lndclient.WalletKitClient
// ExchangeRateProvider is the exchange rate provider.
ExchangeRateProvider *FixedExchangeRateProvider
Signer lndclient.SignerClient
ChainNotifier lndclient.ChainNotifierClient
Router lndclient.RouterClient
LndClient lndclient.LightningClient
Store *PostgresStore
ServerClient loop_rpc.AssetsSwapServerClient
}

type AssetsSwapManager struct {
cfg *Config

expiryManager *utils.ExpiryManager
txConfManager *utils.TxSubscribeConfirmationManager

blockHeight int32
runCtx context.Context
activeSwapOuts map[lntypes.Hash]*OutFSM

sync.Mutex
}

func NewAssetSwapServer(config *Config) *AssetsSwapManager {
return &AssetsSwapManager{
cfg: config,

activeSwapOuts: make(map[lntypes.Hash]*OutFSM),
}
}

func (m *AssetsSwapManager) Run(ctx context.Context, blockHeight int32) error {
m.runCtx = ctx
m.blockHeight = blockHeight

// Get our tapd client info.
tapdInfo, err := m.cfg.AssetClient.GetInfo(
ctx, &taprpc.GetInfoRequest{},
)
if err != nil {
return err
}
log.Infof("Tapd info: %v", tapdInfo)

// Create our subscriptionManagers.
m.expiryManager = utils.NewExpiryManager(m.cfg.ChainNotifier)
m.txConfManager = utils.NewTxSubscribeConfirmationManager(
m.cfg.ChainNotifier,
)

// Start the expiry manager.
errChan := make(chan error, 1)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := m.expiryManager.Start(ctx, blockHeight)
if err != nil {
log.Errorf("Expiry manager failed: %v", err)
errChan <- err
log.Errorf("Gude1")
}
}()

// Recover all the active asset swap outs from the database.
err = m.recoverSwapOuts(ctx)
if err != nil {
return err
}

for {
select {
case err := <-errChan:
log.Errorf("Gude2")
return err

case <-ctx.Done():
log.Errorf("Gude3")
// wg.Wait()
log.Errorf("Gude4")
return nil
}
}
}

func (m *AssetsSwapManager) NewSwapOut(ctx context.Context,
amt btcutil.Amount, asset []byte) (*OutFSM, error) {

// Create a new out fsm.
outFSM := NewOutFSM(m.runCtx, m.getFSMOutConfig())

// Send the initial event to the fsm.
err := outFSM.SendEvent(
ctx, OnRequestAssetOut, &InitSwapOutContext{
Amount: amt,
AssetId: asset,
},
)
if err != nil {
return nil, err
}
// Check if the fsm has an error.
if outFSM.LastActionError != nil {
return nil, outFSM.LastActionError
}

// Wait for the fsm to be in the state we expect.
err = outFSM.DefaultObserver.WaitForState(
ctx, time.Second*15, PayPrepay,
fsm.WithAbortEarlyOnErrorOption(),
)
if err != nil {
return nil, err
}

// Add the swap to the active swap outs.
m.Lock()
m.activeSwapOuts[outFSM.SwapOut.SwapHash] = outFSM
m.Unlock()

return outFSM, nil
}

// recoverSwapOuts recovers all the active asset swap outs from the database.
func (m *AssetsSwapManager) recoverSwapOuts(ctx context.Context) error {
// Fetch all the active asset swap outs from the database.
activeSwapOuts, err := m.cfg.Store.GetActiveAssetOuts(ctx)
if err != nil {
return err
}

for _, swapOut := range activeSwapOuts {
log.Debugf("Recovering asset out %v with state %v",
swapOut.SwapHash, swapOut.State)

swapOutFSM := NewOutFSMFromSwap(
ctx, m.getFSMOutConfig(), swapOut,
)

m.Lock()
m.activeSwapOuts[swapOut.SwapHash] = swapOutFSM
m.Unlock()

// As SendEvent can block, we'll start a goroutine to process
// the event.
go func() {
err := swapOutFSM.SendEvent(ctx, OnRecover, nil)
if err != nil {
log.Errorf("FSM %v Error sending recover "+
"event %v, state: %v",
swapOutFSM.SwapOut.SwapHash,
err, swapOutFSM.SwapOut.State)
}
}()
}

return nil
}

// getFSMOutConfig returns a fsmconfig from the manager.
func (m *AssetsSwapManager) getFSMOutConfig() *FSMConfig {
return &FSMConfig{
TapdClient: m.cfg.AssetClient,
AssetClient: m.cfg.ServerClient,
BlockHeightSubscriber: m.expiryManager,
TxConfSubscriber: m.txConfManager,
ExchangeRateProvider: m.cfg.ExchangeRateProvider,
Wallet: m.cfg.Wallet,
Router: m.cfg.Router,

Store: m.cfg.Store,
Signer: m.cfg.Signer,
}
}

func (m *AssetsSwapManager) ListSwapOutoutputs(ctx context.Context) ([]*SwapOut,
error) {

return m.cfg.Store.GetAllAssetOuts(ctx)
}
Loading

0 comments on commit 731271a

Please sign in to comment.