Skip to content

Commit

Permalink
Add read/write of tags to fileset to restore tags, add fs index boots…
Browse files Browse the repository at this point in the history
…trapping (#590)
  • Loading branch information
robskillington authored May 8, 2018
1 parent 76f42b9 commit 5a17284
Show file tree
Hide file tree
Showing 91 changed files with 2,285 additions and 712 deletions.
58 changes: 30 additions & 28 deletions docs/architecture/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Overview

The primary unit of long-term storage for M3DB are fileset files which store compressed streams of time series values, one per shard block time window size.
The primary unit of long-term storage for M3DB are fileset files which store compressed streams of time series values, one per shard block time window size.

They are flushed to disk after a block time window becomes unreachable, that is the end of the time window for which that block can no longer be written to. If a process is killed before it has a chance to flush the data for the current time window to disk it must be restored from the commit log (or a peer that is responsible for the same shard if replication factor is larger than 1.)

Expand All @@ -12,43 +12,45 @@ A fileset has the following files:

* **Info file:** Stores the block time window start and size and other important metadata about the fileset volume.
* **Summaries file:** Stores a subset of the index file for purposes of keeping the contents in memory and jumping to section of the index file that within a few pages of linear scanning can find the series that is being looked up.
* **Index file:** Stores the series metadata and location of compressed stream in the data file for retrieval.
* **Index file:** Stores the series metadata, including tags if indexing is enabled, and location of compressed stream in the data file for retrieval.
* **Data file:** Stores the series compressed data streams.
* **Bloom filter file:** Stores a bloom filter bitset of all series contained in this fileset for quick knowledge of whether to attempt retrieving a series for this fileset volume.
* **Digests file:** Stores the digest checksums of the info file, summaries file, index file, data file and bloom filter file in the fileset volume for integrity verification.
* **Checkpoint file:** Stores a digest of the digests file and written at the succesful completion of a fileset volume being persisted, allows for quickly checking if a volume was completed.

```
┌─────────────────────┐
┌─────────────────────┐ ┌─────────────────────┐ │ Index File │
│ Info File │ │ Summaries File │ │ (sorted by ID) │
├─────────────────────┤ │ (sorted by ID) │ ├─────────────────────┤
│- Block Start │ ├─────────────────────┤ ┌─>│- Idx │
│- Block Size │ │- Idx │ │ │- ID │
│- Entries (Num) │ │- ID │ │ │- Size │
│- Major Version │ │- Index Entry Offset ├──┘ │- Checksum │
┌─────────────────────┐
┌─────────────────────┐ ┌─────────────────────┐ │ Index File │
│ Info File │ │ Summaries File │ │ (sorted by ID) │
├─────────────────────┤ │ (sorted by ID) │ ├─────────────────────┤
│- Block Start │ ├─────────────────────┤ ┌─>│- Idx │
│- Block Size │ │- Idx │ │ │- ID │
│- Entries (Num) │ │- ID │ │ │- Size │
│- Major Version │ │- Index Entry Offset ├──┘ │- Checksum │
│- Summaries (Num) │ └─────────────────────┘ │- Data Entry Offset ├──┐
│- BloomFilter (K/M) │ └─────────────────────┘ │
│- BloomFilter (K/M) │ │- Encoded Tags | |
│- Snapshot Time │ └─────────────────────┘ │
│- Type (Flush/Snap) │ │
└─────────────────────┘ │
┌─────────────────────┐ ┌───────────────────────────┘
┌─────────────────────┐ │ Bloom Filter File │ │
│ Digests File │ ├─────────────────────┤ │ ┌─────────────────────┐
├─────────────────────┤ │- Bitset │ │ │ Data File │
│- Info file digest │ └─────────────────────┘ │ ├─────────────────────┤
│- Summaries digest │ │ │List of: │
│- Index digest │ └─>│ - Marker (16 bytes)│
│- Data digest │ │ - ID │
│- Bloom filter digest│ │ - Data (size bytes)│
└─────────────────────┘ └─────────────────────┘
┌─────────────────────┐
│ Checkpoint File │
├─────────────────────┤
│- Digests digest │
└─────────────────────┘
┌─────────────────────┐ │ Bloom Filter File │ │
│ Digests File │ ├─────────────────────┤ │ ┌─────────────────────┐
├─────────────────────┤ │- Bitset │ │ │ Data File │
│- Info file digest │ └─────────────────────┘ │ ├─────────────────────┤
│- Summaries digest │ │ │List of: │
│- Index digest │ └─>│ - Marker (16 bytes)│
│- Data digest │ │ - ID │
│- Bloom filter digest│ │ - Data (size bytes)│
└─────────────────────┘ └─────────────────────┘
┌─────────────────────┐
│ Checkpoint File │
├─────────────────────┤
│- Digests digest │
└─────────────────────┘
```

In the diagram above you can see that the data file stores compressed blocks for a given shard / block start combination. The index file (which is sorted by ID and thus can be binary searched or scanned) can be used to find the offset of a specific ID.

Fileset files will be kept for every shard / block start combination that is within the retention period. Once the files fall out of the period defined in the configurable namespace retention period they will be deleted.
Fileset files will be kept for every shard / block start combination that is within the retention period. Once the files fall out of the period defined in the configurable namespace retention period they will be deleted.
3 changes: 2 additions & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions integration/admin_session_fetch_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func TestAdminSessionFetchBlocksFromPeers(t *testing.T) {
now := testSetup.getNowFn()
seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
inputData := []generate.BlockConfig{
{[]string{"foo", "bar"}, 100, now},
{[]string{"foo", "baz"}, 50, now.Add(blockSize)},
{IDs: []string{"foo", "bar"}, NumPoints: 100, Start: now},
{IDs: []string{"foo", "baz"}, NumPoints: 50, Start: now.Add(blockSize)},
}
for _, input := range inputData {
start := input.Start
Expand Down
4 changes: 2 additions & 2 deletions integration/cluster_add_one_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ func TestClusterAddOneNode(t *testing.T) {
now := setups[0].getNowFn()
blockSize := namesp.Options().RetentionOptions().BlockSize()
seriesMaps := generate.BlocksByStart([]generate.BlockConfig{
{[]string{ids[0].str, ids[1].str}, 180, now.Add(-blockSize)},
{[]string{ids[0].str, ids[2].str}, 90, now},
{IDs: []string{ids[0].str, ids[1].str}, NumPoints: 180, Start: now.Add(-blockSize)},
{IDs: []string{ids[0].str, ids[2].str}, NumPoints: 90, Start: now},
})
err = writeTestDataToDisk(namesp, setups[0], seriesMaps)
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions integration/commitlog_bootstrap_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ func TestCommitLogAndFSMergeBootstrap(t *testing.T) {
t3 = t2.Add(ns1BlockSize)
)
blockConfigs := []generate.BlockConfig{
{[]string{"foo", "bar"}, 20, t0},
{[]string{"nah", "baz"}, 50, t1},
{[]string{"hax", "ord"}, 30, t2},
{IDs: []string{"foo", "bar"}, NumPoints: 20, Start: t0},
{IDs: []string{"nah", "baz"}, NumPoints: 50, Start: t1},
{IDs: []string{"hax", "ord"}, NumPoints: 30, Start: t2},
}
log.Info("generating data")
seriesMaps := generate.BlocksByStart(blockConfigs)
Expand Down
14 changes: 7 additions & 7 deletions integration/commitlog_bootstrap_multi_ns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func TestCommitLogBootstrapMultipleNamespaces(t *testing.T) {
log.Info("generating data - ns1")
now := setup.getNowFn()
ns1SeriesMap := generate.BlocksByStart([]generate.BlockConfig{
{[]string{"foo", "bar"}, 20, now.Add(ns1BlockSize)},
{[]string{"bar", "baz"}, 50, now.Add(2 * ns1BlockSize)},
{[]string{"and", "one"}, 40, now.Add(3 * ns1BlockSize)},
{IDs: []string{"foo", "bar"}, NumPoints: 20, Start: now.Add(ns1BlockSize)},
{IDs: []string{"bar", "baz"}, NumPoints: 50, Start: now.Add(2 * ns1BlockSize)},
{IDs: []string{"and", "one"}, NumPoints: 40, Start: now.Add(3 * ns1BlockSize)},
})

setup.namespaceMetadataOrFail(testNamespaces[0])
Expand All @@ -89,10 +89,10 @@ func TestCommitLogBootstrapMultipleNamespaces(t *testing.T) {
// Write test data for ns2
log.Info("generating data - ns2")
ns2SeriesMap := generate.BlocksByStart([]generate.BlockConfig{
{[]string{"abc", "def"}, 20, now.Add(ns2BlockSize)},
{[]string{"xyz", "lmn"}, 50, now.Add(2 * ns2BlockSize)},
{[]string{"cat", "hax"}, 80, now.Add(3 * ns2BlockSize)},
{[]string{"why", "this"}, 40, now.Add(4 * ns2BlockSize)},
{IDs: []string{"abc", "def"}, NumPoints: 20, Start: now.Add(ns2BlockSize)},
{IDs: []string{"xyz", "lmn"}, NumPoints: 50, Start: now.Add(2 * ns2BlockSize)},
{IDs: []string{"cat", "hax"}, NumPoints: 80, Start: now.Add(3 * ns2BlockSize)},
{IDs: []string{"why", "this"}, NumPoints: 40, Start: now.Add(4 * ns2BlockSize)},
})
setup.namespaceMetadataOrFail(testNamespaces[1])
log.Info("writing data - ns2")
Expand Down
20 changes: 19 additions & 1 deletion integration/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,15 @@ import (

type readableSeries struct {
ID string
Tags []readableSeriesTag
Data []ts.Datapoint
}

type readableSeriesTag struct {
Name string
Value string
}

type readableSeriesList []readableSeries

func toDatapoints(fetched *rpc.FetchResult_) []ts.Datapoint {
Expand Down Expand Up @@ -77,6 +83,7 @@ func verifySeriesMapForRange(
require.NoError(t, err)
actual[i] = generate.Series{
ID: s.ID,
Tags: s.Tags,
Data: fetched,
}
}
Expand All @@ -97,7 +104,18 @@ func writeVerifyDebugOutput(t *testing.T, filePath string, start, end time.Time,

list := make(readableSeriesList, 0, len(series))
for i := range series {
list = append(list, readableSeries{ID: series[i].ID.String(), Data: series[i].Data})
tags := make([]readableSeriesTag, len(series[i].Tags))
for _, tag := range series[i].Tags {
tags = append(tags, readableSeriesTag{
Name: tag.Name.String(),
Value: tag.Value.String(),
})
}
list = append(list, readableSeries{
ID: series[i].ID.String(),
Tags: tags,
Data: series[i].Data,
})
}

data, err := json.MarshalIndent(struct {
Expand Down
7 changes: 6 additions & 1 deletion integration/disk_flush_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/m3db/m3db/storage"
"github.com/m3db/m3db/ts"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/ident/testutil"
xtime "github.com/m3db/m3x/time"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -138,7 +139,10 @@ func verifyForTime(
}
require.NoError(t, reader.Open(rOpts))
for i := 0; i < reader.Entries(); i++ {
id, data, _, err := reader.Read()
id, tagsIter, data, _, err := reader.Read()
require.NoError(t, err)

tags, err := testutil.NewTagsFromTagIterator(tagsIter)
require.NoError(t, err)

data.IncRef()
Expand All @@ -155,6 +159,7 @@ func verifyForTime(

actual = append(actual, generate.Series{
ID: id,
Tags: tags,
Data: datapoints,
})

Expand Down
6 changes: 3 additions & 3 deletions integration/disk_flush_multi_ns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ func TestDiskFlushMultipleNamespace(t *testing.T) {
// test data for ns1
ns1SeriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
ns1InputData := []generate.BlockConfig{
{[]string{"foo", "bar"}, 100, now},
{[]string{"foo", "baz"}, 50, now.Add(ns1BlockSize)},
{IDs: []string{"foo", "bar"}, NumPoints: 100, Start: now},
{IDs: []string{"foo", "baz"}, NumPoints: 50, Start: now.Add(ns1BlockSize)},
}

// test data for ns2
ns2SeriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
ns2InputData := []generate.BlockConfig{
{[]string{"foo", "bar"}, 20, now},
{IDs: []string{"foo", "bar"}, NumPoints: 20, Start: now},
}

for _, ns1Input := range ns1InputData {
Expand Down
4 changes: 2 additions & 2 deletions integration/disk_flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func TestDiskFlushSimple(t *testing.T) {
now := testSetup.getNowFn()
seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
inputData := []generate.BlockConfig{
{[]string{"foo", "bar"}, 100, now},
{[]string{"foo", "baz"}, 50, now.Add(blockSize)},
{IDs: []string{"foo", "bar"}, NumPoints: 100, Start: now},
{IDs: []string{"foo", "baz"}, NumPoints: 50, Start: now.Add(blockSize)},
}
for _, input := range inputData {
testSetup.setNowFn(input.Start)
Expand Down
2 changes: 1 addition & 1 deletion integration/disk_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestDiskSnapshotSimple(t *testing.T) {
now := testSetup.getNowFn()
seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
inputData := []generate.BlockConfig{
{[]string{"foo", "bar", "baz"}, 100, now},
{IDs: []string{"foo", "bar", "baz"}, NumPoints: 100, Start: now},
}
for _, input := range inputData {
testSetup.setNowFn(input.Start)
Expand Down
4 changes: 2 additions & 2 deletions integration/dynamic_namespace_add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func TestDynamicNamespaceAdd(t *testing.T) {
now := testSetup.getNowFn()
seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
inputData := []generate.BlockConfig{
{[]string{"foo", "bar"}, 100, now},
{[]string{"foo", "baz"}, 50, now.Add(blockSize)},
{IDs: []string{"foo", "bar"}, NumPoints: 100, Start: now},
{IDs: []string{"foo", "baz"}, NumPoints: 50, Start: now.Add(blockSize)},
}
for _, input := range inputData {
start := input.Start
Expand Down
4 changes: 2 additions & 2 deletions integration/dynamic_namespace_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ func TestDynamicNamespaceDelete(t *testing.T) {
now := testSetup.getNowFn()
seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
inputData := []generate.BlockConfig{
{[]string{"foo", "bar"}, 100, now},
{[]string{"foo", "baz"}, 50, now.Add(blockSize)},
{IDs: []string{"foo", "bar"}, NumPoints: 100, Start: now},
{IDs: []string{"foo", "baz"}, NumPoints: 50, Start: now.Add(blockSize)},
}
for _, input := range inputData {
start := input.Start
Expand Down
Loading

0 comments on commit 5a17284

Please sign in to comment.