Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose BlockKeyCacheSize and enable WriteThrough datastore options #10614

Merged
merged 13 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions config/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,21 @@ import (
"encoding/json"
)

// DefaultDataStoreDirectory is the directory to store all the local IPFS data.
const DefaultDataStoreDirectory = "datastore"
const (
// DefaultDataStoreDirectory is the directory to store all the local IPFS data.
DefaultDataStoreDirectory = "datastore"

// DefaultBlockKeyCacheSize is the size for the blockstore two-queue
// cache which caches block keys and sizes.
DefaultBlockKeyCacheSize = 64 << 10

// DefaultWriteThrough specifies whether to use a "write-through"
// Blockstore and Blockservice. This means that they will write
// without performing any reads to check if the incoming blocks are
// already present in the datastore. Enable for datastores with fast
// writes and slower reads.
DefaultWriteThrough Flag = True
lidel marked this conversation as resolved.
Show resolved Hide resolved
)

// Datastore tracks the configuration of the datastore.
type Datastore struct {
Expand All @@ -21,8 +34,10 @@ type Datastore struct {

Spec map[string]interface{}

HashOnRead bool
BloomFilterSize int
HashOnRead bool
BloomFilterSize int
BlockKeyCacheSize OptionalInteger `json:",omitempty"`
WriteThrough Flag `json:",omitempty"`
}

// DataStorePath returns the default data store path given a configuration root
Expand Down
11 changes: 11 additions & 0 deletions config/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ const (
DefaultUnixFSRawLeaves = false
DefaultUnixFSChunker = "size-262144"
DefaultHashFunction = "sha2-256"

// DefaultBatchMaxNodes controls the maximum number of nodes in a
// write-batch. The total size of the batch is limited by
// BatchMaxnodes and BatchMaxSize.
DefaultBatchMaxNodes = 128
// DefaultBatchMaxSize controls the maximum size of a single
// write-batch. The total size of the batch is limited by
// BatchMaxnodes and BatchMaxSize.
DefaultBatchMaxSize = 100 << 20 // 20MiB
)

// Import configures the default options for ingesting data. This affects commands
Expand All @@ -14,4 +23,6 @@ type Import struct {
UnixFSRawLeaves Flag
UnixFSChunker OptionalString
HashFunction OptionalString
BatchMaxNodes OptionalInteger
BatchMaxSize OptionalInteger
}
1 change: 1 addition & 0 deletions config/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func DefaultDatastoreConfig() Datastore {
GCPeriod: "1h",
BloomFilterSize: 0,
Spec: flatfsSpec(),
WriteThrough: DefaultWriteThrough,
lidel marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
15 changes: 14 additions & 1 deletion core/commands/dag/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
cmds "github.com/ipfs/go-ipfs-cmds"
ipld "github.com/ipfs/go-ipld-format"
ipldlegacy "github.com/ipfs/go-ipld-legacy"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/coreiface/options"
gocarv2 "github.com/ipld/go-car/v2"

Expand All @@ -24,6 +25,11 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
return err
}

cfg, err := node.Repo.Config()
if err != nil {
return err
}

api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
Expand Down Expand Up @@ -55,7 +61,14 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
// this is *not* a transaction
// it is simply a way to relieve pressure on the blockstore
// similar to pinner.Pin/pinner.Flush
batch := ipld.NewBatch(req.Context, api.Dag())
batch := ipld.NewBatch(req.Context, api.Dag(),
// Default: 128. Means 128 file descriptors needed in flatfs
ipld.MaxNodesBatchOption(int(cfg.Import.BatchMaxNodes.WithDefault(config.DefaultBatchMaxNodes))),
// Default 100MiB. When setting block size to 1MiB, we can add
// ~100 nodes maximum. With default 256KiB block-size, we will
// hit the max nodes limit at 32MiB.p
ipld.MaxSizeBatchOption(int(cfg.Import.BatchMaxSize.WithDefault(config.DefaultBatchMaxSize))),
)

roots := cid.NewSet()
var blockCount, blockBytesCount uint64
Expand Down
14 changes: 8 additions & 6 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,12 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e
return nil
}

if settings.Offline {
cfg, err := n.Repo.Config()
if err != nil {
return nil, err
}
cfg, err := n.Repo.Config()
if err != nil {
return nil, err
}

if settings.Offline {
cs := cfg.Ipns.ResolveCacheSize
if cs == 0 {
cs = node.DefaultIpnsCacheSize
Expand Down Expand Up @@ -244,7 +244,9 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e

if settings.Offline || !settings.FetchBlocks {
subAPI.exchange = offlinexch.Exchange(subAPI.blockstore)
subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange)
subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange,
bserv.WriteThrough(cfg.Datastore.WriteThrough.WithDefault(true)),
lidel marked this conversation as resolved.
Show resolved Hide resolved
)
subAPI.dag = dag.NewDAGService(subAPI.blocks)
}

Expand Down
6 changes: 4 additions & 2 deletions core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
if settings.OnlyHash {
// setup a /dev/null pipeline to simulate adding the data
dstore := dssync.MutexWrap(ds.NewNullDatastore())
bs := bstore.NewBlockstore(dstore, bstore.WriteThrough())
bs := bstore.NewBlockstore(dstore, bstore.WriteThrough(true))
addblockstore = bstore.NewGCBlockstore(bs, nil) // gclocker will never be used
exch = nil // exchange will never be used
pinning = nil // pinner will never be used
}

bserv := blockservice.New(addblockstore, exch) // hash security 001
bserv := blockservice.New(addblockstore, exch,
blockservice.WriteThrough(cfg.Datastore.WriteThrough.WithDefault(true)),
lidel marked this conversation as resolved.
Show resolved Hide resolved
) // hash security 001
dserv := merkledag.NewDAGService(bserv)

// add a sync call to the DagService
Expand Down
25 changes: 15 additions & 10 deletions core/node/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,26 @@ import (
dagpb "github.com/ipld/go-codec-dagpb"
"go.uber.org/fx"

"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/core/node/helpers"
"github.com/ipfs/kubo/repo"
)

// BlockService creates new blockservice which provides an interface to fetch content-addressable blocks
func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
bsvc := blockservice.New(bs, rem)

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return bsvc.Close()
},
})

return bsvc
func BlockService(cfg *config.Config) func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
return func(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
bsvc := blockservice.New(bs, rem,
blockservice.WriteThrough(cfg.Datastore.WriteThrough.WithDefault(true)),
lidel marked this conversation as resolved.
Show resolved Hide resolved
)

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return bsvc.Close()
},
})

return bsvc
}
}

// Pinning creates new pinner which tells GC which blocks should be kept
Expand Down
6 changes: 3 additions & 3 deletions core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part
func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
cacheOpts := blockstore.DefaultCacheOpts()
cacheOpts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize
cacheOpts.HasTwoQueueCacheSize = int(cfg.Datastore.BlockKeyCacheSize.WithDefault(config.DefaultBlockKeyCacheSize))
if !bcfg.Permanent {
cacheOpts.HasBloomFilterSize = 0
}
Expand All @@ -201,7 +202,7 @@ func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {
return fx.Options(
fx.Provide(RepoConfig),
fx.Provide(Datastore),
fx.Provide(BaseBlockstoreCtor(cacheOpts, cfg.Datastore.HashOnRead)),
fx.Provide(BaseBlockstoreCtor(cacheOpts, cfg.Datastore.HashOnRead, cfg.Datastore.WriteThrough.WithDefault(true))),
lidel marked this conversation as resolved.
Show resolved Hide resolved
finalBstore,
)
}
Expand Down Expand Up @@ -332,7 +333,6 @@ func Offline(cfg *config.Config) fx.Option {

// Core groups basic IPFS services
var Core = fx.Options(
fx.Provide(BlockService),
fx.Provide(Dag),
fx.Provide(FetcherConfig),
fx.Provide(PathResolverConfig),
Expand Down Expand Up @@ -387,7 +387,7 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option {
Identity(cfg),
IPNS,
Networked(bcfg, cfg, userResourceOverrides),

fx.Provide(BlockService(cfg)),
Core,
)
}
6 changes: 4 additions & 2 deletions core/node/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ func Datastore(repo repo.Repo) datastore.Datastore {
type BaseBlocks blockstore.Blockstore

// BaseBlockstoreCtor creates cached blockstore backed by the provided datastore
func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, hashOnRead bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, hashOnRead bool, writeThrough bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
return func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) {
// hash security
bs = blockstore.NewBlockstore(repo.Datastore())
bs = blockstore.NewBlockstore(repo.Datastore(),
blockstore.WriteThrough(writeThrough),
)
bs = &verifbs.VerifBS{Blockstore: bs}
bs, err = blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, cacheOpts)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions docs/changelogs/v0.33.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,25 @@ If you depended on removed ones, please fill an issue to add them to the upstrea

Onboarding files and directories with `ipfs add --to-files` now requires non-empty names. due to this, The `--to-files` and `--wrap` options are now mutually exclusive ([#10612](https://github.com/ipfs/kubo/issues/10612)).

#### New options for faster writes: `WriteThrough`, `BlockKeyCacheSize`, `BatchMaxNodes`, `BatchMaxSize`

Now that Kubo supports [`pebble`](../datastores.md#pebbleds) as a datastore backend, it becomes very useful to expose some additional configuration options for how the blockservice/blockstore/datastore combo behaves.

Usually, LSM-tree based datastore like Pebble or Badger have very fast write performance (blocks are streamed to disk) while incurring in read-amplification penalties (blocks need to be looked up in the index to know where they are on disk), specially noticiable on spinning disks.

Prior to this version, `BlockService` and `Blockstore` implementations performed a `Has(cid)` for every block that was going to be written, skipping the writes altogether if the block was already present in the datastore. The performance impact of this `Has()` call can vary. The `Datastore` implementation itself might include block-caching and things like bloom-filters to speed up lookups and mitigate read-penalties. Our `Blockstore` implementation also supports a bloom-filter (controlled by `BloomFilterSize` and disabled by default), and a two-queue cache for keys and block sizes. If we assume that most of the blocks added to Kubo are new blocks, not already present in the datastore, or that the datastore itself includes mechanisms to optimize writes and avoid writing the same data twice, the calls to `Has()` at both BlockService and Blockstore layers seem superflous to they point they even harm write performance.

For these reasons, from now on, the default is to use a "write-through" mode for the Blockservice and the Blockstore. We have added a new option `Datastore.WriteThrough`, which defaults to `true`. Previous behaviour can be obtained by manually setting it to `false`.

We have also made the size of the two-queue blockstore cache configurable with another option: `Datastore.BlockKeyCacheSize`, which defaults to `65536` (64KiB). Additionally, this caching layer can be disabled altogether by setting it to `0`. In particular, this option controls the size of a blockstore caching layer that records whether the blockstore has certain block and their sizes (but does not cache the contents, so it stays relativey small in general).

Finally, we have added two new options to the `Import` section to control the maximum size of write-batches: `BatchMaxNodes` and `BatchMaxSize`. These are set by default to `128` nodes and `20MiB`. Increasing them will batch more items together when importing data with `ipfs dag import`, which can speed things up. It is importance to find a balance between available memory (used to hold the batch), disk latencies (when writing the batch) and processing power (when preparing the batch, as nodes are sorted and duplicates removed).

As a reminder, details from all the options are explained in the [configuration documentation](../config.md).

We recommend users trying Pebble as a datastore backend to disable both blockstore bloom-filter and key caching layers and enable write through as a way to evaluate the raw performance of the underlying datastore, which includes its own bloom-filter and caching layers (default cache size is `8MiB` and can be configured in the [options](../datastores.md#pebbleds).


#### 📦️ Dependency updates

- update `boxo` to [v0.25.0](https://github.com/ipfs/boxo/releases/tag/v0.25.0)
Expand Down
58 changes: 58 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ config file at runtime.
- [`Datastore.GCPeriod`](#datastoregcperiod)
- [`Datastore.HashOnRead`](#datastorehashonread)
- [`Datastore.BloomFilterSize`](#datastorebloomfiltersize)
- [`Datastore.WriteTrhough`](#datastorewritethrough)
- [`Datastore.BlockKeyCacheSize`](#datastoreblockkeycachesize)
- [`Datastore.Spec`](#datastorespec)
- [`Discovery`](#discovery)
- [`Discovery.MDNS`](#discoverymdns)
Expand Down Expand Up @@ -176,6 +178,8 @@ config file at runtime.
- [`Import.UnixFSRawLeaves`](#importunixfsrawleaves)
- [`Import.UnixFSChunker`](#importunixfschunker)
- [`Import.HashFunction`](#importhashfunction)
- [`Import.BatchMaxNodes`](#importbatchmaxnodes)
- [`Import.BatchMaxSize`](#importbatchmaxsize)
- [`Version`](#version)
- [`Version.AgentSuffix`](#versionagentsuffix)
- [`Version.SwarmCheckEnabled`](#versionswarmcheckenabled)
Expand Down Expand Up @@ -629,10 +633,48 @@ we'd want to use 1199120 bytes. As of writing, [7 hash
functions](https://github.com/ipfs/go-ipfs-blockstore/blob/547442836ade055cc114b562a3cc193d4e57c884/caching.go#L22)
are used, so the constant `k` is 7 in the formula.

Enabling the BloomFilter can provide performance improvements specially when
responding to many requests for inexistant blocks. It however requires a full
sweep of all the datastore keys on daemon start. On very large datastores this
can be a very taxing operation, particulary if the datastore does not support
querying existing keys without reading their values at the same time (blocks).

Default: `0` (disabled)

Type: `integer` (non-negative, bytes)

### `Datastore.WriteThrough`

This option controls whether a block that already exist in the datastore
should be written to it. When set to `false`, a `Has()` call is performed
against the datastore prior to writing every block. If the block is already
stored, the write is skipped. This check happens both on the Blockservice and
the Blockstore layers and this setting affects both.

When set to `true`, no checks are performed and blocks are written to the
datastore, which depending on the implementation may perform its own checks.

This option can affect performance and the strategy should be taken in
conjunction with [`BlockKeyCacheSize`](#datastoreblockkeycachesize) and
[`BloomFilterSize`](#datastoreboomfiltersize`).

Default: `true`

Type: `bool`

### `Datastore.BlockKeyCacheSize`

A number representing the maximum size in bytes of the blockstore's Two-Queue
cache, which caches block-cids and their block-sizes. Use `0` to disable.

This cache, once primed, can greatly speed up operations like `ipfs repo stat`
as there is no need to read full blocks to know their sizes. Size should be
adjusted depending on the number of CIDs on disk (`NumObjects in `ipfs repo stat`).

Default: `65536` (64KiB)

Type: `optionalInteger` (non-negative, bytes)

### `Datastore.Spec`

Spec defines the structure of the ipfs datastore. It is a composable structure,
Expand Down Expand Up @@ -2421,6 +2463,22 @@ Default: `sha2-256`

Type: `optionalString`

### `Import.BatchMaxNodes

The maximum number of nodes in a write-batch. The total size of the batch is limited by `BatchMaxnodes` and `BatchMaxSize`.

Default: `128`

Type: `optionalInteger`

### `Import.BatchMaxSize`

The maximum size of a single write-batch (computed as the sum of the sizes of the blocks). The total size of the batch is limited by `BatchMaxnodes` and `BatchMaxSize`.

Default: `20971520` (20MiB)

Type: `optionalInteger`

## `Version`

Options to configure agent version announced to the swarm, and leveraging
Expand Down
Loading
Loading