diff --git a/.excludelint b/.excludelint new file mode 100644 index 0000000..c3b5be2 --- /dev/null +++ b/.excludelint @@ -0,0 +1,3 @@ +(vendor/) +(mock_.*.go) +(*.pb.go) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3e81234 --- /dev/null +++ b/.gitignore @@ -0,0 +1,59 @@ +### https://raw.github.com/github/gitignore/35c010258fc790ad769033e9ccfb1021178f2925/Go.gitignore + +# Glide vendor-ed deps +vendor/ + +# Binaries for programs and plugins +*.exe +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736 +.glide/ + + +### https://raw.github.com/github/gitignore/35c010258fc790ad769033e9ccfb1021178f2925/Global/VisualStudioCode.gitignore + +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json + + +### https://raw.github.com/github/gitignore/35c010258fc790ad769033e9ccfb1021178f2925/Global/macOS.gitignore + +*.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns +.com.apple.timemachine.donotpresent + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..f00dfa0 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,15 @@ +language: go +go: + - 1.7 +install: make install-ci +env: + # Set higher timeouts for Travis + - TEST_TIMEOUT_SCALE=20 PACKAGE=github.com/m3db/m3nsch +sudo: required +dist: trusty +script: + - make lint + - make test-ci-unit + - make m3nsch_client + - make m3nsch_server + diff --git a/LICENSE.md b/LICENSE.md index 721b703..858e024 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -1,4 +1,4 @@ -Copyright (c) 2016 Uber Technologies, Inc. +Copyright (c) 2017 Uber Technologies, Inc. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -16,4 +16,4 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. \ No newline at end of file +THE SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..4dfff43 --- /dev/null +++ b/Makefile @@ -0,0 +1,80 @@ +SELF_DIR := $(dir $(lastword $(MAKEFILE_LIST))) +include $(SELF_DIR)/.ci/common.mk + +SHELL=/bin/bash -o pipefail + +html_report := coverage.html +test := .ci/test-cover.sh +test_ci_integration := .ci/test-integration.sh +convert-test-data := .ci/convert-test-data.sh +coverfile := cover.out +coverage_xml := coverage.xml +junit_xml := junit.xml +test_log := test.log +lint_check := .ci/lint.sh + +BUILD := $(abspath ./out) +LINUX_AMD64_ENV := GOOS=linux GOARCH=amd64 CGO_ENABLED=0 +VENDOR_ENV := GO15VENDOREXPERIMENT=1 + +SERVICES := \ + m3nsch_server \ + m3nsch_client \ + +setup: + mkdir -p $(BUILD) + +define SERVICE_RULES + +$(SERVICE): setup + @echo Building $(SERVICE) + $(VENDOR_ENV) go build -o $(BUILD)/$(SERVICE) ./$(SERVICE)/. + +$(SERVICE)-linux-amd64: + $(LINUX_AMD64_ENV) make $(SERVICE) + +endef + +services: $(SERVICES) +services-linux-amd64: + $(LINUX_AMD64_ENV) make services + +$(foreach SERVICE,$(SERVICES),$(eval $(SERVICE_RULES))) + +lint: + @which golint > /dev/null || go get -u github.com/golang/lint/golint + $(VENDOR_ENV) $(lint_check) + +test-internal: + @which go-junit-report > /dev/null || go get -u github.com/sectioneight/go-junit-report + @$(VENDOR_ENV) $(test) $(coverfile) | tee $(test_log) + +test-xml: test-internal + go-junit-report < $(test_log) > $(junit_xml) + gocov convert $(coverfile) | gocov-xml > $(coverage_xml) + @$(convert-test-data) $(coverage_xml) + @rm $(coverfile) &> /dev/null + +test: test-internal + gocov convert $(coverfile) | gocov report + +testhtml: test-internal + gocov convert $(coverfile) | gocov-html > $(html_report) && open $(html_report) + @rm -f $(test_log) &> /dev/null + +test-ci-unit: test-internal + @which goveralls > /dev/null || go get -u -f github.com/mattn/goveralls + goveralls -coverprofile=$(coverfile) -service=travis-ci || echo -e "\x1b[31mCoveralls failed\x1b[m" + +test-ci-integration: + @$(VENDOR_ENV) $(test_ci_integration) + +clean: + echo Cleaning build artifacts... + go clean + rm -rf $(BUILD) + @rm -f *.html *.xml *.out *.test + echo + +.DEFAULT_GOAL := test +.PHONY: test test-xml test-internal testhtml clean \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..744aa76 --- /dev/null +++ b/README.md @@ -0,0 +1,107 @@ +m3nsch +====== +m3nsch (pronounced `mensch`) is a load testing tool for M3DB. It has two components: + - `m3nsch_server`: long lived process which does the load generation + - `m3nsch_client`: cli wrapper which controls the myriad `m3nsch_server(s)` + +A typical deploy will have multiple hosts, each running a single `m3nsch_server` instance, +and a single `m3nsch_client` used to control them. + +### Build +``` +$ make prod-m3nsch_server +$ make prod-m3nsch_client +$ cat < server-conf.yaml +server: + listenAddress: "0.0.0.0:12321" + debugAddress: "0.0.0.0:13441" + cpuFactor: 0.9 + +metrics: + sampleRate: 0.1 + m3: + hostPort: "" + service: "m3nsch" + includeHost: true + env: development + +m3nsch: + concurrency: 2000 + numPointsPerDatum: 60 + +# any additional configs you may have +EOF +``` + +### Deploy +(1) Transfer the `m3nsch_server` binary, and `server-conf.yaml` to all the hosts to be used to generate hosts, e.g. Ingesters + +(2) On each host from (1), kick of the server process by running: +``` +./m3nsch_server -f server-conf.yaml +``` + +(3) Transfer `m3nsch_client` binary to a host with network connectivity to all the hosts. + +### Sample Usage +``` +# set an env var containing the host endpoints seperated by commas +$ export ENDPOINTS="host1:12321,host2:12321" + +# get the status of the various endpoints, make sure all are healthy +$ ./m3nsch_client --endpoints $ENDPOINTS status + +# investigate the various options available during initialization +$ ./m3nsch_client init --help +... +Flags: + -b, --basetime-offset duration offset from current time to use for load, e.g. -2m, -30s (default -2m0s) + -c, --cardinality int aggregate workload cardinality (default 10000) + -f, --force force initialization, stop any running workload + -i, --ingress-qps int aggregate workload ingress qps (default 1000) + -p, --metric-prefix string prefix added to each metric (default "m3nsch_") + -n, --namespace string target namespace (default "testmetrics") + -v, --target-env string target env for load test (default "test") + -z, --target-zone string target zone for load test (default "sjc1") + -t, --token string [required] unique identifier required for all subsequent interactions on this workload + +Global Flags: + -e, --endpoints stringSlice host:port for each of the agent process endpoints + +# initialize the servers, set any workload parameters, target env/zone +# the command below targets the production sjc1 m3db cluster with each `m3nsch_server` attempting to +# sustain a load of 100K writes/s from a set of 1M metrics +$ ./m3nsch_client --endpoints $ENDPOINTS init \ + --token prateek-sample \ + --target-env prod \ + --target-zone sjc1 \ + --ingress-qps 100000 \ + --cardinality 1000000 \ + +# start the load generation +$ ./m3nsch_client --endpoints $ENDPOINTS start + +# modifying the running load uses many of the same options as `init` +$ ./m3nsch_client modify --help +... +Flags: + -b, --basetime-offset duration offset from current time to use for load, e.g. -2m, -30s (default -2m0s) + -c, --cardinality int aggregate workload cardinality (default 10000) + -i, --ingress-qps int aggregate workload ingress qps (default 1000) + -p, --metric-prefix string prefix added to each metric (default "m3nsch_") + -n, --namespace string target namespace (default "testmetrics") + +Global Flags: + -e, --endpoints stringSlice host:port for each of the agent process endpoints + +# the command below bumps up the workload on each `m3nsch_server`, to sustain +# a load of 1M writes/s from a set of 10M metrics +$ ./m3nsch_client --endpoints $ENDPOINTS + --ingress-qps 1000000 \ + --cardinality 10000000 \ + +# finally, stop the load +$ ./m3nsch_client --endpoints $ENDPOINTS stop + +# probably want to teardown the running server processes on the various hosts +``` diff --git a/agent/agent.go b/agent/agent.go new file mode 100644 index 0000000..40793de --- /dev/null +++ b/agent/agent.go @@ -0,0 +1,360 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package agent + +import ( + "fmt" + "sync" + "time" + + "github.com/m3db/m3nsch" + "github.com/m3db/m3nsch/datums" + + "github.com/gavv/monotime" + "github.com/m3db/m3db/client" + "github.com/m3db/m3x/instrument" + xlog "github.com/m3db/m3x/log" + xtime "github.com/m3db/m3x/time" +) + +var ( + errCannotStartNotInitialized = fmt.Errorf("unable to start, agent is not initialized") + errCannotStopNotInitialized = fmt.Errorf("unable to stop, agent is not initialized") + errAlreadyInitialized = fmt.Errorf("unable to initialize, agent already initialized") +) + +type m3nschAgent struct { + sync.RWMutex + token string // workload token + workload m3nsch.Workload // workload to operate upon + registry datums.Registry // workload fake metric registry + session client.Session // m3db session to operate upon + agentStatus m3nsch.Status // agent status + opts m3nsch.AgentOptions // agent options + logger xlog.Logger // logger + metrics agentMetrics // agent performance metrics + workerChans workerChannels // worker-idx -> channel for worker notification + workerWg sync.WaitGroup // used to track when workers are finished + params workerParams // worker params +} + +type workerParams struct { + sync.RWMutex + fn workerFn // workerFn (read|write) + workingSet []generatedMetric // metrics corresponding to workload + ranges []workerRange // worker-idx -> workingSet idx range +} + +// New returns a new Agent. +func New( + registry datums.Registry, + opts m3nsch.AgentOptions, +) m3nsch.Agent { + ms := &m3nschAgent{ + registry: registry, + opts: opts, + logger: opts.InstrumentOptions().Logger(), + params: workerParams{ + fn: workerWriteFn, + }, + } + ms.metrics = agentMetrics{ + writeMethodMetrics: ms.newMethodMetrics("write"), + } + return ms + +} + +func newWorkerChannels(numWorkers int) workerChannels { + var chans workerChannels + for i := 0; i < numWorkers; i++ { + chans = append(chans, make(chan workerNotification)) + } + return chans +} + +func (ms *m3nschAgent) closeWorkerChannelsWithLock() { + ms.workerChans.close() + ms.workerChans = nil +} + +func (ms *m3nschAgent) notifyWorkersWithLock(msg workerNotification) { + ms.workerChans.notify(msg) +} + +func (ms *m3nschAgent) status() m3nsch.Status { + ms.RLock() + defer ms.RUnlock() + return ms.agentStatus +} + +func (ms *m3nschAgent) resetWithLock() { + if ms.workerChans != nil { + ms.closeWorkerChannelsWithLock() + } + if ms.session != nil { + ms.session.Close() + ms.session = nil + } + ms.workload = m3nsch.Workload{} + ms.agentStatus = m3nsch.StatusUninitialized + ms.params.workingSet = nil + ms.params.ranges = nil +} + +func (ms *m3nschAgent) setWorkerParams(workload m3nsch.Workload) { + ms.params.Lock() + defer ms.params.Unlock() + + var ( + current = ms.params.workingSet + cardinality = workload.Cardinality + ) + + if len(current) > cardinality { + current = current[:cardinality] + } + for i := len(current); i < cardinality; i++ { + idx := workload.MetricStartIdx + i + current = append(current, generatedMetric{ + name: fmt.Sprintf("%v.m%d", workload.MetricPrefix, idx), + timeseries: ms.registry.Get(i), + }) + } + ms.params.workingSet = current + + concurrency := ms.opts.Concurrency() + numMetricsPerWorker := len(current) / concurrency + workerRanges := make([]workerRange, concurrency) + for i := 0; i < concurrency; i++ { + workerRanges[i] = workerRange{ + startIdx: i * numMetricsPerWorker, + endIdx: (i+1)*numMetricsPerWorker - 1, + lastIdx: -1, + } + } + ms.params.ranges = workerRanges +} + +func (ms *m3nschAgent) Status() m3nsch.AgentStatus { + ms.RLock() + defer ms.RUnlock() + return m3nsch.AgentStatus{ + Status: ms.agentStatus, + Token: ms.token, + } +} + +func (ms *m3nschAgent) Workload() m3nsch.Workload { + ms.RLock() + defer ms.RUnlock() + return ms.workload +} + +func (ms *m3nschAgent) SetWorkload(w m3nsch.Workload) { + ms.Lock() + defer ms.Unlock() + ms.workload = w + ms.setWorkerParams(w) + + if ms.agentStatus == m3nsch.StatusRunning { + ms.notifyWorkersWithLock(workerNotification{update: true}) + } +} + +func (ms *m3nschAgent) Init( + token string, + w m3nsch.Workload, + force bool, + targetZone string, + targetEnv string, +) error { + ms.Lock() + defer ms.Unlock() + status := ms.agentStatus + if status != m3nsch.StatusUninitialized && !force { + return errAlreadyInitialized + } + + if status == m3nsch.StatusRunning { + if err := ms.stopWithLock(); err != nil { + return err + } + } + + session, err := ms.opts.NewSessionFn()(targetZone, targetEnv) + if err != nil { + return err + } + ms.session = session + ms.token = token + ms.workload = w + ms.setWorkerParams(w) + ms.agentStatus = m3nsch.StatusInitialized + return nil +} + +func (ms *m3nschAgent) Start() error { + ms.Lock() + defer ms.Unlock() + if ms.agentStatus != m3nsch.StatusInitialized { + return errCannotStartNotInitialized + } + concurrency := ms.opts.Concurrency() + ms.workerChans = newWorkerChannels(concurrency) + ms.agentStatus = m3nsch.StatusRunning + ms.workerWg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go ms.runWorker(i, ms.workerChans[i]) + } + return nil +} + +func (ms *m3nschAgent) stopWithLock() error { + status := ms.agentStatus + if status == m3nsch.StatusUninitialized { + return errCannotStopNotInitialized + } + + if status == m3nsch.StatusRunning { + ms.notifyWorkersWithLock(workerNotification{stop: true}) + ms.workerWg.Wait() + } + + ms.resetWithLock() + return nil +} + +func (ms *m3nschAgent) Stop() error { + ms.Lock() + defer ms.Unlock() + return ms.stopWithLock() +} + +func (ms *m3nschAgent) MaxQPS() int64 { + return int64(ms.opts.Concurrency()) * ms.opts.MaxWorkerQPS() +} + +func (ms *m3nschAgent) newMethodMetrics(method string) instrument.MethodMetrics { + subScope := ms.opts.InstrumentOptions().MetricsScope().SubScope("agent") + return instrument.NewMethodMetrics(subScope, method, ms.opts.InstrumentOptions().MetricsSamplingRate()) +} + +func (ms *m3nschAgent) tickPeriodWithLock() time.Duration { + var ( + qps = ms.workload.IngressQPS + numWorkers = ms.opts.Concurrency() + qpsPerWorker = float64(qps) / float64(numWorkers) + tickPeriod = time.Duration(1000*1000*1000/qpsPerWorker) * time.Nanosecond + ) + return tickPeriod +} + +func (ms *m3nschAgent) workerParams() (xtime.Unit, string, time.Time, time.Duration) { + ms.params.RLock() + defer ms.params.RUnlock() + return ms.opts.TimeUnit(), ms.workload.Namespace, ms.workload.BaseTime, ms.tickPeriodWithLock() +} + +func (ms *m3nschAgent) nextWorkerMetric(workerIdx int) generatedMetric { + ms.params.RLock() + defer ms.params.RUnlock() + metricIdx := ms.params.ranges[workerIdx].next() + return ms.params.workingSet[metricIdx] +} + +func (ms *m3nschAgent) runWorker(workerIdx int, workerCh chan workerNotification) { + defer ms.workerWg.Done() + var ( + methodMetrics = ms.metrics.writeMethodMetrics + timeUnit, namespace, fakeNow, tickPeriod = ms.workerParams() + tickLoop = time.NewTicker(tickPeriod) + ) + defer tickLoop.Stop() + for { + select { + case msg := <-workerCh: + if msg.stop { + return + } + if msg.update { + tickLoop.Stop() + timeUnit, namespace, fakeNow, tickPeriod = ms.workerParams() + tickLoop = time.NewTicker(tickPeriod) + } + + case <-tickLoop.C: + fakeNow = fakeNow.Add(tickPeriod) + metric := ms.nextWorkerMetric(workerIdx) + start := monotime.Now() + err := ms.params.fn(workerIdx, ms.session, namespace, metric, fakeNow, timeUnit) + elapsed := monotime.Since(start) + methodMetrics.ReportSuccessOrError(err, elapsed) + } + } +} + +type generatedMetric struct { + name string + timeseries datums.SyntheticTimeSeries +} + +type workerRange struct { + startIdx int // inclusive + endIdx int // exclusive + lastIdx int // last idx returned +} + +func (wr *workerRange) next() int { + i := wr.lastIdx + next := wr.startIdx + ((i + 1) % (wr.endIdx - wr.startIdx + 1)) + wr.lastIdx = next + return next +} + +type workerNotification struct { + stop bool + update bool +} + +type workerChannels []chan workerNotification + +func (c workerChannels) close() { + for _, ch := range c { + close(ch) + } +} + +func (c workerChannels) notify(msg workerNotification) { + for _, ch := range c { + ch <- msg + } +} + +type agentMetrics struct { + writeMethodMetrics instrument.MethodMetrics +} + +type workerFn func(workerIdx int, session client.Session, namespace string, metric generatedMetric, t time.Time, u xtime.Unit) error + +func workerWriteFn(_ int, session client.Session, namespace string, metric generatedMetric, t time.Time, u xtime.Unit) error { + return session.Write(namespace, metric.name, t, metric.timeseries.Next(), u, nil) +} diff --git a/agent/agent_options.go b/agent/agent_options.go new file mode 100644 index 0000000..216a95b --- /dev/null +++ b/agent/agent_options.go @@ -0,0 +1,106 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package agent + +import ( + "github.com/m3db/m3nsch" + + "github.com/m3db/m3x/instrument" + xtime "github.com/m3db/m3x/time" +) + +var ( + // avg latency per m3db write op is ~1 ms when the CPU is under load + // so liberally, we set MaxWorkerQPS at ~ 10K writes per sec + defaultMaxWorkerQPS = int64(10000) + + // defaultConcurrency is the default number of go routines used during + // load generation + defaultConcurrency = int(2000) + + // defaultTimeUnit is the default unit of time used during load operations + defaultTimeUnit = xtime.Second +) + +type agentOpts struct { + iopts instrument.Options + maxWorkerQPS int64 + concurrency int + newSessionFn m3nsch.NewSessionFn + timeUnit xtime.Unit +} + +// NewOptions returns a new AgentOptions object with default values +func NewOptions( + iopts instrument.Options, +) m3nsch.AgentOptions { + return &agentOpts{ + iopts: iopts, + maxWorkerQPS: defaultMaxWorkerQPS, + concurrency: defaultConcurrency, + timeUnit: defaultTimeUnit, + } +} + +func (so *agentOpts) SetInstrumentOptions(iopts instrument.Options) m3nsch.AgentOptions { + so.iopts = iopts + return so +} + +func (so *agentOpts) InstrumentOptions() instrument.Options { + return so.iopts +} + +func (so *agentOpts) SetMaxWorkerQPS(qps int64) m3nsch.AgentOptions { + so.maxWorkerQPS = qps + return so +} + +func (so *agentOpts) MaxWorkerQPS() int64 { + return so.maxWorkerQPS +} + +func (so *agentOpts) SetConcurrency(c int) m3nsch.AgentOptions { + so.concurrency = c + return so +} + +func (so *agentOpts) Concurrency() int { + return so.concurrency +} + +func (so *agentOpts) SetNewSessionFn(fn m3nsch.NewSessionFn) m3nsch.AgentOptions { + so.newSessionFn = fn + return so +} + +func (so *agentOpts) NewSessionFn() m3nsch.NewSessionFn { + return so.newSessionFn +} + +func (so *agentOpts) SetTimeUnit(u xtime.Unit) m3nsch.AgentOptions { + so.timeUnit = u + return so +} + +func (so *agentOpts) TimeUnit() xtime.Unit { + return so.timeUnit +} diff --git a/agent/agent_test.go b/agent/agent_test.go new file mode 100644 index 0000000..68e5747 --- /dev/null +++ b/agent/agent_test.go @@ -0,0 +1,320 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package agent + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/m3db/m3nsch" + "github.com/m3db/m3nsch/datums" + + "github.com/m3db/m3db/client" + "github.com/m3db/m3x/instrument" + xtime "github.com/m3db/m3x/time" + "github.com/stretchr/testify/require" +) + +const ( + testNumPointsPerDatum = 40 +) + +type testWrite struct { + metricName string + timestamp time.Time + value float64 +} + +func newTestOptions() m3nsch.AgentOptions { + iopts := instrument.NewOptions() + return NewOptions(iopts). + SetNewSessionFn(func(_, _ string) (client.Session, error) { + return nil, nil + }) +} + +func TestWorkloadMetricStartIdx(t *testing.T) { + var ( + reg = datums.NewDefaultRegistry(testNumPointsPerDatum) + opts = newTestOptions(). + SetConcurrency(1) + workload = m3nsch.Workload{ + Cardinality: 10, + IngressQPS: 100, + MetricStartIdx: 1000, + } + agent = New(reg, opts).(*m3nschAgent) + writes []testWrite + + token = "" + targetZone = "" + targetEnv = "" + ) + + agent.params.fn = func(_ int, _ client.Session, _ string, metric generatedMetric, t time.Time, _ xtime.Unit) error { + writes = append(writes, testWrite{ + metricName: metric.name, + timestamp: t, + value: metric.timeseries.Next(), + }) + return nil + } + + err := agent.Init(token, workload, false, targetZone, targetEnv) + require.NoError(t, err) + + err = agent.Start() + require.NoError(t, err) + + // let worker perform write ops for 1 second + time.Sleep(1 * time.Second) + + err = agent.Stop() + require.NoError(t, err) + + // ensure we've seen 90% of the writes we're expecting + eps := 0.1 + numExpectedWrites := workload.IngressQPS + require.InEpsilon(t, numExpectedWrites, len(writes), eps) + + // ensure the ordering of metric writes is correct + for i, wr := range writes { + metricIdx := workload.MetricStartIdx + i%workload.Cardinality + require.Equal(t, fmt.Sprintf(".m%d", metricIdx), wr.metricName) + } +} + +func TestNewSingleWriterAgent(t *testing.T) { + var ( + reg = datums.NewDefaultRegistry(testNumPointsPerDatum) + opts = newTestOptions(). + SetConcurrency(1) + workload = m3nsch.Workload{ + Cardinality: 10, + IngressQPS: 100, + } + agent = New(reg, opts).(*m3nschAgent) + writes []testWrite + + token = "" + targetZone = "" + targetEnv = "" + ) + + agent.params.fn = func(_ int, _ client.Session, _ string, metric generatedMetric, t time.Time, _ xtime.Unit) error { + writes = append(writes, testWrite{ + metricName: metric.name, + timestamp: t, + value: metric.timeseries.Next(), + }) + return nil + } + + err := agent.Init(token, workload, false, targetZone, targetEnv) + require.NoError(t, err) + + err = agent.Start() + require.NoError(t, err) + + // let worker perform write ops for 1 second + time.Sleep(1 * time.Second) + + err = agent.Stop() + require.NoError(t, err) + + // ensure we've seen 90% of the writes we're expecting + eps := 0.1 + numExpectedWrites := workload.IngressQPS + require.InEpsilon(t, numExpectedWrites, len(writes), eps) + + // ensure the ordering of metric writes is correct + for i, wr := range writes { + metricIdx := i % workload.Cardinality + require.Equal(t, fmt.Sprintf(".m%d", metricIdx), wr.metricName) + } + + // ensure the values written per metric are accurate + // first, group all values by metricIdx (which are the same as metricName due to assertion above) + valuesByMetricIdx := make(map[int][]testWrite) + for i, wr := range writes { + metricIdx := i % workload.Cardinality + current, ok := valuesByMetricIdx[metricIdx] + if !ok { + current = []testWrite{} + } + current = append(current, wr) + valuesByMetricIdx[metricIdx] = current + } + + // finally, go through the values per metric, and ensure + // they line up with expected values from registry + for idx, values := range valuesByMetricIdx { + datum := reg.Get(idx) + for i, wr := range values { + require.Equal(t, datum.Get(i), wr.value, + "metric: %s, idx: %d, i: %d, ts: %s", wr.metricName, idx, i, wr.timestamp.String()) + } + } +} + +func TestWorkerParams(t *testing.T) { + var ( + reg = datums.NewDefaultRegistry(testNumPointsPerDatum) + opts = newTestOptions(). + SetConcurrency(10) + agent = New(reg, opts).(*m3nschAgent) + t0 = time.Now() + testNs = "testNs" + workload = m3nsch.Workload{ + Cardinality: 1000, + IngressQPS: 100, + BaseTime: t0, + Namespace: testNs, + } + ) + agent.SetWorkload(workload) + + expectedWritesPerWorkerPerSec := workload.IngressQPS / opts.Concurrency() + expectedTickPeriodPerWorker := time.Duration(1000.0/float64(expectedWritesPerWorkerPerSec)) * time.Millisecond + + _, ns, baseTime, tickPeriod := agent.workerParams() + require.Equal(t, testNs, ns) + require.Equal(t, t0, baseTime) + require.Equal(t, expectedTickPeriodPerWorker, tickPeriod) +} + +func TestMultipleWriterAgent(t *testing.T) { + var ( + reg = datums.NewDefaultRegistry(testNumPointsPerDatum) + opts = newTestOptions(). + SetConcurrency(2) + workload = m3nsch.Workload{ + Cardinality: 4, + IngressQPS: 100, + } + token = "" + targetZone = "" + targetEnv = "" + agent = New(reg, opts).(*m3nschAgent) + + writesLock sync.Mutex + writesByWorkerIdx map[int][]testWrite + ) + + // initialize writesByWorkerIdx + writesByWorkerIdx = make(map[int][]testWrite) + for i := 0; i < opts.Concurrency(); i++ { + writesByWorkerIdx[i] = []testWrite{} + } + + agent.params.fn = func(wIdx int, _ client.Session, _ string, metric generatedMetric, t time.Time, _ xtime.Unit) error { + writesLock.Lock() + writesByWorkerIdx[wIdx] = append(writesByWorkerIdx[wIdx], testWrite{ + metricName: metric.name, + timestamp: t, + value: metric.timeseries.Next(), + }) + writesLock.Unlock() + return nil + } + + err := agent.Init(token, workload, false, targetZone, targetEnv) + require.NoError(t, err) + + err = agent.Start() + require.NoError(t, err) + + // let worker perform write ops for 1 second + time.Sleep(1 * time.Second) + + err = agent.Stop() + require.NoError(t, err) + + // ensure we've seen at least 10% of the expected writes + // NB(prateek): ideally, this would require a stricter number of writes + // but in testing, the mutex overhead induced due to protecting the + // observed writes map is large. and for the purposes of testing, + // 10% of the values are sufficient for testing ordering assertions + eps := 0.10 + numExpectedWritesPerWorker := int(float64(workload.IngressQPS) * eps) + for i := 0; i < opts.Concurrency(); i++ { + numSeenWrites := len(writesByWorkerIdx[i]) + require.True(t, numSeenWrites > numExpectedWritesPerWorker, + "worker: %d, expectedWrites: %d, seenWrites: %d", i) + } + + // helper to identify which metric we're expecting per worker + metricIdxFn := func(wIdx int, mIdx int) int { + numWorkers := opts.Concurrency() + numMetrics := workload.Cardinality + numMetricsPerWorker := numMetrics / numWorkers + idx := numMetricsPerWorker*wIdx + mIdx%numMetricsPerWorker + return idx + } + + // ensure the ordering of metric writes is correct + for workerIdx, writes := range writesByWorkerIdx { + for i, wr := range writes { + metricIdx := metricIdxFn(workerIdx, i) + require.Equal(t, fmt.Sprintf(".m%d", metricIdx), wr.metricName) + } + } +} + +// test for transition checks +func TestTransitions(t *testing.T) { + var ( + reg = datums.NewDefaultRegistry(testNumPointsPerDatum) + opts = newTestOptions(). + SetConcurrency(10) + agent = New(reg, opts).(*m3nschAgent) + workload = m3nsch.Workload{ + Cardinality: 1000, + IngressQPS: 100, + } + ) + agent.params.fn = func(_ int, _ client.Session, _ string, _ generatedMetric, _ time.Time, _ xtime.Unit) error { + return nil + } + + err := agent.Start() + require.Error(t, err) + + err = agent.Stop() + require.Error(t, err) + + err = agent.Init("", workload, false, "", "") + require.NoError(t, err) + + err = agent.Stop() + require.NoError(t, err) + + err = agent.Init("", workload, false, "", "") + require.NoError(t, err) + + err = agent.Start() + require.NoError(t, err) + + err = agent.Stop() + require.NoError(t, err) +} diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go new file mode 100644 index 0000000..13471c8 --- /dev/null +++ b/coordinator/coordinator.go @@ -0,0 +1,434 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package coordinator + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/m3db/m3nsch" + "github.com/m3db/m3nsch/rpc" + "github.com/m3db/m3nsch/rpc/convert" + + xerrors "github.com/m3db/m3x/errors" + "github.com/m3db/m3x/instrument" + xlog "github.com/m3db/m3x/log" + "google.golang.org/grpc" +) + +var ( + errNoEndpointsProvided = fmt.Errorf("no endpoints provided") +) + +type m3nschCoordinator struct { + sync.Mutex + opts m3nsch.CoordinatorOptions + clients map[string]*m3nschClient + newClientFn newM3nschClientFn +} + +// New returns a new Coordinator process with provided endpoints treated +// as Agent processes. +func New( + opts m3nsch.CoordinatorOptions, + endpoints []string, +) (m3nsch.Coordinator, error) { + if len(endpoints) == 0 { + return nil, errNoEndpointsProvided + } + coordinator := &m3nschCoordinator{ + opts: opts, + clients: make(map[string]*m3nschClient), + newClientFn: newM3nschClient, + } + return coordinator, coordinator.initializeConnections(endpoints) +} + +func (m *m3nschCoordinator) initializeConnections(endpoints []string) error { + var ( + wg sync.WaitGroup + numEndpoints = len(endpoints) + multiErr syncClientMultiErr + ) + + wg.Add(numEndpoints) + for i := 0; i < numEndpoints; i++ { + go func(idx int) { + defer wg.Done() + endpoint := endpoints[idx] + client, err := m.newClientFn(endpoint, m.opts.InstrumentOptions(), m.opts.Timeout()) + if err != nil { + multiErr.Add(endpoint, err) + return + } + m.Lock() + m.clients[endpoint] = client + m.Unlock() + }(i) + } + wg.Wait() + + return multiErr.FinalError() +} + +func (m *m3nschCoordinator) Teardown() error { + var err syncClientMultiErr + m.forEachClient(func(c *m3nschClient) { + err.Add(c.endpoint, c.close()) + }) + return err.FinalError() +} + +func (m *m3nschCoordinator) Status() (map[string]m3nsch.AgentStatus, error) { + var ( + ctx = context.Background() + multiErr syncClientMultiErr + + lock sync.Mutex + statuses = make(map[string]m3nsch.AgentStatus, len(m.clients)) + ) + + m.forEachClient(func(c *m3nschClient) { + response, err := c.client.Status(ctx, &rpc.StatusRequest{}) + if err != nil { + multiErr.Add(c.endpoint, err) + return + } + + status, err := convert.ToM3nschStatus(response.Status) + if err != nil { + multiErr.Add(c.endpoint, err) + return + } + + workload, err := convert.ToM3nschWorkload(response.GetWorkload()) + if err != nil { + multiErr.Add(c.endpoint, err) + return + } + + lock.Lock() + statuses[c.endpoint] = m3nsch.AgentStatus{ + Status: status, + Token: response.Token, + MaxQPS: response.MaxQPS, + Workload: workload, + } + lock.Unlock() + }) + + return statuses, nil +} + +func (m *m3nschCoordinator) Workload() (m3nsch.Workload, error) { + statuses, err := m.Status() + if err != nil { + return m3nsch.Workload{}, err + } + + // ensure all agents are initialized + var multiErr syncClientMultiErr + for endpoint, status := range statuses { + if status.Status == m3nsch.StatusUninitialized { + multiErr.Add(endpoint, fmt.Errorf("agent not initialized")) + } + } + if err = multiErr.FinalError(); err != nil { + return m3nsch.Workload{}, err + } + + // TODO(prateek): ensure all agent workloads are set the same + // TODO(prateek): ensure no agent have overlapping metric ranges + var ( + workload = m3nsch.Workload{} + first = true + ) + for _, status := range statuses { + if first { + workload.BaseTime = status.Workload.BaseTime + workload.Namespace = status.Workload.Namespace + workload.MetricPrefix = status.Workload.MetricPrefix + first = false + } + workload.Cardinality += status.Workload.Cardinality + workload.IngressQPS += status.Workload.IngressQPS + } + + return workload, nil +} + +func (m *m3nschCoordinator) SetWorkload(workload m3nsch.Workload) error { + statuses, err := m.Status() + if err != nil { + return err + } + + // ensure all agents are initialized + var multiErr syncClientMultiErr + for endpoint, status := range statuses { + if status.Status == m3nsch.StatusUninitialized { + multiErr.Add(endpoint, fmt.Errorf("agent not initialized")) + } + } + if err = multiErr.FinalError(); err != nil { + return err + } + + // split workload amongst the various agents + splitWorkload, err := m.splitWorkload(workload, statuses) + if err != nil { + return err + } + + // set the targetWorkload on each agent + ctx := context.Background() + multiErr = syncClientMultiErr{} + m.forEachClient(func(c *m3nschClient) { + endpoint := c.endpoint + targetWorkload, ok := splitWorkload[endpoint] + if !ok { + multiErr.Add(endpoint, fmt.Errorf("splitWorkload does not contain all endpoints")) + return + } + + rpcWorkload := convert.ToProtoWorkload(targetWorkload) + _, clientErr := c.client.Modify(ctx, &rpc.ModifyRequest{Workload: &rpcWorkload}) + multiErr.Add(endpoint, clientErr) + }) + + return multiErr.FinalError() +} + +func (m *m3nschCoordinator) Init( + token string, + workload m3nsch.Workload, + force bool, + targetZone string, + targetEnv string, +) error { + statuses, err := m.Status() + if err != nil { + return err + } + + // ensure all agents are not initialized + var multiErr syncClientMultiErr + for endpoint, status := range statuses { + if status.Status != m3nsch.StatusUninitialized { + multiErr.Add(endpoint, fmt.Errorf("agent already initialized")) + } + } + if err = multiErr.FinalError(); err != nil { + return err + } + + // split workload amongst the various agents + splitWorkload, err := m.splitWorkload(workload, statuses) + if err != nil { + return err + } + + // initialize each agent with targetWorkload + ctx := context.Background() + multiErr = syncClientMultiErr{} + m.forEachClient(func(c *m3nschClient) { + endpoint := c.endpoint + targetWorkload, ok := splitWorkload[endpoint] + if !ok { + multiErr.Add(endpoint, fmt.Errorf("splitWorkload does not contain all endpoints")) + return + } + + rpcWorkload := convert.ToProtoWorkload(targetWorkload) + _, clientErr := c.client.Init(ctx, &rpc.InitRequest{ + Token: token, + Workload: &rpcWorkload, + Force: force, + TargetZone: targetZone, + TargetEnv: targetEnv, + }) + multiErr.Add(endpoint, clientErr) + }) + + return multiErr.FinalError() +} + +func (m *m3nschCoordinator) Start() error { + statuses, err := m.Status() + if err != nil { + return err + } + + // ensure all agents are initialized + var multiErr syncClientMultiErr + for endpoint, status := range statuses { + if status.Status == m3nsch.StatusUninitialized { + multiErr.Add(endpoint, fmt.Errorf("agent not initialized")) + } + } + if err = multiErr.FinalError(); err != nil { + return err + } + + // start each agent + ctx := context.Background() + multiErr = syncClientMultiErr{} + m.forEachClient(func(c *m3nschClient) { + endpoint := c.endpoint + _, clientErr := c.client.Start(ctx, &rpc.StartRequest{}) + multiErr.Add(endpoint, clientErr) + }) + + return multiErr.FinalError() +} + +func (m *m3nschCoordinator) Stop() error { + // stop each agent + ctx := context.Background() + multiErr := syncClientMultiErr{} + m.forEachClient(func(c *m3nschClient) { + endpoint := c.endpoint + _, err := c.client.Stop(ctx, &rpc.StopRequest{}) + multiErr.Add(endpoint, err) + }) + + return multiErr.FinalError() +} + +func (m *m3nschCoordinator) splitWorkload( + aggWorkload m3nsch.Workload, + statuses map[string]m3nsch.AgentStatus, +) (map[string]m3nsch.Workload, error) { + // ensure we have enough aggregate capacity to satisfy workload + totalIngressCapacity := int64(0) + for _, status := range statuses { + totalIngressCapacity += status.MaxQPS + } + if totalIngressCapacity < int64(aggWorkload.IngressQPS) { + return nil, fmt.Errorf("insufficient capacity") + } + + // initialiaze return map + splitWorkload := make(map[string]m3nsch.Workload, len(statuses)) + + // split workload per worker proportional to capacity + metricStart := 0 + for endpoint, status := range statuses { + var ( + workerFrac = float64(status.MaxQPS) / float64(totalIngressCapacity) + numMetrics = int(float64(aggWorkload.Cardinality) * workerFrac) + qps = int(float64(aggWorkload.IngressQPS) * workerFrac) + workerWorkload = aggWorkload + ) + workerWorkload.MetricStartIdx = metricStart + workerWorkload.Cardinality = numMetrics + workerWorkload.IngressQPS = qps + splitWorkload[endpoint] = workerWorkload + + metricStart += numMetrics + } + return splitWorkload, nil +} + +type syncClientMultiErr struct { + sync.Mutex + multiErr xerrors.MultiError +} + +func (e *syncClientMultiErr) Add(endpoint string, err error) { + if err == nil { + return + } + cErr := fmt.Errorf("[%v] %v", endpoint, err) + e.Lock() + e.multiErr = e.multiErr.Add(cErr) + e.Unlock() +} + +func (e *syncClientMultiErr) FinalError() error { + return e.multiErr.FinalError() +} + +type forEachClientFn func(client *m3nschClient) + +func (m *m3nschCoordinator) forEachClient(fn forEachClientFn) { + var ( + parallel = m.opts.ParallelOperations() + numClients = len(m.clients) + wg sync.WaitGroup + ) + + if parallel { + wg.Add(numClients) + } + + for _, client := range m.clients { + if parallel { + go func(c *m3nschClient) { + defer wg.Done() + fn(c) + }(client) + } else { + fn(client) + } + } + + if parallel { + wg.Wait() + } +} + +type newM3nschClientFn func(string, instrument.Options, time.Duration) (*m3nschClient, error) + +type m3nschClient struct { + endpoint string + logger xlog.Logger + conn *grpc.ClientConn + client rpc.MenschClient +} + +func newM3nschClient( + endpoint string, + iopts instrument.Options, + timeout time.Duration, +) (*m3nschClient, error) { + var ( + logger = iopts.Logger().WithFields(xlog.NewLogField("client", endpoint)) + conn, err = grpc.Dial(endpoint, grpc.WithTimeout(timeout), grpc.WithInsecure()) + ) + if err != nil { + return nil, fmt.Errorf("could not connect: %v", err) + } + logger.Debug("connection established") + client := rpc.NewMenschClient(conn) + return &m3nschClient{ + endpoint: endpoint, + logger: logger, + conn: conn, + client: client, + }, nil +} + +func (mc *m3nschClient) close() error { + return mc.conn.Close() +} diff --git a/coordinator/coordinator_options.go b/coordinator/coordinator_options.go new file mode 100644 index 0000000..d38f211 --- /dev/null +++ b/coordinator/coordinator_options.go @@ -0,0 +1,77 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package coordinator + +import ( + "time" + + "github.com/m3db/m3nsch" + "github.com/m3db/m3x/instrument" +) + +var ( + defaultRPCTimeout = time.Minute + defaultParallelOps = true +) + +type coordinatorOpts struct { + iopts instrument.Options + timeout time.Duration + parallel bool +} + +// NewOptions creates a new options struct. +func NewOptions( + iopts instrument.Options, +) m3nsch.CoordinatorOptions { + return &coordinatorOpts{ + iopts: iopts, + timeout: defaultRPCTimeout, + parallel: defaultParallelOps, + } +} + +func (mo *coordinatorOpts) SetInstrumentOptions(iopts instrument.Options) m3nsch.CoordinatorOptions { + mo.iopts = iopts + return mo +} + +func (mo *coordinatorOpts) InstrumentOptions() instrument.Options { + return mo.iopts +} + +func (mo *coordinatorOpts) SetTimeout(d time.Duration) m3nsch.CoordinatorOptions { + mo.timeout = d + return mo +} + +func (mo *coordinatorOpts) Timeout() time.Duration { + return mo.timeout +} + +func (mo *coordinatorOpts) SetParallelOperations(f bool) m3nsch.CoordinatorOptions { + mo.parallel = f + return mo +} + +func (mo *coordinatorOpts) ParallelOperations() bool { + return mo.parallel +} diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go new file mode 100644 index 0000000..f601976 --- /dev/null +++ b/coordinator/coordinator_test.go @@ -0,0 +1,150 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package coordinator + +import ( + "sync" + "testing" + "time" + + "github.com/m3db/m3nsch" + + "github.com/m3db/m3x/instrument" + "github.com/stretchr/testify/require" +) + +var ( + testEndpoints = []string{ + "testEndpoint1", + "testEndpoint2", + } +) + +func newTestCoordinator() *m3nschCoordinator { + iopts := instrument.NewOptions() + opts := NewOptions(iopts) + return &m3nschCoordinator{ + opts: opts, + clients: make(map[string]*m3nschClient), + } +} + +func TestInitializeConnections(t *testing.T) { + var ( + lock sync.Mutex + initMap = make(map[string]bool) + coordinator = newTestCoordinator() + ) + coordinator.newClientFn = func(e string, _ instrument.Options, _ time.Duration) (*m3nschClient, error) { + lock.Lock() + initMap[e] = true + lock.Unlock() + return &m3nschClient{}, nil + } + + err := coordinator.initializeConnections(testEndpoints) + require.NoError(t, err) + + for _, endpoint := range testEndpoints { + flag, ok := initMap[endpoint] + require.True(t, ok, "endpoint not initialized: %v", endpoint) + require.True(t, flag, "endpoint not initialized: %v", endpoint) + } +} + +func TestForEachClient(t *testing.T) { + coordinator := newTestCoordinator() + coordinator.newClientFn = func(e string, _ instrument.Options, _ time.Duration) (*m3nschClient, error) { + return &m3nschClient{endpoint: e}, nil + } + err := coordinator.initializeConnections(testEndpoints) + require.NoError(t, err) + + // non-parallel version + coordinator.opts = coordinator.opts.SetParallelOperations(false) + clientMap := make(map[string]bool) + coordinator.forEachClient(func(c *m3nschClient) { + clientMap[c.endpoint] = true + }) + for _, endpoint := range testEndpoints { + flag, ok := clientMap[endpoint] + require.True(t, ok, "endpoint not initialized: %v", endpoint) + require.True(t, flag, "endpoint not initialized: %v", endpoint) + } + + // parallel version + var lock sync.Mutex + coordinator.opts = coordinator.opts.SetParallelOperations(true) + clientMap = make(map[string]bool) + coordinator.forEachClient(func(c *m3nschClient) { + lock.Lock() + clientMap[c.endpoint] = true + lock.Unlock() + }) + for _, endpoint := range testEndpoints { + flag, ok := clientMap[endpoint] + require.True(t, ok, "endpoint not initialized: %v", endpoint) + require.True(t, flag, "endpoint not initialized: %v", endpoint) + } +} + +func TestSplitWorkloadFail(t *testing.T) { + coordinator := newTestCoordinator() + aggregateWorkload := m3nsch.Workload{ + IngressQPS: 2, + } + statuses := map[string]m3nsch.AgentStatus{ + testEndpoints[0]: { + MaxQPS: 1, + }, + } + _, err := coordinator.splitWorkload(aggregateWorkload, statuses) + require.Error(t, err) +} + +func TestSplitWorkload(t *testing.T) { + coordinator := newTestCoordinator() + aggregateWorkload := m3nsch.Workload{ + Cardinality: 3000, + IngressQPS: 300, + } + statuses := map[string]m3nsch.AgentStatus{ + testEndpoints[0]: { + MaxQPS: 200, + }, + testEndpoints[1]: { + MaxQPS: 400, + }, + } + splitWorkloads, err := coordinator.splitWorkload(aggregateWorkload, statuses) + require.NoError(t, err) + require.Equal(t, 2, len(splitWorkloads)) + + workload1, ok := splitWorkloads[testEndpoints[0]] + require.True(t, ok) + require.Equal(t, 1000, workload1.Cardinality) + require.Equal(t, 100, workload1.IngressQPS) + + workload2, ok := splitWorkloads[testEndpoints[1]] + require.True(t, ok) + require.Equal(t, 2000, workload2.Cardinality) + require.Equal(t, 200, workload2.IngressQPS) +} diff --git a/datums/datum.go b/datums/datum.go new file mode 100644 index 0000000..694d182 --- /dev/null +++ b/datums/datum.go @@ -0,0 +1,79 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package datums + +import "fmt" + +var ( + errNumPointsNegative = fmt.Errorf("numPoints must be positive") +) + +type synTS struct { + id int + data []float64 + + lastIdx int +} + +func (ld *synTS) ID() int { + return ld.id +} + +func (ld *synTS) Size() int { + return len(ld.data) +} + +func (ld *synTS) Data() []float64 { + return ld.data +} + +func (ld *synTS) Get(idx int) float64 { + idx = idx % len(ld.data) + if idx < 0 { + idx += len(ld.data) + } + return ld.data[idx] +} + +func (ld *synTS) Next() float64 { + idx := (ld.lastIdx + 1) % len(ld.data) + if idx < 0 { + idx += len(ld.data) + } + ld.lastIdx = idx + return ld.data[idx] +} + +// NewSyntheticTimeSeris generates a new SyntheticTimeSeris using the provided parameters. +func NewSyntheticTimeSeris(id int, numPoints int, fn TSGenFn) (SyntheticTimeSeries, error) { + if numPoints < 0 { + return nil, errNumPointsNegative + } + data := make([]float64, numPoints) + for i := 0; i < numPoints; i++ { + data[i] = fn(i) + } + return &synTS{ + id: id, + data: data, + lastIdx: -1, + }, nil +} diff --git a/datums/registry.go b/datums/registry.go new file mode 100644 index 0000000..1a7a228 --- /dev/null +++ b/datums/registry.go @@ -0,0 +1,74 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package datums + +type tsRegistry struct { + currentIdx int + numPointsPerDatum int + tsGenMap map[int]TSGenFn +} + +func (reg *tsRegistry) Size() int { + return len(reg.tsGenMap) +} + +func (reg *tsRegistry) Get(i int) SyntheticTimeSeries { + sz := reg.Size() + idx := i % sz + if idx < 0 { + idx = idx + sz + } + datum, err := NewSyntheticTimeSeris(idx, reg.numPointsPerDatum, reg.tsGenMap[idx]) + if err != nil { + panic(err) + } + return datum +} + +// NewDefaultRegistry returns a Registry with default timeseries generators +func NewDefaultRegistry(numPointsPerDatum int) Registry { + reg := &tsRegistry{ + numPointsPerDatum: numPointsPerDatum, + tsGenMap: make(map[int]TSGenFn), + } + reg.init() + return reg +} + +func (reg *tsRegistry) init() { + // identity datum + reg.addGenFn(func(i int) float64 { + return float64(i) + }) + + // square datum + reg.addGenFn(func(i int) float64 { + return float64(i * i) + }) + + // TODO(prateek): make this bigger +} + +func (reg *tsRegistry) addGenFn(f TSGenFn) { + idx := reg.currentIdx + reg.tsGenMap[idx] = f + reg.currentIdx++ +} diff --git a/datums/types.go b/datums/types.go new file mode 100644 index 0000000..8ecc329 --- /dev/null +++ b/datums/types.go @@ -0,0 +1,55 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package datums + +// TSGenFn represents a pure function used to +// generate synthetic time series' +type TSGenFn func(idx int) float64 + +// SyntheticTimeSeries represents a synthetically generated +// time series +type SyntheticTimeSeries interface { + // ID returns the id of the SyntheticTimeSeries + ID() int + + // Size returns the number of points in the SyntheticTimeSeries + Size() int + + // Data returns data points comprising the SyntheticTimeSeries + Data() []float64 + + // Get(n) returns the nth (circularly wrapped) data point + Get(n int) float64 + + // Next simulates an infinite iterator on the SyntheticTimeSeries + Next() float64 +} + +// Registry is a collection of synthetic time series' +type Registry interface { + // Get(n) returns the nth (wrapped circularly) SyntheticTimeSeries + // known to the Registry. + Get(n int) SyntheticTimeSeries + + // Size returns the number of unique time series' + // the Registry is capable of generating. + Size() int +} diff --git a/glide.lock b/glide.lock new file mode 100644 index 0000000..5b2455b --- /dev/null +++ b/glide.lock @@ -0,0 +1,173 @@ +hash: a539218a08730e83f130bf1ed415321b5137b6950b2f56150ab470fdb2832093 +updated: 2017-03-15T15:46:10.216421502-04:00 +imports: +- name: github.com/apache/thrift + version: 53dd39833a08ce33582e5ff31fa18bb4735d6731 + subpackages: + - lib/go/thrift +- name: github.com/davecgh/go-spew + version: 346938d642f2ec3594ed81d874461961cd0faa76 + subpackages: + - spew +- name: github.com/facebookgo/clock + version: 600d898af40aa09a7a93ecb9265d87b0504b6f03 +- name: github.com/fsnotify/fsnotify + version: 30411dbcefb7a1da7e84f75530ad3abe4011b4f8 +- name: github.com/gavv/monotime + version: 47d58efa69556a936a3c15eb2ed42706d968ab01 +- name: github.com/golang/mock + version: bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 + subpackages: + - gomock +- name: github.com/golang/protobuf + version: 3852dcfda249c2097355a6aabb199a28d97b30df + subpackages: + - proto + - ptypes/timestamp +- name: github.com/hashicorp/hcl + version: 630949a3c5fa3c613328e1b8256052cbc2327c9b + subpackages: + - hcl/ast + - hcl/parser + - hcl/scanner + - hcl/strconv + - hcl/token + - json/parser + - json/scanner + - json/token +- name: github.com/inconshreveable/mousetrap + version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75 +- name: github.com/m3db/m3cluster + version: 1054ce5875f3f13519cfc1c4e816c06bab8587fe + subpackages: + - client + - kv + - services + - services/placement + - shard +- name: github.com/m3db/m3db + version: 4d41e2d211f35888c76894ab77fb3ffddee7e273 + subpackages: + - client + - clock + - context + - digest + - encoding + - encoding/m3tsz + - generated/thrift/rpc + - network/server/tchannelthrift/convert + - network/server/tchannelthrift/errors + - network/server/tchannelthrift/node/channel + - retention + - sharding + - storage/block + - storage/bootstrap/result + - topology + - ts + - x/io +- name: github.com/m3db/m3x + version: 2adc966acde14c9f0cc04c3eaeb338c5f9673a74 + subpackages: + - checked + - clock + - close + - errors + - instrument + - log + - pool + - retry + - sync + - time + - watch +- name: github.com/magiconair/properties + version: b3b15ef068fd0b17ddf408a23669f20811d194d2 +- name: github.com/mitchellh/mapstructure + version: 53818660ed4955e899c0bcafa97299a388bd7c8e +- name: github.com/opentracing/opentracing-go + version: 855519783f479520497c6b3445611b05fc42f009 + subpackages: + - ext +- name: github.com/pborman/getopt + version: ec82d864f599c39673eef89f91b93fa5576567a1 +- name: github.com/pelletier/go-buffruneio + version: c37440a7cf42ac63b919c752ca73a85067e05992 +- name: github.com/pelletier/go-toml + version: fee7787d3f811af92276f5ff10107092e95b7a1d +- name: github.com/pmezard/go-difflib + version: 792786c7400a136282c1664665ae0a8db921c6c2 + subpackages: + - difflib +- name: github.com/spaolacci/murmur3 + version: 0d12bf811670bf6a1a63828dfbd003eded177fce +- name: github.com/spf13/afero + version: 9be650865eab0c12963d8753212f4f9c66cdcf12 + subpackages: + - mem +- name: github.com/spf13/cast + version: ce135a4ebeee6cfe9a26c93ee0d37825f26113c7 +- name: github.com/spf13/cobra + version: 7c674d9e72017ed25f6d2b5e497a1368086b6a6f +- name: github.com/spf13/jwalterweatherman + version: fa7ca7e836cf3a8bb4ebf799f472c12d7e903d66 +- name: github.com/spf13/pflag + version: 4f9190456aed1c2113ca51ea9b89219747458dc1 +- name: github.com/spf13/viper + version: 7538d73b4eb9511d85a9f1dfef202eeb8ac260f4 +- name: github.com/stretchr/testify + version: 6fe211e493929a8aac0469b93f28b1d0688a9a3a + subpackages: + - assert + - require +- name: github.com/uber-go/atomic + version: e682c1008ac17bf26d2e4b5ad6cdd08520ed0b22 +- name: github.com/uber-go/tally + version: 43c1379c0577ac1eb74f9f3869cea07191c9992b + subpackages: + - m3 + - m3/customtransports + - m3/thrift + - m3/thriftudp +- name: github.com/uber/tchannel-go + version: 8d570ed87acb2661d390c096215260f3f9e9b931 + subpackages: + - relay + - thrift + - thrift/gen-go/meta + - tnet + - trand + - typed +- name: golang.org/x/net + version: 61557ac0112b576429a0df080e1c2cef5dfbb642 + subpackages: + - context + - http2 + - http2/hpack + - idna + - internal/timeseries + - lex/httplex + - trace +- name: golang.org/x/sys + version: d4feaf1a7e61e1d9e79e6c4e76c6349e9cab0a03 + subpackages: + - unix +- name: golang.org/x/text + version: ece019dcfd29abcf65d0d1dfe145e8faad097640 + subpackages: + - transform + - unicode/norm +- name: google.golang.org/grpc + version: 777daa17ff9b5daef1cfdf915088a2ada3332bf0 + subpackages: + - codes + - credentials + - grpclog + - internal + - metadata + - naming + - peer + - transport +- name: gopkg.in/validator.v2 + version: 3e4f037f12a1221a0864cf0dd2e81c452ab22448 +- name: gopkg.in/yaml.v2 + version: a83829b6f1293c91addabc89d0571c246397bbf4 +testImports: [] diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..054cb03 --- /dev/null +++ b/glide.yaml @@ -0,0 +1,51 @@ +package: github.com/m3db/m3nsch +import: +- package: github.com/gavv/monotime + version: 47d58efa69556a936a3c15eb2ed42706d968ab01 +- package: github.com/golang/protobuf + version: 3852dcfda249c2097355a6aabb199a28d97b30df + subpackages: + - proto + - ptypes/timestamp +- package: github.com/m3db/m3db + version: 4d41e2d211f35888c76894ab77fb3ffddee7e273 + subpackages: + - client +- package: github.com/m3db/m3x + version: 2adc966acde14c9f0cc04c3eaeb338c5f9673a74 + subpackages: + - errors + - instrument + - log + - time +- package: github.com/pborman/getopt + version: ec82d864f599c39673eef89f91b93fa5576567a1 +- package: github.com/spf13/cobra + version: 7c674d9e72017ed25f6d2b5e497a1368086b6a6f +- package: github.com/spf13/pflag + version: 4f9190456aed1c2113ca51ea9b89219747458dc1 +- package: github.com/spf13/viper + version: 7538d73b4eb9511d85a9f1dfef202eeb8ac260f4 +- package: github.com/uber-go/tally + version: 43c1379c0577ac1eb74f9f3869cea07191c9992b + subpackages: + - m3 +- package: golang.org/x/net + version: 61557ac0112b576429a0df080e1c2cef5dfbb642 + subpackages: + - context +- package: google.golang.org/grpc + version: 777daa17ff9b5daef1cfdf915088a2ada3332bf0 + subpackages: + - codes +- package: gopkg.in/validator.v2 + version: 3e4f037f12a1221a0864cf0dd2e81c452ab22448 +- package: github.com/stretchr/testify + version: 6fe211e493929a8aac0469b93f28b1d0688a9a3a + subpackages: + - require +- package: github.com/golang/mock + version: bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 + subpackages: + - gomock +testImport: diff --git a/m3nsch_client/cmd/init.go b/m3nsch_client/cmd/init.go new file mode 100644 index 0000000..e7bad56 --- /dev/null +++ b/m3nsch_client/cmd/init.go @@ -0,0 +1,129 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cmd + +import ( + "fmt" + "log" + + "github.com/m3db/m3nsch/coordinator" + + xerrors "github.com/m3db/m3x/errors" + "github.com/m3db/m3x/instrument" + "github.com/spf13/cobra" +) + +var ( + // local flags + localInitFlags initFlags + + // InitCmd represents the base command when called without any subcommands + initCmd *cobra.Command + + defaultEnv = "test" + defaultZone = "sjc1" +) + +func init() { + initCmd = &cobra.Command{ + Use: "init", + Short: "Initialize agent processes", + Long: "Initialize agent processes with any resources required to execute workload", + Run: initExec, + Example: `# Initialize agents with default workload: +./m3nsch_client -e ",...,:" init -t any_string_breadcrumb + +# Initialize agents with explicit workload: +./m3nsch_client --endpoints ",...,:" init \ + --token any_string_breadcrumb \ + --target-zone sjc1 \ + --target-env prod \ + --metric-prefix m3nsch_metric_prefix \ + --namespace testmetrics \ + --cardinality 1000000 \ + --ingress-qps 200000 \`, + } + + flags := initCmd.Flags() + flags.StringVarP(&localInitFlags.token, "token", "t", "", + `[required] unique identifier required for all subsequent interactions on this workload`) + flags.BoolVarP(&localInitFlags.force, "force", "f", false, + `force initialization, stop any running workload`) + flags.StringVarP(&localInitFlags.targetZone, "target-zone", "z", defaultZone, + `target zone for load test`) + flags.StringVarP(&localInitFlags.targetEnv, "target-env", "v", defaultEnv, + `target env for load test`) + registerWorkloadFlags(flags, &localInitFlags.workload) +} + +type initFlags struct { + token string + force bool + workload cliWorkload + targetZone string + targetEnv string +} + +func (f initFlags) validate() error { + var multiErr xerrors.MultiError + if f.token == "" { + multiErr = multiErr.Add(fmt.Errorf("token is not set")) + } + if f.targetEnv == "" { + multiErr = multiErr.Add(fmt.Errorf("target-env is not set")) + } + if f.targetZone == "" { + multiErr = multiErr.Add(fmt.Errorf("target-zone is not set")) + } + if err := f.workload.validate(); err != nil { + multiErr = multiErr.Add(err) + } + return multiErr.FinalError() +} + +func initExec(cmd *cobra.Command, _ []string) { + if !gFlags.isValid() { + log.Fatalf("Invalid flags: %v", M3nschCmd.UsageString()) + } + if err := localInitFlags.validate(); err != nil { + log.Fatalf("Invalid flags: %v\n%s", err, cmd.UsageString()) + } + + var ( + workload = localInitFlags.workload.toM3nschWorkload() + iopts = instrument.NewOptions() + logger = iopts.Logger() + mOpts = coordinator.NewOptions(iopts) + coord, err = coordinator.New(mOpts, gFlags.endpoints) + ) + + if err != nil { + logger.Fatalf("unable to create coord: %v", err) + } + defer coord.Teardown() + + err = coord.Init(localInitFlags.token, workload, localInitFlags.force, + localInitFlags.targetZone, localInitFlags.targetEnv) + + if err != nil { + logger.Fatalf("unable to initialize: %v", err) + } +} diff --git a/m3nsch_client/cmd/m3nsch.go b/m3nsch_client/cmd/m3nsch.go new file mode 100644 index 0000000..b271e83 --- /dev/null +++ b/m3nsch_client/cmd/m3nsch.go @@ -0,0 +1,69 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cmd + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" +) + +var ( + // globalFlags + gFlags globalFlags + + // M3nschCmd represents the base command when called without any subcommands + M3nschCmd = &cobra.Command{ + Use: "m3nsch_client", + Short: "CLI interface to m3nsch - M3DB load generation", + } +) + +type globalFlags struct { + endpoints []string +} + +func (f globalFlags) isValid() bool { + return len(f.endpoints) != 0 +} + +func init() { + flags := M3nschCmd.PersistentFlags() + flags.StringSliceVarP(&gFlags.endpoints, "endpoints", "e", []string{}, + `host:port for each of the agent process endpoints`) + + M3nschCmd.AddCommand( + statusCmd, + initCmd, + startCmd, + stopCmd, + modifyCmd, + ) +} + +// Run executes the m3nsch command. +func Run() { + if err := M3nschCmd.Execute(); err != nil { + fmt.Println(err) + os.Exit(-1) + } +} diff --git a/m3nsch_client/cmd/modify.go b/m3nsch_client/cmd/modify.go new file mode 100644 index 0000000..72bf96b --- /dev/null +++ b/m3nsch_client/cmd/modify.go @@ -0,0 +1,81 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cmd + +import ( + "log" + + "github.com/m3db/m3nsch/coordinator" + + "github.com/m3db/m3x/instrument" + "github.com/spf13/cobra" +) + +var ( + modifyWorkload cliWorkload + + modifyCmd = &cobra.Command{ + Use: "modify", + Short: "modify the workload used in load generation", + Long: "Modify changes the worload used during load generation on each agent process", + Run: modifyExec, + Example: `# Modify agents with explicit workload: +./m3nsch_client --endpoints ",...,:" modify \ + --metric-prefix m3nsch_metric_prefix \ + --namespace metrics \ + --cardinality 1000000 \ + --ingress-qps 200000 \`, + } +) + +func init() { + flags := modifyCmd.Flags() + registerWorkloadFlags(flags, &modifyWorkload) +} + +func modifyExec(cmd *cobra.Command, _ []string) { + if !gFlags.isValid() { + log.Fatalf("Invalid flags\n%s", cmd.UsageString()) + } + if err := modifyWorkload.validate(); err != nil { + log.Fatalf("Invalid flags: %v\n%s", err, cmd.UsageString()) + } + + var ( + workload = modifyWorkload.toM3nschWorkload() + iopts = instrument.NewOptions() + logger = iopts.Logger() + mOpts = coordinator.NewOptions(iopts) + coord, err = coordinator.New(mOpts, gFlags.endpoints) + ) + + if err != nil { + logger.Fatalf("unable to create coord: %v", err) + } + defer coord.Teardown() + + err = coord.SetWorkload(workload) + if err != nil { + logger.Fatalf("unable to modify workload: %v", err) + } + + logger.Infof("workload modified successfully!") +} diff --git a/m3nsch_client/cmd/start.go b/m3nsch_client/cmd/start.go new file mode 100644 index 0000000..c00c99b --- /dev/null +++ b/m3nsch_client/cmd/start.go @@ -0,0 +1,66 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cmd + +import ( + "log" + + "github.com/m3db/m3nsch/coordinator" + + "github.com/m3db/m3x/instrument" + "github.com/spf13/cobra" +) + +var ( + startCmd = &cobra.Command{ + Use: "start", + Short: "start the load generation", + Long: "Start kicks off the load generation on each agent process", + Run: startExec, + Example: `# Start the load generation process on various agents: +./m3nsch_client -e ",...,:" start`, + } +) + +func startExec(_ *cobra.Command, _ []string) { + if !gFlags.isValid() { + log.Fatalf("unable to execute, invalid flags\n%s", M3nschCmd.UsageString()) + } + + var ( + iopts = instrument.NewOptions() + logger = iopts.Logger() + mOpts = coordinator.NewOptions(iopts) + coord, err = coordinator.New(mOpts, gFlags.endpoints) + ) + + if err != nil { + logger.Fatalf("unable to create coordinator: %v", err) + } + defer coord.Teardown() + + err = coord.Start() + if err != nil { + logger.Fatalf("unable to start workload: %v", err) + } + + logger.Infof("workload started!") +} diff --git a/m3nsch_client/cmd/status.go b/m3nsch_client/cmd/status.go new file mode 100644 index 0000000..9366cac --- /dev/null +++ b/m3nsch_client/cmd/status.go @@ -0,0 +1,73 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cmd + +import ( + "log" + + "github.com/m3db/m3nsch/coordinator" + + "github.com/m3db/m3x/instrument" + "github.com/spf13/cobra" +) + +var ( + statusCmd = &cobra.Command{ + Use: "status", + Short: "Retrieves the status of agent processes", + Long: "Status retrieves a status from each of the described agent processes", + Run: statusExec, + Example: `# Get status from the various agents: +./m3nsch_client -e ",...,:" status`, + } +) + +func statusExec(_ *cobra.Command, _ []string) { + if !gFlags.isValid() { + log.Fatalf("unable to execute, invalid flags\n%s", M3nschCmd.UsageString()) + } + + var ( + iopts = instrument.NewOptions() + logger = iopts.Logger() + mOpts = coordinator.NewOptions(iopts) + coord, err = coordinator.New(mOpts, gFlags.endpoints) + ) + + if err != nil { + logger.Fatalf("unable to create coordinator: %v", err) + } + defer coord.Teardown() + + statusMap, err := coord.Status() + if err != nil { + logger.Fatalf("unable to retrieve status: %v", err) + } + + for endpoint, status := range statusMap { + token := status.Token + if token == "" { + token = "" + } + logger.Infof("[%v] MaxQPS: %d, Status: %v, Token: %v, Workload: %+v", + endpoint, status.MaxQPS, status.Status, token, status.Workload) + } +} diff --git a/m3nsch_client/cmd/stop.go b/m3nsch_client/cmd/stop.go new file mode 100644 index 0000000..c0933f6 --- /dev/null +++ b/m3nsch_client/cmd/stop.go @@ -0,0 +1,66 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cmd + +import ( + "log" + + "github.com/m3db/m3nsch/coordinator" + + "github.com/m3db/m3x/instrument" + "github.com/spf13/cobra" +) + +var ( + stopCmd = &cobra.Command{ + Use: "stop", + Short: "stop the load generation", + Long: "Stop stops the load generation on each agent process", + Run: stopExec, + Example: `# Stop the load generation process on various agents: +./m3nsch_client -e ",...,:" stop`, + } +) + +func stopExec(_ *cobra.Command, _ []string) { + if !gFlags.isValid() { + log.Fatalf("unable to execute, invalid flags\n%s", M3nschCmd.UsageString()) + } + + var ( + iopts = instrument.NewOptions() + logger = iopts.Logger() + mOpts = coordinator.NewOptions(iopts) + coordinator, err = coordinator.New(mOpts, gFlags.endpoints) + ) + + if err != nil { + logger.Fatalf("unable to create coordinator: %v", err) + } + defer coordinator.Teardown() + + err = coordinator.Stop() + if err != nil { + logger.Fatalf("unable to stop workload: %v", err) + } + + logger.Infof("workload stopped!") +} diff --git a/m3nsch_client/cmd/util.go b/m3nsch_client/cmd/util.go new file mode 100644 index 0000000..1f5fa64 --- /dev/null +++ b/m3nsch_client/cmd/util.go @@ -0,0 +1,71 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cmd + +import ( + "fmt" + "time" + + "github.com/m3db/m3nsch" + + xerrors "github.com/m3db/m3x/errors" + "github.com/spf13/pflag" +) + +type cliWorkload struct { + m3nsch.Workload + baseTimeOffset time.Duration +} + +func (w *cliWorkload) validate() error { + var multiErr xerrors.MultiError + if w.baseTimeOffset >= time.Duration(0*time.Second) { + multiErr = multiErr.Add(fmt.Errorf("basetime-offset must be negative")) + } + if w.Cardinality <= 0 { + multiErr = multiErr.Add(fmt.Errorf("cardinality must be a positive integer")) + } + if w.IngressQPS <= 0 { + multiErr = multiErr.Add(fmt.Errorf("ingress-qps must be a positive integer")) + } + if w.Namespace == "" { + multiErr = multiErr.Add(fmt.Errorf("namespace must be set")) + } + return multiErr.FinalError() +} + +func (w *cliWorkload) toM3nschWorkload() m3nsch.Workload { + w.BaseTime = time.Now().Add(w.baseTimeOffset) + return w.Workload +} + +func registerWorkloadFlags(flags *pflag.FlagSet, workload *cliWorkload) { + flags.DurationVarP(&workload.baseTimeOffset, "basetime-offset", "b", -2*time.Minute, + `offset from current time to use for load, e.g. -2m, -30s`) + flags.StringVarP(&workload.MetricPrefix, "metric-prefix", "p", "m3nsch_", + `prefix added to each metric`) + flags.StringVarP(&workload.Namespace, "namespace", "n", "testmetrics", + `target namespace`) + flags.IntVarP(&workload.Cardinality, "cardinality", "c", 10000, + `aggregate workload cardinality`) + flags.IntVarP(&workload.IngressQPS, "ingress-qps", "i", 1000, + `aggregate workload ingress qps`) +} diff --git a/m3nsch_client/main.go b/m3nsch_client/main.go new file mode 100644 index 0000000..7fb08c2 --- /dev/null +++ b/m3nsch_client/main.go @@ -0,0 +1,29 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package main + +import ( + m3nsch "github.com/m3db/m3nsch/m3nsch_client/cmd" +) + +func main() { + m3nsch.Run() +} diff --git a/m3nsch_server/config/config.go b/m3nsch_server/config/config.go new file mode 100644 index 0000000..a0e9478 --- /dev/null +++ b/m3nsch_server/config/config.go @@ -0,0 +1,75 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package config + +import ( + "github.com/spf13/viper" + tallym3 "github.com/uber-go/tally/m3" + validator "gopkg.in/validator.v2" +) + +// Configuration represents the knobs available to configure a m3nsch_server +type Configuration struct { + Server ServerConfiguration `yaml:"server"` + M3nsch M3nschConfiguration `yaml:"m3nsch" validate:"nonzero"` + Metrics MetricsConfiguration `yaml:"metrics" validate:"nonzero"` +} + +// ServerConfiguration represents the knobs available to configure server properties +type ServerConfiguration struct { + ListenAddress string `yaml:"listenAddress" validate:"nonzero"` + DebugAddress string `yaml:"debugAddress" validate:"nonzero"` + CPUFactor float64 `yaml:"cpuFactor" validate:"min=0.5,max=3.0"` +} + +// MetricsConfiguration represents the knobs available to configure metrics collection +type MetricsConfiguration struct { + Prefix string `yaml:"prefix"` + SampleRate float64 `yaml:"sampleRate" validate:"min=0.01,max=1.0"` + M3 tallym3.Configuration `yaml:"metrics" validate:"nonzero"` +} + +// M3nschConfiguration represents the knobs available to configure m3nsch properties +type M3nschConfiguration struct { + Concurrency int `yaml:"concurrency" validate:"min=500,max=5000"` + NumPointsPerDatum int `yaml:"numPointsPerDatum" validate:"min=10,max=120"` +} + +// New returns a Configuration read from the specified path +func New(filename string) (Configuration, error) { + viper.SetConfigType("yaml") + viper.SetConfigFile(filename) + var conf Configuration + + if err := viper.ReadInConfig(); err != nil { + return conf, err + } + + if err := viper.Unmarshal(&conf); err != nil { + return conf, err + } + + if err := validator.Validate(conf); err != nil { + return conf, err + } + + return conf, nil +} diff --git a/m3nsch_server/main.go b/m3nsch_server/main.go new file mode 100644 index 0000000..185dce9 --- /dev/null +++ b/m3nsch_server/main.go @@ -0,0 +1,117 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package main + +import ( + "net" + "net/http" + _ "net/http/pprof" + "os" + "runtime" + "time" + + "github.com/m3db/m3nsch" + "github.com/m3db/m3nsch/agent" + "github.com/m3db/m3nsch/datums" + "github.com/m3db/m3nsch/m3nsch_server/config" + "github.com/m3db/m3nsch/m3nsch_server/services" + "github.com/m3db/m3nsch/m3nsch_server/tcp" + "github.com/m3db/m3nsch/rpc" + + "github.com/m3db/m3x/instrument" + xlog "github.com/m3db/m3x/log" + "github.com/pborman/getopt" + "github.com/uber-go/tally" + "google.golang.org/grpc" +) + +func main() { + var ( + configFile = getopt.StringLong("config-file", 'f', "", "Configuration file") + ) + getopt.Parse() + if len(*configFile) == 0 { + getopt.Usage() + return + } + + logger := xlog.NewLogger(os.Stdout) + conf, err := config.New(*configFile) + if err != nil { + logger.Fatalf("unable to read configuration file: %v", err.Error()) + } + + maxProcs := int(float64(runtime.NumCPU()) * conf.Server.CPUFactor) + logger.Infof("setting maxProcs = %d", maxProcs) + runtime.GOMAXPROCS(maxProcs) + + StartPProfServer(conf.Server.DebugAddress, logger) + + reporter, err := conf.Metrics.M3.NewReporter() + if err != nil { + logger.Fatalf("could not connect to metrics: %v", err) + } + scope, _ := tally.NewCachedRootScope(conf.Metrics.Prefix, nil, reporter, time.Second, tally.DefaultSeparator) + + listener, err := tcp.NewTCPListener(conf.Server.ListenAddress, 3*time.Minute) + if err != nil { + logger.Fatalf("could not create TCP Listener: %v", err) + } + + iopts := instrument. + NewOptions(). + SetLogger(logger). + SetMetricsScope(scope). + SetMetricsSamplingRate(conf.Metrics.SampleRate) + datumRegistry := datums.NewDefaultRegistry(conf.M3nsch.NumPointsPerDatum) + agentOpts := agent.NewOptions(iopts). + SetConcurrency(conf.M3nsch.Concurrency) // TODO: also need to SetNewSessionFn() + agent := agent.New(datumRegistry, agentOpts) + ServeGRPCService(listener, agent, scope, logger) +} + +// StartPProfServer starts a pprof server at specified address, or crashes +// the program on failure. +func StartPProfServer(debugAddress string, logger xlog.Logger) { + go func() { + if err := http.ListenAndServe(debugAddress, nil); err != nil { + logger.Fatalf("unable to serve debug server: %v", err) + } + }() + logger.Infof("serving pprof endpoints at: %v", debugAddress) +} + +// ServeGRPCService serves m3nsch_server GRPC service at the specified address. +func ServeGRPCService( + listener net.Listener, + agent m3nsch.Agent, + scope tally.Scope, + logger xlog.Logger, +) { + server := grpc.NewServer(grpc.MaxConcurrentStreams(16384)) + service, err := services.NewGRPCService(agent, scope, logger) + rpc.RegisterMenschServer(server, service) + logger.Infof("serving m3nsch endpoints at %v", listener.Addr().String()) + err = server.Serve(listener) + if err != nil { + logger.Fatalf("could not serve: %v", err) + } +} diff --git a/m3nsch_server/services/grpc_service.go b/m3nsch_server/services/grpc_service.go new file mode 100644 index 0000000..df669e7 --- /dev/null +++ b/m3nsch_server/services/grpc_service.go @@ -0,0 +1,114 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package services + +import ( + "github.com/m3db/m3nsch" + "github.com/m3db/m3nsch/rpc" + convert "github.com/m3db/m3nsch/rpc/convert" + + xlog "github.com/m3db/m3x/log" + + "github.com/uber-go/tally" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +// NewGRPCService returns a new GRPCService wrapping a m3nsch.Agent +func NewGRPCService( + agent m3nsch.Agent, + metricsScope tally.Scope, + logger xlog.Logger, +) (rpc.MenschServer, error) { + return &menschServer{ + agent: agent, + scope: metricsScope.SubScope("grpc-server"), + logger: logger, + }, nil +} + +type menschServer struct { + scope tally.Scope + logger xlog.Logger + agent m3nsch.Agent +} + +func (ms *menschServer) Status(ctx context.Context, req *rpc.StatusRequest) (*rpc.StatusResponse, error) { + status := ms.agent.Status() + workload := convert.ToProtoWorkload(ms.agent.Workload()) + response := &rpc.StatusResponse{ + Token: status.Token, + Status: convert.ToProtoStatus(status.Status), + MaxQPS: ms.agent.MaxQPS(), + Workload: &workload, + } + return response, nil +} + +func (ms *menschServer) Init(ctx context.Context, req *rpc.InitRequest) (*rpc.InitResponse, error) { + if req == nil { + return nil, grpc.Errorf(codes.InvalidArgument, "nil request") + } + + ms.logger.Debugf("received init request: %v", req.String()) + workload, err := convert.ToM3nschWorkload(req.GetWorkload()) + if err != nil { + return nil, grpc.Errorf(codes.InvalidArgument, "unable to parse workload: %v", err) + } + + err = ms.agent.Init(req.GetToken(), workload, req.GetForce(), + req.GetTargetZone(), req.GetTargetEnv()) + if err != nil { + return nil, grpc.Errorf(codes.Unavailable, err.Error()) + } + + return &rpc.InitResponse{}, nil +} + +func (ms *menschServer) Start(context.Context, *rpc.StartRequest) (*rpc.StartResponse, error) { + ms.logger.Debugf("received Start() request") + if err := ms.agent.Start(); err != nil { + return nil, grpc.Errorf(codes.Unknown, err.Error()) + } + return &rpc.StartResponse{}, nil +} + +func (ms *menschServer) Stop(context.Context, *rpc.StopRequest) (*rpc.StopResponse, error) { + ms.logger.Debugf("received Stop() request") + if err := ms.agent.Stop(); err != nil { + return nil, grpc.Errorf(codes.Unknown, err.Error()) + } + return &rpc.StopResponse{}, nil +} + +func (ms *menschServer) Modify(_ context.Context, req *rpc.ModifyRequest) (*rpc.ModifyResponse, error) { + ms.logger.Debugf("received Modify() request: %v", req.String()) + if req == nil { + return nil, grpc.Errorf(codes.InvalidArgument, "nil request") + } + workload, err := convert.ToM3nschWorkload(req.GetWorkload()) + if err != nil { + return nil, grpc.Errorf(codes.InvalidArgument, "unable to parse workload: %v", err) + } + ms.agent.SetWorkload(workload) + return &rpc.ModifyResponse{}, nil +} diff --git a/m3nsch_server/tcp/tcp.go b/m3nsch_server/tcp/tcp.go new file mode 100644 index 0000000..b591218 --- /dev/null +++ b/m3nsch_server/tcp/tcp.go @@ -0,0 +1,54 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package tcp + +import ( + "net" + "time" +) + +// NewTCPListener is Listener specifically for TCP +func NewTCPListener(listenAddress string, keepAlivePeriod time.Duration) (net.Listener, error) { + l, err := net.Listen("tcp", listenAddress) + if err != nil { + return nil, err + } + return tcpKeepAliveListener{l.(*net.TCPListener), keepAlivePeriod}, err +} + +// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted +// connections. It's used by ListenAndServe and ListenAndServeTLS so +// dead TCP connections (e.g. closing laptop mid-download) eventually +// go away. This is cargo culted from http/server.go +type tcpKeepAliveListener struct { + *net.TCPListener + keepAlivePeriod time.Duration +} + +func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { + tc, err := ln.AcceptTCP() + if err != nil { + return + } + tc.SetKeepAlive(true) + tc.SetKeepAlivePeriod(ln.keepAlivePeriod) + return tc, nil +} diff --git a/rpc/convert/to_api.go b/rpc/convert/to_api.go new file mode 100644 index 0000000..d51eb8f --- /dev/null +++ b/rpc/convert/to_api.go @@ -0,0 +1,63 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package convert + +import ( + "fmt" + "time" + + "github.com/m3db/m3nsch" + proto "github.com/m3db/m3nsch/rpc" + + google_protobuf "github.com/golang/protobuf/ptypes/timestamp" +) + +func toTimeFromProtoTimestamp(t *google_protobuf.Timestamp) time.Time { + return time.Unix(int64(t.Seconds), int64(t.Nanos)) +} + +// ToM3nschWorkload converts a rpc Workload into an equivalent API Workload. +func ToM3nschWorkload(workload *proto.Workload) (m3nsch.Workload, error) { + if workload == nil { + return m3nsch.Workload{}, fmt.Errorf("invalid workload") + } + + return m3nsch.Workload{ + BaseTime: toTimeFromProtoTimestamp(workload.BaseTime), + MetricPrefix: workload.MetricPrefix, + Namespace: workload.Namespace, + Cardinality: int(workload.Cardinality), + IngressQPS: int(workload.IngressQPS), + }, nil +} + +// ToM3nschStatus converts a rpc Status into an equivalent API Status. +func ToM3nschStatus(status proto.Status) (m3nsch.Status, error) { + switch status { + case proto.Status_UNINITIALIZED: + return m3nsch.StatusUninitialized, nil + case proto.Status_INITIALIZED: + return m3nsch.StatusInitialized, nil + case proto.Status_RUNNING: + return m3nsch.StatusRunning, nil + } + return m3nsch.StatusUninitialized, fmt.Errorf("invalid status: %s", status.String()) +} diff --git a/rpc/convert/to_proto.go b/rpc/convert/to_proto.go new file mode 100644 index 0000000..a1a5445 --- /dev/null +++ b/rpc/convert/to_proto.go @@ -0,0 +1,55 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package convert + +import ( + "github.com/m3db/m3nsch" + proto "github.com/m3db/m3nsch/rpc" + + google_protobuf "github.com/golang/protobuf/ptypes/timestamp" +) + +// ToProtoStatus converts an API status into a RPC status. +func ToProtoStatus(status m3nsch.Status) proto.Status { + switch status { + case m3nsch.StatusUninitialized: + return proto.Status_UNINITIALIZED + case m3nsch.StatusInitialized: + return proto.Status_INITIALIZED + case m3nsch.StatusRunning: + return proto.Status_RUNNING + } + return proto.Status_UNKNOWN +} + +// ToProtoWorkload converts an API Workload into a RPC Workload. +func ToProtoWorkload(mw m3nsch.Workload) proto.Workload { + var w proto.Workload + w.BaseTime = &google_protobuf.Timestamp{ + Seconds: mw.BaseTime.Unix(), + Nanos: int32(mw.BaseTime.UnixNano()), + } + w.Cardinality = int32(mw.Cardinality) + w.IngressQPS = int32(mw.IngressQPS) + w.MetricPrefix = mw.MetricPrefix + w.Namespace = mw.Namespace + return w +} diff --git a/rpc/generate.go b/rpc/generate.go new file mode 100644 index 0000000..32e5199 --- /dev/null +++ b/rpc/generate.go @@ -0,0 +1,24 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +//go:generate protoc --go_out=plugins=grpc:. m3nsch.proto +//go:generate mockgen -source=m3nsch.pb.go -package=rpc -destination=mock_m3nsch.pb.go + +package rpc diff --git a/rpc/m3nsch.pb.go b/rpc/m3nsch.pb.go new file mode 100644 index 0000000..129994b --- /dev/null +++ b/rpc/m3nsch.pb.go @@ -0,0 +1,559 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by protoc-gen-go. +// source: m3nsch.proto +// DO NOT EDIT! + +/* +Package rpc is a generated protocol buffer package. + +It is generated from these files: + m3nsch.proto + +It has these top-level messages: + StatusRequest + StatusResponse + InitRequest + InitResponse + ModifyRequest + ModifyResponse + StartRequest + StartResponse + StopRequest + StopResponse + Workload +*/ +package rpc + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" +import google_protobuf "github.com/golang/protobuf/ptypes/timestamp" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Status int32 + +const ( + Status_UNKNOWN Status = 0 + Status_UNINITIALIZED Status = 1 + Status_INITIALIZED Status = 2 + Status_RUNNING Status = 3 +) + +var Status_name = map[int32]string{ + 0: "UNKNOWN", + 1: "UNINITIALIZED", + 2: "INITIALIZED", + 3: "RUNNING", +} +var Status_value = map[string]int32{ + "UNKNOWN": 0, + "UNINITIALIZED": 1, + "INITIALIZED": 2, + "RUNNING": 3, +} + +func (x Status) String() string { + return proto.EnumName(Status_name, int32(x)) +} +func (Status) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +type StatusRequest struct { +} + +func (m *StatusRequest) Reset() { *m = StatusRequest{} } +func (m *StatusRequest) String() string { return proto.CompactTextString(m) } +func (*StatusRequest) ProtoMessage() {} +func (*StatusRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +type StatusResponse struct { + Status Status `protobuf:"varint,1,opt,name=status,enum=rpc.Status" json:"status,omitempty"` + Token string `protobuf:"bytes,2,opt,name=token" json:"token,omitempty"` + MaxQPS int64 `protobuf:"varint,3,opt,name=maxQPS" json:"maxQPS,omitempty"` + Workload *Workload `protobuf:"bytes,4,opt,name=workload" json:"workload,omitempty"` +} + +func (m *StatusResponse) Reset() { *m = StatusResponse{} } +func (m *StatusResponse) String() string { return proto.CompactTextString(m) } +func (*StatusResponse) ProtoMessage() {} +func (*StatusResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *StatusResponse) GetStatus() Status { + if m != nil { + return m.Status + } + return Status_UNKNOWN +} + +func (m *StatusResponse) GetToken() string { + if m != nil { + return m.Token + } + return "" +} + +func (m *StatusResponse) GetMaxQPS() int64 { + if m != nil { + return m.MaxQPS + } + return 0 +} + +func (m *StatusResponse) GetWorkload() *Workload { + if m != nil { + return m.Workload + } + return nil +} + +type InitRequest struct { + Token string `protobuf:"bytes,1,opt,name=token" json:"token,omitempty"` + Workload *Workload `protobuf:"bytes,2,opt,name=workload" json:"workload,omitempty"` + Force bool `protobuf:"varint,3,opt,name=force" json:"force,omitempty"` + TargetZone string `protobuf:"bytes,4,opt,name=targetZone" json:"targetZone,omitempty"` + TargetEnv string `protobuf:"bytes,5,opt,name=targetEnv" json:"targetEnv,omitempty"` +} + +func (m *InitRequest) Reset() { *m = InitRequest{} } +func (m *InitRequest) String() string { return proto.CompactTextString(m) } +func (*InitRequest) ProtoMessage() {} +func (*InitRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *InitRequest) GetToken() string { + if m != nil { + return m.Token + } + return "" +} + +func (m *InitRequest) GetWorkload() *Workload { + if m != nil { + return m.Workload + } + return nil +} + +func (m *InitRequest) GetForce() bool { + if m != nil { + return m.Force + } + return false +} + +func (m *InitRequest) GetTargetZone() string { + if m != nil { + return m.TargetZone + } + return "" +} + +func (m *InitRequest) GetTargetEnv() string { + if m != nil { + return m.TargetEnv + } + return "" +} + +type InitResponse struct { +} + +func (m *InitResponse) Reset() { *m = InitResponse{} } +func (m *InitResponse) String() string { return proto.CompactTextString(m) } +func (*InitResponse) ProtoMessage() {} +func (*InitResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +type ModifyRequest struct { + Workload *Workload `protobuf:"bytes,1,opt,name=workload" json:"workload,omitempty"` +} + +func (m *ModifyRequest) Reset() { *m = ModifyRequest{} } +func (m *ModifyRequest) String() string { return proto.CompactTextString(m) } +func (*ModifyRequest) ProtoMessage() {} +func (*ModifyRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func (m *ModifyRequest) GetWorkload() *Workload { + if m != nil { + return m.Workload + } + return nil +} + +type ModifyResponse struct { +} + +func (m *ModifyResponse) Reset() { *m = ModifyResponse{} } +func (m *ModifyResponse) String() string { return proto.CompactTextString(m) } +func (*ModifyResponse) ProtoMessage() {} +func (*ModifyResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } + +type StartRequest struct { +} + +func (m *StartRequest) Reset() { *m = StartRequest{} } +func (m *StartRequest) String() string { return proto.CompactTextString(m) } +func (*StartRequest) ProtoMessage() {} +func (*StartRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } + +type StartResponse struct { +} + +func (m *StartResponse) Reset() { *m = StartResponse{} } +func (m *StartResponse) String() string { return proto.CompactTextString(m) } +func (*StartResponse) ProtoMessage() {} +func (*StartResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } + +type StopRequest struct { +} + +func (m *StopRequest) Reset() { *m = StopRequest{} } +func (m *StopRequest) String() string { return proto.CompactTextString(m) } +func (*StopRequest) ProtoMessage() {} +func (*StopRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } + +type StopResponse struct { +} + +func (m *StopResponse) Reset() { *m = StopResponse{} } +func (m *StopResponse) String() string { return proto.CompactTextString(m) } +func (*StopResponse) ProtoMessage() {} +func (*StopResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } + +type Workload struct { + BaseTime *google_protobuf.Timestamp `protobuf:"bytes,1,opt,name=baseTime" json:"baseTime,omitempty"` + MetricPrefix string `protobuf:"bytes,2,opt,name=metricPrefix" json:"metricPrefix,omitempty"` + Namespace string `protobuf:"bytes,3,opt,name=namespace" json:"namespace,omitempty"` + Cardinality int32 `protobuf:"varint,4,opt,name=cardinality" json:"cardinality,omitempty"` + IngressQPS int32 `protobuf:"varint,5,opt,name=ingressQPS" json:"ingressQPS,omitempty"` +} + +func (m *Workload) Reset() { *m = Workload{} } +func (m *Workload) String() string { return proto.CompactTextString(m) } +func (*Workload) ProtoMessage() {} +func (*Workload) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } + +func (m *Workload) GetBaseTime() *google_protobuf.Timestamp { + if m != nil { + return m.BaseTime + } + return nil +} + +func (m *Workload) GetMetricPrefix() string { + if m != nil { + return m.MetricPrefix + } + return "" +} + +func (m *Workload) GetNamespace() string { + if m != nil { + return m.Namespace + } + return "" +} + +func (m *Workload) GetCardinality() int32 { + if m != nil { + return m.Cardinality + } + return 0 +} + +func (m *Workload) GetIngressQPS() int32 { + if m != nil { + return m.IngressQPS + } + return 0 +} + +func init() { + proto.RegisterType((*StatusRequest)(nil), "rpc.StatusRequest") + proto.RegisterType((*StatusResponse)(nil), "rpc.StatusResponse") + proto.RegisterType((*InitRequest)(nil), "rpc.InitRequest") + proto.RegisterType((*InitResponse)(nil), "rpc.InitResponse") + proto.RegisterType((*ModifyRequest)(nil), "rpc.ModifyRequest") + proto.RegisterType((*ModifyResponse)(nil), "rpc.ModifyResponse") + proto.RegisterType((*StartRequest)(nil), "rpc.StartRequest") + proto.RegisterType((*StartResponse)(nil), "rpc.StartResponse") + proto.RegisterType((*StopRequest)(nil), "rpc.StopRequest") + proto.RegisterType((*StopResponse)(nil), "rpc.StopResponse") + proto.RegisterType((*Workload)(nil), "rpc.Workload") + proto.RegisterEnum("rpc.Status", Status_name, Status_value) +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for Mensch service + +type MenschClient interface { + Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) + Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*InitResponse, error) + Start(ctx context.Context, in *StartRequest, opts ...grpc.CallOption) (*StartResponse, error) + Stop(ctx context.Context, in *StopRequest, opts ...grpc.CallOption) (*StopResponse, error) + Modify(ctx context.Context, in *ModifyRequest, opts ...grpc.CallOption) (*ModifyResponse, error) +} + +type menschClient struct { + cc *grpc.ClientConn +} + +func NewMenschClient(cc *grpc.ClientConn) MenschClient { + return &menschClient{cc} +} + +func (c *menschClient) Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) { + out := new(StatusResponse) + err := grpc.Invoke(ctx, "/rpc.Mensch/Status", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *menschClient) Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*InitResponse, error) { + out := new(InitResponse) + err := grpc.Invoke(ctx, "/rpc.Mensch/Init", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *menschClient) Start(ctx context.Context, in *StartRequest, opts ...grpc.CallOption) (*StartResponse, error) { + out := new(StartResponse) + err := grpc.Invoke(ctx, "/rpc.Mensch/Start", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *menschClient) Stop(ctx context.Context, in *StopRequest, opts ...grpc.CallOption) (*StopResponse, error) { + out := new(StopResponse) + err := grpc.Invoke(ctx, "/rpc.Mensch/Stop", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *menschClient) Modify(ctx context.Context, in *ModifyRequest, opts ...grpc.CallOption) (*ModifyResponse, error) { + out := new(ModifyResponse) + err := grpc.Invoke(ctx, "/rpc.Mensch/Modify", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Mensch service + +type MenschServer interface { + Status(context.Context, *StatusRequest) (*StatusResponse, error) + Init(context.Context, *InitRequest) (*InitResponse, error) + Start(context.Context, *StartRequest) (*StartResponse, error) + Stop(context.Context, *StopRequest) (*StopResponse, error) + Modify(context.Context, *ModifyRequest) (*ModifyResponse, error) +} + +func RegisterMenschServer(s *grpc.Server, srv MenschServer) { + s.RegisterService(&_Mensch_serviceDesc, srv) +} + +func _Mensch_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StatusRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MenschServer).Status(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/rpc.Mensch/Status", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MenschServer).Status(ctx, req.(*StatusRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Mensch_Init_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(InitRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MenschServer).Init(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/rpc.Mensch/Init", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MenschServer).Init(ctx, req.(*InitRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Mensch_Start_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StartRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MenschServer).Start(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/rpc.Mensch/Start", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MenschServer).Start(ctx, req.(*StartRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Mensch_Stop_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StopRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MenschServer).Stop(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/rpc.Mensch/Stop", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MenschServer).Stop(ctx, req.(*StopRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Mensch_Modify_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ModifyRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MenschServer).Modify(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/rpc.Mensch/Modify", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MenschServer).Modify(ctx, req.(*ModifyRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Mensch_serviceDesc = grpc.ServiceDesc{ + ServiceName: "rpc.Mensch", + HandlerType: (*MenschServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Status", + Handler: _Mensch_Status_Handler, + }, + { + MethodName: "Init", + Handler: _Mensch_Init_Handler, + }, + { + MethodName: "Start", + Handler: _Mensch_Start_Handler, + }, + { + MethodName: "Stop", + Handler: _Mensch_Stop_Handler, + }, + { + MethodName: "Modify", + Handler: _Mensch_Modify_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "m3nsch.proto", +} + +func init() { proto.RegisterFile("m3nsch.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 535 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x53, 0xd1, 0x6e, 0xd3, 0x30, + 0x14, 0xc5, 0xed, 0x1a, 0xda, 0x9b, 0xb6, 0x6b, 0x0d, 0x42, 0x55, 0x84, 0xa0, 0x0a, 0x2f, 0x05, + 0xa4, 0x4c, 0xeb, 0x24, 0x1e, 0x78, 0x43, 0x62, 0xa0, 0x08, 0x16, 0x46, 0xda, 0x6a, 0xd2, 0xde, + 0xdc, 0xd4, 0x2d, 0xd1, 0x9a, 0x38, 0xd8, 0x2e, 0x6c, 0x3f, 0xc1, 0x47, 0xf0, 0x31, 0x7c, 0x13, + 0x8f, 0x53, 0x6c, 0xa7, 0x71, 0x5f, 0xf6, 0x78, 0xcf, 0xbd, 0x39, 0xf7, 0xe4, 0x9e, 0x63, 0xe8, + 0x66, 0x67, 0xb9, 0x48, 0x7e, 0x04, 0x05, 0x67, 0x92, 0xe1, 0x26, 0x2f, 0x12, 0xef, 0xe5, 0x86, + 0xb1, 0xcd, 0x96, 0x9e, 0x28, 0x68, 0xb9, 0x5b, 0x9f, 0xc8, 0x34, 0xa3, 0x42, 0x92, 0xac, 0xd0, + 0x53, 0xfe, 0x31, 0xf4, 0x66, 0x92, 0xc8, 0x9d, 0x88, 0xe9, 0xcf, 0x1d, 0x15, 0xd2, 0xff, 0x83, + 0xa0, 0x5f, 0x21, 0xa2, 0x60, 0xb9, 0xa0, 0xf8, 0x15, 0x38, 0x42, 0x21, 0x23, 0x34, 0x46, 0x93, + 0xfe, 0xd4, 0x0d, 0x78, 0x91, 0x04, 0x66, 0xc8, 0xb4, 0xf0, 0x53, 0x68, 0x49, 0x76, 0x43, 0xf3, + 0x51, 0x63, 0x8c, 0x26, 0x9d, 0x58, 0x17, 0xf8, 0x19, 0x38, 0x19, 0xb9, 0xfd, 0x7e, 0x39, 0x1b, + 0x35, 0xc7, 0x68, 0xd2, 0x8c, 0x4d, 0x85, 0x5f, 0x43, 0xfb, 0x37, 0xe3, 0x37, 0x5b, 0x46, 0x56, + 0xa3, 0xa3, 0x31, 0x9a, 0xb8, 0xd3, 0x9e, 0x22, 0xbd, 0x32, 0x60, 0xbc, 0x6f, 0xfb, 0x7f, 0x11, + 0xb8, 0x61, 0x9e, 0x4a, 0x23, 0xb0, 0x5e, 0x84, 0xec, 0x45, 0x36, 0x61, 0xe3, 0x41, 0xc2, 0x92, + 0x60, 0xcd, 0x78, 0x42, 0x95, 0xa4, 0x76, 0xac, 0x0b, 0xfc, 0x02, 0x40, 0x12, 0xbe, 0xa1, 0xf2, + 0x9a, 0xe5, 0x54, 0x69, 0xea, 0xc4, 0x16, 0x82, 0x9f, 0x43, 0x47, 0x57, 0xe7, 0xf9, 0xaf, 0x51, + 0x4b, 0xb5, 0x6b, 0xc0, 0xef, 0x43, 0x57, 0x6b, 0xd4, 0x27, 0xf3, 0xdf, 0x43, 0xef, 0x82, 0xad, + 0xd2, 0xf5, 0x5d, 0xa5, 0xda, 0xd6, 0x87, 0x1e, 0xfe, 0xe1, 0x01, 0xf4, 0xab, 0x6f, 0x0d, 0x5b, + 0x1f, 0xba, 0x33, 0x49, 0x78, 0x75, 0x02, 0x63, 0x1a, 0xaf, 0xd7, 0xf5, 0xc0, 0x9d, 0x49, 0x56, + 0x54, 0x7d, 0x35, 0x5f, 0x96, 0xa6, 0xfd, 0x0f, 0x41, 0xbb, 0x5a, 0x84, 0xdf, 0x41, 0x7b, 0x49, + 0x04, 0x9d, 0xa7, 0x19, 0x35, 0x4a, 0xbc, 0x40, 0xa7, 0x24, 0xa8, 0x52, 0x12, 0xcc, 0xab, 0x94, + 0xc4, 0xfb, 0x59, 0xec, 0x43, 0x37, 0xa3, 0x92, 0xa7, 0xc9, 0x25, 0xa7, 0xeb, 0xf4, 0xd6, 0xf8, + 0x7c, 0x80, 0x95, 0x47, 0xca, 0x49, 0x46, 0x45, 0x41, 0xcc, 0x79, 0x3b, 0x71, 0x0d, 0xe0, 0x31, + 0xb8, 0x09, 0xe1, 0xab, 0x34, 0x27, 0xdb, 0x54, 0xde, 0xa9, 0x1b, 0xb7, 0x62, 0x1b, 0x2a, 0x4d, + 0x48, 0xf3, 0x0d, 0xa7, 0x42, 0x94, 0x91, 0x69, 0xa9, 0x01, 0x0b, 0x79, 0xf3, 0x09, 0x1c, 0x1d, + 0x3b, 0xec, 0xc2, 0xe3, 0x45, 0xf4, 0x25, 0xfa, 0x76, 0x15, 0x0d, 0x1e, 0xe1, 0x21, 0xf4, 0x16, + 0x51, 0x18, 0x85, 0xf3, 0xf0, 0xc3, 0xd7, 0xf0, 0xfa, 0xfc, 0xe3, 0x00, 0xe1, 0x63, 0x70, 0x6d, + 0xa0, 0x51, 0x7e, 0x10, 0x2f, 0xa2, 0x28, 0x8c, 0x3e, 0x0f, 0x9a, 0xd3, 0xff, 0x08, 0x9c, 0x0b, + 0x5a, 0x3e, 0x16, 0x7c, 0xba, 0xa7, 0xc4, 0x76, 0xac, 0xf5, 0x25, 0xbd, 0x27, 0x07, 0x98, 0x79, + 0x0f, 0x6f, 0xe1, 0xa8, 0x34, 0x1b, 0x0f, 0x54, 0xd3, 0xca, 0xa6, 0x37, 0xb4, 0x10, 0x33, 0x1c, + 0x40, 0x4b, 0x79, 0x85, 0x87, 0x15, 0xd5, 0xde, 0x47, 0x0f, 0xdb, 0x50, 0x4d, 0x5e, 0x7a, 0x67, + 0xc8, 0x2d, 0x57, 0xbd, 0xa1, 0x85, 0x98, 0xe1, 0x53, 0x70, 0x74, 0x54, 0x8c, 0xf8, 0x83, 0xcc, + 0x19, 0xf1, 0x87, 0x59, 0x5a, 0x3a, 0xca, 0xe4, 0xb3, 0xfb, 0x00, 0x00, 0x00, 0xff, 0xff, 0x7c, + 0x35, 0xd2, 0x8e, 0x2d, 0x04, 0x00, 0x00, +} diff --git a/rpc/m3nsch.proto b/rpc/m3nsch.proto new file mode 100644 index 0000000..421a394 --- /dev/null +++ b/rpc/m3nsch.proto @@ -0,0 +1,68 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +package rpc; + +service Mensch { + rpc Status(StatusRequest) returns (StatusResponse); + rpc Init(InitRequest) returns (InitResponse); + rpc Start(StartRequest) returns (StartResponse); + rpc Stop(StopRequest) returns (StopResponse); + rpc Modify(ModifyRequest) returns (ModifyResponse); +} + +enum Status { + UNKNOWN = 0; + UNINITIALIZED = 1; + INITIALIZED = 2; + RUNNING = 3; +} + +message StatusRequest {} + +message StatusResponse { + Status status = 1; + string token = 2; + int64 maxQPS = 3; + Workload workload = 4; +} + +message InitRequest { + string token = 1; + Workload workload = 2; + bool force = 3; + string targetZone = 4; + string targetEnv = 5; +} + +message InitResponse { +} + +message ModifyRequest { + Workload workload = 1; +} + +message ModifyResponse { +} + +message StartRequest { +} + +message StartResponse { +} + +message StopRequest { +} + +message StopResponse{ +} + +message Workload { + google.protobuf.Timestamp baseTime = 1; + string metricPrefix = 2; + string namespace = 3; + int32 cardinality = 4; + int32 ingressQPS = 5; +} + diff --git a/rpc/mock_m3nsch.pb.go b/rpc/mock_m3nsch.pb.go new file mode 100644 index 0000000..3bc0c44 --- /dev/null +++ b/rpc/mock_m3nsch.pb.go @@ -0,0 +1,207 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Automatically generated by MockGen. DO NOT EDIT! +// Source: m3nsch.pb.go + +package rpc + +import ( + gomock "github.com/golang/mock/gomock" + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Mock of MenschClient interface +type MockMenschClient struct { + ctrl *gomock.Controller + recorder *_MockMenschClientRecorder +} + +// Recorder for MockMenschClient (not exported) +type _MockMenschClientRecorder struct { + mock *MockMenschClient +} + +func NewMockMenschClient(ctrl *gomock.Controller) *MockMenschClient { + mock := &MockMenschClient{ctrl: ctrl} + mock.recorder = &_MockMenschClientRecorder{mock} + return mock +} + +func (_m *MockMenschClient) EXPECT() *_MockMenschClientRecorder { + return _m.recorder +} + +func (_m *MockMenschClient) Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) { + _s := []interface{}{ctx, in} + for _, _x := range opts { + _s = append(_s, _x) + } + ret := _m.ctrl.Call(_m, "Status", _s...) + ret0, _ := ret[0].(*StatusResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockMenschClientRecorder) Status(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + _s := append([]interface{}{arg0, arg1}, arg2...) + return _mr.mock.ctrl.RecordCall(_mr.mock, "Status", _s...) +} + +func (_m *MockMenschClient) Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*InitResponse, error) { + _s := []interface{}{ctx, in} + for _, _x := range opts { + _s = append(_s, _x) + } + ret := _m.ctrl.Call(_m, "Init", _s...) + ret0, _ := ret[0].(*InitResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockMenschClientRecorder) Init(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + _s := append([]interface{}{arg0, arg1}, arg2...) + return _mr.mock.ctrl.RecordCall(_mr.mock, "Init", _s...) +} + +func (_m *MockMenschClient) Start(ctx context.Context, in *StartRequest, opts ...grpc.CallOption) (*StartResponse, error) { + _s := []interface{}{ctx, in} + for _, _x := range opts { + _s = append(_s, _x) + } + ret := _m.ctrl.Call(_m, "Start", _s...) + ret0, _ := ret[0].(*StartResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockMenschClientRecorder) Start(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + _s := append([]interface{}{arg0, arg1}, arg2...) + return _mr.mock.ctrl.RecordCall(_mr.mock, "Start", _s...) +} + +func (_m *MockMenschClient) Stop(ctx context.Context, in *StopRequest, opts ...grpc.CallOption) (*StopResponse, error) { + _s := []interface{}{ctx, in} + for _, _x := range opts { + _s = append(_s, _x) + } + ret := _m.ctrl.Call(_m, "Stop", _s...) + ret0, _ := ret[0].(*StopResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockMenschClientRecorder) Stop(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + _s := append([]interface{}{arg0, arg1}, arg2...) + return _mr.mock.ctrl.RecordCall(_mr.mock, "Stop", _s...) +} + +func (_m *MockMenschClient) Modify(ctx context.Context, in *ModifyRequest, opts ...grpc.CallOption) (*ModifyResponse, error) { + _s := []interface{}{ctx, in} + for _, _x := range opts { + _s = append(_s, _x) + } + ret := _m.ctrl.Call(_m, "Modify", _s...) + ret0, _ := ret[0].(*ModifyResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockMenschClientRecorder) Modify(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + _s := append([]interface{}{arg0, arg1}, arg2...) + return _mr.mock.ctrl.RecordCall(_mr.mock, "Modify", _s...) +} + +// Mock of MenschServer interface +type MockMenschServer struct { + ctrl *gomock.Controller + recorder *_MockMenschServerRecorder +} + +// Recorder for MockMenschServer (not exported) +type _MockMenschServerRecorder struct { + mock *MockMenschServer +} + +func NewMockMenschServer(ctrl *gomock.Controller) *MockMenschServer { + mock := &MockMenschServer{ctrl: ctrl} + mock.recorder = &_MockMenschServerRecorder{mock} + return mock +} + +func (_m *MockMenschServer) EXPECT() *_MockMenschServerRecorder { + return _m.recorder +} + +func (_m *MockMenschServer) Status(_param0 context.Context, _param1 *StatusRequest) (*StatusResponse, error) { + ret := _m.ctrl.Call(_m, "Status", _param0, _param1) + ret0, _ := ret[0].(*StatusResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockMenschServerRecorder) Status(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "Status", arg0, arg1) +} + +func (_m *MockMenschServer) Init(_param0 context.Context, _param1 *InitRequest) (*InitResponse, error) { + ret := _m.ctrl.Call(_m, "Init", _param0, _param1) + ret0, _ := ret[0].(*InitResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockMenschServerRecorder) Init(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "Init", arg0, arg1) +} + +func (_m *MockMenschServer) Start(_param0 context.Context, _param1 *StartRequest) (*StartResponse, error) { + ret := _m.ctrl.Call(_m, "Start", _param0, _param1) + ret0, _ := ret[0].(*StartResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockMenschServerRecorder) Start(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "Start", arg0, arg1) +} + +func (_m *MockMenschServer) Stop(_param0 context.Context, _param1 *StopRequest) (*StopResponse, error) { + ret := _m.ctrl.Call(_m, "Stop", _param0, _param1) + ret0, _ := ret[0].(*StopResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockMenschServerRecorder) Stop(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "Stop", arg0, arg1) +} + +func (_m *MockMenschServer) Modify(_param0 context.Context, _param1 *ModifyRequest) (*ModifyResponse, error) { + ret := _m.ctrl.Call(_m, "Modify", _param0, _param1) + ret0, _ := ret[0].(*ModifyResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockMenschServerRecorder) Modify(arg0, arg1 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "Modify", arg0, arg1) +} diff --git a/types.go b/types.go new file mode 100644 index 0000000..1b43c05 --- /dev/null +++ b/types.go @@ -0,0 +1,202 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package m3nsch + +import ( + "time" + + "github.com/m3db/m3db/client" + "github.com/m3db/m3x/instrument" + xtime "github.com/m3db/m3x/time" +) + +// Status represents the various states the load generation processes may exist in. +type Status int + +const ( + // StatusUninitialized refers to a load generation process yet to be initialized. + StatusUninitialized Status = iota + + // StatusInitialized refers to a load generation process which has been initialized. + StatusInitialized + + // StatusRunning refers to a load generation process which is running. + StatusRunning +) + +// Workload is a collection of attributes required to define a load generation workload. +// TODO(prateek): add support for duration +type Workload struct { + // BaseTime is the epoch value for time used during load generation, all timestamps are + // generated relative to it. + BaseTime time.Time + + // Namespace is the target namespace to perform the writes against. + Namespace string + + // MetricPrefix is the string prefixed to each metric used. + MetricPrefix string + + // Cardinality is the number of unique metrics used. + Cardinality int + + // IngressQPS is the number of metrics written per second. + IngressQPS int + + // MetricStartIdx is an offset to control metric numbering. Can be safely ignored + // by external callers. + MetricStartIdx int +} + +// Coordinator refers to the process responsible for synchronizing load generation. +type Coordinator interface { + // Status returns the status of the agents known to this process. + Status() (map[string]AgentStatus, error) + + // Init acquires resources required on known agent processes to be able to + // generate load. + // NB(prateek): Init takes ~30s to initialize m3db.Session objects + Init(token string, w Workload, force bool, targetZone string, targetEnv string) error + + // Workload returns the aggregate workload currently initialized on the + // agent processes known to the coordinator process. + Workload() (Workload, error) + + // SetWorkload splits the specified workload into smaller chunks, and distributes them + // across known agent processes. + SetWorkload(Workload) error + + // Start begins the load generation process on known agent processes. + Start() error + + // Stops ends the load generation process on known agent processes. + Stop() error + + // Teardown releases resources held by the Coordinator process (connections, state tracking structures). + Teardown() error +} + +// CoordinatorOptions is a collection of the various knobs to control Coordinator behavior. +type CoordinatorOptions interface { + // SetInstrumentOptions sets the InstrumentOptions + SetInstrumentOptions(instrument.Options) CoordinatorOptions + + // InstrumentOptions returns the set InstrumentOptions + InstrumentOptions() instrument.Options + + // SetTimeout sets the timeout for rpc interaction + SetTimeout(time.Duration) CoordinatorOptions + + // Timeout returns the timeout for rpc interaction + Timeout() time.Duration + + // SetParallelOperations sets a flag determining if the operations + // performed by the coordinator against various endpoints are to be + // parallelized or not + SetParallelOperations(bool) CoordinatorOptions + + // ParallelOperations returns a flag indicating if the operations + // performed by the coordinator against various endpoints are to be + // parallelized or not + ParallelOperations() bool +} + +// AgentStatus is a collection of attributes capturing the state +// of a agent process. +type AgentStatus struct { + // Status refers to the agent process' running status + Status Status + + // Token (if non-empty) is the breadcrumb used to Init the agent process' + Token string + + // MaxQPS is the maximum QPS attainable by the Agent process + MaxQPS int64 + + // Workload is the currently configured workload on the agent process + Workload Workload +} + +// Agent refers to the process responsible for executing load generation. +type Agent interface { + // Status returns the status of the agent process. + Status() AgentStatus + + // Workload returns Workload currently configured on the agent process. + Workload() Workload + + // SetWorkload sets the Workload on the agent process. + SetWorkload(Workload) + + // Init initializes resources required by the agent process. + Init(token string, w Workload, force bool, targetZone string, targetEnv string) error + + // Start begins the load generation process if the agent is Initialized, or errors if it is not. + Start() error + + // Stop ends the load generation process if the agent is Running. + Stop() error + + // TODO(prateek): stats + + // MaxQPS returns the maximum QPS this Agent is capable of driving. + // MaxQPS := `AgentOptions.MaxWorkerQPS() * AgentOptions.Concurrency()` + MaxQPS() int64 +} + +// NewSessionFn creates a new client.Session for the specified environment, zone. +type NewSessionFn func(targetZone string, targetEnv string) (client.Session, error) + +// AgentOptions is a collection of knobs to control Agent behavior. +type AgentOptions interface { + // SetInstrumentOptions sets the InstrumentOptions + SetInstrumentOptions(instrument.Options) AgentOptions + + // InstrumentOptions returns the set InstrumentOptions + InstrumentOptions() instrument.Options + + // SetMaxWorkerQPS sets the maximum QPS per 'worker' go-routine used in + // the load generation process. + SetMaxWorkerQPS(int64) AgentOptions + + // SetMaxWorkerQPS returns the maximum QPS per 'worker' go-routine used in + // the load generation process. + MaxWorkerQPS() int64 + + // SetConcurrency sets the number of concurrent go routines used. + SetConcurrency(int) AgentOptions + + // Concurrency returns the number of concurrent go routines usable during + // load generation. + Concurrency() int + + // SetNewSessionFn sets the new session function + SetNewSessionFn(fn NewSessionFn) AgentOptions + + // NewSessionFn returns the new session function + NewSessionFn() NewSessionFn + + // SetTimeUnit sets the time unit to use during load operations. + SetTimeUnit(xtime.Unit) AgentOptions + + // TimeUnit returns the time unit used during load operations. + TimeUnit() xtime.Unit +}