Skip to content

Commit

Permalink
Pull request 309: 6480 imp load balance
Browse files Browse the repository at this point in the history
Updates AdguardTeam/AdGuardHome#6480.

Squashed commit of the following:

commit 053c830
Merge: 8f6b648 638288e
Author: Eugene Burkov <[email protected]>
Date:   Mon Dec 18 18:27:31 2023 +0300

    Merge branch 'master' into 6480-imp-load-balance

commit 8f6b648
Author: Eugene Burkov <[email protected]>
Date:   Mon Dec 18 15:30:28 2023 +0300

    proxy: imp docs

commit 7927132
Author: Eugene Burkov <[email protected]>
Date:   Mon Dec 18 15:17:41 2023 +0300

    proxy: unexport clock

commit 5cbb56e
Author: Eugene Burkov <[email protected]>
Date:   Mon Dec 18 15:01:01 2023 +0300

    proxy: imp code

commit df5863b
Author: Eugene Burkov <[email protected]>
Date:   Mon Dec 18 12:47:34 2023 +0300

    proxy: return algo

commit 2ad8274
Author: Eugene Burkov <[email protected]>
Date:   Fri Dec 15 19:57:14 2023 +0300

    proxy: imp code, algo

commit 068bcd8
Author: Eugene Burkov <[email protected]>
Date:   Fri Dec 15 16:18:48 2023 +0300

    proxy: imp code, change algo

commit 932f00f
Author: Eugene Burkov <[email protected]>
Date:   Thu Dec 14 18:50:02 2023 +0300

    proxy: imp docs

commit 62d9b57
Author: Eugene Burkov <[email protected]>
Date:   Thu Dec 14 16:36:25 2023 +0300

    proxy: imp test

commit 90aa395
Author: Eugene Burkov <[email protected]>
Date:   Thu Dec 14 15:00:37 2023 +0300

    proxy: imp tests

commit 69ba821
Author: Eugene Burkov <[email protected]>
Date:   Wed Dec 13 19:26:56 2023 +0300

    proxy: imp algo

commit d7bf7b9
Author: Eugene Burkov <[email protected]>
Date:   Thu Dec 7 19:28:53 2023 +0300

    all: fix nil deref

commit bb4ceb6
Merge: cc26532 d6ebaac
Author: Eugene Burkov <[email protected]>
Date:   Thu Dec 7 19:16:34 2023 +0300

    Merge branch 'master' into 6480-test-load-balance

commit cc26532
Author: Eugene Burkov <[email protected]>
Date:   Thu Dec 7 18:46:17 2023 +0300

    proxy: test load balancing
  • Loading branch information
EugeneOne1 committed Dec 18, 2023
1 parent 638288e commit 06d548f
Show file tree
Hide file tree
Showing 9 changed files with 392 additions and 131 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ require (
golang.org/x/mod v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.13.0 // indirect
gonum.org/v1/gonum v0.14.0
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
gonum.org/v1/gonum v0.14.0 h1:2NiG67LD1tEH0D7kM+ps2V+fXmsAnpUeec7n8tcr4S0=
gonum.org/v1/gonum v0.14.0/go.mod h1:AoWeoz0becf9QMWtE8iWXNXc27fK4fNeHNf/oMejGfU=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
Expand Down
7 changes: 4 additions & 3 deletions proxy/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ const testCacheSize = 4096

const testUpsAddr = "https://upstream.address"

var upstreamWithAddr = &funcUpstream{
exchangeFunc: func(m *dns.Msg) (resp *dns.Msg, err error) { panic("not implemented") },
addressFunc: func() (addr string) { return testUpsAddr },
var upstreamWithAddr = &fakeUpstream{
onExchange: func(m *dns.Msg) (resp *dns.Msg, err error) { panic("not implemented") },
onClose: func() (err error) { panic("not implemented") },
onAddress: func() (addr string) { return testUpsAddr },
}

func TestServeCached(t *testing.T) {
Expand Down
21 changes: 21 additions & 0 deletions proxy/clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package proxy

import "time"

// clock is the interface for provider of current time. It's used to simplify
// testing.
//
// TODO(e.burkov): Move to golibs.
type clock interface {
// Now returns the current local time.
Now() (now time.Time)
}

// type check
var _ clock = realClock{}

// realClock is the [clock] which actually uses the [time] package.
type realClock struct{}

// Now implements the [clock] interface for RealClock.
func (realClock) Now() (now time.Time) { return time.Now() }
2 changes: 1 addition & 1 deletion proxy/dns64.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (p *Proxy) performDNS64(
host := origReq.Question[0].Name
log.Debug("proxy: received an empty aaaa response for %q, checking dns64", host)

dns64Resp, u, err := p.exchange(dns64Req, upstreams)
dns64Resp, u, err := p.exchangeUpstreams(dns64Req, upstreams)
if err != nil {
log.Error("proxy: dns64 request failed: %s", err)

Expand Down
146 changes: 95 additions & 51 deletions proxy/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,100 +7,144 @@ import (
"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log"
"github.com/miekg/dns"
"golang.org/x/exp/slices"
"gonum.org/v1/gonum/stat/sampleuv"
)

// exchange -- sends DNS query to the upstream DNS server and returns the response
func (p *Proxy) exchange(req *dns.Msg, upstreams []upstream.Upstream) (reply *dns.Msg, u upstream.Upstream, err error) {
qtype := req.Question[0].Qtype
if p.UpstreamMode == UModeFastestAddr && (qtype == dns.TypeA || qtype == dns.TypeAAAA) {
reply, u, err = p.fastestAddr.ExchangeFastest(req, upstreams)
return
}

if p.UpstreamMode == UModeParallel {
reply, u, err = upstream.ExchangeParallel(upstreams, req)
return
// exchangeUpstreams resolves req using the given upstreams. It returns the DNS
// response, the upstream that successfully resolved the request, and the error
// if any.
func (p *Proxy) exchangeUpstreams(
req *dns.Msg,
ups []upstream.Upstream,
) (resp *dns.Msg, u upstream.Upstream, err error) {
switch p.UpstreamMode {
case UModeParallel:
return upstream.ExchangeParallel(ups, req)
case UModeFastestAddr:
switch req.Question[0].Qtype {
case dns.TypeA, dns.TypeAAAA:
return p.fastestAddr.ExchangeFastest(req, ups)
default:
// Go on to the load-balancing mode.
}
default:
// Go on to the load-balancing mode.
}

// UModeLoadBalance goes below
if len(ups) == 1 {
u = ups[0]
resp, _, err = exchange(u, req, p.time)
// TODO(e.burkov): p.updateRTT(u.Address(), elapsed)

if len(upstreams) == 1 {
u = upstreams[0]
reply, _, err = exchangeWithUpstream(u, req)
return
return resp, u, err
}

// sort upstreams by rtt from fast to slow
sortedUpstreams := p.getSortedUpstreams(upstreams)
w := sampleuv.NewWeighted(p.calcWeights(ups), p.randSrc)
var errs []error
for i, ok := w.Take(); ok; i, ok = w.Take() {
u = ups[i]

errs := []error{}
for _, dnsUpstream := range sortedUpstreams {
var elapsed int
reply, elapsed, err = exchangeWithUpstream(dnsUpstream, req)
var elapsed time.Duration
resp, elapsed, err = exchange(u, req, p.time)
if err == nil {
p.updateRTT(dnsUpstream.Address(), elapsed)
p.updateRTT(u.Address(), elapsed)

return reply, dnsUpstream, err
return resp, u, nil
}

errs = append(errs, err)
p.updateRTT(dnsUpstream.Address(), int(defaultTimeout/time.Millisecond))

// TODO(e.burkov): Use the actual configured timeout or, perhaps, the
// actual measured elapsed time.
p.updateRTT(u.Address(), defaultTimeout)
}

// TODO(e.burkov): Use [errors.Join].
return nil, nil, errors.List("all upstreams failed to exchange request", errs...)
}

func (p *Proxy) getSortedUpstreams(u []upstream.Upstream) []upstream.Upstream {
// clone upstreams list to avoid race conditions
clone := slices.Clone(u)

p.rttLock.Lock()
defer p.rttLock.Unlock()
// exchange returns the result of the DNS request exchange with the given
// upstream and the elapsed time in milliseconds. It uses the given clock to
// measure the request duration.
func exchange(u upstream.Upstream, req *dns.Msg, c clock) (resp *dns.Msg, dur time.Duration, err error) {
startTime := c.Now()

slices.SortFunc(clone, func(a, b upstream.Upstream) (res int) {
// TODO(d.kolyshev): Use upstreams for sort comparing.
return p.upstreamRTTStats[a.Address()] - p.upstreamRTTStats[b.Address()]
})

return clone
}

// exchangeWithUpstream returns result of Exchange with elapsed time
func exchangeWithUpstream(u upstream.Upstream, req *dns.Msg) (*dns.Msg, int, error) {
startTime := time.Now()
reply, err := u.Exchange(req)
elapsed := time.Since(startTime)

// Don't use [time.Since] because it uses [time.Now].
dur = c.Now().Sub(startTime)

addr := u.Address()
if err != nil {
log.Error(
"dnsproxy: upstream %s failed to exchange %s in %s: %s",
addr,
req.Question[0].String(),
elapsed,
dur,
err,
)
} else {
log.Debug(
"dnsproxy: upstream %s successfully finished exchange of %s; elapsed %s",
addr,
req.Question[0].String(),
elapsed,
dur,
)
}

return reply, int(elapsed.Milliseconds()), err
return reply, dur, err
}

// upstreamRTTStats is the statistics for a single upstream's round-trip time.
type upstreamRTTStats struct {
// rttSum is the sum of all the round-trip times in microseconds. The
// float64 type is used since it's capable of representing about 285 years
// in microseconds.
rttSum float64

// reqNum is the number of requests to the upstream. The float64 type is
// used since to avoid unnecessary type conversions.
reqNum float64
}

// update returns updated stats after adding given RTT.
func (stats upstreamRTTStats) update(rtt time.Duration) (updated upstreamRTTStats) {
return upstreamRTTStats{
rttSum: stats.rttSum + float64(rtt.Microseconds()),
reqNum: stats.reqNum + 1,
}
}

// calcWeights returns the slice of weights, each corresponding to the upstream
// with the same index in the given slice.
func (p *Proxy) calcWeights(ups []upstream.Upstream) (weights []float64) {
weights = make([]float64, 0, len(ups))

p.rttLock.Lock()
defer p.rttLock.Unlock()

for _, u := range ups {
stat := p.upstreamRTTStats[u.Address()]
if stat.rttSum == 0 || stat.reqNum == 0 {
// Use 1 as the default weight.
weights = append(weights, 1)
} else {
weights = append(weights, 1/(stat.rttSum/stat.reqNum))
}
}

return weights
}

// updateRTT updates the round-trip time in upstreamRTTStats for given address.
func (p *Proxy) updateRTT(address string, rtt int) {
// updateRTT updates the round-trip time in [upstreamRTTStats] for given
// address.
func (p *Proxy) updateRTT(address string, rtt time.Duration) {
p.rttLock.Lock()
defer p.rttLock.Unlock()

if p.upstreamRTTStats == nil {
p.upstreamRTTStats = map[string]int{}
p.upstreamRTTStats = map[string]upstreamRTTStats{}
}

p.upstreamRTTStats[address] = (p.upstreamRTTStats[address] + rtt) / 2
p.upstreamRTTStats[address] = p.upstreamRTTStats[address].update(rtt)
}
Loading

0 comments on commit 06d548f

Please sign in to comment.