Skip to content

Commit

Permalink
Respect total maximum results across DHT and IPNI. Report where provi…
Browse files Browse the repository at this point in the history
…der found.
  • Loading branch information
gammazero committed Sep 17, 2024
1 parent 89f17d3 commit 105c360
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 30 deletions.
64 changes: 40 additions & 24 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ type daemon struct {
promRegistry *prometheus.Registry
}

// number of providers at which to stop looking for providers in the DHT
// When doing a check only with a CID
var MaxProvidersCount = 10
const (
defaultIndexerURL = "https://cid.contact"
// number of providers at which to stop looking for providers in the DHT
// When doing a check only with a CID
maxProvidersCount = 10

ipniSource = "IPNI"
dhtSource = "Amino DHT"
)

func newDaemon(ctx context.Context, acceleratedDHT bool) (*daemon, error) {
rm, err := NewResourceManager()
Expand Down Expand Up @@ -137,33 +143,40 @@ type providerOutput struct {
Addrs []string
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
Source string
}

// runCidCheck finds providers of a given CID, using the DHT and IPNI
// concurrently. A check of connectivity and Bitswap availability is performed
// for each provider found.
func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string) (cidCheckOutput, error) {
if ipniURL == "" {
ipniURL = defaultIndexerURL
}

crClient, err := client.New(ipniURL, client.WithStreamResultsRequired())
if err != nil {
return nil, fmt.Errorf("failed to creat content router client: %w", err)

Check warning on line 159 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L159

Added line #L159 was not covered by tests
}
routerClient := contentrouter.NewContentRoutingClient(crClient)

queryCtx, cancel := context.WithCancel(ctx)
defer cancel()
queryCtx, cancelQuery := context.WithCancel(ctx)
defer cancelQuery()

// Find providers with DHT and IPNI concurrently.
provsCh := d.dht.FindProvidersAsync(queryCtx, cidKey, MaxProvidersCount)
ipniProvsCh := routerClient.FindProvidersAsync(queryCtx, cidKey, MaxProvidersCount)
provsCh := d.dht.FindProvidersAsync(queryCtx, cidKey, maxProvidersCount)
ipniProvsCh := routerClient.FindProvidersAsync(queryCtx, cidKey, maxProvidersCount)

out := make([]providerOutput, 0, MaxProvidersCount)
out := make([]providerOutput, 0, maxProvidersCount)
var wg sync.WaitGroup
var mu sync.Mutex

var providersCount int
var done bool

for !done {
var provider peer.AddrInfo
var open bool
var source string

select {
case provider, open = <-provsCh:
Expand All @@ -174,6 +187,7 @@ func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string
}
continue
}
source = dhtSource
case provider, open = <-ipniProvsCh:
if !open {
ipniProvsCh = nil
Expand All @@ -182,10 +196,15 @@ func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string
}
continue
}
source = ipniSource

Check warning on line 199 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L199

Added line #L199 was not covered by tests
}
providersCount++
if providersCount == maxProvidersCount {
done = true

Check warning on line 203 in daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon.go#L203

Added line #L203 was not covered by tests
}

wg.Add(1)
go func(provider peer.AddrInfo) {
go func(provider peer.AddrInfo, src string) {
defer wg.Done()

outputAddrs := []string{}
Expand Down Expand Up @@ -213,6 +232,7 @@ func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string
ID: provider.ID.String(),
Addrs: outputAddrs,
DataAvailableOverBitswap: BitswapCheckOutput{},
Source: src,
}

testHost, err := d.createTestHost()
Expand Down Expand Up @@ -245,8 +265,9 @@ func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string
mu.Lock()
out = append(out, provOutput)
mu.Unlock()
}(provider)
}(provider, source)
}
cancelQuery()

// Wait for all goroutines to finish
wg.Wait()
Expand All @@ -260,6 +281,7 @@ type peerCheckOutput struct {
ProviderRecordFromPeerInDHT bool
ConnectionMaddrs []string
DataAvailableOverBitswap BitswapCheckOutput
Source string
}

// runPeerCheck checks the connectivity and Bitswap availability of a CID from a given peer (either with just peer ID or specific multiaddr)
Expand All @@ -274,20 +296,17 @@ func (d *daemon) runPeerCheck(ctx context.Context, maStr string, c cid.Cid) (*pe
return nil, err
}

// User has only passed a PeerID without any maddrs
onlyPeerID := len(ai.Addrs) == 0

out := &peerCheckOutput{}

connectionFailed := false
addrMap, peerAddrDHTErr := peerAddrsInDHT(ctx, d.dht, d.dhtMessenger, ai.ID)

out.ProviderRecordFromPeerInDHT = ProviderRecordFromPeerInDHT(ctx, d.dht, c, ai.ID)
out := &peerCheckOutput{
ProviderRecordFromPeerInDHT: ProviderRecordFromPeerInDHT(ctx, d.dht, c, ai.ID),
PeerFoundInDHT: addrMap,
}

addrMap, peerAddrDHTErr := peerAddrsInDHT(ctx, d.dht, d.dhtMessenger, ai.ID)
out.PeerFoundInDHT = addrMap
var connectionFailed bool

// If peerID given,but no addresses check the DHT
if onlyPeerID {
if len(ai.Addrs) == 0 {
if peerAddrDHTErr != nil {
// PeerID is not resolvable via the DHT
connectionFailed = true
Expand Down Expand Up @@ -368,9 +387,6 @@ func peerAddrsInDHT(ctx context.Context, d kademlia, messenger *dhtpb.ProtocolMe
return nil, err
}

wg := sync.WaitGroup{}
wg.Add(len(closestPeers))

resCh := make(chan *peer.AddrInfo, len(closestPeers))

numSuccessfulResponses := execOnMany(ctx, 0.3, time.Second*3, func(ctx context.Context, peerToQuery peer.ID) error {
Expand Down
5 changes: 0 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func main() {

const (
defaultCheckTimeout = 60 * time.Second
defaultIndexerURL = "https://cid.contact/routing/v1/providers/"
)

func startServer(ctx context.Context, d *daemon, tcpListener, metricsUsername, metricPassword string) error {
Expand Down Expand Up @@ -120,10 +119,6 @@ func startServer(ctx context.Context, d *daemon, tcpListener, metricsUsername, m
}
}

if ipniURL == "" {
ipniURL = defaultIndexerURL
}

log.Printf("Checking %s with timeout %s seconds", cidStr, checkTimeout.String())
withTimeout, cancel := context.WithTimeout(r.Context(), checkTimeout)
defer cancel()
Expand Down
3 changes: 2 additions & 1 deletion web/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -317,14 +317,15 @@ <h2 class="f4">What does it mean if I get an error?</h2>
}
})

outText += `${successfulProviders > 0 ? '✅' : '❌'} Found ${successfulProviders} working providers (out of ${resp.length} provider records sampled from Amino DHT) that could be connected to and had the CID available over Bitswap:`
outText += `${successfulProviders > 0 ? '✅' : '❌'} Found ${successfulProviders} working providers (out of ${resp.length} provider records sampled from Amino DHT and IPNI) that could be connected to and had the CID available over Bitswap:`
for (const provider of resp) {
const couldConnect = provider.ConnectionError === ''

outText += `\n\t${provider.ID}\n\t\tConnected: ${couldConnect ? "✅" : `❌ ${provider.ConnectionError.replaceAll('\n', '\n\t\t')}` }`
outText += couldConnect ? `\n\t\tBitswap Check: ${provider.DataAvailableOverBitswap.Found ? `✅` : "❌"} ${provider.DataAvailableOverBitswap.Error || ''}` : ''
outText += (couldConnect && provider.ConnectionMaddrs) ? `\n\t\tSuccessful Connection Multiaddr${provider.ConnectionMaddrs.length > 1 ? 's' : ''}:\n\t\t\t${provider.ConnectionMaddrs?.join('\n\t\t\t') || ''}` : ''
outText += (provider.Addrs.length > 0) ? `\n\t\tPeer Multiaddrs:\n\t\t\t${provider.Addrs.join('\n\t\t\t')}` : ''
outText += `\n\t\tFound in: ${provider.Source}`
}

return outText
Expand Down

0 comments on commit 105c360

Please sign in to comment.