diff --git a/daemon.go b/daemon.go index 8e1e3a3..83dcdb1 100644 --- a/daemon.go +++ b/daemon.go @@ -40,9 +40,15 @@ type daemon struct { promRegistry *prometheus.Registry } -// number of providers at which to stop looking for providers in the DHT -// When doing a check only with a CID -var MaxProvidersCount = 10 +const ( + defaultIndexerURL = "https://cid.contact" + // number of providers at which to stop looking for providers in the DHT + // When doing a check only with a CID + maxProvidersCount = 10 + + ipniSource = "IPNI" + dhtSource = "Amino DHT" +) func newDaemon(ctx context.Context, acceleratedDHT bool) (*daemon, error) { rm, err := NewResourceManager() @@ -137,33 +143,40 @@ type providerOutput struct { Addrs []string ConnectionMaddrs []string DataAvailableOverBitswap BitswapCheckOutput + Source string } // runCidCheck finds providers of a given CID, using the DHT and IPNI // concurrently. A check of connectivity and Bitswap availability is performed // for each provider found. func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string) (cidCheckOutput, error) { + if ipniURL == "" { + ipniURL = defaultIndexerURL + } + crClient, err := client.New(ipniURL, client.WithStreamResultsRequired()) if err != nil { return nil, fmt.Errorf("failed to creat content router client: %w", err) } routerClient := contentrouter.NewContentRoutingClient(crClient) - queryCtx, cancel := context.WithCancel(ctx) - defer cancel() + queryCtx, cancelQuery := context.WithCancel(ctx) + defer cancelQuery() // Find providers with DHT and IPNI concurrently. - provsCh := d.dht.FindProvidersAsync(queryCtx, cidKey, MaxProvidersCount) - ipniProvsCh := routerClient.FindProvidersAsync(queryCtx, cidKey, MaxProvidersCount) + provsCh := d.dht.FindProvidersAsync(queryCtx, cidKey, maxProvidersCount) + ipniProvsCh := routerClient.FindProvidersAsync(queryCtx, cidKey, maxProvidersCount) - out := make([]providerOutput, 0, MaxProvidersCount) + out := make([]providerOutput, 0, maxProvidersCount) var wg sync.WaitGroup var mu sync.Mutex - + var providersCount int var done bool + for !done { var provider peer.AddrInfo var open bool + var source string select { case provider, open = <-provsCh: @@ -174,6 +187,7 @@ func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string } continue } + source = dhtSource case provider, open = <-ipniProvsCh: if !open { ipniProvsCh = nil @@ -182,10 +196,15 @@ func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string } continue } + source = ipniSource + } + providersCount++ + if providersCount == maxProvidersCount { + done = true } wg.Add(1) - go func(provider peer.AddrInfo) { + go func(provider peer.AddrInfo, src string) { defer wg.Done() outputAddrs := []string{} @@ -213,6 +232,7 @@ func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string ID: provider.ID.String(), Addrs: outputAddrs, DataAvailableOverBitswap: BitswapCheckOutput{}, + Source: src, } testHost, err := d.createTestHost() @@ -245,8 +265,9 @@ func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string mu.Lock() out = append(out, provOutput) mu.Unlock() - }(provider) + }(provider, source) } + cancelQuery() // Wait for all goroutines to finish wg.Wait() @@ -260,6 +281,7 @@ type peerCheckOutput struct { ProviderRecordFromPeerInDHT bool ConnectionMaddrs []string DataAvailableOverBitswap BitswapCheckOutput + Source string } // runPeerCheck checks the connectivity and Bitswap availability of a CID from a given peer (either with just peer ID or specific multiaddr) @@ -274,20 +296,17 @@ func (d *daemon) runPeerCheck(ctx context.Context, maStr string, c cid.Cid) (*pe return nil, err } - // User has only passed a PeerID without any maddrs - onlyPeerID := len(ai.Addrs) == 0 - - out := &peerCheckOutput{} - - connectionFailed := false + addrMap, peerAddrDHTErr := peerAddrsInDHT(ctx, d.dht, d.dhtMessenger, ai.ID) - out.ProviderRecordFromPeerInDHT = ProviderRecordFromPeerInDHT(ctx, d.dht, c, ai.ID) + out := &peerCheckOutput{ + ProviderRecordFromPeerInDHT: ProviderRecordFromPeerInDHT(ctx, d.dht, c, ai.ID), + PeerFoundInDHT: addrMap, + } - addrMap, peerAddrDHTErr := peerAddrsInDHT(ctx, d.dht, d.dhtMessenger, ai.ID) - out.PeerFoundInDHT = addrMap + var connectionFailed bool // If peerID given,but no addresses check the DHT - if onlyPeerID { + if len(ai.Addrs) == 0 { if peerAddrDHTErr != nil { // PeerID is not resolvable via the DHT connectionFailed = true @@ -368,9 +387,6 @@ func peerAddrsInDHT(ctx context.Context, d kademlia, messenger *dhtpb.ProtocolMe return nil, err } - wg := sync.WaitGroup{} - wg.Add(len(closestPeers)) - resCh := make(chan *peer.AddrInfo, len(closestPeers)) numSuccessfulResponses := execOnMany(ctx, 0.3, time.Second*3, func(ctx context.Context, peerToQuery peer.ID) error { diff --git a/main.go b/main.go index dbe1401..aabad12 100644 --- a/main.go +++ b/main.go @@ -70,7 +70,6 @@ func main() { const ( defaultCheckTimeout = 60 * time.Second - defaultIndexerURL = "https://cid.contact/routing/v1/providers/" ) func startServer(ctx context.Context, d *daemon, tcpListener, metricsUsername, metricPassword string) error { @@ -120,10 +119,6 @@ func startServer(ctx context.Context, d *daemon, tcpListener, metricsUsername, m } } - if ipniURL == "" { - ipniURL = defaultIndexerURL - } - log.Printf("Checking %s with timeout %s seconds", cidStr, checkTimeout.String()) withTimeout, cancel := context.WithTimeout(r.Context(), checkTimeout) defer cancel() diff --git a/web/index.html b/web/index.html index d20f7f0..9a2b6f2 100644 --- a/web/index.html +++ b/web/index.html @@ -317,7 +317,7 @@