Skip to content

Commit

Permalink
(choria-io#1502) Upgrade KV, introduce formal Leader Election
Browse files Browse the repository at this point in the history
This upgrades KV to the nats.go implementation and adds a
standard CHORIA_LEADER_ELECTION KV bucket that is to be used
for leader election.

We add a KV Based Leader Election provider that treats keys
in the bucket as elections and we create it as standard.

When the special provisioning account is enabled we import
one specific key into that account to facilitate LE for the
Choria Provisioner without exposing other elections into
that account

Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Nov 2, 2021
1 parent 21b0811 commit b7567a3
Show file tree
Hide file tree
Showing 26 changed files with 1,254 additions and 209 deletions.
27 changes: 21 additions & 6 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ A few special types are defined, the rest map to standard Go types
|[plugin.choria.network.provisioning.signer_cert](#pluginchorianetworkprovisioningsigner_cert)|[plugin.choria.network.public_url](#pluginchorianetworkpublic_url)|
|[plugin.choria.network.stream.advisory_replicas](#pluginchorianetworkstreamadvisory_replicas)|[plugin.choria.network.stream.advisory_retention](#pluginchorianetworkstreamadvisory_retention)|
|[plugin.choria.network.stream.event_replicas](#pluginchorianetworkstreamevent_replicas)|[plugin.choria.network.stream.event_retention](#pluginchorianetworkstreamevent_retention)|
|[plugin.choria.network.stream.leader_election_replicas](#pluginchorianetworkstreamleader_election_replicas)|[plugin.choria.network.stream.leader_election_ttl](#pluginchorianetworkstreamleader_election_ttl)|
|[plugin.choria.network.stream.machine_replicas](#pluginchorianetworkstreammachine_replicas)|[plugin.choria.network.stream.machine_retention](#pluginchorianetworkstreammachine_retention)|
|[plugin.choria.network.stream.store](#pluginchorianetworkstreamstore)|[plugin.choria.network.system.password](#pluginchorianetworksystempassword)|
|[plugin.choria.network.system.user](#pluginchorianetworksystemuser)|[plugin.choria.network.tls_timeout](#pluginchorianetworktls_timeout)|
Expand Down Expand Up @@ -488,9 +489,9 @@ Name:Port to advertise to clients, useful when fronted by a proxy
## plugin.choria.network.stream.advisory_replicas

* **Type:** integer
* **Default Value:** 1
* **Default Value:** -1

When configuring Stream advisories storage ensure data is replicated in the cluster over this many servers
When configuring Stream advisories storage ensure data is replicated in the cluster over this many servers, -1 means count of peers

## plugin.choria.network.stream.advisory_retention

Expand All @@ -502,9 +503,9 @@ When not zero enables retaining Stream advisories in the Stream Store
## plugin.choria.network.stream.event_replicas

* **Type:** integer
* **Default Value:** 1
* **Default Value:** -1

When configuring LifeCycle events ensure data is replicated in the cluster over this many servers
When configuring LifeCycle events ensure data is replicated in the cluster over this many servers, -1 means count of peers

## plugin.choria.network.stream.event_retention

Expand All @@ -513,12 +514,26 @@ When configuring LifeCycle events ensure data is replicated in the cluster over

When not zero enables retaining Lifecycle events in the Stream Store

## plugin.choria.network.stream.leader_election_replicas

* **Type:** integer
* **Default Value:** -1

When configuring Stream based Leader Election storage ensure data is replicated in the cluster over this many servers, -1 means count of peers

## plugin.choria.network.stream.leader_election_ttl

* **Type:** duration
* **Default Value:** 1m

The TTL for leader election, leaders must vote at least this frequently to remain leader

## plugin.choria.network.stream.machine_replicas

* **Type:** integer
* **Default Value:** 1
* **Default Value:** -1

When configuring Autonomous Agent event storage ensure data is replicated in the cluster over this many servers
When configuring Autonomous Agent event storage ensure data is replicated in the cluster over this many servers, -1 means count of peers

## plugin.choria.network.stream.machine_retention

Expand Down
55 changes: 37 additions & 18 deletions aagent/watchers/kvwatcher/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import (
"github.com/choria-io/go-choria/aagent/watchers/event"
"github.com/choria-io/go-choria/aagent/watchers/watcher"
iu "github.com/choria-io/go-choria/internal/util"
"github.com/choria-io/go-choria/providers/kv"
"github.com/google/go-cmp/cmp"
"github.com/nats-io/jsm.go/kv"
"github.com/nats-io/nats.go"
)

type State int
Expand Down Expand Up @@ -57,7 +58,7 @@ type Watcher struct {

name string
machine model.Machine
kv kv.RoKV
kv nats.KeyValue
interval time.Duration

previousVal interface{}
Expand Down Expand Up @@ -95,16 +96,6 @@ func New(machine model.Machine, name string, states []string, failEvent string,
return nil, fmt.Errorf("could not set properties: %s", err)
}

mgr, err := machine.JetStreamConnection()
if err != nil {
return nil, err
}

tw.kv, err = kv.NewRoClient(mgr.NatsConn(), tw.properties.Bucket)
if err != nil {
return nil, err
}

return tw, nil
}

Expand Down Expand Up @@ -157,6 +148,24 @@ func (w *Watcher) stopPolling() {
w.mu.Unlock()
}

func (w *Watcher) connectKV() error {
w.mu.Lock()
defer w.mu.Unlock()

var err error
mgr, err := w.machine.JetStreamConnection()
if err != nil {
return err
}

w.kv, err = kv.NewKV(mgr.NatsConn(), w.properties.Bucket, false)
if err != nil {
return err
}

return nil
}

func (w *Watcher) poll() (State, error) {
if !w.ShouldWatch() {
return Skipped, nil
Expand All @@ -168,10 +177,20 @@ func (w *Watcher) poll() (State, error) {
return Skipped, nil
}
w.polling = true
store := w.kv
w.mu.Unlock()

defer w.stopPolling()

// we try to bind to the store here on every poll so that if the store does not yet exist
// at startup we will keep trying until it does
if store == nil {
err := w.connectKV()
if err != nil {
return Error, err
}
}

lp := w.lastPoll
since := time.Since(lp).Round(time.Second)
if since < w.interval {
Expand Down Expand Up @@ -215,11 +234,11 @@ func (w *Watcher) poll() (State, error) {

switch {
// key isn't there, nothing was previously found its unchanged
case err == kv.ErrUnknownKey && w.previousVal == nil:
case err == nats.ErrKeyNotFound && w.previousVal == nil:
return Unchanged, nil

// key isn't there, we had a value before its a change due to delete
case err == kv.ErrUnknownKey && w.previousVal != nil:
case err == nats.ErrKeyNotFound && w.previousVal != nil:
w.Debugf("Removing data from %s", dk)
err = w.machine.DataDelete(dk)
if err != nil {
Expand All @@ -243,18 +262,18 @@ func (w *Watcher) poll() (State, error) {
return Error, err
}

w.previousSeq = val.Sequence()
w.previousSeq = val.Revision()
w.previousVal = parsedValue
return Changed, nil

// a put that didnt update, but we are asked to transition anyway
// we do not trigger this on first start of the machine only once its running (previousSeq is 0)
case cmp.Equal(w.previousVal, parsedValue) && w.properties.TransitionOnMatch && w.previousSeq > 0 && val.Sequence() > w.previousSeq:
w.previousSeq = val.Sequence()
case cmp.Equal(w.previousVal, parsedValue) && w.properties.TransitionOnMatch && w.previousSeq > 0 && val.Revision() > w.previousSeq:
w.previousSeq = val.Revision()
return Changed, nil

default:
w.previousSeq = val.Sequence()
w.previousSeq = val.Revision()
if w.properties.TransitionOnSuccessfulGet {
return Changed, nil
}
Expand Down
26 changes: 26 additions & 0 deletions broker/network/network_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,32 @@ func (s *Server) setupAccounts() (err error) {
if err != nil {
s.log.Warnf("Could not import lifecycle events from Provisioning account")
}

// ensure leader election KV bucket functions in the provisioner account
// the only key thats accessible by the provisioner account is `provisioner`
// and he can only do info on the bucket, this way a rogue entity in there
// cannot disrupt other leader elections. This is a 2nd layer of protection
// since nodes in there also lacks access to the `choria.streams.>` and `$KV.>`
// prefixes for access to anything
err = s.choriaAccount.AddServiceExportWithResponse("$JS.API.STREAM.INFO.KV_CHORIA_LEADER_ELECTION", server.Singleton, []*server.Account{s.provisioningAccount})
if err == nil {
err = s.provisioningAccount.AddServiceImport(s.choriaAccount, "choria.streams.STREAM.INFO.KV_CHORIA_LEADER_ELECTION", "$JS.API.STREAM.INFO.KV_CHORIA_LEADER_ELECTION")
if err != nil {
s.log.Warnf("Could not import KV_CHORIA_LEADER_ELECTION stream info API: %s", err)
}
} else {
s.log.Warnf("Could not export KV_CHORIA_LEADER_ELECTION Info API to Provisioning: %s", err)
}

err = s.choriaAccount.AddServiceExportWithResponse("$KV.CHORIA_LEADER_ELECTION.provisioner", server.Singleton, []*server.Account{s.provisioningAccount})
if err == nil {
err = s.provisioningAccount.AddServiceImport(s.choriaAccount, "$KV.CHORIA_LEADER_ELECTION.provisioner", "$KV.CHORIA_LEADER_ELECTION.provisioner")
if err != nil {
s.log.Warnf("Could not import CHORIA_LEADER_ELECTION message subjects: %s", err)
}
} else {
s.log.Warnf("Could not export CHORIA_LEADER_ELECTION message subjects to Provisioning: %s", err)
}
}

return nil
Expand Down
116 changes: 90 additions & 26 deletions broker/network/network_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/nats-io/jsm.go"
"github.com/nats-io/jsm.go/api"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"

Expand All @@ -26,23 +27,22 @@ func (s *Server) setupStreaming() error {
return fmt.Errorf("system Account is required for Choria Streams")
}

s.log.Infof("Configuring Choria Stream Processing in %v", s.config.Choria.NetworkStreamStore)
s.log.Infof("Configuring Choria Streams in %v", s.config.Choria.NetworkStreamStore)

s.gnatsd.EnableJetStream(&server.JetStreamConfig{StoreDir: s.config.Choria.NetworkStreamStore})
err := s.gnatsd.EnableJetStream(&server.JetStreamConfig{StoreDir: s.config.Choria.NetworkStreamStore})
if err != nil {
return err
}

for _, acct := range []*server.Account{s.choriaAccount, s.provisioningAccount} {
if acct == nil {
continue
}
s.log.Infof("Enabling Choria Streams for account %s", s.choriaAccount)

err := acct.EnableJetStream(nil)
if err != nil {
s.log.Errorf("Could not enable Choria Streams for the %s account: %s", acct.Name, err)
}
err = s.choriaAccount.EnableJetStream(nil)
if err != nil {
s.log.Errorf("Could not enable Choria Streams for the %s account: %s", s.choriaAccount.Name, err)
}

if !acct.JetStreamEnabled() {
s.log.Errorf("Choria Streams enabled for account %q but it's not reporting as enabled", acct.Name)
}
if !s.choriaAccount.JetStreamEnabled() {
s.log.Errorf("Choria Streams enabled for account %q but it's not reporting as enabled", s.choriaAccount.Name)
}

return nil
Expand Down Expand Up @@ -88,17 +88,50 @@ func (s *Server) configureSystemStreams(ctx context.Context) error {
return err
}

err = s.createOrUpdateStream("CHORIA_EVENTS", []string{"choria.lifecycle.>"}, s.config.Choria.NetworkEventStoreDuration, s.config.Choria.NetworkEventStoreReplicas, mgr)
cfg := s.config.Choria
if cfg.NetworkEventStoreReplicas == -1 || cfg.NetworkMachineStoreReplicas == -1 || cfg.NetworkStreamAdvisoryReplicas == -1 || cfg.NetworkLeaderElectionReplicas == -1 {
peers, err := s.choria.NetworkBrokerPeers()
if err != nil {
s.log.Warnf("Cannot determine network peers to calculate dynamic replica sizes: %s", err)
}

count := peers.Count()
if count == 0 {
count = 1 // avoid replica=0
}

if cfg.NetworkEventStoreReplicas == -1 {
s.log.Infof("Setting Lifecycle Event Store Replicas to %d", count)
cfg.NetworkEventStoreReplicas = count
}

if cfg.NetworkMachineStoreReplicas == -1 {
s.log.Infof("Setting Autonomous Agent Event Store Replicas to %d", count)
cfg.NetworkMachineStoreReplicas = count
}

if cfg.NetworkStreamAdvisoryReplicas == -1 {
s.log.Infof("Setting Choria Streams Advisory Store Replicas to %d", count)
cfg.NetworkStreamAdvisoryReplicas = count
}

if cfg.NetworkLeaderElectionReplicas == -1 {
s.log.Infof("Setting Choria Streams Leader election Replicas to %d", count)
cfg.NetworkLeaderElectionReplicas = count
}
}

err = s.createOrUpdateStream("CHORIA_EVENTS", []string{"choria.lifecycle.>"}, cfg.NetworkEventStoreDuration, cfg.NetworkEventStoreReplicas, mgr)
if err != nil {
return err
}

err = s.createOrUpdateStream("CHORIA_MACHINE", []string{"choria.machine.>"}, s.config.Choria.NetworkMachineStoreDuration, s.config.Choria.NetworkMachineStoreReplicas, mgr)
err = s.createOrUpdateStream("CHORIA_MACHINE", []string{"choria.machine.>"}, cfg.NetworkMachineStoreDuration, cfg.NetworkMachineStoreReplicas, mgr)
if err != nil {
return err
}

err = s.createOrUpdateStream("CHORIA_STREAM_ADVISORIES", []string{"$JS.EVENT.ADVISORY.>"}, s.config.Choria.NetworkStreamAdvisoryDuration, s.config.Choria.NetworkStreamAdvisoryReplicas, mgr)
err = s.createOrUpdateStream("CHORIA_STREAM_ADVISORIES", []string{"$JS.EVENT.ADVISORY.>"}, cfg.NetworkStreamAdvisoryDuration, cfg.NetworkStreamAdvisoryReplicas, mgr)
if err != nil {
return err
}
Expand All @@ -108,6 +141,23 @@ func (s *Server) configureSystemStreams(ctx context.Context) error {
return err
}

eCfg, err := jsm.NewStreamConfiguration(jsm.DefaultStream,
jsm.Replicas(cfg.NetworkLeaderElectionReplicas),
jsm.MaxAge(cfg.NetworkLeaderElectionTTL),
jsm.AllowRollup(),
jsm.Subjects("$KV.CHORIA_LEADER_ELECTION.>"),
jsm.StreamDescription("Choria Leader Election Bucket"),
jsm.MaxMessageSize(1024),
jsm.FileStorage(),
jsm.MaxMessagesPerSubject(1))
if err != nil {
return err
}
err = s.createOrUpdateStreamWithConfig("KV_CHORIA_LEADER_ELECTION", *eCfg, mgr)
if err != nil {
return err
}

return nil
}

Expand All @@ -116,22 +166,36 @@ func (s *Server) createOrUpdateStream(name string, subjects []string, maxAge tim
return nil
}

str, err := mgr.LoadOrNewStream(name, jsm.FileStorage(), jsm.Subjects(subjects...), jsm.MaxAge(maxAge), jsm.Replicas(replicas))
cfg, err := jsm.NewStreamConfiguration(jsm.DefaultStream, jsm.FileStorage(), jsm.Subjects(subjects...), jsm.MaxAge(maxAge), jsm.Replicas(replicas))
if err != nil {
return fmt.Errorf("could not load or create %s: %s", name, err)
return fmt.Errorf("could not create configuration: %s", err)
}

cfg := str.Configuration()
if cfg.MaxAge != maxAge {
s.log.Infof("Updating %s retention from %s to %s", str.Name(), cfg.MaxAge, maxAge)
cfg.MaxAge = maxAge
err = str.UpdateConfiguration(cfg)
if err != nil {
return fmt.Errorf("could not update retention period for %s Stream: %s", name, err)
err = s.createOrUpdateStreamWithConfig(name, *cfg, mgr)
if err != nil {
return fmt.Errorf("could not create stream %s: %s", name, err)
}

return nil
}

func (s *Server) createOrUpdateStreamWithConfig(name string, cfg api.StreamConfig, mgr *jsm.Manager) error {
cfg.Name = name
str, err := mgr.LoadStream(name)
if err != nil {
_, err := mgr.NewStreamFromDefault(name, cfg)
if err == nil {
s.log.Infof("Created stream %s with %d replicas and %s retention", cfg.Name, cfg.Replicas, cfg.MaxAge)
}
return err
}

err = str.UpdateConfiguration(cfg)
if err != nil {
return err
}

s.log.Infof("Configured stream %q with %d replicas and %s retention", name, replicas, maxAge)
s.log.Infof("Configured stream %s with %d replicas and %s retention", cfg.Name, cfg.Replicas, cfg.MaxAge)

return nil
}
Loading

0 comments on commit b7567a3

Please sign in to comment.