From 2e7189b813c2a53e5ed8bfee303b77d049a7da08 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Sun, 16 Oct 2022 09:41:16 -0400 Subject: [PATCH 1/2] Add JetStream request-reply and provisioner examples Signed-off-by: Byron Ruth --- docker/docker-compose.yaml | 2 +- examples/jetstream/request-reply/go/go.mod | 14 + examples/jetstream/request-reply/go/go.sum | 30 ++ examples/jetstream/request-reply/go/main.go | 56 ++++ examples/use-cases/auto-scaler/go/go.mod | 14 + examples/use-cases/auto-scaler/go/go.sum | 32 ++ examples/use-cases/auto-scaler/go/main.go | 309 ++++++++++++++++++++ 7 files changed, 456 insertions(+), 1 deletion(-) create mode 100644 examples/jetstream/request-reply/go/go.mod create mode 100644 examples/jetstream/request-reply/go/go.sum create mode 100644 examples/jetstream/request-reply/go/main.go create mode 100644 examples/use-cases/auto-scaler/go/go.mod create mode 100644 examples/use-cases/auto-scaler/go/go.sum create mode 100644 examples/use-cases/auto-scaler/go/main.go diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 01ffc775..53acbf89 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -1,7 +1,7 @@ version: '3.9' services: nats: - image: docker.io/nats:2.9.0 + image: docker.io/nats:2.9.3 command: - "--debug" - "--http_port=8222" diff --git a/examples/jetstream/request-reply/go/go.mod b/examples/jetstream/request-reply/go/go.mod new file mode 100644 index 00000000..86906333 --- /dev/null +++ b/examples/jetstream/request-reply/go/go.mod @@ -0,0 +1,14 @@ +module github.com/bruth/nats-by-example/examples/jetstream/request-reply/go + +go 1.19 + +require github.com/nats-io/nats.go v1.18.1-0.20221015130652-e09f13da2bd8 + +require ( + github.com/golang/protobuf v1.5.2 // indirect + github.com/nats-io/nats-server/v2 v2.9.3 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect + google.golang.org/protobuf v1.28.1 // indirect +) diff --git a/examples/jetstream/request-reply/go/go.sum b/examples/jetstream/request-reply/go/go.sum new file mode 100644 index 00000000..7de3b1ed --- /dev/null +++ b/examples/jetstream/request-reply/go/go.sum @@ -0,0 +1,30 @@ +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= +github.com/nats-io/nats-server/v2 v2.9.3 h1:HrfzA7G9LNetKkm1z+jU/e9kuAe+E6uaBuuq9EB5sQQ= +github.com/nats-io/nats-server/v2 v2.9.3/go.mod h1:4sq8wvrpbvSzL1n3ZfEYnH4qeUuIl5W990j3kw13rRk= +github.com/nats-io/nats.go v1.18.1-0.20221015130652-e09f13da2bd8 h1:2qzLrWyP1GFlcIegd5RJlGV0NQyB/yZDhrRL3M+GNG8= +github.com/nats-io/nats.go v1.18.1-0.20221015130652-e09f13da2bd8/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec h1:BkDtF2Ih9xZ7le9ndzTA7KJow28VbQW3odyk/8drmuI= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= diff --git a/examples/jetstream/request-reply/go/main.go b/examples/jetstream/request-reply/go/main.go new file mode 100644 index 00000000..37d06790 --- /dev/null +++ b/examples/jetstream/request-reply/go/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "fmt" + "os" + "time" + + "github.com/nats-io/nats.go" +) + +func main() { + natsURL := os.Getenv("NATS_URL") + + nc, _ := nats.Connect(natsURL) + defer nc.Drain() + + js, _ := nc.JetStream() + + // Create a [work-queue][wq] stream that will act as a buffer for requests. + js.AddStream(&nats.StreamConfig{ + Name: "REQUESTS", + Subjects: []string{"requests.*"}, + Retention: nats.WorkQueuePolicy, + }) + + // Create an ephemeral consumer + subscription responsible for replying. + sub, _ := js.Subscribe("requests.*", func(msg *nats.Msg) { + var r string + switch msg.Subject { + case "requests.order-sandwich": + r = "🥪" + case "requests.order-bagel": + r = "🥯" + case "requests.order-flatbread": + r = "🥙" + default: + return + } + msg.Respond([]byte(r)) + }) + defer sub.Drain() + + // Send some requests. + rep, _ := js.Request("requests.order-sandwich", nil, time.Second) + fmt.Println(string(rep.Data)) + + rep, _ = js.Request("requests.order-flatbread", nil, time.Second) + fmt.Println(string(rep.Data)) + + // If a request cannot be fulfilled, the message is terminated. + _, err := js.Request("requests.order-drink", nil, time.Second) + fmt.Printf("timeout? %v\n", err == nats.ErrTimeout) + + info, _ := js.StreamInfo("REQUESTS") + fmt.Printf("%d remaining in the stream\n", info.State.Msgs) +} diff --git a/examples/use-cases/auto-scaler/go/go.mod b/examples/use-cases/auto-scaler/go/go.mod new file mode 100644 index 00000000..86906333 --- /dev/null +++ b/examples/use-cases/auto-scaler/go/go.mod @@ -0,0 +1,14 @@ +module github.com/bruth/nats-by-example/examples/jetstream/request-reply/go + +go 1.19 + +require github.com/nats-io/nats.go v1.18.1-0.20221015130652-e09f13da2bd8 + +require ( + github.com/golang/protobuf v1.5.2 // indirect + github.com/nats-io/nats-server/v2 v2.9.3 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect + google.golang.org/protobuf v1.28.1 // indirect +) diff --git a/examples/use-cases/auto-scaler/go/go.sum b/examples/use-cases/auto-scaler/go/go.sum new file mode 100644 index 00000000..a3ce3035 --- /dev/null +++ b/examples/use-cases/auto-scaler/go/go.sum @@ -0,0 +1,32 @@ +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= +github.com/nats-io/nats-server/v2 v2.9.3 h1:HrfzA7G9LNetKkm1z+jU/e9kuAe+E6uaBuuq9EB5sQQ= +github.com/nats-io/nats-server/v2 v2.9.3/go.mod h1:4sq8wvrpbvSzL1n3ZfEYnH4qeUuIl5W990j3kw13rRk= +github.com/nats-io/nats.go v1.18.0 h1:o480Ao6kuSSFyJO75rGTXCEPj7LGkY84C1Ye+Uhm4c0= +github.com/nats-io/nats.go v1.18.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.18.1-0.20221015130652-e09f13da2bd8 h1:2qzLrWyP1GFlcIegd5RJlGV0NQyB/yZDhrRL3M+GNG8= +github.com/nats-io/nats.go v1.18.1-0.20221015130652-e09f13da2bd8/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec h1:BkDtF2Ih9xZ7le9ndzTA7KJow28VbQW3odyk/8drmuI= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= diff --git a/examples/use-cases/auto-scaler/go/main.go b/examples/use-cases/auto-scaler/go/main.go new file mode 100644 index 00000000..017f12a7 --- /dev/null +++ b/examples/use-cases/auto-scaler/go/main.go @@ -0,0 +1,309 @@ +package main + +import ( + "context" + "log" + "math" + "os" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nuid" +) + +func main() { + natsURL := os.Getenv("NATS_URL") + + nc, _ := nats.Connect(natsURL) + defer nc.Drain() + + js, _ := nc.JetStream() + + // Create an [interest-based stream][interest] which will hold + // a message only until all bound consumers have ack'ed the message. + // [interest]: /examples/jetstream/interest-stream/go + js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"test.*"}, + Retention: nats.InterestPolicy, + }) + + // Create pull consumer that will track state for the subscribers + // that actually process the messages. + js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "PROCESSOR", + AckPolicy: nats.AckExplicitPolicy, + }) + + // Create pull consumer that will monitor the stream and auto-provision + // subscribers bound to the PROCESSOR consumer to actually do the work. + js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "PROVISIONER", + AckPolicy: nats.AckAllPolicy, + HeadersOnly: true, + }) + + // Setup a base context. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Initialize a basic goroutine provisioner that implements `Provisioner`. + provisioner := newGoroutineProvision(js, "TEST", "PROCESSOR") + + // Start the provisioner which actually monitors the stream and invokes + // the provisioner methods. + sub, _ := StartProvisioner(ctx, js, provisioner, "TEST", "PROVISIONER") + defer sub.Unsubscribe() + + // Send a batch of requests which will scale up + log.Print("starting batch 1...") + t0 := time.Now() + for i := 0; i < 10_000; i++ { + id := nuid.Next() + for { + _, err := js.Request("test.foo", nil, time.Second, nats.MsgId(id)) + if err == nil { + break + } else { + log.Printf("batch 1: %d: %s, retrying...", i, err) + } + } + } + log.Printf("first batch done in %s", time.Since(t0)) + + log.Print("sleeping 5 seconds..") + time.Sleep(3 * time.Second) + + log.Print("starting batch 2...") + t0 = time.Now() + for i := 0; i < 10_000; i++ { + id := nuid.Next() + for { + _, err := js.Request("test.foo", nil, time.Second, nats.MsgId(id)) + if err == nil { + break + } else { + log.Printf("batch 2: %d: %s, retrying...", i, err) + } + } + } + log.Printf("second batch done in %s", time.Since(t0)) + + cancel() + time.Sleep(time.Second) +} + +func StartProvisioner(ctx context.Context, js nats.JetStreamContext, p Provisioner, stream, consumer string) (*nats.Subscription, error) { + // Bind a subscriber for PROVISIONER consumer. + sub, err := js.PullSubscribe("", "", nats.Bind(stream, consumer)) + if err != nil { + return nil, err + } + + go startProvisioner(ctx, sub, p) + + return sub, nil +} + +func startProvisioner(ctx context.Context, sub *nats.Subscription, p Provisioner) { + var ( + count int + ratePerSec float64 + lastReceive time.Time + lastScaleTime time.Time + ) + + startTime := time.Now() + instances := make(map[string]struct{}) + + for { + fctx, cancel := context.WithTimeout(ctx, time.Second) + + // Timeout getting the next message after a second.. this will ensure + // when there are no new messages, the scaling check will run to + // scale down. + msgs, err := sub.Fetch(10, nats.Context(fctx)) + cancel() + + stopping := false + switch err { + case nil: + case nats.ErrTimeout, context.DeadlineExceeded: + log.Print("provisioner: no new messages") + default: + stopping = true + log.Printf("provisioner: stopping: %s", err) + } + + now := time.Now() + + // Update stats when messages are present. + if l := len(msgs); l > 0 { + msgs[l-1].Ack() + + count += l + lastReceive = now + dur := lastReceive.Sub(startTime) + ratePerSec = float64(count) / (float64(dur) / float64(time.Second)) + if len(instances) == 0 { + log.Printf("provisioner: new message observed") + } + } + + if stopping || now.Sub(lastScaleTime) >= time.Second { + lastScaleTime = now + // Determine desired scale of instances. + currentNum := len(instances) + newNum := 0 + if !stopping { + newNum = p.NumInstances(lastReceive, currentNum, ratePerSec) + } + scaleDiff := newNum - currentNum + if scaleDiff != 0 { + log.Printf("provisioner: scaling: %d -> %d", currentNum, newNum) + } + + if scaleDiff < 0 { + var n int + for id := range instances { + if n == -scaleDiff { + break + } + n++ + err := p.StopInstance(ctx, id) + if err != nil { + log.Printf("provisioner: instance %s error: %s", id, err) + } else { + log.Printf("provisioner: instance %s stopped", id) + delete(instances, id) + } + } + } + + if scaleDiff > 0 { + for i := 0; i < scaleDiff; i++ { + id, err := p.StartInstance(ctx) + if err != nil { + log.Printf("provisioner: instance %s error: %s", id, err) + break + } + instances[id] = struct{}{} + log.Printf("provisioner: instance %s started", id) + } + } + + if stopping { + return + } + } + } +} + +type Provisioner interface { + // NumInstances returns the number of instances that should be provisioned + // given the last message receive time, the current number, and the current + // message rate per second. + NumInstances(lastReceive time.Time, num int, ratePerSec float64) int + + // StartInstance starts a new instance and returns and ID representing the + // instance. + StartInstance(ctx context.Context) (string, error) + + // StopInstance stops an instance given an ID. + StopInstance(ctx context.Context, id string) error +} + +func newGoroutineProvision(js nats.JetStreamContext, stream, consumer string) *goroutineProvisioner { + return &goroutineProvisioner{ + js: js, + inactiveThreshold: 2 * time.Second, + fetchSize: 5, + maxRatePerHandler: 500, + streamName: stream, + consumerName: consumer, + handlers: make(map[string]context.CancelFunc), + } +} + +type goroutineProvisioner struct { + js nats.JetStreamContext + inactiveThreshold time.Duration + maxRatePerHandler int + fetchSize int + streamName string + consumerName string + handlers map[string]context.CancelFunc +} + +// Function to calculate the appropriate number of handlers +// given the message rate. +func (p *goroutineProvisioner) NumInstances(lastReceive time.Time, current int, ratePerSec float64) int { + // Detect inactivity, gradually scale down. + if time.Since(lastReceive) >= p.inactiveThreshold { + return current - 1 + } + + // Start with 1 + if current == 0 { + return 1 + } + + // Scale up one at a time until the max is reached based on handling 100 + current++ + max := int(math.Ceil(ratePerSec / float64(p.maxRatePerHandler))) + + if current > max { + current = max + } + + return current +} + +func (d *goroutineProvisioner) startInstance(ctx context.Context, id string) { + sub, _ := d.js.PullSubscribe("", "", nats.Bind(d.streamName, d.consumerName)) + defer sub.Drain() + + count := 0 + + for { + fctx, cancel := context.WithTimeout(ctx, time.Second) + msgs, err := sub.Fetch(d.fetchSize, nats.Context(fctx)) + cancel() + + switch err { + // Received messages, or no new messages so just continue. + case nil, context.DeadlineExceeded: + // Clean exit. + case context.Canceled: + log.Printf("instance %s: processed %d messages", id, count) + return + default: + log.Printf("instance %s: error: %s", id, err) + return + } + + // Loop and do work.. + for _, msg := range msgs { + msg.Respond(nil) + msg.Ack() + count++ + } + } +} + +func (d *goroutineProvisioner) StartInstance(ctx context.Context) (string, error) { + id := nuid.Next() + hctx, cancel := context.WithCancel(ctx) + d.handlers[id] = cancel + go d.startInstance(hctx, id) + return id, nil +} + +func (d *goroutineProvisioner) StopInstance(ctx context.Context, id string) error { + cancel, ok := d.handlers[id] + if ok { + cancel() + delete(d.handlers, id) + } + return nil +} From 2efb06d31f7a6fad9385792aa794453208986ce7 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Mon, 17 Oct 2022 17:16:13 -0400 Subject: [PATCH 2/2] Add description Signed-off-by: Byron Ruth --- examples/jetstream/request-reply/meta.yaml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 examples/jetstream/request-reply/meta.yaml diff --git a/examples/jetstream/request-reply/meta.yaml b/examples/jetstream/request-reply/meta.yaml new file mode 100644 index 00000000..d055be45 --- /dev/null +++ b/examples/jetstream/request-reply/meta.yaml @@ -0,0 +1,18 @@ +title: Request-Reply +description: | + This is an experimental preview of a possible request-reply API + for JetStream. With core NATS, if a given client is not showing + interest in a subject at the time a message is published on that + subject, the message will not be received. + + In general, this behavior is expected of request-reply since these + are expected to behave like a *phone call* where someone needs to + *pick up* in order to talk. The alternative is relying on a mailbox + which is essentially what a peristent stream enables. + + The question is what happens with the reply if a message can be + sent, queued, and the responder comes online later when the original + requester is now offline? + + These are some active questions the NATS team are evaluating if you + want to show your interest and use cases!