Skip to content
This repository has been archived by the owner on Aug 31, 2022. It is now read-only.

Pfcwd path fix #44

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
17 changes: 17 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
TildenWinston marked this conversation as resolved.
Show resolved Hide resolved
"configurations": [
{
"name": "Launch",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${fileDirname}",
"env": {},
"args": []
}
]
}
95 changes: 95 additions & 0 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",

"tasks": [
{
"label": "Make",
TildenWinston marked this conversation as resolved.
Show resolved Hide resolved
"type": "shell",
"command": "make",
"group": "build",
"presentation": {
"echo": true,
"reveal": "always",
"focus": false,
"panel": "shared",
"showReuseMessage": true,
"clear": false
},
"problemMatcher": [
"$go"
],

},
{
"label": "export",
"type": "shell",
"command": "export",
"args": ["CVL_SCHEMA_PATH=/usr/sbin/schema"],
"group": "build",
"presentation": {
"echo": true,
"reveal": "always",
"focus": false,
"panel": "shared",
"showReuseMessage": true,
"clear": false
},
"problemMatcher": [
"$go"
],

},
{
"label": "run",
"type": "shell",
"command": "${workspaceFolder}/build/bin/dialout_client_cli",
"args": ["insecure -logtostderr -v 7"],
"group": "build",
"presentation": {
"echo": true,
"reveal": "always",
"focus": false,
"panel": "shared",
"showReuseMessage": true,
"clear": false
},
"problemMatcher": [
"$go"
],

},
{
"label": "install",
"type": "shell",
"command": "echo 'devbox' | sudo -S install testdata/database_config.json -t /var/run/redis/sonic-db",
"args": [""],
"group": "build",
"presentation": {
"echo": true,
"reveal": "always",
"focus": false,
"panel": "shared",
"showReuseMessage": true,
"clear": false
},
"problemMatcher": [
"$go"
],

},
{
"label": "buildrun",
"type": "shell",
"command": "echo Done",
"problemMatcher": [
"$go"
],
"group": "build",
"dependsOrder": "sequence",
"dependsOn":["Make", "export", "install", "run" ]
}

]
}
19 changes: 17 additions & 2 deletions dialout/dialout_client/dialout_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@ import (
"crypto/tls"
"errors"
"fmt"
"net"

spb "github.com/Azure/sonic-telemetry/proto"
sdc "github.com/Azure/sonic-telemetry/sonic_data_client"
sdcfg "github.com/Azure/sonic-telemetry/sonic_db_config"
"github.com/Workiva/go-datastructures/queue"
"github.com/go-redis/redis"
log "github.com/golang/glog"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/ygot/ygot"
"github.com/Workiva/go-datastructures/queue"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"net"

//"reflect"
"strconv"
"strings"
Expand Down Expand Up @@ -185,6 +187,8 @@ func (cs *clientSubscription) NewInstance(ctx context.Context) error {
}

target := cs.prefix.GetTarget()
log.V(7).Infof("Target %v", target)

if target == "" {
return fmt.Errorf("Empty target data not supported yet")
}
Expand All @@ -195,6 +199,7 @@ func (cs *clientSubscription) NewInstance(ctx context.Context) error {
if target == "OTHERS" {
dc, err = sdc.NewNonDbClient(cs.paths, cs.prefix)
} else {
log.V(7).Infof("Paths %v, Prefix %v", cs.paths, cs.prefix)
dc, err = sdc.NewDbClient(cs.paths, cs.prefix)
}
if err != nil {
Expand Down Expand Up @@ -264,9 +269,14 @@ func newClient(ctx context.Context, dest Destination) (*Client, error) {
opts := []grpc.DialOption{
grpc.WithBlock(),
}

if clientCfg.TLS != nil {
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(clientCfg.TLS)))
} else {
opts = append(opts, grpc.WithInsecure())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not belong to the PR

log.V(2).Infof("gRPC without TLS")
}

conn, err := grpc.DialContext(ctx, dest.Addrs, opts...)
if err != nil {
return nil, fmt.Errorf("Dial to (%s, timeout %v): %v", dest, timeout, err)
Expand Down Expand Up @@ -317,6 +327,7 @@ restart: //Remote server might go down, in that case we restart with next destin
}

log.V(1).Infof("Dialout service connected to %v successfully for %v", dest, cs.name)
log.V(7).Infof("CTX is %v", ctx)
pub, err := c.client.Publish(ctx)
if err != nil {
log.V(1).Infof("Publish to %v for %v failed: %v, retrying", dest, cs.name, err)
Expand All @@ -335,6 +346,7 @@ restart: //Remote server might go down, in that case we restart with next destin
return
}
cs.cMu.Unlock()
log.V(7).Infof("Before switch 346 %v", cs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debuging line, remove


switch cs.reportType {
case Periodic:
Expand Down Expand Up @@ -388,13 +400,16 @@ restart: //Remote server might go down, in that case we restart with next destin
select {
default:
cs.w.Add(1)
log.V(7).Infof("Before stream run %v", cs.w)
go cs.dc.StreamRun(cs.q, cs.stop, &cs.w, nil)
log.V(7).Infof("After stream run %v", cs)
time.Sleep(100 * time.Millisecond)
err = cs.send(pub)
if err != nil {
log.V(1).Infof("Client %v pub Send error:%v, cs.conTryCnt %v", cs.name, err, cs.conTryCnt)
}
cs.Close()
log.V(7).Infof("Line 409 %v", cs)
cs.w.Wait()
// Don't restart immediatly
time.Sleep(clientCfg.RetryInterval)
Expand Down
22 changes: 16 additions & 6 deletions dialout/dialout_client_cli/dialout_client_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ package main
import (
"crypto/tls"
"flag"
"os"
"os/signal"
"time"

dc "github.com/Azure/sonic-telemetry/dialout/dialout_client"
log "github.com/golang/glog"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"golang.org/x/net/context"
"os"
"os/signal"
"time"
)

var (
Expand All @@ -19,13 +20,16 @@ var (
RetryInterval: 30 * time.Second,
Encoding: gpb.Encoding_JSON_IETF,
Unidirectional: true,
TLS: &tls.Config{},
}
tlsCfg = tls.Config{}

tlsDisable bool
)

func init() {
flag.StringVar(&clientCfg.TLS.ServerName, "server_name", "", "When set, use this hostname to verify server certificate during TLS handshake.")
flag.BoolVar(&clientCfg.TLS.InsecureSkipVerify, "insecure", false, "When set, client will not verify the server certificate during TLS handshake.")
flag.StringVar(&tlsCfg.ServerName, "server_name", "", "When set, use this hostname to verify server certificate during TLS handshake.")
flag.BoolVar(&tlsCfg.InsecureSkipVerify, "skip_verify", false, "When set, client will not verify the server certificate during TLS handshake.")
flag.BoolVar(&tlsDisable, "insecure", false, "Without TLS, only for testing")
flag.DurationVar(&clientCfg.RetryInterval, "retry_interval", 30*time.Second, "Interval at which client tries to reconnect to destination servers")
flag.BoolVar(&clientCfg.Unidirectional, "unidirectional", true, "No repesponse from server is expected")
}
Expand All @@ -41,6 +45,12 @@ func main() {
cancel()
}()
log.V(1).Infof("Starting telemetry publish client")

if !tlsDisable {
clientCfg.TLS = &tlsCfg
log.V(1).Infof("TLS enable")
}

err := dc.DialOutRun(ctx, &clientCfg)
log.V(1).Infof("Exiting telemetry publish client: %v", err)
log.Flush()
Expand Down
49 changes: 22 additions & 27 deletions dialout/dialout_server_cli/dialout_server_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"google.golang.org/grpc/credentials"

ds "github.com/Azure/sonic-telemetry/dialout/dialout_server"
testcert "github.com/Azure/sonic-telemetry/testdata/tls"
)

var (
Expand All @@ -20,7 +19,7 @@ var (
caCert = flag.String("ca_crt", "", "CA certificate for client certificate validation. Optional.")
serverCert = flag.String("server_crt", "", "TLS server certificate")
serverKey = flag.String("server_key", "", "TLS server private key")
insecure = flag.Bool("insecure", false, "Skip providing TLS cert and key, for testing only!")
insecure = flag.Bool("insecure", false, "Without TLS, for testing only!")
allowNoClientCert = flag.Bool("allow_no_client_auth", false, "When set, telemetry server will request but not require a client certificate.")
)

Expand All @@ -36,11 +35,6 @@ func main() {
var err error

if *insecure {
certificate, err = testcert.NewCert()
if err != nil {
log.Exitf("could not load server key pair: %s", err)
}
} else {
switch {
case *serverCert == "":
log.Errorf("serverCert must be set.")
Expand All @@ -53,32 +47,33 @@ func main() {
if err != nil {
log.Exitf("could not load server key pair: %s", err)
}
}

tlsCfg := &tls.Config{
ClientAuth: tls.RequireAndVerifyClientCert,
Certificates: []tls.Certificate{certificate},
}
if *allowNoClientCert {
// RequestClientCert will ask client for a certificate but won't
// require it to proceed. If certificate is provided, it will be
// verified.
tlsCfg.ClientAuth = tls.RequestClientCert
}
tlsCfg := &tls.Config{
ClientAuth: tls.RequireAndVerifyClientCert,
Certificates: []tls.Certificate{certificate},
}

if *caCert != "" {
ca, err := ioutil.ReadFile(*caCert)
if err != nil {
log.Exitf("could not read CA certificate: %s", err)
if *allowNoClientCert {
// RequestClientCert will ask client for a certificate but won't
// require it to proceed. If certificate is provided, it will be
// verified.
tlsCfg.ClientAuth = tls.RequestClientCert
}
certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(ca); !ok {
log.Exit("failed to append CA certificate")

if *caCert != "" {
ca, err := ioutil.ReadFile(*caCert)
if err != nil {
log.Exitf("could not read CA certificate: %s", err)
}
certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(ca); !ok {
log.Exit("failed to append CA certificate")
}
tlsCfg.ClientCAs = certPool
}
tlsCfg.ClientCAs = certPool
opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsCfg))}
}

opts := []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsCfg))}
cfg := &ds.Config{}
cfg.Port = int64(*port)
s, err := ds.NewServer(cfg, opts)
Expand Down
Loading