Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add event-combine processor #252

Merged
merged 54 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
6f07369
Bump goreleaser/goreleaser-action from 4 to 5
dependabot[bot] Sep 18, 2023
9c1c5d6
allow the number of prometheus_write writers to be configurable
karimra Oct 10, 2023
75fc1a5
sort the timeseries by timestamp before writing to prometheus
karimra Oct 12, 2023
16baebc
update docs
karimra Oct 12, 2023
7b6548a
allow setting a subscription encoding per target
karimra Oct 13, 2023
945ae08
add event-combine processor
karimra Oct 13, 2023
a1a4dc8
autodetect if kafka input is receiving an event or a list of events
karimra Oct 17, 2023
c02f061
Add rate-limit processor
netixx Oct 19, 2023
2a8ce11
Improve slice allocation, use package var for const chars and fix doc…
netixx Oct 25, 2023
1c3d90f
Merge pull request #255 from nokia/single-event-kafka-input
karimra Oct 25, 2023
d62f24a
Revert to append for slice
netixx Oct 25, 2023
0436882
Merge pull request #262 from netixx/rate-limiter-processor
karimra Oct 25, 2023
0bfa527
add asciigraph output
karimra Oct 27, 2023
9444171
Bump google.golang.org/grpc from 1.58.2 to 1.59.0
dependabot[bot] Oct 30, 2023
8e6e6ea
Fix tls-ca key for yaml
netixx Oct 30, 2023
86dbd13
Bump github.com/docker/docker
dependabot[bot] Oct 30, 2023
49ba97c
Merge pull request #268 from netixx/netixx/fix-target-yaml
karimra Oct 31, 2023
106e34e
Add logs on starlark failed processing with debug enabled
netixx Oct 31, 2023
bd19bd9
Merge pull request #270 from netixx/netixx/add-starlark-logs
karimra Oct 31, 2023
e7c51c2
Bump github.com/nats-io/nkeys from 0.4.5 to 0.4.6
dependabot[bot] Oct 31, 2023
f2481c5
Implement WriteEvent on file output so that it can be used by 'inputs…
netixx Nov 2, 2023
b321c4a
Init redis locker
netixx Oct 19, 2023
f1bf4af
Fix nil pointer with resp.Body.Close on error
netixx Oct 23, 2023
7d978ad
Fix clustering metrics
netixx Oct 26, 2023
d3b9766
Fix bug corrupting prometheus timeseries
alibresco Nov 5, 2023
4ab677a
fix cache on-change query
karimra Nov 6, 2023
e248a60
Merge pull request #279 from nokia/cache-on-change
karimra Nov 6, 2023
f6273a8
Merge pull request #274 from netixx/netixx/implement-writeevent
karimra Nov 6, 2023
f4c7436
Merge pull request #277 from alibresco/main
karimra Nov 6, 2023
7afa0af
Merge pull request #264 from nokia/asciigraph-output
karimra Nov 6, 2023
3b96d1d
Merge pull request #275 from netixx/netixx/add-redis-locker
karimra Nov 7, 2023
45f48da
Merge pull request #250 from nokia/per-target-encoding
karimra Nov 7, 2023
13ffe5f
Merge branch 'main' into prom-write-multi-writers
karimra Nov 7, 2023
33a703a
Merge pull request #271 from openconfig/dependabot/go_modules/github.…
karimra Nov 7, 2023
546965c
Merge pull request #247 from nokia/prom-write-multi-writers
karimra Nov 7, 2023
5e05649
Merge pull request #228 from openconfig/dependabot/github_actions/gor…
karimra Nov 7, 2023
e16c6f6
Bump github.com/nats-io/nats-server/v2 from 2.9.20 to 2.10.4
dependabot[bot] Nov 7, 2023
d030d7f
Merge pull request #265 from openconfig/dependabot/go_modules/github.…
karimra Nov 7, 2023
f242948
Merge pull request #266 from openconfig/dependabot/go_modules/google.…
karimra Nov 7, 2023
a1a987b
Merge pull request #269 from openconfig/dependabot/go_modules/github.…
karimra Nov 7, 2023
d589dfa
file output: set write concurrency to 1 when writing to stdout or stderr
karimra Nov 7, 2023
a881b4b
consul loader: match service tags
karimra Nov 7, 2023
4e18a6a
Merge pull request #280 from nokia/fix273
karimra Nov 7, 2023
ba67e01
Merge pull request #281 from nokia/fix272
karimra Nov 7, 2023
dfc4188
rename processor and add test
karimra Nov 7, 2023
52459d2
add docs
karimra Nov 7, 2023
8c5f089
add event-combine processor
karimra Oct 13, 2023
58439f7
rename processor and add test
karimra Nov 7, 2023
31f5480
add docs
karimra Nov 7, 2023
6e490a3
rebase on main
karimra Nov 7, 2023
bc522ec
Merge remote-tracking branch 'refs/remotes/origin/processor-combine' …
karimra Nov 7, 2023
7dade95
rename struct
karimra Nov 7, 2023
cff92e7
update docs
karimra Nov 7, 2023
8b77f38
update mkdocs
karimra Nov 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
password: ${{ secrets.GITHUB_TOKEN }}

- name: Release with goreleaser
uses: goreleaser/goreleaser-action@v4
uses: goreleaser/goreleaser-action@v5
with:
version: ${{ env.GORELEASER_VER }}
args: release --rm-dist
Expand Down
2 changes: 1 addition & 1 deletion app/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (a *App) handleClusteringGet(w http.ResponseWriter, r *http.Request) {
}

func (a *App) handleHealthzGet(w http.ResponseWriter, r *http.Request) {
s := map[string]string{"status": "healthy",}
s := map[string]string{"status": "healthy"}
b, err := json.Marshal(s)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
Expand Down
2 changes: 1 addition & 1 deletion app/clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ func (a *App) unassignTarget(ctx context.Context, name string, serviceID string)
}
rsp, err := client.Do(req)
if err != nil {
rsp.Body.Close()
// don't close the body here since Body will be nil
a.Logger.Printf("failed HTTP request: %v", err)
continue
}
Expand Down
7 changes: 4 additions & 3 deletions app/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ import (
"strings"

"github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/gnmic/config"
"github.com/openconfig/gnmic/formatters"
"github.com/openconfig/gnmic/types"
"github.com/openconfig/grpctunnel/tunnel"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"google.golang.org/protobuf/proto"

"github.com/openconfig/gnmic/config"
"github.com/openconfig/gnmic/formatters"
"github.com/openconfig/gnmic/types"
)

type targetDiffResponse struct {
Expand Down
9 changes: 5 additions & 4 deletions app/gnmi_client_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
"time"

"github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/grpctunnel/tunnel"
"google.golang.org/grpc"

"github.com/openconfig/gnmic/config"
"github.com/openconfig/gnmic/lockers"
"github.com/openconfig/gnmic/outputs"
"github.com/openconfig/gnmic/types"
"github.com/openconfig/grpctunnel/tunnel"
"google.golang.org/grpc"
)

type subscriptionRequest struct {
Expand Down Expand Up @@ -174,7 +175,7 @@ func (a *App) clientSubscribe(ctx context.Context, tc *types.TargetConfig) error
}
subRequests := make([]subscriptionRequest, 0, len(subscriptionsConfigs))
for scName, sc := range subscriptionsConfigs {
req, err := a.Config.CreateSubscribeRequest(sc, tc.Name)
req, err := a.Config.CreateSubscribeRequest(sc, tc)
if err != nil {
if errors.Is(errors.Unwrap(err), config.ErrConfig) {
fmt.Fprintf(os.Stderr, "%v\n", err)
Expand Down Expand Up @@ -243,7 +244,7 @@ func (a *App) clientSubscribeOnce(ctx context.Context, tc *types.TargetConfig) e
}
subRequests := make([]subscriptionRequest, 0)
for _, sc := range subscriptionsConfigs {
req, err := a.Config.CreateSubscribeRequest(sc, tc.Name)
req, err := a.Config.CreateSubscribeRequest(sc, tc)
if err != nil {
if errors.Is(errors.Unwrap(err), config.ErrConfig) {
fmt.Fprintf(os.Stderr, "%v\n", err)
Expand Down
4 changes: 2 additions & 2 deletions app/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (a *App) startClusterMetrics() {
if err != nil {
a.Logger.Printf("failed to get leader key: %v", err)
}
if leader[leaderKey] == a.Config.InstanceName {
if leader[leaderKey] == a.Config.Clustering.InstanceName {
clusterIsLeader.Set(1)
} else {
clusterIsLeader.Set(0)
Expand All @@ -84,7 +84,7 @@ func (a *App) startClusterMetrics() {
}
numLockedNodes := 0
for _, v := range lockedNodes {
if v == a.Config.InstanceName {
if v == a.Config.Clustering.InstanceName {
numLockedNodes++
}
}
Expand Down
8 changes: 6 additions & 2 deletions cache/oc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
"github.com/openconfig/gnmi/path"
"github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/gnmi/subscribe"
"github.com/openconfig/gnmic/utils"
"google.golang.org/protobuf/proto"

"github.com/openconfig/gnmic/utils"
)

const (
Expand Down Expand Up @@ -298,13 +299,13 @@ func (gc *gnmiCache) handleOnChangeQuery(ctx context.Context, ro *ReadOpts, ch c
caches := gc.getCaches(ro.Subscription)
numCaches := len(caches)
gc.logger.Printf("on-change query got %d caches", numCaches)

wg := new(sync.WaitGroup)
wg.Add(numCaches)

for name, c := range caches {
go func(name string, c *subCache) {
defer wg.Done()

for _, p := range ro.Paths {
// handle updates only
if !ro.UpdatesOnly {
Expand All @@ -330,6 +331,9 @@ func (gc *gnmiCache) handleOnChangeQuery(ctx context.Context, ro *ReadOpts, ch c
}
// main on-change subscription
fp := path.ToStrings(p, true)
fp = append(fp, "")
copy(fp[1:], fp)
fp[0] = ro.Target
// set callback
mc := &matchClient{name: name, ch: ch}
remove := c.match.AddQuery(fp, mc)
Expand Down
19 changes: 13 additions & 6 deletions cmd/prompt.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
homedir "github.com/mitchellh/go-homedir"
"github.com/nsf/termbox-go"
"github.com/olekukonko/tablewriter"
"github.com/openconfig/gnmic/types"
"github.com/openconfig/goyang/pkg/yang"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/openconfig/gnmic/types"
)

var colorMapping = map[string]goprompt.Color{
Expand Down Expand Up @@ -298,13 +299,17 @@ func subscriptionTable(scs map[string]*types.SubscriptionConfig, list bool) [][]
if list {
tabData := make([][]string, 0, len(scs))
for _, sub := range scs {
enc := ""
if sub.Encoding != nil {
enc = *sub.Encoding
}
tabData = append(tabData, []string{
sub.Name,
sub.ModeString(),
sub.PrefixString(),
sub.PathsString(),
sub.SampleIntervalString(),
sub.Encoding,
enc,
})
}
sort.Slice(tabData, func(i, j int) bool {
Expand All @@ -323,7 +328,7 @@ func subscriptionTable(scs map[string]*types.SubscriptionConfig, list bool) [][]
tabData = append(tabData, []string{"Prefix", sub.PrefixString()})
tabData = append(tabData, []string{"Paths", sub.PathsString()})
tabData = append(tabData, []string{"Sample Interval", sub.SampleIntervalString()})
tabData = append(tabData, []string{"Encoding", sub.Encoding})
tabData = append(tabData, []string{"Encoding", *sub.Encoding})
tabData = append(tabData, []string{"Qos", sub.QosString()})
tabData = append(tabData, []string{"Heartbeat Interval", sub.HeartbeatIntervalString()})
return tabData
Expand Down Expand Up @@ -694,9 +699,11 @@ func subscriptionDescription(sub *types.SubscriptionConfig) string {
sb.WriteString(", ")
}
}
sb.WriteString("encoding=")
sb.WriteString(sub.Encoding)
sb.WriteString(", ")
if sub.Encoding != nil {
sb.WriteString("encoding=")
sb.WriteString(*sub.Encoding)
sb.WriteString(", ")
}
if sub.Prefix != "" {
sb.WriteString("prefix=")
sb.WriteString(sub.Prefix)
Expand Down
7 changes: 4 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
"github.com/itchyny/gojq"
"github.com/mitchellh/go-homedir"
"github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/gnmic/api"
"github.com/openconfig/gnmic/types"
"github.com/openconfig/gnmic/utils"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"gopkg.in/natefinch/lumberjack.v2"
yaml "gopkg.in/yaml.v2"

"github.com/openconfig/gnmic/api"
"github.com/openconfig/gnmic/types"
"github.com/openconfig/gnmic/utils"
)

const (
Expand Down
7 changes: 4 additions & 3 deletions config/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import (
"strings"

"github.com/openconfig/gnmi/proto/gnmi"
"github.com/spf13/cobra"

"github.com/openconfig/gnmic/api"
"github.com/openconfig/gnmic/types"
"github.com/spf13/cobra"
)

func (c *Config) CreateDiffSubscribeRequest(cmd *cobra.Command) (*gnmi.SubscribeRequest, error) {
Expand All @@ -26,12 +27,12 @@ func (c *Config) CreateDiffSubscribeRequest(cmd *cobra.Command) (*gnmi.Subscribe
Target: c.DiffTarget,
Paths: c.DiffPath,
Mode: "ONCE",
Encoding: c.Encoding,
Encoding: &c.Encoding,
}
if flagIsSet(cmd, "qos") {
sc.Qos = &c.DiffQos
}
return c.CreateSubscribeRequest(sc, "")
return c.CreateSubscribeRequest(sc, nil)
}

func (c *Config) CreateDiffGetRequest() (*gnmi.GetRequest, error) {
Expand Down
Loading