Skip to content

Commit

Permalink
Optimize Relay (#108)
Browse files Browse the repository at this point in the history
* Optimize Relay

Signed-off-by: Dongri Jin <[email protected]>

* Fix curerntTime

Signed-off-by: Dongri Jin <[email protected]>

* Add src, dst current time

Signed-off-by: Dongri Jin <[email protected]>

* Add optimizeRelay

Signed-off-by: Dongri Jin <[email protected]>

* Refactoring optimize relay

Signed-off-by: Dongri Jin <[email protected]>

* * Add src, dst parameter
* Fix start time variable
* Change relay parameter 

Signed-off-by: Dongri Jin <[email protected]>

* Fix update clients

Signed-off-by: Dongri Jin <[email protected]>

* Fix src, dst parameters


Signed-off-by: Dongri Jin <[email protected]>

* Fix src, dst

Signed-off-by: Dongri Jin <[email protected]>

* Fix tm2tm test

Signed-off-by: Dongri Jin <[email protected]>

* Refactoring shouldExecuteRelay

Signed-off-by: Dongri Jin <[email protected]>

* Refactoring test-service

Signed-off-by: Dongri Jin <[email protected]>

* Add ack test to service

Signed-off-by: Dongri Jin <[email protected]>

* Fix error log

Signed-off-by: Dongri Jin <[email protected]>

* Fix service interval


Signed-off-by: Dongri Jin <[email protected]>

* Fix test sleep


Signed-off-by: Dongri Jin <[email protected]>

* Fix service test

Signed-off-by: Dongri Jin <[email protected]>

* Remove startTime
Use eventHeight


Signed-off-by: Dongri Jin <[email protected]>

* Add logger


Signed-off-by: Dongri Jin <[email protected]>

* * Fix Error messages
* Fix check src, dst timestamp
* Fix test case comments


Signed-off-by: Dongri Jin <[email protected]>

* Fix error logs

Signed-off-by: Dongri Jin <[email protected]>

* Fix error message


Signed-off-by: Dongri Jin <[email protected]>

* Fix error message

Signed-off-by: Dongri Jin <[email protected]>

---------

Signed-off-by: Dongri Jin <[email protected]>
  • Loading branch information
dongrie authored Jan 31, 2024
1 parent f9ed49d commit bf144b0
Show file tree
Hide file tree
Showing 13 changed files with 439 additions and 100 deletions.
22 changes: 11 additions & 11 deletions chains/tendermint/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,22 +184,22 @@ func (c *Chain) QueryUnreceivedPackets(ctx core.QueryContext, seqs []uint64) ([]
PacketCommitmentSequences: seqs,
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to query unreceived packets: error=%w height=%v", err, ctx.Height())
}
return res.Sequences, nil
}

func (c *Chain) QueryUnfinalizedRelayPackets(ctx core.QueryContext, counterparty core.LightClientICS04Querier) (core.PacketInfoList, error) {
res, err := c.queryPacketCommitments(ctx, 0, 1000)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to query packet commitments: error=%w height=%v", err, ctx.Height())
}

var packets core.PacketInfoList
for _, ps := range res.Commitments {
packet, height, err := c.querySentPacket(ctx, ps.Sequence)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to query sent packet: error=%w height=%v", err, ctx.Height())
}
packets = append(packets, &core.PacketInfo{
Packet: *packet,
Expand All @@ -210,14 +210,14 @@ func (c *Chain) QueryUnfinalizedRelayPackets(ctx core.QueryContext, counterparty

var counterpartyCtx core.QueryContext
if counterpartyH, err := counterparty.GetLatestFinalizedHeader(); err != nil {
return nil, err
return nil, fmt.Errorf("failed to get latest finalized header: error=%w height=%v", err, ctx.Height())
} else {
counterpartyCtx = core.NewQueryContext(context.TODO(), counterpartyH.GetHeight())
}

seqs, err := counterparty.QueryUnreceivedPackets(counterpartyCtx, packets.ExtractSequenceList())
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to query counterparty for unreceived packets: error=%w, height=%v", err, counterpartyCtx.Height())
}
packets = packets.Filter(seqs)

Expand All @@ -233,26 +233,26 @@ func (c *Chain) QueryUnreceivedAcknowledgements(ctx core.QueryContext, seqs []ui
PacketAckSequences: seqs,
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to query unreceived acks: : error=%w height=%v", err, ctx.Height())
}
return res.Sequences, nil
}

func (c *Chain) QueryUnfinalizedRelayAcknowledgements(ctx core.QueryContext, counterparty core.LightClientICS04Querier) (core.PacketInfoList, error) {
res, err := c.queryPacketAcknowledgementCommitments(ctx, 0, 1000)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to query packet acknowledgement commitments: error=%w height=%v", err, ctx.Height())
}

var packets core.PacketInfoList
for _, ps := range res.Acknowledgements {
packet, rpHeight, err := c.queryReceivedPacket(ctx, ps.Sequence)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to query received packet: error=%w height=%v", err, ctx.Height())
}
ack, _, err := c.queryWrittenAcknowledgement(ctx, ps.Sequence)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to query written acknowledgement: error=%w height=%v", err, ctx.Height())
}
packets = append(packets, &core.PacketInfo{
Packet: *packet,
Expand All @@ -263,14 +263,14 @@ func (c *Chain) QueryUnfinalizedRelayAcknowledgements(ctx core.QueryContext, cou

var counterpartyCtx core.QueryContext
if counterpartyH, err := counterparty.GetLatestFinalizedHeader(); err != nil {
return nil, err
return nil, fmt.Errorf("failed to get latest finalized header: error=%w height=%v", err, ctx.Height())
} else {
counterpartyCtx = core.NewQueryContext(context.TODO(), counterpartyH.GetHeight())
}

seqs, err := counterparty.QueryUnreceivedAcknowledgements(counterpartyCtx, packets.ExtractSequenceList())
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to query counterparty for unreceived acknowledgements: error=%w height=%v", err, counterpartyCtx.Height())
}
packets = packets.Filter(seqs)

Expand Down
30 changes: 25 additions & 5 deletions cmd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,18 @@ func serviceCmd(ctx *config.Context) *cobra.Command {

func startCmd(ctx *config.Context) *cobra.Command {
const (
flagRelayInterval = "relay-interval"
flagPrometheusAddr = "prometheus-addr"
flagRelayInterval = "relay-interval"
flagPrometheusAddr = "prometheus-addr"
flagSrcRelayOptimizeInterval = "src-relay-optimize-interval"
flagSrcRelayOptimizeCount = "src-relay-optimize-count"
flagDstRelayOptimizeInterval = "dst-relay-optimize-interval"
flagDstRelayOptimizeCount = "dst-relay-optimize-count"
)
const (
defaultRelayInterval = 3 * time.Second
defaultPrometheusAddr = "localhost:2223"
defaultRelayInterval = 3 * time.Second
defaultPrometheusAddr = "localhost:2223"
defaultRelayOptimizeInterval = 10 * time.Second
defaultRelayOptimizeCount = 5
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -60,10 +66,24 @@ func startCmd(ctx *config.Context) *cobra.Command {
if err := st.SetupRelay(context.TODO(), c[src], c[dst]); err != nil {
return err
}
return core.StartService(context.Background(), st, c[src], c[dst], viper.GetDuration(flagRelayInterval))
return core.StartService(
context.Background(),
st,
c[src],
c[dst],
viper.GetDuration(flagRelayInterval),
viper.GetDuration(flagSrcRelayOptimizeInterval),
viper.GetUint64(flagSrcRelayOptimizeCount),
viper.GetDuration(flagDstRelayOptimizeInterval),
viper.GetUint64(flagDstRelayOptimizeCount),
)
},
}
cmd.Flags().Duration(flagRelayInterval, defaultRelayInterval, "time interval to perform relays")
cmd.Flags().String(flagPrometheusAddr, defaultPrometheusAddr, "host address to which the prometheus exporter listens")
cmd.Flags().Duration(flagSrcRelayOptimizeInterval, defaultRelayOptimizeInterval, "maximum time interval to delay relays for optimization")
cmd.Flags().Uint64(flagSrcRelayOptimizeCount, defaultRelayOptimizeCount, "maximum number of relays to delay for optimization")
cmd.Flags().Duration(flagDstRelayOptimizeInterval, defaultRelayOptimizeInterval, "maximum time interval to delay relays for optimization")
cmd.Flags().Uint64(flagDstRelayOptimizeCount, defaultRelayOptimizeCount, "maximum number of relays to delay for optimization")
return cmd
}
18 changes: 14 additions & 4 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,18 @@ func relayMsgsCmd(ctx *config.Context) *cobra.Command {

msgs := core.NewRelayMsgs()

if m, err := st.UpdateClients(c[src], c[dst], sp, &core.RelayPackets{}, sh, viper.GetBool(flagDoRefresh)); err != nil {
doExecuteRelaySrc := len(sp.Dst) > 0
doExecuteRelayDst := len(sp.Src) > 0
doExecuteAckSrc := false
doExecuteAckDst := false

if m, err := st.UpdateClients(c[src], c[dst], doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, sh, viper.GetBool(flagDoRefresh)); err != nil {
return err
} else {
msgs.Merge(m)
}

if m, err := st.RelayPackets(c[src], c[dst], sp, sh); err != nil {
if m, err := st.RelayPackets(c[src], c[dst], sp, sh, doExecuteRelaySrc, doExecuteRelayDst); err != nil {
return err
} else {
msgs.Merge(m)
Expand Down Expand Up @@ -315,13 +320,18 @@ func relayAcksCmd(ctx *config.Context) *cobra.Command {

msgs := core.NewRelayMsgs()

if m, err := st.UpdateClients(c[src], c[dst], &core.RelayPackets{}, sp, sh, viper.GetBool(flagDoRefresh)); err != nil {
doExecuteRelaySrc := false
doExecuteRelayDst := false
doExecuteAckSrc := len(sp.Dst) > 0
doExecuteAckDst := len(sp.Src) > 0

if m, err := st.UpdateClients(c[src], c[dst], doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, sh, viper.GetBool(flagDoRefresh)); err != nil {
return err
} else {
msgs.Merge(m)
}

if m, err := st.RelayAcknowledgements(c[src], c[dst], sp, sh); err != nil {
if m, err := st.RelayAcknowledgements(c[src], c[dst], sp, sh, doExecuteAckSrc, doExecuteAckDst); err != nil {
return err
} else {
msgs.Merge(m)
Expand Down
66 changes: 34 additions & 32 deletions core/naive-strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader
now := time.Now()
srcPackets, err = src.QueryUnfinalizedRelayPackets(srcCtx, dst)
if err != nil {
return err
return fmt.Errorf("failed to query unfinalized relay packets on src chain: %w", err)
}
logger.TimeTrack(now, "QueryUnfinalizedRelayPackets", "queried_chain", "src", "num_packets", len(srcPackets))
return nil
Expand All @@ -123,7 +123,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader
now := time.Now()
dstPackets, err = dst.QueryUnfinalizedRelayPackets(dstCtx, src)
if err != nil {
return err
return fmt.Errorf("failed to query unfinalized relay packets on dst chain: %w", err)
}
logger.TimeTrack(now, "QueryUnfinalizedRelayPackets", "queried_chain", "dst", "num_packets", len(dstPackets))
return nil
Expand Down Expand Up @@ -168,7 +168,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader
now := time.Now()
seqs, err := dst.QueryUnreceivedPackets(dstCtx, srcPackets.ExtractSequenceList())
if err != nil {
return err
return fmt.Errorf("failed to query unreceived packets on dst chain: %w", err)
}
logger.TimeTrack(now, "QueryUnreceivedPackets", "queried_chain", "dst", "num_seqs", len(seqs))
srcPackets = srcPackets.Filter(seqs)
Expand All @@ -179,7 +179,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader
now := time.Now()
seqs, err := src.QueryUnreceivedPackets(srcCtx, dstPackets.ExtractSequenceList())
if err != nil {
return err
return fmt.Errorf("failed to query unreceived packets on src chain: %w", err)
}
logger.TimeTrack(now, "QueryUnreceivedPackets", "queried_chain", "src", "num_seqs", len(seqs))
dstPackets = dstPackets.Filter(seqs)
Expand All @@ -199,7 +199,7 @@ func (st *NaiveStrategy) UnrelayedPackets(src, dst *ProvableChain, sh SyncHeader
}, nil
}

func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) (*RelayMsgs, error) {
func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteRelaySrc, doExecuteRelayDst bool) (*RelayMsgs, error) {
logger := GetChannelPairLogger(src, dst)
defer logger.TimeTrack(time.Now(), "RelayPackets", "num_src", len(rp.Src), "num_dst", len(rp.Dst))

Expand All @@ -225,21 +225,26 @@ func (st *NaiveStrategy) RelayPackets(src, dst *ProvableChain, rp *RelayPackets,
return nil, err
}

msgs.Dst, err = collectPackets(srcCtx, src, rp.Src, dstAddress)
if err != nil {
logger.Error(
"error collecting packets",
err,
)
return nil, err
if doExecuteRelayDst {
msgs.Dst, err = collectPackets(srcCtx, src, rp.Src, dstAddress)
if err != nil {
logger.Error(
"error collecting packets",
err,
)
return nil, err
}
}
msgs.Src, err = collectPackets(dstCtx, dst, rp.Dst, srcAddress)
if err != nil {
logger.Error(
"error collecting packets",
err,
)
return nil, err

if doExecuteRelaySrc {
msgs.Src, err = collectPackets(dstCtx, dst, rp.Dst, srcAddress)
if err != nil {
logger.Error(
"error collecting packets",
err,
)
return nil, err
}
}

if len(msgs.Dst) == 0 && len(msgs.Src) == 0 {
Expand Down Expand Up @@ -281,7 +286,7 @@ func (st *NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh S
now := time.Now()
srcAcks, err = src.QueryUnfinalizedRelayAcknowledgements(srcCtx, dst)
if err != nil {
return err
return fmt.Errorf("failed to query unfinalized relay acknowledgements on src chain: %w", err)
}
logger.TimeTrack(now, "QueryUnfinalizedRelayAcknowledgements", "queried_chain", "src", "num_packets", len(srcAcks))
return nil
Expand All @@ -306,7 +311,7 @@ func (st *NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh S
now := time.Now()
dstAcks, err = dst.QueryUnfinalizedRelayAcknowledgements(dstCtx, src)
if err != nil {
return err
return fmt.Errorf("failed to query unfinalized relay acknowledgements on dst chain: %w", err)
}
logger.TimeTrack(now, "QueryUnfinalizedRelayAcknowledgements", "queried_chain", "dst", "num_packets", len(dstAcks))
return nil
Expand Down Expand Up @@ -350,7 +355,7 @@ func (st *NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh S
now := time.Now()
seqs, err := dst.QueryUnreceivedAcknowledgements(dstCtx, srcAcks.ExtractSequenceList())
if err != nil {
return err
return fmt.Errorf("failed to query unreceived acknowledgements on dst chain: %w", err)
}
logger.TimeTrack(now, "QueryUnreceivedAcknowledgements", "queried_chain", "dst", "num_seqs", len(seqs))
srcAcks = srcAcks.Filter(seqs)
Expand All @@ -363,7 +368,7 @@ func (st *NaiveStrategy) UnrelayedAcknowledgements(src, dst *ProvableChain, sh S
now := time.Now()
seqs, err := src.QueryUnreceivedAcknowledgements(srcCtx, dstAcks.ExtractSequenceList())
if err != nil {
return err
return fmt.Errorf("failed to query unreceived acknowledgements on src chain: %w", err)
}
logger.TimeTrack(now, "QueryUnreceivedAcknowledgements", "queried_chain", "src", "num_seqs", len(seqs))
dstAcks = dstAcks.Filter(seqs)
Expand Down Expand Up @@ -415,7 +420,7 @@ func logPacketsRelayed(src, dst Chain, num int, obj string, dir string) {
)
}

func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders) (*RelayMsgs, error) {
func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteAckSrc, doExecuteAckDst bool) (*RelayMsgs, error) {
logger := GetChannelPairLogger(src, dst)
defer logger.TimeTrack(time.Now(), "RelayAcknowledgements", "num_src", len(rp.Src), "num_dst", len(rp.Dst))

Expand All @@ -440,13 +445,13 @@ func (st *NaiveStrategy) RelayAcknowledgements(src, dst *ProvableChain, rp *Rela
return nil, err
}

if !st.dstNoAck {
if !st.dstNoAck && doExecuteAckDst {
msgs.Dst, err = collectAcks(srcCtx, src, rp.Src, dstAddress)
if err != nil {
return nil, err
}
}
if !st.srcNoAck {
if !st.srcNoAck && doExecuteAckSrc {
msgs.Src, err = collectAcks(dstCtx, dst, rp.Dst, srcAddress)
if err != nil {
return nil, err
Expand Down Expand Up @@ -490,16 +495,13 @@ func collectAcks(ctx QueryContext, chain *ProvableChain, packets PacketInfoList,
return msgs, nil
}

func (st *NaiveStrategy) UpdateClients(src, dst *ProvableChain, rpForRecv, rpForAck *RelayPackets, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error) {
func (st *NaiveStrategy) UpdateClients(src, dst *ProvableChain, doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst bool, sh SyncHeaders, doRefresh bool) (*RelayMsgs, error) {
logger := GetChannelPairLogger(src, dst)

msgs := NewRelayMsgs()

// check if unrelayed packets or acks exist
needsUpdateForSrc := len(rpForRecv.Dst) > 0 ||
!st.srcNoAck && len(rpForAck.Dst) > 0
needsUpdateForDst := len(rpForRecv.Src) > 0 ||
!st.dstNoAck && len(rpForAck.Src) > 0
needsUpdateForSrc := doExecuteRelaySrc || (doExecuteAckSrc && !st.srcNoAck)
needsUpdateForDst := doExecuteRelayDst || (doExecuteAckDst && !st.dstNoAck)

// check if LC refresh is needed
if !needsUpdateForSrc && doRefresh {
Expand Down
Loading

0 comments on commit bf144b0

Please sign in to comment.