Skip to content

Commit

Permalink
feat: implement metrics (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
troian authored Jan 19, 2020
1 parent 5ffb27c commit e14fb8a
Show file tree
Hide file tree
Showing 46 changed files with 1,579 additions and 1,736 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
- stage: test
script:
- cd cmd/volantmq
- go build -race -v -gcflags "-N -l" .
- go build -v -gcflags "-N -l" .
- cd ../..
- ./go.test.sh
after_success:
Expand Down
32 changes: 26 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.13.4 as builder
FROM golang:1.13.6-alpine3.11 as builder
LABEL stage=intermediate

#compile linux only
Expand All @@ -13,6 +13,11 @@ RUN mkdir -p $VOLANTMQ_WORK_DIR/bin
RUN mkdir -p $VOLANTMQ_WORK_DIR/conf
RUN mkdir -p $VOLANTMQ_PLUGINS_DIR

# git required by govvv
# gcc&g++ for building plugins
RUN apk update && apk upgrade && \
apk add --no-cache bash git openssh gcc g++

# Create environment directory
ENV PATH $VOLANTMQ_WORK_DIR/bin:$PATH

Expand All @@ -21,32 +26,47 @@ RUN \
GO111MODULE=off go get -v github.com/VolantMQ/volantmq/cmd/volantmq \
&& cd $GOPATH/src/github.com/VolantMQ/volantmq/cmd/volantmq \
&& GO111MODULE=on go mod tidy \
&& go get github.com/ahmetb/govvv \
&& go get github.com/troian/govvv \
&& govvv build $VOLANTMQ_BUILD_FLAGS -o $VOLANTMQ_WORK_DIR/bin/volantmq

RUN cp $GOPATH/src/github.com/VolantMQ/volantmq/tools/print_version.sh /bin

# build debug plugins
RUN \
GO111MODULE=off go get gitlab.com/VolantMQ/vlplugin/debug \
&& cd $GOPATH/src/gitlab.com/VolantMQ/vlplugin/debug \
&& GO111MODULE=on go mod tidy \
&& go build $VOLANTMQ_BUILD_FLAGS -buildmode=plugin -o $VOLANTMQ_WORK_DIR/plugins/debug.so
&& go build $VOLANTMQ_BUILD_FLAGS -buildmode=plugin -ldflags "-X main.version=$(print_version.sh)" -o $VOLANTMQ_WORK_DIR/plugins/debug.so

# build health plugins
RUN \
GO111MODULE=off go get gitlab.com/VolantMQ/vlplugin/health \
&& cd $GOPATH/src/gitlab.com/VolantMQ/vlplugin/health \
&& GO111MODULE=on go mod tidy \
&& go build $VOLANTMQ_BUILD_FLAGS -buildmode=plugin -o $VOLANTMQ_WORK_DIR/plugins/health.so
&& go build $VOLANTMQ_BUILD_FLAGS -buildmode=plugin -ldflags "-X main.version=$(print_version.sh)" -o $VOLANTMQ_WORK_DIR/plugins/health.so

# build metrics plugins
RUN \
GO111MODULE=off go get gitlab.com/VolantMQ/vlplugin/monitoring/prometheus \
&& cd $GOPATH/src/gitlab.com/VolantMQ/monitoring/prometheus \
&& GO111MODULE=on go mod tidy \
&& go build $VOLANTMQ_BUILD_FLAGS -buildmode=plugin -ldflags "-X main.version=$(print_version.sh)" -o $VOLANTMQ_WORK_DIR/plugins/prometheus.so

RUN \
GO111MODULE=off go get gitlab.com/VolantMQ/vlplugin/monitoring/systree \
&& cd $GOPATH/src/gitlab.com/VolantMQ/monitoring/systree \
&& GO111MODULE=on go mod tidy \
&& go build $VOLANTMQ_BUILD_FLAGS -buildmode=plugin -ldflags "-X main.version=$(print_version.sh)" -o $VOLANTMQ_WORK_DIR/plugins/systree.so

#build persistence plugins
RUN \
GO111MODULE=off go get gitlab.com/VolantMQ/vlplugin/persistence/bbolt \
&& cd $GOPATH/src/gitlab.com/VolantMQ/vlplugin/persistence/bbolt \
&& GO111MODULE=on go mod tidy \
&& cd plugin \
&& go build $VOLANTMQ_BUILD_FLAGS -buildmode=plugin -o $VOLANTMQ_WORK_DIR/plugins/persistence_bbolt.so
&& go build $VOLANTMQ_BUILD_FLAGS -buildmode=plugin -ldflags "-X main.version=$(print_version.sh)" -o $VOLANTMQ_WORK_DIR/plugins/persistence_bbolt.so

FROM ubuntu
FROM alpine
ENV \
VOLANTMQ_WORK_DIR=/usr/lib/volantmq

Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ VolantMQ

VolantMQ is a high performance MQTT broker that aims to be fully compliant with MQTT specs

### Features, Limitations, and Future
###Features, Limitations, and Future

**Features**
* [MQTT v3.1 - V3.1.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html)
Expand All @@ -27,19 +27,19 @@ VolantMQ is a high performance MQTT broker that aims to be fully compliant with
* [BBolt](https://github.com/coreos/bbolt)
* In memory

### Compatibility

###Compatibility
Project has been tested with the following client libraries
* Paho MQTT Conformance/Interoperability Testing Suite (in Python)
* Pass with all of the test cases
* Paho C Client library (in C)
* Pass with all of the test cases

### How to use
###How to use
Best option is to run prebuilt docker image
```bash
docker run --rm -p 1883:1883 -p 8080:8080 -v $(pwd)/examples/config.yaml:/etc/volantmq/config.yaml --env VOLANTMQ_CONFIG=/etc/volantmq/config.yaml volantmq/volantmq
```
####Whats inside

- In example above port 1883 is mqtt listener with default user/password testuser/testpassword
- Port 8080 exposes healthcheck endpoints as well as pprof at http://localhost:8080/debug/pprof
Expand Down
1 change: 0 additions & 1 deletion VERSION

This file was deleted.

5 changes: 2 additions & 3 deletions clients/expiry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (

"github.com/VolantMQ/vlapi/mqttp"
"github.com/VolantMQ/vlapi/vlpersistence"

"github.com/VolantMQ/volantmq/types"
"github.com/VolantMQ/vlapi/vltypes"
)

type expiryEvent interface {
Expand All @@ -20,7 +19,7 @@ type expiryConfig struct {
id string
createdAt time.Time
expiringSince time.Time
messenger types.TopicMessenger
messenger vltypes.TopicMessenger
will *mqttp.Publish
expireIn *uint32
willIn uint32
Expand Down
37 changes: 28 additions & 9 deletions clients/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/VolantMQ/vlapi/vlauth"
"github.com/VolantMQ/vlapi/vlpersistence"
"github.com/VolantMQ/vlapi/vlsubscriber"
"github.com/VolantMQ/vlapi/vltypes"
"go.uber.org/zap"

"github.com/VolantMQ/volantmq/configuration"
Expand All @@ -16,15 +17,15 @@ import (
)

type sessionEvents interface {
sessionOffline(string, bool, *expiryConfig)
connectionClosed(string, mqttp.ReasonCode)
sessionOffline(string, sessionOfflineState)
connectionClosed(string, bool, mqttp.ReasonCode)
subscriberShutdown(string, vlsubscriber.IFace)
}

type sessionPreConfig struct {
id string
createdAt time.Time
messenger types.TopicMessenger
messenger vltypes.TopicMessenger
conn connection.Session
persistence vlpersistence.Packets
permissions vlauth.Permissions
Expand All @@ -41,6 +42,15 @@ type sessionConfig struct {
version mqttp.ProtocolVersion
}

type sessionOfflineState struct {
durable bool
keepContainer bool
qos0 int
qos12 int
unAck int
exp *expiryConfig
}

type session struct {
sessionPreConfig
log *zap.SugaredLogger
Expand Down Expand Up @@ -139,7 +149,7 @@ func (s *session) SignalSubscribe(pkt *mqttp.Subscribe) (mqttp.IFace, error) {
Ops: t.Ops(),
}

if retained, ee := s.subscriber.Subscribe(t.Filter(), &params); ee != nil {
if retained, ee := s.subscriber.Subscribe(t.Filter(), params); ee != nil {
reason = mqttp.QosFailure
} else {
reason = mqttp.ReasonCode(params.Granted)
Expand Down Expand Up @@ -288,6 +298,13 @@ func (s *session) SignalConnectionClose(params connection.DisconnectParams) {
// if session is clean send will regardless to will delay
willIn := uint32(0)

state := sessionOfflineState{
durable: s.durable,
qos0: len(params.Packets.QoS0),
qos12: len(params.Packets.QoS12),
unAck: len(params.Packets.UnAck),
}

if s.will != nil {
if val := s.will.PropertyGet(mqttp.PropertyWillDelayInterval); val != nil {
willIn, _ = val.AsInt()
Expand All @@ -301,11 +318,11 @@ func (s *session) SignalConnectionClose(params connection.DisconnectParams) {
}
}

s.connectionClosed(s.id, params.Reason)
state.keepContainer = (s.durable && s.subscriber.HasSubscriptions()) || (willIn > 0)

keepContainer := (s.durable && s.subscriber.HasSubscriptions()) || (willIn > 0)
s.connectionClosed(s.id, s.durable, params.Reason)

if !keepContainer {
if !state.keepContainer {
s.subscriberShutdown(s.id, s.subscriber)
s.subscriber = nil
}
Expand All @@ -331,11 +348,13 @@ func (s *session) SignalConnectionClose(params connection.DisconnectParams) {
willIn: willIn,
}

keepContainer = true
state.keepContainer = true
}
}

s.sessionOffline(s.id, keepContainer, exp)
state.exp = exp

s.sessionOffline(s.id, state)

s.stopReq.Do(func() {})
}
Loading

0 comments on commit e14fb8a

Please sign in to comment.