Skip to content

Commit

Permalink
Add memq server
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeda committed Jan 29, 2017
1 parent ccc701e commit 800d0d1
Show file tree
Hide file tree
Showing 6 changed files with 440 additions and 0 deletions.
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,33 @@ Have Docker installed.
make push REGISTRY=<my-gcr-registry>
```

### KeyGen Workload

To help simulate batch workers, we have a synthetic workload of generating 4096 bit RSA keys. This can be configured through the UI or the command line.

```
--keygen-enable Enable KeyGen workload
--keygen-exit-code int Exit code when workload complete
--keygen-exit-on-complete Exit after workload is complete
--keygen-num-to-gen int The number of keys to generate. Set to 0 for infinite
--keygen-time-to-run int The target run time in seconds. Set to 0 for infinite
```

### MemQ server

We also have a simple in memory queue with REST API. This is based heavily on https://github.com/kelseyhightower/memq.

The API is as follows with Urls being relative to `<server addr>/memq/server`. See `pkg/memq/types.go` for the data structures returned.

| Method | Url | Desc
| --- | --- | ---
| `GET` | `/stats` | Get stats on all queues
| `PUT` | `/queues/:queue` | Create a queue
| `DELETE` | `/queue/:queue` | Delete a queue
| `POST` | `/queue/:queue/drain` | Discard all items in queue
| `POST` | `/queue/:queue/enqueue` | Add item to queue. Body is plain text. Response is message object.
| `POST` | `/queue/:queue/dequeue` | Grab an item off the queue and return it. Returns a 204 "No Content" if queue is empty.

### Versions

Images built will automatically have the git verison (based on tag) applied. In addition, there is an idea of a "fake version". This is used so that we can use the same basic server to demonstrate upgrade scenarios.
Expand Down
4 changes: 4 additions & 0 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
"github.com/jbeda/kuard/pkg/env"
"github.com/jbeda/kuard/pkg/htmlutils"
"github.com/jbeda/kuard/pkg/keygen"
"github.com/jbeda/kuard/pkg/memq/server"
"github.com/jbeda/kuard/pkg/sitedata"
"github.com/jbeda/kuard/pkg/version"

"github.com/julienschmidt/httprouter"
)

Expand Down Expand Up @@ -124,5 +126,7 @@ func NewApp() *App {
k.kg = keygen.New("/keygen")
k.kg.AddRoutes(router)

memqserver.NewServer("/memq/server").AddRoutes(router)

return k
}
128 changes: 128 additions & 0 deletions pkg/memq/server/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
Copyright 2017 The KUAR Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package memqserver

import (
"io/ioutil"
"net/http"

"github.com/jbeda/kuard/pkg/apiutils"
"github.com/julienschmidt/httprouter"
)

type Server struct {
path string
broker *Broker
}

func NewServer(path string) *Server {
return &Server{
path: path,
broker: NewBroker(),
}
}

func (s *Server) AddRoutes(router *httprouter.Router) {
router.GET(s.path+"/stats", s.GetStats)
router.PUT(s.path+"/queues/:queue", s.CreateQueue)
router.DELETE(s.path+"/queues/:queue", s.DeleteQueue)
router.POST(s.path+"/queues/:queue/drain", s.DrainQueue)
router.POST(s.path+"/queues/:queue/dequeue", s.Dequeue)
router.POST(s.path+"/queues/:queue/enqueue", s.Enqueue)
}

func (s *Server) CreateQueue(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
qName := p.ByName("queue")
if len(qName) == 0 {
http.Error(w, ErrEmptyName.Error(), http.StatusBadRequest)
return
}
err := s.broker.CreateQueue(qName)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}

func (s *Server) DeleteQueue(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
qName := p.ByName("queue")
if len(qName) == 0 {
http.Error(w, ErrEmptyName.Error(), http.StatusBadRequest)
return
}
err := s.broker.DeleteQueue(qName)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}

func (s *Server) DrainQueue(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
qName := p.ByName("queue")
if len(qName) == 0 {
http.Error(w, ErrEmptyName.Error(), http.StatusBadRequest)
return
}
err := s.broker.DrainQueue(qName)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}

func (s *Server) Enqueue(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
w.WriteHeader(http.StatusInternalServerError)
return
}
qName := p.ByName("queue")
if len(qName) == 0 {
http.Error(w, ErrEmptyName.Error(), http.StatusBadRequest)
return
}

msg, err := s.broker.PutMessage(qName, string(body))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

apiutils.ServeJSON(w, msg)
}

func (s *Server) Dequeue(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
qName := p.ByName("queue")
if len(qName) == 0 {
http.Error(w, ErrEmptyName.Error(), http.StatusBadRequest)
return
}

m, err := s.broker.GetMessage(qName)
if err == ErrEmptyQueue {
w.WriteHeader(http.StatusNoContent)
return
} else if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

apiutils.ServeJSON(w, &m)
}

func (s *Server) GetStats(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
stats := s.broker.Stats()
apiutils.ServeJSON(w, &stats)
}
204 changes: 204 additions & 0 deletions pkg/memq/server/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
Copyright 2017 The KUAR Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package memqserver

import (
"crypto/rand"
"encoding/hex"
"errors"
"sync"
"time"

"github.com/jbeda/kuard/pkg/memq"
)

var ErrEmptyQueue = errors.New("empty queue")
var ErrNotExist = errors.New("does not exist")
var ErrAlreadyExist = errors.New("already exists")
var ErrEmptyName = errors.New("empty name")

type Queue struct {
Depth int64
Enqueued int64
Dequeued int64
Drained int64
Messages []*memq.Message
mu *sync.RWMutex
}

type Broker struct {
Queues map[string]*Queue
mu *sync.RWMutex
}

func newStats() *memq.Stats {
return &memq.Stats{
Kind: "stats",
Queues: make([]memq.Stat, 0),
}
}

func newMessage(body string) (*memq.Message, error) {
id, err := uuid()
if err != nil {
return nil, err
}
m := &memq.Message{
Kind: "message",
ID: id,
Body: body,
Created: time.Now()}
return m, nil
}

func newQueue(name string) *Queue {
return &Queue{
Depth: 0,
Messages: make([]*memq.Message, 0),
mu: &sync.RWMutex{},
}
}

func NewBroker() *Broker {
return &Broker{
Queues: make(map[string]*Queue),
mu: &sync.RWMutex{},
}
}

func (b *Broker) CreateQueue(name string) error {
b.mu.Lock()
defer b.mu.Unlock()

if _, ok := b.Queues[name]; ok {
return ErrAlreadyExist
}

b.Queues[name] = newQueue(name)

return nil
}

func (b *Broker) DeleteQueue(name string) error {
b.mu.Lock()
defer b.mu.Unlock()

if _, ok := b.Queues[name]; !ok {
return ErrNotExist
}
delete(b.Queues, name)
return nil
}

func (b *Broker) DrainQueue(name string) error {
b.mu.Lock()
defer b.mu.Unlock()

q, ok := b.Queues[name]
if !ok {
return ErrNotExist
}

q.mu.Lock()
defer q.mu.Unlock()

q.Messages = make([]*memq.Message, 0)
q.Drained += q.Depth
q.Depth = 0

return nil
}

// getQueue safely gets a queue. There is no guarantee that the queue won't be
// thown away (via DrainQueue or DeleteQueue) before it can be used.
func (b *Broker) getQueue(queue string) (*Queue, error) {
b.mu.RLock()
defer b.mu.RUnlock()

q, ok := b.Queues[queue]
if !ok {
return nil, ErrNotExist
}
return q, nil
}

func (b *Broker) PutMessage(queue, body string) (*memq.Message, error) {
q, err := b.getQueue(queue)
if err != nil {
return nil, err
}

message, err := newMessage(body)
if err != nil {
return nil, err
}

q.mu.Lock()
defer q.mu.Unlock()
q.Messages = append(q.Messages, message)
q.Depth++
q.Enqueued++
return message, nil
}

func (b *Broker) GetMessage(queue string) (*memq.Message, error) {
q, err := b.getQueue(queue)
if err != nil {
return nil, err
}

q.mu.Lock()
defer q.mu.Unlock()
if len(q.Messages) < 1 {
return nil, ErrEmptyQueue
}
var m *memq.Message
m, q.Messages = q.Messages[0], q.Messages[1:]
q.Depth--
q.Dequeued++
return m, nil
}

func (b *Broker) Stats() *memq.Stats {
s := newStats()

b.mu.RLock()
defer b.mu.RUnlock()

for name, q := range b.Queues {
q.mu.RLock()
stat := memq.Stat{
Name: name,
Depth: q.Depth,
Enqueued: q.Enqueued,
Dequeued: q.Dequeued,
Drained: q.Drained,
}
s.Queues = append(s.Queues, stat)
q.mu.RUnlock()
}
return s
}

func uuid() (string, error) {
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
return "", err
}
return hex.EncodeToString(b), nil
}
Loading

0 comments on commit 800d0d1

Please sign in to comment.