Skip to content

Commit

Permalink
Add example
Browse files Browse the repository at this point in the history
Michael Andersen committed Sep 21, 2018
1 parent 34ca0d1 commit 7031495
Showing 3 changed files with 117 additions and 0 deletions.
112 changes: 112 additions & 0 deletions example/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package main

import (
"context"
"encoding/base64"
"fmt"
"io/ioutil"
"os"
"time"

"github.com/immesys/wavemq/mqpb"
"github.com/pborman/uuid"
"google.golang.org/grpc"
)

// This is an example that shows how to publish and subscribe to a WAVEMQ site router
// Fill these fields in:
const EntityFile = "entity.ent"
const Namespace = "GyAlyQyfJuai4MCyg6Rx9KkxnZZXWyDaIo0EXGY9-WEq6w=="
const SiteRouter = "127.0.0.1:4516"

var namespaceBytes []byte

func main() {
var err error
namespaceBytes, err = base64.URLEncoding.DecodeString(Namespace)
if err != nil {
fmt.Printf("failed to decode namespace: %v\n", err)
os.Exit(1)
}

// Load the WAVE3 entity that will be used
perspective, err := ioutil.ReadFile(EntityFile)
if err != nil {
fmt.Printf("could not load entity %q, you might need to create one and grant it permissions\n", EntityFile)
os.Exit(1)
}

// Establish a GRPC connection to the site router.
conn, err := grpc.Dial(SiteRouter, grpc.WithInsecure(), grpc.FailOnNonTempDialError(true), grpc.WithBlock())
if err != nil {
fmt.Printf("could not connect to the site router: %v\n", err)
os.Exit(1)
}

// Create the WAVEMQ client
client := mqpb.NewWAVEMQClient(conn)

go subscribe(client, perspective)

//Publish five messages, then exit
for i := 0; i < 5; i++ {
time.Sleep(1 * time.Second)
presp, err := client.Publish(context.Background(), &mqpb.PublishParams{
Perspective: &mqpb.Perspective{
EntitySecret: &mqpb.EntitySecret{
DER: perspective,
},
},
Namespace: namespaceBytes,
Uri: "example/topic",
Content: []*mqpb.PayloadObject{{Schema: "text", Content: []byte("world")}},
})
if err != nil {
fmt.Printf("publish error: %v\n", err)
os.Exit(1)
}
if presp.Error != nil {
fmt.Printf("publish error: %v\n", presp.Error.Message)
os.Exit(1)
}
}
}

func subscribe(client mqpb.WAVEMQClient, perspective []byte) {

sub, err := client.Subscribe(context.Background(), &mqpb.SubscribeParams{
Perspective: &mqpb.Perspective{
EntitySecret: &mqpb.EntitySecret{
DER: perspective,
},
},
Namespace: namespaceBytes,
Uri: "example/*",
//If you want a persistent subscription between different runs of this program,
//specify this to be something constant (but unique)
Identifier: uuid.NewRandom().String(),
//This subscription will automatically unsubscribe one minute after this
//program ends
Expiry: 60,
})
if err != nil {
fmt.Printf("subscribe error: %v\n", err)
os.Exit(1)
}
for {
m, err := sub.Recv()
if err != nil {
fmt.Printf("subscribe error: %v\n", err)
os.Exit(1)
}
if m.Error != nil {
fmt.Printf("subscribe error: %v\n", m.Error.Message)
os.Exit(1)
}
fmt.Printf("received message on URI: %s\n", m.Message.Tbs.Uri)
fmt.Printf(" contents:\n")
for _, po := range m.Message.Tbs.Payload {
fmt.Printf(" schema=%q content=%q\n", po.Schema, po.Content)
}
}
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -55,6 +55,7 @@ func main() {
os.Exit(1)
}
fmt.Printf("configuration loaded\n")

consts.DefaultToUnrevoked = conf.WaveConfig.DefaultToUnrevoked

qm, err := core.NewQManager(&conf.QueueConfig)
4 changes: 4 additions & 0 deletions server/local.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (
"github.com/immesys/wavemq/core"
pb "github.com/immesys/wavemq/mqpb"
logging "github.com/op/go-logging"
"github.com/pborman/uuid"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
)
@@ -185,6 +186,9 @@ func (s *srv) Subscribe(p *pb.SubscribeParams, r pb.WAVEMQ_SubscribeServer) erro
if p.Expiry < 60 {
p.Expiry = 60
}
if p.Identifier == "" {
p.Identifier = uuid.NewRandom().String()
}
sub, err := s.am.FormSubRequest(p, s.tm.RouterID())
if err != nil {
pmFailedFormSubscribe.Add(1)

0 comments on commit 7031495

Please sign in to comment.