Skip to content

Commit

Permalink
nsqd: new metadata filename without ID
Browse files Browse the repository at this point in the history
symlink old metadata filename to new
when loading, if both exist, ensure they match
this makes rollback possible without losing messages
(when rolling back forward, some manual intervention is required)

on windows, Symlink() needs Administrator privs,
so just write plain old metadata file

includes tests
  • Loading branch information
ploxiln committed Jan 28, 2017
1 parent 34c7eb7 commit 3b4b6a5
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 21 deletions.
7 changes: 5 additions & 2 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,11 @@ func (p *program) Start() error {
options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.New(opts)

nsqd.LoadMetadata()
err := nsqd.PersistMetadata()
err := nsqd.LoadMetadata()
if err != nil {
log.Fatalf("ERROR: %s", err.Error())
}
err = nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
Expand Down
108 changes: 91 additions & 17 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nsqd

import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
Expand All @@ -12,6 +13,7 @@ import (
"net"
"os"
"path"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -268,24 +270,73 @@ type meta struct {
} `json:"topics"`
}

func (n *NSQD) LoadMetadata() {
atomic.StoreInt32(&n.isLoading, 1)
defer atomic.StoreInt32(&n.isLoading, 0)
func newMetadataFile(opts *Options) string {
return path.Join(opts.DataPath, "nsqd.dat")
}

fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
func oldMetadataFile(opts *Options) string {
return path.Join(opts.DataPath, fmt.Sprintf("nsqd.%d.dat", opts.ID))
}

func readOrEmpty(fn string) ([]byte, error) {
data, err := ioutil.ReadFile(fn)
if err != nil {
if !os.IsNotExist(err) {
n.logf("ERROR: failed to read channel metadata from %s - %s", fn, err)
return nil, fmt.Errorf("failed to read metadata from %s - %s", fn, err)
}
}
return data, nil
}

func writeSyncFile(fn string, data []byte) error {
f, err := os.OpenFile(fn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}

_, err = f.Write(data)
if err == nil {
err = f.Sync()
}
f.Close()
return err
}

func (n *NSQD) LoadMetadata() error {
atomic.StoreInt32(&n.isLoading, 1)
defer atomic.StoreInt32(&n.isLoading, 0)

fn := newMetadataFile(n.getOpts())
// old metadata filename with ID, maintained in parallel to enable roll-back
fnID := oldMetadataFile(n.getOpts())

data, err := readOrEmpty(fn)
if err != nil {
return err
}
dataID, errID := readOrEmpty(fnID)
if errID != nil {
return errID
}

if data == nil && dataID == nil {
return nil // fresh start
}
if data != nil && dataID != nil {
if bytes.Compare(data, dataID) != 0 {
return fmt.Errorf("metadata in %s and %s do not match (delete one)", fn, fnID)
}
return
}
if data == nil {
// only old metadata file exists, use it
fn = fnID
data = dataID
}

var m meta
err = json.Unmarshal(data, &m)
if err != nil {
n.logf("ERROR: failed to parse metadata - %s", err)
return
return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
}

for _, t := range m.Topics {
Expand All @@ -309,12 +360,15 @@ func (n *NSQD) LoadMetadata() {
}
}
}
return nil
}

func (n *NSQD) PersistMetadata() error {
// persist metadata about what topics/channels we have
// so that upon restart we can get back to the same state
fileName := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
// persist metadata about what topics/channels we have, across restarts
fileName := newMetadataFile(n.getOpts())
// old metadata filename with ID, maintained in parallel to enable roll-back
fileNameID := oldMetadataFile(n.getOpts())

n.logf("NSQ: persisting topic/channel metadata to %s", fileName)

js := make(map[string]interface{})
Expand Down Expand Up @@ -353,23 +407,43 @@ func (n *NSQD) PersistMetadata() error {
}

tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
f, err := os.OpenFile(tmpFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)

err = writeSyncFile(tmpFileName, data)
if err != nil {
return err
}
err = os.Rename(tmpFileName, fileName)
if err != nil {
return err
}
// technically should fsync DataPath here

_, err = f.Write(data)
stat, err := os.Lstat(fileNameID)
if err == nil && stat.Mode()&os.ModeSymlink != 0 {
return nil
}

// if no symlink (yet), race condition:
// crash right here may cause next startup to see metadata conflict and abort

tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int())

if runtime.GOOS != "windows" {
err = os.Symlink(fileName, tmpFileNameID)
} else {
// on Windows need Administrator privs to Symlink
// instead write copy every time
err = writeSyncFile(tmpFileNameID, data)
}
if err != nil {
f.Close()
return err
}
f.Sync()
f.Close()

err = os.Rename(tmpFileName, fileName)
err = os.Rename(tmpFileNameID, fileNameID)
if err != nil {
return err
}
// technically should fsync DataPath here

return nil
}
Expand Down
103 changes: 101 additions & 2 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"io/ioutil"
"net"
"os"
"path"
"strconv"
"sync/atomic"
"testing"
Expand All @@ -24,7 +23,7 @@ const (
)

func getMetadata(n *NSQD) (*meta, error) {
fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
fn := newMetadataFile(n.getOpts())
data, err := ioutil.ReadFile(fn)
if err != nil {
return nil, err
Expand Down Expand Up @@ -158,6 +157,106 @@ func TestStartup(t *testing.T) {
<-doneExitChan
}

func TestMetadataMigrate(t *testing.T) {
old_meta := `
{
"topics": [
{
"channels": [
{"name": "c1", "paused": false},
{"name": "c2", "paused": false}
],
"name": "t1",
"paused": false
}
],
"version": "1.0.0-alpha"
}`

tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)

opts := NewOptions()
opts.DataPath = tmpDir
opts.Logger = test.NewTestLogger(t)

oldFn := oldMetadataFile(opts)
err = ioutil.WriteFile(oldFn, []byte(old_meta), 0600)
if err != nil {
panic(err)
}

_, _, nsqd := mustStartNSQD(opts)
err = nsqd.LoadMetadata()
test.Nil(t, err)
err = nsqd.PersistMetadata()
test.Nil(t, err)
nsqd.Exit()

oldFi, err := os.Lstat(oldFn)
test.Nil(t, err)
test.Equal(t, oldFi.Mode()&os.ModeType, os.ModeSymlink)

_, _, nsqd = mustStartNSQD(opts)
err = nsqd.LoadMetadata()
test.Nil(t, err)

t1, err := nsqd.GetExistingTopic("t1")
test.Nil(t, err)
test.NotNil(t, t1)
c2, err := t1.GetExistingChannel("c2")
test.Nil(t, err)
test.NotNil(t, c2)

nsqd.Exit()
}

func TestMetadataConflict(t *testing.T) {
old_meta := `
{
"topics": [{
"name": "t1", "paused": false,
"channels": [{"name": "c1", "paused": false}]
}],
"version": "1.0.0-alpha"
}`
new_meta := `
{
"topics": [{
"name": "t2", "paused": false,
"channels": [{"name": "c2", "paused": false}]
}],
"version": "1.0.0-alpha"
}`

tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)

opts := NewOptions()
opts.DataPath = tmpDir
opts.Logger = test.NewTestLogger(t)

err = ioutil.WriteFile(oldMetadataFile(opts), []byte(old_meta), 0600)
if err != nil {
panic(err)
}
err = ioutil.WriteFile(newMetadataFile(opts), []byte(new_meta), 0600)
if err != nil {
panic(err)
}

_, _, nsqd := mustStartNSQD(opts)
err = nsqd.LoadMetadata()
test.NotNil(t, err)
nsqd.Exit()
}

func TestEphemeralTopicsAndChannels(t *testing.T) {
// ephemeral topics/channels are lazily removed after the last channel/client is removed
opts := NewOptions()
Expand Down

0 comments on commit 3b4b6a5

Please sign in to comment.