From 89f17d35c65c0e356b176cf51ca08cc32ca9c9a2 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Mon, 16 Sep 2024 16:17:43 -0700 Subject: [PATCH] Query DHT and IPNI concurrently --- daemon.go | 59 +++++++++++++++++-------- go.mod | 4 +- go.sum | 7 ++- ipni.go | 128 ------------------------------------------------------ main.go | 61 +++++++++++++------------- 5 files changed, 80 insertions(+), 179 deletions(-) delete mode 100644 ipni.go diff --git a/daemon.go b/daemon.go index 9e43a13..8e1e3a3 100644 --- a/daemon.go +++ b/daemon.go @@ -9,6 +9,8 @@ import ( vole "github.com/ipfs-shipyard/vole/lib" "github.com/ipfs/boxo/ipns" + "github.com/ipfs/boxo/routing/http/client" + "github.com/ipfs/boxo/routing/http/contentrouter" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" @@ -137,23 +139,51 @@ type providerOutput struct { DataAvailableOverBitswap BitswapCheckOutput } -// runCidCheck looks up the DHT for providers of a given CID and then checks their connectivity and Bitswap availability -func (d *daemon) runCidCheck(ctx context.Context, cidStr string) (cidCheckOutput, error) { - cid, err := cid.Decode(cidStr) +// runCidCheck finds providers of a given CID, using the DHT and IPNI +// concurrently. A check of connectivity and Bitswap availability is performed +// for each provider found. +func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string) (cidCheckOutput, error) { + crClient, err := client.New(ipniURL, client.WithStreamResultsRequired()) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to creat content router client: %w", err) } - - out := make([]providerOutput, 0, MaxProvidersCount) + routerClient := contentrouter.NewContentRoutingClient(crClient) queryCtx, cancel := context.WithCancel(ctx) defer cancel() - provsCh := d.dht.FindProvidersAsync(queryCtx, cid, MaxProvidersCount) + // Find providers with DHT and IPNI concurrently. + provsCh := d.dht.FindProvidersAsync(queryCtx, cidKey, MaxProvidersCount) + ipniProvsCh := routerClient.FindProvidersAsync(queryCtx, cidKey, MaxProvidersCount) + + out := make([]providerOutput, 0, MaxProvidersCount) var wg sync.WaitGroup var mu sync.Mutex - for provider := range provsCh { + var done bool + for !done { + var provider peer.AddrInfo + var open bool + + select { + case provider, open = <-provsCh: + if !open { + provsCh = nil + if ipniProvsCh == nil { + done = true + } + continue + } + case provider, open = <-ipniProvsCh: + if !open { + ipniProvsCh = nil + if provsCh == nil { + done = true + } + continue + } + } + wg.Add(1) go func(provider peer.AddrInfo) { defer wg.Done() @@ -196,7 +226,7 @@ func (d *daemon) runCidCheck(ctx context.Context, cidStr string) (cidCheckOutput dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*15) defer dialCancel() - testHost.Connect(dialCtx, provider) + _ = testHost.Connect(dialCtx, provider) // Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714 _, connErr := testHost.NewStream(dialCtx, provider.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap") @@ -205,7 +235,7 @@ func (d *daemon) runCidCheck(ctx context.Context, cidStr string) (cidCheckOutput } else { // since we pass a libp2p host that's already connected to the peer the actual connection maddr we pass in doesn't matter p2pAddr, _ := multiaddr.NewMultiaddr("/p2p/" + provider.ID.String()) - provOutput.DataAvailableOverBitswap = checkBitswapCID(ctx, testHost, cid, p2pAddr) + provOutput.DataAvailableOverBitswap = checkBitswapCID(ctx, testHost, cidKey, p2pAddr) for _, c := range testHost.Network().ConnsToPeer(provider.ID) { provOutput.ConnectionMaddrs = append(provOutput.ConnectionMaddrs, c.RemoteMultiaddr().String()) @@ -233,7 +263,7 @@ type peerCheckOutput struct { } // runPeerCheck checks the connectivity and Bitswap availability of a CID from a given peer (either with just peer ID or specific multiaddr) -func (d *daemon) runPeerCheck(ctx context.Context, maStr, cidStr string) (*peerCheckOutput, error) { +func (d *daemon) runPeerCheck(ctx context.Context, maStr string, c cid.Cid) (*peerCheckOutput, error) { ma, err := multiaddr.NewMultiaddr(maStr) if err != nil { return nil, err @@ -247,11 +277,6 @@ func (d *daemon) runPeerCheck(ctx context.Context, maStr, cidStr string) (*peerC // User has only passed a PeerID without any maddrs onlyPeerID := len(ai.Addrs) == 0 - c, err := cid.Decode(cidStr) - if err != nil { - return nil, err - } - out := &peerCheckOutput{} connectionFailed := false @@ -288,7 +313,7 @@ func (d *daemon) runPeerCheck(ctx context.Context, maStr, cidStr string) (*peerC // Test Is the target connectable dialCtx, dialCancel := context.WithTimeout(ctx, time.Second*120) - testHost.Connect(dialCtx, *ai) + _ = testHost.Connect(dialCtx, *ai) // Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714 _, connErr := testHost.NewStream(dialCtx, ai.ID, "/ipfs/bitswap/1.2.0", "/ipfs/bitswap/1.1.0", "/ipfs/bitswap/1.0.0", "/ipfs/bitswap") dialCancel() diff --git a/go.mod b/go.mod index b9c0f66..b8b5971 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/ipfs/go-block-format v0.2.0 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-datastore v0.6.0 - github.com/ipni/go-libipni v0.6.13 github.com/libp2p/go-libp2p v0.36.3 github.com/libp2p/go-libp2p-kad-dht v0.26.1 github.com/libp2p/go-libp2p-mplex v0.9.0 @@ -56,6 +55,7 @@ require ( github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect @@ -143,8 +143,10 @@ require ( github.com/raulk/go-watchdog v1.3.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/samber/lo v1.39.0 // indirect github.com/sanity-io/litter v1.5.5 // indirect github.com/sergi/go-diff v1.3.1 // indirect + github.com/smartystreets/assertions v1.13.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.55.0 // indirect diff --git a/go.sum b/go.sum index d6a98a0..d14f0a8 100644 --- a/go.sum +++ b/go.sum @@ -179,6 +179,8 @@ github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE0 github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f h1:KMlcu9X58lhTA/KrfX8Bi1LQSO4pzoVjTiL3h4Jk+Zk= github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= @@ -253,8 +255,6 @@ github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6 github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYtmKZ+agnUw9s= github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E= github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ= -github.com/ipni/go-libipni v0.6.13 h1:6fQU6ZFu8fi0DZIs4VXZrIFbT9r97dNmNl7flWMVblE= -github.com/ipni/go-libipni v0.6.13/go.mod h1:+hNohg7Tx8ML2a/Ei19zUxCnSqtqXiHySlqHIwPhQyQ= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= @@ -529,6 +529,8 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA= +github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/sanity-io/litter v1.5.5 h1:iE+sBxPBzoK6uaEP5Lt3fHNgpKcHXc/A2HGETy0uJQo= github.com/sanity-io/litter v1.5.5/go.mod h1:9gzJgR2i4ZpjZHsKvUXIRQVk7P+yM3e+jAF7bU2UI5U= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= @@ -581,6 +583,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v0.0.0-20161117074351-18a02ba4a312/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= diff --git a/ipni.go b/ipni.go deleted file mode 100644 index 7f79ab8..0000000 --- a/ipni.go +++ /dev/null @@ -1,128 +0,0 @@ -package main - -import ( - "context" - "encoding/base64" - "fmt" - "strings" - - "github.com/ipfs/go-cid" - "github.com/ipni/go-libipni/find/client" - "github.com/ipni/go-libipni/find/model" - "github.com/ipni/go-libipni/metadata" - // "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multihash" - // "github.com/urfave/cli/v2" -) - -func FindIPNIProviders(ctx context.Context, cidArg string, indexer string) error { - // mhArgs := cctx.StringSlice("mh") - // cidArgs := cctx.StringSlice("cid") - // if len(mhArgs) == 0 && len(cidArgs) == 0 { - // return fmt.Errorf("must specify at least one multihash or CID") - // } - - mhs := make([]multihash.Multihash, 0, 1) - // for i := range mhArgs { - // m, err := multihash.FromB58String(mhArgs[i]) - // if err != nil { - // return err - // } - // mhs = append(mhs, m) - // } - c, err := cid.Decode(cidArg) - if err != nil { - return err - } - mhs = append(mhs, c.Hash()) - - // if cctx.Bool("no-priv") { - // return clearFind(cctx, mhs) - // } - return dhFind(ctx, mhs, indexer) -} - -func dhFind(ctx context.Context, mhs []multihash.Multihash, indexer string) error { - // cl, err := client.NewDHashClient( - // client.WithProvidersURL(indexers...), - // client.WithDHStoreURL(''), - // client.WithPcacheTTL(0), - // ) - - cl, err := client.New(indexer) - if err != nil { - return err - } - if err != nil { - return err - } - - resp, err := client.FindBatch(ctx, cl, mhs) - if err != nil { - return err - } - // if resp == nil && cctx.Bool("fallback") { - // return clearFind(cctx, mhs) - // } - fmt.Println("🔒 Reader privacy enabled") - return printResults(ctx, resp) -} - -func printResults(ctx context.Context, resp *model.FindResponse) error { - if resp == nil || len(resp.MultihashResults) == 0 { - fmt.Println("index not found") - return nil - } - - // if cctx.Bool("id-only") { - // seen := make(map[peer.ID]struct{}) - // for i := range resp.MultihashResults { - // for _, pr := range resp.MultihashResults[i].ProviderResults { - // if _, ok := seen[pr.Provider.ID]; ok { - // continue - // } - // seen[pr.Provider.ID] = struct{}{} - // fmt.Println(pr.Provider.ID.String()) - // } - // } - // return nil - // } - - for i := range resp.MultihashResults { - fmt.Println("Multihash:", resp.MultihashResults[i].Multihash.B58String()) - if len(resp.MultihashResults[i].ProviderResults) == 0 { - fmt.Println(" index not found") - continue - } - // Group results by provider. - providers := make(map[string][]model.ProviderResult) - for _, pr := range resp.MultihashResults[i].ProviderResults { - provStr := pr.Provider.String() - providers[provStr] = append(providers[provStr], pr) - } - for provStr, prs := range providers { - fmt.Println(" Provider:", provStr) - for _, pr := range prs { - fmt.Println(" ContextID:", base64.StdEncoding.EncodeToString(pr.ContextID)) - fmt.Println(" Metadata:", decodeMetadata(pr.Metadata)) - } - } - } - return nil -} - -func decodeMetadata(metaBytes []byte) string { - if len(metaBytes) == 0 { - return "nil" - } - meta := metadata.Default.New() - err := meta.UnmarshalBinary(metaBytes) - if err != nil { - return fmt.Sprint("error: ", err.Error()) - } - protoStrs := make([]string, meta.Len()) - for i, p := range meta.Protocols() { - protoStrs[i] = p.String() - } - return strings.Join(protoStrs, ", ") -} diff --git a/main.go b/main.go index 03f1c3d..dbe1401 100644 --- a/main.go +++ b/main.go @@ -5,18 +5,16 @@ import ( "crypto/subtle" "embed" "encoding/json" - "errors" "log" "net" "net/http" "os" - "strconv" "time" + "github.com/ipfs/go-cid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/urfave/cli/v2" ) @@ -60,6 +58,7 @@ func main() { if err != nil { return err } + return startServer(ctx, d, cctx.String("address"), cctx.String("metrics-auth-username"), cctx.String("metrics-auth-password")) } @@ -69,8 +68,10 @@ func main() { } } -const DEFAULT_CHECK_TIMEOUT = 60 -const DEFAULT_IPNI_INDEXER = "https://cid.contact" +const ( + defaultCheckTimeout = 60 * time.Second + defaultIndexerURL = "https://cid.contact/routing/v1/providers/" +) func startServer(ctx context.Context, d *daemon, tcpListener, metricsUsername, metricPassword string) error { log.Printf("Starting %s %s\n", name, version) @@ -97,46 +98,48 @@ func startServer(ctx context.Context, d *daemon, tcpListener, metricsUsername, m maStr := r.URL.Query().Get("multiaddr") cidStr := r.URL.Query().Get("cid") timeoutStr := r.URL.Query().Get("timeoutSeconds") - ipniIndexer := r.URL.Query().Get("ipniIndexer") + ipniURL := r.URL.Query().Get("ipniIndexer") if cidStr == "" { - err = errors.New("missing 'cid' query parameter") + http.Error(w, "missing 'cid' query parameter", http.StatusBadRequest) + return + } + + cidKey, err := cid.Decode(cidStr) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return } - timeout := DEFAULT_CHECK_TIMEOUT + checkTimeout := defaultCheckTimeout if timeoutStr != "" { - timeout, err = strconv.Atoi(timeoutStr) + checkTimeout, err = time.ParseDuration(timeoutStr + "s") if err != nil { http.Error(w, "Invalid timeout value (in seconds)", http.StatusBadRequest) return } } - if ipniIndexer == "" { - ipniIndexer = DEFAULT_IPNI_INDEXER + if ipniURL == "" { + ipniURL = defaultIndexerURL } - FindIPNIProviders(ctx, cidStr, ipniIndexer) - - log.Printf("Checking %s with timeout %d seconds", cidStr, timeout) - withTimeout, cancel := context.WithTimeout(r.Context(), time.Duration(timeout)*time.Second) + log.Printf("Checking %s with timeout %s seconds", cidStr, checkTimeout.String()) + withTimeout, cancel := context.WithTimeout(r.Context(), checkTimeout) defer cancel() - var err error - var data interface{} + var data interface{} if maStr == "" { - data, err = d.runCidCheck(withTimeout, cidStr) + data, err = d.runCidCheck(withTimeout, cidKey, ipniURL) } else { - data, err = d.runPeerCheck(withTimeout, maStr, cidStr) + data, err = d.runPeerCheck(withTimeout, maStr, cidKey) } - - if err == nil { - w.Header().Add("Content-Type", "application/json") - _ = json.NewEncoder(w).Encode(data) - } else { - w.WriteHeader(http.StatusInternalServerError) - _, _ = w.Write([]byte(err.Error())) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return } + w.Header().Add("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(data) } // Register the default Go collector @@ -239,11 +242,7 @@ func getWebAddress(l net.Listener) string { return addr } switch host { - case "": - fallthrough - case "0.0.0.0": - fallthrough - case "::": + case "", "0.0.0.0", "::": return net.JoinHostPort("localhost", port) default: return addr