Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: remove --worker-id, replace with --node-id #844

Merged
merged 4 commits into from
Feb 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.Bool("verbose", false, "enable verbose logging")
flagSet.String("config", "", "path to config file")
flagSet.String("log-prefix", "[nsqd] ", "log message prefix")
flagSet.Int64("worker-id", opts.ID, "unique seed for message ID generation (int) in range [0,4096) (will default to a hash of hostname)")
flagSet.Int64("node-id", opts.ID, "unique part for message IDs, (int) in range [0,1024) (default is hash of hostname)")
flagSet.Bool("worker-id", false, "do NOT use this, use --node-id")

flagSet.String("https-address", opts.HTTPSAddress, "<addr>:<port> to listen on for HTTPS clients")
flagSet.String("http-address", opts.HTTPAddress, "<addr>:<port> to listen on for HTTP clients")
Expand Down Expand Up @@ -218,8 +219,11 @@ func (p *program) Start() error {
options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.New(opts)

nsqd.LoadMetadata()
err := nsqd.PersistMetadata()
err := nsqd.LoadMetadata()
if err != nil {
log.Fatalf("ERROR: %s", err.Error())
}
err = nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion nsqadmin/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func bootstrapNSQCluster(t *testing.T) (string, []*nsqd.NSQD, []*nsqlookupd.NSQL
nsqdOpts.BroadcastAddress = "127.0.0.1"
nsqdOpts.NSQLookupdTCPAddresses = []string{nsqlookupd1.RealTCPAddr().String()}
nsqdOpts.Logger = lgr
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down
3 changes: 1 addition & 2 deletions nsqadmin/nsqadmin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"os"
"os/exec"
"testing"
"time"

"github.com/nsqio/nsq/internal/test"
"github.com/nsqio/nsq/nsqd"
Expand Down Expand Up @@ -103,7 +102,7 @@ func mustStartNSQD(opts *nsqd.Options) (*net.TCPAddr, *net.TCPAddr, *nsqd.NSQD)
opts.HTTPAddress = "127.0.0.1:0"
opts.HTTPSAddress = "127.0.0.1:0"
if opts.DataPath == "" {
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down
10 changes: 4 additions & 6 deletions nsqd/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (d *diskQueue) persistMetaData() error {
f.Close()

// atomically rename
return atomicRename(tmpFileName, fileName)
return os.Rename(tmpFileName, fileName)
}

func (d *diskQueue) metaDataFileName() string {
Expand Down Expand Up @@ -538,14 +538,12 @@ func (d *diskQueue) handleReadError() {
badFn := d.fileName(d.readFileNum)
badRenameFn := badFn + ".bad"

d.logf(
"NOTICE: diskqueue(%s) jump to next file and saving bad file as %s",
d.logf("NOTICE: diskqueue(%s) jump to next file and saving bad file as %s",
d.name, badRenameFn)

err := atomicRename(badFn, badRenameFn)
err := os.Rename(badFn, badRenameFn)
if err != nil {
d.logf(
"ERROR: diskqueue(%s) failed to rename bad diskqueue file %s to %s",
d.logf("ERROR: diskqueue(%s) failed to rename bad diskqueue file %s to %s",
d.name, badFn, badRenameFn)
}

Expand Down
20 changes: 10 additions & 10 deletions nsqd/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestDiskQueue(t *testing.T) {
l := test.NewTestLogger(t)

dqName := "test_disk_queue" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand All @@ -42,7 +42,7 @@ func TestDiskQueue(t *testing.T) {
func TestDiskQueueRoll(t *testing.T) {
l := test.NewTestLogger(t)
dqName := "test_disk_queue_roll" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -73,7 +73,7 @@ func assertFileNotExist(t *testing.T, fn string) {
func TestDiskQueueEmpty(t *testing.T) {
l := test.NewTestLogger(t)
dqName := "test_disk_queue_empty" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestDiskQueueEmpty(t *testing.T) {
func TestDiskQueueCorruption(t *testing.T) {
l := test.NewTestLogger(t)
dqName := "test_disk_queue_corruption" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func readMetaDataFile(fileName string, retried int) md {
func TestDiskQueueSyncAfterRead(t *testing.T) {
l := test.NewTestLogger(t)
dqName := "test_disk_queue_read_after_sync" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -270,7 +270,7 @@ func TestDiskQueueTorture(t *testing.T) {

l := test.NewTestLogger(t)
dqName := "test_disk_queue_torture" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -385,7 +385,7 @@ func benchmarkDiskQueuePut(size int64, b *testing.B) {
b.StopTimer()
l := test.NewTestLogger(b)
dqName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -434,7 +434,7 @@ func BenchmarkDiskWrite1048576(b *testing.B) {
func benchmarkDiskWrite(size int64, b *testing.B) {
b.StopTimer()
fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -480,7 +480,7 @@ func BenchmarkDiskWriteBuffered1048576(b *testing.B) {
func benchmarkDiskWriteBuffered(size int64, b *testing.B) {
b.StopTimer()
fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -536,7 +536,7 @@ func benchmarkDiskQueueGet(size int64, b *testing.B) {
b.StopTimer()
l := test.NewTestLogger(b)
dqName := "bench_disk_queue_get" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
tmpDir, err := ioutil.TempDir("", "nsq-test-")
if err != nil {
panic(err)
}
Expand Down
14 changes: 7 additions & 7 deletions nsqd/guid.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
)

const (
workerIDBits = uint64(10)
nodeIDBits = uint64(10)
sequenceBits = uint64(12)
workerIDShift = sequenceBits
timestampShift = sequenceBits + workerIDBits
nodeIDShift = sequenceBits
timestampShift = sequenceBits + nodeIDBits
sequenceMask = int64(-1) ^ (int64(-1) << sequenceBits)

// ( 2012-10-28 16:23:42 UTC ).UnixNano() >> 20
Expand All @@ -36,15 +36,15 @@ type guid int64
type guidFactory struct {
sync.Mutex

workerID int64
nodeID int64
sequence int64
lastTimestamp int64
lastID guid
}

func NewGUIDFactory(workerID int64) *guidFactory {
func NewGUIDFactory(nodeID int64) *guidFactory {
return &guidFactory{
workerID: workerID,
nodeID: nodeID,
}
}

Expand Down Expand Up @@ -72,7 +72,7 @@ func (f *guidFactory) NewGUID() (guid, error) {
f.lastTimestamp = ts

id := guid(((ts - twepoch) << timestampShift) |
(f.workerID << workerIDShift) |
(f.nodeID << nodeIDShift) |
f.sequence)

if id <= f.lastID {
Expand Down
110 changes: 92 additions & 18 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nsqd

import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
Expand All @@ -12,6 +13,7 @@ import (
"net"
"os"
"path"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -102,7 +104,7 @@ func New(opts *Options) *NSQD {
}

if opts.ID < 0 || opts.ID >= 1024 {
n.logf("FATAL: --worker-id must be [0,1024)")
n.logf("FATAL: --node-id must be [0,1024)")
os.Exit(1)
}

Expand Down Expand Up @@ -268,24 +270,73 @@ type meta struct {
} `json:"topics"`
}

func (n *NSQD) LoadMetadata() {
atomic.StoreInt32(&n.isLoading, 1)
defer atomic.StoreInt32(&n.isLoading, 0)
func newMetadataFile(opts *Options) string {
return path.Join(opts.DataPath, "nsqd.dat")
}

fn := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
func oldMetadataFile(opts *Options) string {
return path.Join(opts.DataPath, fmt.Sprintf("nsqd.%d.dat", opts.ID))
}

func readOrEmpty(fn string) ([]byte, error) {
data, err := ioutil.ReadFile(fn)
if err != nil {
if !os.IsNotExist(err) {
n.logf("ERROR: failed to read channel metadata from %s - %s", fn, err)
return nil, fmt.Errorf("failed to read metadata from %s - %s", fn, err)
}
}
return data, nil
}

func writeSyncFile(fn string, data []byte) error {
f, err := os.OpenFile(fn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return err
}

_, err = f.Write(data)
if err == nil {
err = f.Sync()
}
f.Close()
return err
}

func (n *NSQD) LoadMetadata() error {
atomic.StoreInt32(&n.isLoading, 1)
defer atomic.StoreInt32(&n.isLoading, 0)

fn := newMetadataFile(n.getOpts())
// old metadata filename with ID, maintained in parallel to enable roll-back
fnID := oldMetadataFile(n.getOpts())

data, err := readOrEmpty(fn)
if err != nil {
return err
}
dataID, errID := readOrEmpty(fnID)
if errID != nil {
return errID
}

if data == nil && dataID == nil {
return nil // fresh start
}
if data != nil && dataID != nil {
if bytes.Compare(data, dataID) != 0 {
return fmt.Errorf("metadata in %s and %s do not match (delete one)", fn, fnID)
}
return
}
if data == nil {
// only old metadata file exists, use it
fn = fnID
data = dataID
}

var m meta
err = json.Unmarshal(data, &m)
if err != nil {
n.logf("ERROR: failed to parse metadata - %s", err)
return
return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
}

for _, t := range m.Topics {
Expand All @@ -309,12 +360,15 @@ func (n *NSQD) LoadMetadata() {
}
}
}
return nil
}

func (n *NSQD) PersistMetadata() error {
// persist metadata about what topics/channels we have
// so that upon restart we can get back to the same state
fileName := fmt.Sprintf(path.Join(n.getOpts().DataPath, "nsqd.%d.dat"), n.getOpts().ID)
// persist metadata about what topics/channels we have, across restarts
fileName := newMetadataFile(n.getOpts())
// old metadata filename with ID, maintained in parallel to enable roll-back
fileNameID := oldMetadataFile(n.getOpts())

n.logf("NSQ: persisting topic/channel metadata to %s", fileName)

js := make(map[string]interface{})
Expand Down Expand Up @@ -353,23 +407,43 @@ func (n *NSQD) PersistMetadata() error {
}

tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
f, err := os.OpenFile(tmpFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)

err = writeSyncFile(tmpFileName, data)
if err != nil {
return err
}
err = os.Rename(tmpFileName, fileName)
if err != nil {
return err
}
// technically should fsync DataPath here

_, err = f.Write(data)
stat, err := os.Lstat(fileNameID)
if err == nil && stat.Mode()&os.ModeSymlink != 0 {
return nil
}

// if no symlink (yet), race condition:
// crash right here may cause next startup to see metadata conflict and abort

tmpFileNameID := fmt.Sprintf("%s.%d.tmp", fileNameID, rand.Int())

if runtime.GOOS != "windows" {
err = os.Symlink(fileName, tmpFileNameID)
} else {
// on Windows need Administrator privs to Symlink
// instead write copy every time
err = writeSyncFile(tmpFileNameID, data)
}
if err != nil {
f.Close()
return err
}
f.Sync()
f.Close()

err = atomicRename(tmpFileName, fileName)
err = os.Rename(tmpFileNameID, fileNameID)
if err != nil {
return err
}
// technically should fsync DataPath here

return nil
}
Expand Down
Loading