-
Notifications
You must be signed in to change notification settings - Fork 2
feat: switch to boxo and fix CAR fetch timeouts #68
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,11 +9,11 @@ import ( | |
"net/url" | ||
"time" | ||
|
||
ipfsblockstore "github.com/ipfs/boxo/blockstore" | ||
ipath "github.com/ipfs/boxo/coreiface/path" | ||
gateway "github.com/ipfs/boxo/gateway" | ||
blocks "github.com/ipfs/go-block-format" | ||
"github.com/ipfs/go-cid" | ||
ipfsblockstore "github.com/ipfs/go-ipfs-blockstore" | ||
blocks "github.com/ipfs/go-libipfs/blocks" | ||
gateway "github.com/ipfs/go-libipfs/gateway" | ||
ipath "github.com/ipfs/interface-go-ipfs-core/path" | ||
) | ||
|
||
type Config struct { | ||
|
@@ -75,11 +75,19 @@ type Config struct { | |
MaxNCoolOff int | ||
} | ||
|
||
const DefaultLoggingInterval = 5 * time.Second | ||
const DefaultSaturnLoggerRequestTimeout = 1 * time.Minute | ||
|
||
const DefaultSaturnOrchestratorRequestTimeout = 30 * time.Second | ||
|
||
const DefaultSaturnBlockRequestTimeout = 19 * time.Second | ||
const DefaultSaturnCarRequestTimeout = 30 * time.Minute | ||
|
||
const DefaultMaxRetries = 3 | ||
const DefaultPoolFailureDownvoteDebounce = 1 * time.Minute | ||
const DefaultPoolMembershipDebounce = 3 * DefaultPoolRefreshInterval | ||
const DefaultPoolLowWatermark = 5 | ||
const DefaultSaturnRequestTimeout = 19 * time.Second | ||
|
||
const maxBlockSize = 4194305 // 4 Mib + 1 byte | ||
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=1000" | ||
const DefaultPoolRefreshInterval = 5 * time.Minute | ||
|
@@ -122,7 +130,7 @@ type ErrCoolDown struct { | |
} | ||
|
||
func (e *ErrCoolDown) Error() string { | ||
return fmt.Sprintf("multiple saturn retrieval failures seen for CID %s/Path %s, please retry after %s", e.Cid, e.Path, humanRetry(e.retryAfter)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ℹ️ This was producing very confusing log errors |
||
return fmt.Sprintf("multiple saturn retrieval failures seen for CID %q / Path %q, please retry after %s", e.Cid, e.Path, humanRetry(e.retryAfter)) | ||
} | ||
|
||
func (e *ErrCoolDown) RetryAfter() time.Duration { | ||
|
@@ -188,7 +196,7 @@ func NewCaboose(config *Config) (*Caboose, error) { | |
|
||
if c.config.SaturnClient == nil { | ||
c.config.SaturnClient = &http.Client{ | ||
Timeout: DefaultSaturnRequestTimeout, | ||
Timeout: DefaultSaturnCarRequestTimeout, | ||
} | ||
} | ||
if c.config.OrchestratorEndpoint == nil { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,8 +9,8 @@ import ( | |
"time" | ||
|
||
"github.com/google/uuid" | ||
blocks "github.com/ipfs/go-block-format" | ||
"github.com/ipfs/go-cid" | ||
blocks "github.com/ipfs/go-libipfs/blocks" | ||
) | ||
|
||
var saturnReqTmpl = "/ipfs/%s?format=raw" | ||
|
@@ -81,6 +81,11 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, | |
isCacheHit := false | ||
networkError := "" | ||
|
||
isBlockRequest := false | ||
if mime == "application/vnd.ipld.raw" { | ||
isBlockRequest = true | ||
} | ||
|
||
defer func() { | ||
var ttfbMs int64 | ||
durationSecs := time.Since(start).Seconds() | ||
|
@@ -92,15 +97,15 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, | |
ttfbMs = fb.Sub(start).Milliseconds() | ||
fetchTTFBPerBlockPerPeerSuccessMetric.Observe(float64(ttfbMs)) | ||
// track individual block metrics separately | ||
if mime == "application/vnd.ipld.raw" { | ||
if isBlockRequest { | ||
fetchDurationPerBlockPerPeerSuccessMetric.Observe(float64(response_success_end.Sub(start).Milliseconds())) | ||
} else { | ||
fetchDurationPerCarPerPeerSuccessMetric.Observe(float64(response_success_end.Sub(start).Milliseconds())) | ||
} | ||
fetchSpeedPerBlockPerPeerMetric.Observe(float64(received) / float64(durationMs)) | ||
} else { | ||
fetchTTFBPerBlockPerPeerFailureMetric.Observe(float64(ttfbMs)) | ||
if mime == "application/vnd.ipld.raw" { | ||
if isBlockRequest { | ||
fetchDurationPerBlockPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds())) | ||
} else { | ||
fetchDurationPerCarPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds())) | ||
|
@@ -145,7 +150,16 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string, | |
} | ||
}() | ||
|
||
reqCtx, cancel := context.WithTimeout(ctx, DefaultSaturnRequestTimeout) | ||
// TODO: Ideally, we would have additional "PerRequestInactivityTimeout" | ||
// which is the amount of time without any NEW data from the server, but | ||
// that can be added later. We need both because a slow trickle of data | ||
// could take a large amount of time. | ||
requestTimeout := DefaultSaturnCarRequestTimeout | ||
if isBlockRequest { | ||
requestTimeout = DefaultSaturnBlockRequestTimeout | ||
} | ||
|
||
reqCtx, cancel := context.WithTimeout(ctx, requestTimeout) | ||
Comment on lines
+153
to
+162
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💭 Implementing "PerRequestInactivityTimeout" would help a lot – we could then have this 30m timeout as a hard ceiling (or even raise it), but then have the same timeout for block and for CAR when L1 did not send any new bytes for some time. |
||
defer cancel() | ||
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil) | ||
if err != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Had to add separate timeout for CARs.
Not feeling strongly about 30m, but that is for how long I was able to stream wikipedia DAG from the old gateway, so a good starting point.