Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add read/write of tags to fileset to restore tags, add fs index bootstrapping #590

Merged
merged 25 commits into from
May 8, 2018
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 30 additions & 28 deletions docs/architecture/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Overview

The primary unit of long-term storage for M3DB are fileset files which store compressed streams of time series values, one per shard block time window size.
The primary unit of long-term storage for M3DB are fileset files which store compressed streams of time series values, one per shard block time window size.

They are flushed to disk after a block time window becomes unreachable, that is the end of the time window for which that block can no longer be written to. If a process is killed before it has a chance to flush the data for the current time window to disk it must be restored from the commit log (or a peer that is responsible for the same shard if replication factor is larger than 1.)

Expand All @@ -12,43 +12,45 @@ A fileset has the following files:

* **Info file:** Stores the block time window start and size and other important metadata about the fileset volume.
* **Summaries file:** Stores a subset of the index file for purposes of keeping the contents in memory and jumping to section of the index file that within a few pages of linear scanning can find the series that is being looked up.
* **Index file:** Stores the series metadata and location of compressed stream in the data file for retrieval.
* **Index file:** Stores the series metadata, including tags if indexing is enabled, and location of compressed stream in the data file for retrieval.
* **Data file:** Stores the series compressed data streams.
* **Bloom filter file:** Stores a bloom filter bitset of all series contained in this fileset for quick knowledge of whether to attempt retrieving a series for this fileset volume.
* **Digests file:** Stores the digest checksums of the info file, summaries file, index file, data file and bloom filter file in the fileset volume for integrity verification.
* **Checkpoint file:** Stores a digest of the digests file and written at the succesful completion of a fileset volume being persisted, allows for quickly checking if a volume was completed.

```
┌─────────────────────┐
┌─────────────────────┐ ┌─────────────────────┐ │ Index File │
│ Info File │ │ Summaries File │ │ (sorted by ID) │
├─────────────────────┤ │ (sorted by ID) │ ├─────────────────────┤
│- Block Start │ ├─────────────────────┤ ┌─>│- Idx │
│- Block Size │ │- Idx │ │ │- ID │
│- Entries (Num) │ │- ID │ │ │- Size │
│- Major Version │ │- Index Entry Offset ├──┘ │- Checksum │
┌─────────────────────┐
┌─────────────────────┐ ┌─────────────────────┐ │ Index File │
│ Info File │ │ Summaries File │ │ (sorted by ID) │
Copy link
Contributor

@richardartoul richardartoul May 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're updating this, can you add the new fields I added recently:

type IndexInfo struct {
	MajorVersion int64
	BlockStart   int64
	BlockSize    int64
	Entries      int64
	Summaries    IndexSummariesInfo
	BloomFilter  IndexBloomFilterInfo
	SnapshotTime int64
	FileType     persist.FileSetType
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

├─────────────────────┤ │ (sorted by ID) │ ├─────────────────────┤
│- Block Start │ ├─────────────────────┤ ┌─>│- Idx │
│- Block Size │ │- Idx │ │ │- ID │
│- Entries (Num) │ │- ID │ │ │- Size │
│- Major Version │ │- Index Entry Offset ├──┘ │- Checksum │
│- Summaries (Num) │ └─────────────────────┘ │- Data Entry Offset ├──┐
│- BloomFilter (K/M) │ └─────────────────────┘ │
│- BloomFilter (K/M) │ │- Encoded Tags | |
│- Snapshot Time │ └─────────────────────┘ │
│- Type (Flush/Snap) │ │
└─────────────────────┘ │
┌─────────────────────┐ ┌───────────────────────────┘
┌─────────────────────┐ │ Bloom Filter File │ │
│ Digests File │ ├─────────────────────┤ │ ┌─────────────────────┐
├─────────────────────┤ │- Bitset │ │ │ Data File │
│- Info file digest │ └─────────────────────┘ │ ├─────────────────────┤
│- Summaries digest │ │ │List of: │
│- Index digest │ └─>│ - Marker (16 bytes)│
│- Data digest │ │ - ID │
│- Bloom filter digest│ │ - Data (size bytes)│
└─────────────────────┘ └─────────────────────┘

┌─────────────────────┐
│ Checkpoint File │
├─────────────────────┤
│- Digests digest │
└─────────────────────┘

┌─────────────────────┐ │ Bloom Filter File │ │
│ Digests File │ ├─────────────────────┤ │ ┌─────────────────────┐
├─────────────────────┤ │- Bitset │ │ │ Data File │
│- Info file digest │ └─────────────────────┘ │ ├─────────────────────┤
│- Summaries digest │ │ │List of: │
│- Index digest │ └─>│ - Marker (16 bytes)│
│- Data digest │ │ - ID │
│- Bloom filter digest│ │ - Data (size bytes)│
└─────────────────────┘ └─────────────────────┘

┌─────────────────────┐
│ Checkpoint File │
├─────────────────────┤
│- Digests digest │
└─────────────────────┘
```

In the diagram above you can see that the data file stores compressed blocks for a given shard / block start combination. The index file (which is sorted by ID and thus can be binary searched or scanned) can be used to find the offset of a specific ID.

Fileset files will be kept for every shard / block start combination that is within the retention period. Once the files fall out of the period defined in the configurable namespace retention period they will be deleted.
Fileset files will be kept for every shard / block start combination that is within the retention period. Once the files fall out of the period defined in the configurable namespace retention period they will be deleted.
3 changes: 2 additions & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions integration/admin_session_fetch_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func TestAdminSessionFetchBlocksFromPeers(t *testing.T) {
now := testSetup.getNowFn()
seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
inputData := []generate.BlockConfig{
{[]string{"foo", "bar"}, 100, now},
{[]string{"foo", "baz"}, 50, now.Add(blockSize)},
{IDs: []string{"foo", "bar"}, NumPoints: 100, Start: now},
{IDs: []string{"foo", "baz"}, NumPoints: 50, Start: now.Add(blockSize)},
}
for _, input := range inputData {
start := input.Start
Expand Down
4 changes: 2 additions & 2 deletions integration/cluster_add_one_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ func TestClusterAddOneNode(t *testing.T) {
now := setups[0].getNowFn()
blockSize := namesp.Options().RetentionOptions().BlockSize()
seriesMaps := generate.BlocksByStart([]generate.BlockConfig{
{[]string{ids[0].str, ids[1].str}, 180, now.Add(-blockSize)},
{[]string{ids[0].str, ids[2].str}, 90, now},
{IDs: []string{ids[0].str, ids[1].str}, NumPoints: 180, Start: now.Add(-blockSize)},
{IDs: []string{ids[0].str, ids[2].str}, NumPoints: 90, Start: now},
})
err = writeTestDataToDisk(namesp, setups[0], seriesMaps)
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions integration/commitlog_bootstrap_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ func TestCommitLogAndFSMergeBootstrap(t *testing.T) {
t3 = t2.Add(ns1BlockSize)
)
blockConfigs := []generate.BlockConfig{
{[]string{"foo", "bar"}, 20, t0},
{[]string{"nah", "baz"}, 50, t1},
{[]string{"hax", "ord"}, 30, t2},
{IDs: []string{"foo", "bar"}, NumPoints: 20, Start: t0},
{IDs: []string{"nah", "baz"}, NumPoints: 50, Start: t1},
{IDs: []string{"hax", "ord"}, NumPoints: 30, Start: t2},
}
log.Info("generating data")
seriesMaps := generate.BlocksByStart(blockConfigs)
Expand Down
14 changes: 7 additions & 7 deletions integration/commitlog_bootstrap_multi_ns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func TestCommitLogBootstrapMultipleNamespaces(t *testing.T) {
log.Info("generating data - ns1")
now := setup.getNowFn()
ns1SeriesMap := generate.BlocksByStart([]generate.BlockConfig{
{[]string{"foo", "bar"}, 20, now.Add(ns1BlockSize)},
{[]string{"bar", "baz"}, 50, now.Add(2 * ns1BlockSize)},
{[]string{"and", "one"}, 40, now.Add(3 * ns1BlockSize)},
{IDs: []string{"foo", "bar"}, NumPoints: 20, Start: now.Add(ns1BlockSize)},
{IDs: []string{"bar", "baz"}, NumPoints: 50, Start: now.Add(2 * ns1BlockSize)},
{IDs: []string{"and", "one"}, NumPoints: 40, Start: now.Add(3 * ns1BlockSize)},
})

setup.namespaceMetadataOrFail(testNamespaces[0])
Expand All @@ -89,10 +89,10 @@ func TestCommitLogBootstrapMultipleNamespaces(t *testing.T) {
// Write test data for ns2
log.Info("generating data - ns2")
ns2SeriesMap := generate.BlocksByStart([]generate.BlockConfig{
{[]string{"abc", "def"}, 20, now.Add(ns2BlockSize)},
{[]string{"xyz", "lmn"}, 50, now.Add(2 * ns2BlockSize)},
{[]string{"cat", "hax"}, 80, now.Add(3 * ns2BlockSize)},
{[]string{"why", "this"}, 40, now.Add(4 * ns2BlockSize)},
{IDs: []string{"abc", "def"}, NumPoints: 20, Start: now.Add(ns2BlockSize)},
{IDs: []string{"xyz", "lmn"}, NumPoints: 50, Start: now.Add(2 * ns2BlockSize)},
{IDs: []string{"cat", "hax"}, NumPoints: 80, Start: now.Add(3 * ns2BlockSize)},
{IDs: []string{"why", "this"}, NumPoints: 40, Start: now.Add(4 * ns2BlockSize)},
})
setup.namespaceMetadataOrFail(testNamespaces[1])
log.Info("writing data - ns2")
Expand Down
20 changes: 19 additions & 1 deletion integration/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,15 @@ import (

type readableSeries struct {
ID string
Tags []readableSeriesTag
Data []ts.Datapoint
}

type readableSeriesTag struct {
Name string
Value string
}

type readableSeriesList []readableSeries

func toDatapoints(fetched *rpc.FetchResult_) []ts.Datapoint {
Expand Down Expand Up @@ -77,6 +83,7 @@ func verifySeriesMapForRange(
require.NoError(t, err)
actual[i] = generate.Series{
ID: s.ID,
Tags: s.Tags,
Data: fetched,
}
}
Expand All @@ -97,7 +104,18 @@ func writeVerifyDebugOutput(t *testing.T, filePath string, start, end time.Time,

list := make(readableSeriesList, 0, len(series))
for i := range series {
list = append(list, readableSeries{ID: series[i].ID.String(), Data: series[i].Data})
tags := make([]readableSeriesTag, len(series[i].Tags))
for _, tag := range series[i].Tags {
tags = append(tags, readableSeriesTag{
Name: tag.Name.String(),
Value: tag.Value.String(),
})
}
list = append(list, readableSeries{
ID: series[i].ID.String(),
Tags: tags,
Data: series[i].Data,
})
}

data, err := json.MarshalIndent(struct {
Expand Down
7 changes: 6 additions & 1 deletion integration/disk_flush_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/m3db/m3db/storage"
"github.com/m3db/m3db/ts"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/ident/testutil"
xtime "github.com/m3db/m3x/time"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -138,7 +139,10 @@ func verifyForTime(
}
require.NoError(t, reader.Open(rOpts))
for i := 0; i < reader.Entries(); i++ {
id, data, _, err := reader.Read()
id, tagsIter, data, _, err := reader.Read()
require.NoError(t, err)

tags, err := testutil.NewTagsFromTagIterator(tagsIter)
require.NoError(t, err)

data.IncRef()
Expand All @@ -155,6 +159,7 @@ func verifyForTime(

actual = append(actual, generate.Series{
ID: id,
Tags: tags,
Data: datapoints,
})

Expand Down
6 changes: 3 additions & 3 deletions integration/disk_flush_multi_ns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ func TestDiskFlushMultipleNamespace(t *testing.T) {
// test data for ns1
ns1SeriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
ns1InputData := []generate.BlockConfig{
{[]string{"foo", "bar"}, 100, now},
{[]string{"foo", "baz"}, 50, now.Add(ns1BlockSize)},
{IDs: []string{"foo", "bar"}, NumPoints: 100, Start: now},
{IDs: []string{"foo", "baz"}, NumPoints: 50, Start: now.Add(ns1BlockSize)},
}

// test data for ns2
ns2SeriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
ns2InputData := []generate.BlockConfig{
{[]string{"foo", "bar"}, 20, now},
{IDs: []string{"foo", "bar"}, NumPoints: 20, Start: now},
}

for _, ns1Input := range ns1InputData {
Expand Down
4 changes: 2 additions & 2 deletions integration/disk_flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func TestDiskFlushSimple(t *testing.T) {
now := testSetup.getNowFn()
seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
inputData := []generate.BlockConfig{
{[]string{"foo", "bar"}, 100, now},
{[]string{"foo", "baz"}, 50, now.Add(blockSize)},
{IDs: []string{"foo", "bar"}, NumPoints: 100, Start: now},
{IDs: []string{"foo", "baz"}, NumPoints: 50, Start: now.Add(blockSize)},
}
for _, input := range inputData {
testSetup.setNowFn(input.Start)
Expand Down
2 changes: 1 addition & 1 deletion integration/disk_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestDiskSnapshotSimple(t *testing.T) {
now := testSetup.getNowFn()
seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
inputData := []generate.BlockConfig{
{[]string{"foo", "bar", "baz"}, 100, now},
{IDs: []string{"foo", "bar", "baz"}, NumPoints: 100, Start: now},
}
for _, input := range inputData {
testSetup.setNowFn(input.Start)
Expand Down
4 changes: 2 additions & 2 deletions integration/dynamic_namespace_add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func TestDynamicNamespaceAdd(t *testing.T) {
now := testSetup.getNowFn()
seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
inputData := []generate.BlockConfig{
{[]string{"foo", "bar"}, 100, now},
{[]string{"foo", "baz"}, 50, now.Add(blockSize)},
{IDs: []string{"foo", "bar"}, NumPoints: 100, Start: now},
{IDs: []string{"foo", "baz"}, NumPoints: 50, Start: now.Add(blockSize)},
}
for _, input := range inputData {
start := input.Start
Expand Down
4 changes: 2 additions & 2 deletions integration/dynamic_namespace_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ func TestDynamicNamespaceDelete(t *testing.T) {
now := testSetup.getNowFn()
seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
inputData := []generate.BlockConfig{
{[]string{"foo", "bar"}, 100, now},
{[]string{"foo", "baz"}, 50, now.Add(blockSize)},
{IDs: []string{"foo", "bar"}, NumPoints: 100, Start: now},
{IDs: []string{"foo", "baz"}, NumPoints: 50, Start: now.Add(blockSize)},
}
for _, input := range inputData {
start := input.Start
Expand Down
Loading