From 42816f1b77cd2c248c82b63b71982c7b09c21897 Mon Sep 17 00:00:00 2001 From: RazorBach Date: Tue, 20 Feb 2024 14:59:42 -0800 Subject: [PATCH] No public description PiperOrigin-RevId: 608751064 --- .bazelrc | 3 + ...ci-cpp-build-gnmi.yml => ci-cpp-build.yml} | 4 +- BUILD.bazel | 2 +- LICENSE | 2 +- WORKSPACE.bazel | 2 +- compile_protos.sh | 11 +- gnpsi_deps.bzl | 58 +- proto/gnpsi/BUILD.bazel | 19 +- proto/gnpsi/gnpsi.pb.go | 640 ------------------ proto/gnpsi/gnpsi_grpc.pb.go | 134 ---- server/BUILD.bazel | 67 ++ server/gnpsi_relay_server.cc | 110 +++ server/gnpsi_relay_server.h | 67 ++ server/gnpsi_relay_server_test.cc | 114 ++++ server/gnpsi_service_impl.cc | 125 ++++ server/gnpsi_service_impl.h | 112 +++ server/mock_gnpsi_service_impl.h | 19 + 17 files changed, 687 insertions(+), 802 deletions(-) create mode 100644 .bazelrc rename .github/workflows/{ci-cpp-build-gnmi.yml => ci-cpp-build.yml} (95%) delete mode 100644 proto/gnpsi/gnpsi.pb.go delete mode 100644 proto/gnpsi/gnpsi_grpc.pb.go create mode 100644 server/BUILD.bazel create mode 100644 server/gnpsi_relay_server.cc create mode 100644 server/gnpsi_relay_server.h create mode 100644 server/gnpsi_relay_server_test.cc create mode 100644 server/gnpsi_service_impl.cc create mode 100644 server/gnpsi_service_impl.h create mode 100644 server/mock_gnpsi_service_impl.h diff --git a/.bazelrc b/.bazelrc new file mode 100644 index 0000000..4818c21 --- /dev/null +++ b/.bazelrc @@ -0,0 +1,3 @@ +# absl supports only c++14 +build --cxxopt=-std=c++14 +build --host_cxxopt=-std=c++14 diff --git a/.github/workflows/ci-cpp-build-gnmi.yml b/.github/workflows/ci-cpp-build.yml similarity index 95% rename from .github/workflows/ci-cpp-build-gnmi.yml rename to .github/workflows/ci-cpp-build.yml index fb76974..ad32063 100644 --- a/.github/workflows/ci-cpp-build-gnmi.yml +++ b/.github/workflows/ci-cpp-build.yml @@ -31,8 +31,8 @@ jobs: bazel-${{ runner.os }}-build- - name: Install bazelisk run: | - curl -LO "https://github.com/bazelbuild/bazelisk/releases/download/v1.8.1/$BAZEL" + curl -LO "https://github.com/bazelbuild/bazelisk/releases/download/v1.17.0/$BAZEL" chmod +x $BAZEL sudo mv $BAZEL /usr/local/bin/bazel - name: Build - run: bazel build //... + run: bazel build //... \ No newline at end of file diff --git a/BUILD.bazel b/BUILD.bazel index ca5484e..9e9f1b2 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1,4 +1,4 @@ -# Copyright 2021 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/LICENSE b/LICENSE index 7a4a3ea..d645695 100644 --- a/LICENSE +++ b/LICENSE @@ -199,4 +199,4 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and - limitations under the License. \ No newline at end of file + limitations under the License. diff --git a/WORKSPACE.bazel b/WORKSPACE.bazel index 2f010d5..9ee991f 100644 --- a/WORKSPACE.bazel +++ b/WORKSPACE.bazel @@ -1,4 +1,4 @@ -# Copyright 2021 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/compile_protos.sh b/compile_protos.sh index 6bf6260..ee6cf08 100755 --- a/compile_protos.sh +++ b/compile_protos.sh @@ -5,7 +5,7 @@ # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http:#www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -15,11 +15,10 @@ # #!/bin/bash -set -euo pipefail -# Go -for p in gnpsi; do - protoc --go-grpc_out=. --go-grpc_opt=paths=source_relative --go_out=. --go_opt=paths=source_relative proto/$p/$p.proto -done +# Cpp +PROTO_DIR="proto/gnpsi" +protoc -I $PROTO_DIR --grpc_out=$PROTO_DIR --plugin=protoc-gen-grpc=`which grpc_cpp_plugin` $PROTO_DIR/gnpsi.proto +protoc -I $PROTO_DIR --cpp_out=$PROTO_DIR $PROTO_DIR/gnpsi.proto diff --git a/gnpsi_deps.bzl b/gnpsi_deps.bzl index 7e3c89b..cde8303 100644 --- a/gnpsi_deps.bzl +++ b/gnpsi_deps.bzl @@ -1,4 +1,4 @@ -# Copyright 2021 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,31 +21,61 @@ def gnpsi_deps(): if not native.existing_rule("com_github_grpc_grpc"): http_archive( name = "com_github_grpc_grpc", - url = "https://github.com/grpc/grpc/archive/refs/tags/v1.43.2.tar.gz", - strip_prefix = "grpc-1.43.2", - sha256 = "b74ce7d26fe187970d1d8e2c06a5d3391122f7bc1fdce569aff5e435fb8fe780", + url = "https://github.com/grpc/grpc/archive/v1.56.1.zip", + strip_prefix = "grpc-1.56.1", + sha256 = "04a52a313926f0f6ec2ed489ac7552aa5949693b071eaf45ae13b66e5910c32f", + ) + if not native.existing_rule("com_google_absl"): + http_archive( + name = "com_google_absl", + url = "https://github.com/abseil/abseil-cpp/archive/20230125.3.tar.gz", + strip_prefix = "abseil-cpp-20230125.3", + sha256 = "5366d7e7fa7ba0d915014d387b66d0d002c03236448e1ba9ef98122c13b35c36", + ) + if not native.existing_rule("com_google_googletest"): + http_archive( + name = "com_google_googletest", + urls = ["https://github.com/google/googletest/archive/release-1.11.0.tar.gz"], + strip_prefix = "googletest-release-1.11.0", + sha256 = "b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5", ) if not native.existing_rule("com_google_protobuf"): http_archive( name = "com_google_protobuf", - url = "https://github.com/protocolbuffers/protobuf/releases/download/v3.19.4/protobuf-all-3.19.4.tar.gz", - strip_prefix = "protobuf-3.19.4", - sha256 = "ba0650be1b169d24908eeddbe6107f011d8df0da5b1a5a4449a913b10e578faf", + url = "https://github.com/protocolbuffers/protobuf/archive/refs/tags/v23.1.zip", + strip_prefix = "protobuf-23.1", + sha256 = "c0ea9f4d75b37ea8e9d78ce4c670d066bcb7cebdba190fa5fc8c57b1f00c0c2c", ) if not native.existing_rule("com_google_googleapis"): http_archive( name = "com_google_googleapis", - url = "https://github.com/googleapis/googleapis/archive/ccb9d245ddac58b8d4ad918e6a914e841a64cc28.zip", - strip_prefix = "googleapis-ccb9d245ddac58b8d4ad918e6a914e841a64cc28", - sha256 = "feca5804fa0af2bc48d041a8b6e0356fb9e4848b3dd6ee74ab847022e90c69ff", + url = "https://github.com/googleapis/googleapis/archive/2f9af297c84c55c8b871ba4495e01ade42476c92.tar.gz", + sha256 = "5bb6b0253ccf64b53d6c7249625a7e3f6c3bc6402abd52d3778bfa48258703a0", + strip_prefix = "googleapis-2f9af297c84c55c8b871ba4495e01ade42476c92", + ) + if not native.existing_rule("com_github_google_glog"): + http_archive( + name = "com_github_google_glog", + url = "https://github.com/google/glog/archive/v0.6.0.tar.gz", + strip_prefix = "glog-0.6.0", + sha256 = "8a83bf982f37bb70825df71a9709fa90ea9f4447fb3c099e1d720a439d88bad6", + ) + + # Needed to make glog happy. + if not native.existing_rule("com_github_gflags_gflags"): + http_archive( + name = "com_github_gflags_gflags", + url = "https://github.com/gflags/gflags/archive/v2.2.2.tar.gz", + strip_prefix = "gflags-2.2.2", + sha256 = "34af2f15cf7367513b352bdcd2493ab14ce43692d2dcd9dfc499492966c64dcf", ) if not native.existing_rule("rules_proto"): http_archive( name = "rules_proto", - sha256 = "66bfdf8782796239d3875d37e7de19b1d94301e8972b3cbd2446b332429b4df1", - strip_prefix = "rules_proto-4.0.0", urls = [ - "https://mirror.bazel.build/github.com/bazelbuild/rules_proto/archive/refs/tags/4.0.0.tar.gz", - "https://github.com/bazelbuild/rules_proto/archive/refs/tags/4.0.0.tar.gz", + "https://mirror.bazel.build/github.com/bazelbuild/rules_proto/archive/97d8af4dc474595af3900dd85cb3a29ad28cc313.tar.gz", + "https://github.com/bazelbuild/rules_proto/archive/97d8af4dc474595af3900dd85cb3a29ad28cc313.tar.gz", ], + strip_prefix = "rules_proto-97d8af4dc474595af3900dd85cb3a29ad28cc313", + sha256 = "602e7161d9195e50246177e7c55b2f39950a9cf7366f74ed5f22fd45750cd208", ) diff --git a/proto/gnpsi/BUILD.bazel b/proto/gnpsi/BUILD.bazel index c99b01f..2979d29 100644 --- a/proto/gnpsi/BUILD.bazel +++ b/proto/gnpsi/BUILD.bazel @@ -1,4 +1,4 @@ -# Copyright 2021 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,10 +13,12 @@ # limitations under the License. # # Supporting infrastructure for implementing and testing PINS. -# load("@com_github_grpc_grpc//bazel:cc_grpc_library.bzl", "cc_grpc_library") +# gnpsi defines a gRPC-based network packet sampling interface implemented on +# vendor network elements. It provides mechanisms for streaming packet +# samples/updates from the network element. package( default_visibility = ["//visibility:public"], licenses = ["notice"], @@ -25,10 +27,21 @@ package( proto_library( name = "gnpsi_proto", srcs = ["gnpsi.proto"], - import_prefix = "github.com/openconfig/gnpsi", + deps = [ + "@com_google_protobuf//:any_proto", + "@com_google_protobuf//:descriptor_proto", + ], ) cc_proto_library( name = "gnpsi_cc_proto", deps = [":gnpsi_proto"], ) + +cc_grpc_library( + name = "gnpsi_grpc_proto", + srcs = [":gnpsi_proto"], + generate_mocks = True, + grpc_only = ["True"], + deps = [":gnpsi_cc_proto"], +) diff --git a/proto/gnpsi/gnpsi.pb.go b/proto/gnpsi/gnpsi.pb.go deleted file mode 100644 index ee0d179..0000000 --- a/proto/gnpsi/gnpsi.pb.go +++ /dev/null @@ -1,640 +0,0 @@ -// -// Copyright 2021 Google LLC All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.27.1 -// protoc v3.12.4 -// source: proto/gnpsi/gnpsi.proto - -// Package gNPSI defines a service specification for the gRPC Network Packet -// Sampling Interface. This interface is defined to be a standard interface via -// which a telemetry system ("client") can subscribe to sampling updates from a -// device -// ("target"). -// - -package gnpsi - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type SFlowMetadata_Version int32 - -const ( - SFlowMetadata_UNSPECIFIED SFlowMetadata_Version = 0 - SFlowMetadata_V2 SFlowMetadata_Version = 1 - SFlowMetadata_V5 SFlowMetadata_Version = 2 -) - -// Enum value maps for SFlowMetadata_Version. -var ( - SFlowMetadata_Version_name = map[int32]string{ - 0: "UNSPECIFIED", - 1: "V2", - 2: "V5", - } - SFlowMetadata_Version_value = map[string]int32{ - "UNSPECIFIED": 0, - "V2": 1, - "V5": 2, - } -) - -func (x SFlowMetadata_Version) Enum() *SFlowMetadata_Version { - p := new(SFlowMetadata_Version) - *p = x - return p -} - -func (x SFlowMetadata_Version) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (SFlowMetadata_Version) Descriptor() protoreflect.EnumDescriptor { - return file_proto_gnpsi_gnpsi_proto_enumTypes[0].Descriptor() -} - -func (SFlowMetadata_Version) Type() protoreflect.EnumType { - return &file_proto_gnpsi_gnpsi_proto_enumTypes[0] -} - -func (x SFlowMetadata_Version) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use SFlowMetadata_Version.Descriptor instead. -func (SFlowMetadata_Version) EnumDescriptor() ([]byte, []int) { - return file_proto_gnpsi_gnpsi_proto_rawDescGZIP(), []int{0, 0} -} - -type NetFlowMetadata_Version int32 - -const ( - NetFlowMetadata_UNSPECIFIED NetFlowMetadata_Version = 0 - NetFlowMetadata_V1 NetFlowMetadata_Version = 1 - NetFlowMetadata_V5 NetFlowMetadata_Version = 2 - NetFlowMetadata_V7 NetFlowMetadata_Version = 3 - NetFlowMetadata_V9 NetFlowMetadata_Version = 4 -) - -// Enum value maps for NetFlowMetadata_Version. -var ( - NetFlowMetadata_Version_name = map[int32]string{ - 0: "UNSPECIFIED", - 1: "V1", - 2: "V5", - 3: "V7", - 4: "V9", - } - NetFlowMetadata_Version_value = map[string]int32{ - "UNSPECIFIED": 0, - "V1": 1, - "V5": 2, - "V7": 3, - "V9": 4, - } -) - -func (x NetFlowMetadata_Version) Enum() *NetFlowMetadata_Version { - p := new(NetFlowMetadata_Version) - *p = x - return p -} - -func (x NetFlowMetadata_Version) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (NetFlowMetadata_Version) Descriptor() protoreflect.EnumDescriptor { - return file_proto_gnpsi_gnpsi_proto_enumTypes[1].Descriptor() -} - -func (NetFlowMetadata_Version) Type() protoreflect.EnumType { - return &file_proto_gnpsi_gnpsi_proto_enumTypes[1] -} - -func (x NetFlowMetadata_Version) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use NetFlowMetadata_Version.Descriptor instead. -func (NetFlowMetadata_Version) EnumDescriptor() ([]byte, []int) { - return file_proto_gnpsi_gnpsi_proto_rawDescGZIP(), []int{1, 0} -} - -type IPFIXMetadata_Version int32 - -const ( - IPFIXMetadata_UNSPECIFIED IPFIXMetadata_Version = 0 - IPFIXMetadata_V10 IPFIXMetadata_Version = 1 -) - -// Enum value maps for IPFIXMetadata_Version. -var ( - IPFIXMetadata_Version_name = map[int32]string{ - 0: "UNSPECIFIED", - 1: "V10", - } - IPFIXMetadata_Version_value = map[string]int32{ - "UNSPECIFIED": 0, - "V10": 1, - } -) - -func (x IPFIXMetadata_Version) Enum() *IPFIXMetadata_Version { - p := new(IPFIXMetadata_Version) - *p = x - return p -} - -func (x IPFIXMetadata_Version) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (IPFIXMetadata_Version) Descriptor() protoreflect.EnumDescriptor { - return file_proto_gnpsi_gnpsi_proto_enumTypes[2].Descriptor() -} - -func (IPFIXMetadata_Version) Type() protoreflect.EnumType { - return &file_proto_gnpsi_gnpsi_proto_enumTypes[2] -} - -func (x IPFIXMetadata_Version) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use IPFIXMetadata_Version.Descriptor instead. -func (IPFIXMetadata_Version) EnumDescriptor() ([]byte, []int) { - return file_proto_gnpsi_gnpsi_proto_rawDescGZIP(), []int{2, 0} -} - -type SFlowMetadata struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Version SFlowMetadata_Version `protobuf:"varint,1,opt,name=version,proto3,enum=gnpsi.SFlowMetadata_Version" json:"version,omitempty"` -} - -func (x *SFlowMetadata) Reset() { - *x = SFlowMetadata{} - if protoimpl.UnsafeEnabled { - mi := &file_proto_gnpsi_gnpsi_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *SFlowMetadata) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*SFlowMetadata) ProtoMessage() {} - -func (x *SFlowMetadata) ProtoReflect() protoreflect.Message { - mi := &file_proto_gnpsi_gnpsi_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use SFlowMetadata.ProtoReflect.Descriptor instead. -func (*SFlowMetadata) Descriptor() ([]byte, []int) { - return file_proto_gnpsi_gnpsi_proto_rawDescGZIP(), []int{0} -} - -func (x *SFlowMetadata) GetVersion() SFlowMetadata_Version { - if x != nil { - return x.Version - } - return SFlowMetadata_UNSPECIFIED -} - -type NetFlowMetadata struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Version NetFlowMetadata_Version `protobuf:"varint,1,opt,name=version,proto3,enum=gnpsi.NetFlowMetadata_Version" json:"version,omitempty"` -} - -func (x *NetFlowMetadata) Reset() { - *x = NetFlowMetadata{} - if protoimpl.UnsafeEnabled { - mi := &file_proto_gnpsi_gnpsi_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *NetFlowMetadata) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*NetFlowMetadata) ProtoMessage() {} - -func (x *NetFlowMetadata) ProtoReflect() protoreflect.Message { - mi := &file_proto_gnpsi_gnpsi_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use NetFlowMetadata.ProtoReflect.Descriptor instead. -func (*NetFlowMetadata) Descriptor() ([]byte, []int) { - return file_proto_gnpsi_gnpsi_proto_rawDescGZIP(), []int{1} -} - -func (x *NetFlowMetadata) GetVersion() NetFlowMetadata_Version { - if x != nil { - return x.Version - } - return NetFlowMetadata_UNSPECIFIED -} - -type IPFIXMetadata struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Version IPFIXMetadata_Version `protobuf:"varint,1,opt,name=version,proto3,enum=gnpsi.IPFIXMetadata_Version" json:"version,omitempty"` -} - -func (x *IPFIXMetadata) Reset() { - *x = IPFIXMetadata{} - if protoimpl.UnsafeEnabled { - mi := &file_proto_gnpsi_gnpsi_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *IPFIXMetadata) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*IPFIXMetadata) ProtoMessage() {} - -func (x *IPFIXMetadata) ProtoReflect() protoreflect.Message { - mi := &file_proto_gnpsi_gnpsi_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use IPFIXMetadata.ProtoReflect.Descriptor instead. -func (*IPFIXMetadata) Descriptor() ([]byte, []int) { - return file_proto_gnpsi_gnpsi_proto_rawDescGZIP(), []int{2} -} - -func (x *IPFIXMetadata) GetVersion() IPFIXMetadata_Version { - if x != nil { - return x.Version - } - return IPFIXMetadata_UNSPECIFIED -} - -type Request struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields -} - -func (x *Request) Reset() { - *x = Request{} - if protoimpl.UnsafeEnabled { - mi := &file_proto_gnpsi_gnpsi_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Request) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Request) ProtoMessage() {} - -func (x *Request) ProtoReflect() protoreflect.Message { - mi := &file_proto_gnpsi_gnpsi_proto_msgTypes[3] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Request.ProtoReflect.Descriptor instead. -func (*Request) Descriptor() ([]byte, []int) { - return file_proto_gnpsi_gnpsi_proto_rawDescGZIP(), []int{3} -} - -type Sample struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // Payload of the sample. - Packet []byte `protobuf:"bytes,1,opt,name=packet,proto3" json:"packet,omitempty"` - // Last timestamp of sample payload (ns since epoch) - Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - // Only one of these metadata will be populated to correspond to the sample - // returned. - // - // The metadata fields applies to all messages on this stream, and would only - // be present in the first message on the stream. - SflowMetadata *SFlowMetadata `protobuf:"bytes,101,opt,name=sflow_metadata,json=sflowMetadata,proto3" json:"sflow_metadata,omitempty"` - NetflowMetadata *NetFlowMetadata `protobuf:"bytes,102,opt,name=netflow_metadata,json=netflowMetadata,proto3" json:"netflow_metadata,omitempty"` - IpfixMetadata *IPFIXMetadata `protobuf:"bytes,103,opt,name=ipfix_metadata,json=ipfixMetadata,proto3" json:"ipfix_metadata,omitempty"` -} - -func (x *Sample) Reset() { - *x = Sample{} - if protoimpl.UnsafeEnabled { - mi := &file_proto_gnpsi_gnpsi_proto_msgTypes[4] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Sample) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Sample) ProtoMessage() {} - -func (x *Sample) ProtoReflect() protoreflect.Message { - mi := &file_proto_gnpsi_gnpsi_proto_msgTypes[4] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Sample.ProtoReflect.Descriptor instead. -func (*Sample) Descriptor() ([]byte, []int) { - return file_proto_gnpsi_gnpsi_proto_rawDescGZIP(), []int{4} -} - -func (x *Sample) GetPacket() []byte { - if x != nil { - return x.Packet - } - return nil -} - -func (x *Sample) GetTimestamp() int64 { - if x != nil { - return x.Timestamp - } - return 0 -} - -func (x *Sample) GetSflowMetadata() *SFlowMetadata { - if x != nil { - return x.SflowMetadata - } - return nil -} - -func (x *Sample) GetNetflowMetadata() *NetFlowMetadata { - if x != nil { - return x.NetflowMetadata - } - return nil -} - -func (x *Sample) GetIpfixMetadata() *IPFIXMetadata { - if x != nil { - return x.IpfixMetadata - } - return nil -} - -var File_proto_gnpsi_gnpsi_proto protoreflect.FileDescriptor - -var file_proto_gnpsi_gnpsi_proto_rawDesc = []byte{ - 0x0a, 0x17, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6e, 0x70, 0x73, 0x69, 0x2f, 0x67, 0x6e, - 0x70, 0x73, 0x69, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x67, 0x6e, 0x70, 0x73, 0x69, - 0x22, 0x73, 0x0a, 0x0d, 0x53, 0x46, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, - 0x61, 0x12, 0x36, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0e, 0x32, 0x1c, 0x2e, 0x67, 0x6e, 0x70, 0x73, 0x69, 0x2e, 0x53, 0x46, 0x6c, 0x6f, 0x77, - 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x2a, 0x0a, 0x07, 0x56, 0x65, 0x72, - 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, - 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x06, 0x0a, 0x02, 0x56, 0x32, 0x10, 0x01, 0x12, 0x06, 0x0a, - 0x02, 0x56, 0x35, 0x10, 0x02, 0x22, 0x87, 0x01, 0x0a, 0x0f, 0x4e, 0x65, 0x74, 0x46, 0x6c, 0x6f, - 0x77, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x38, 0x0a, 0x07, 0x76, 0x65, 0x72, - 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1e, 0x2e, 0x67, 0x6e, 0x70, - 0x73, 0x69, 0x2e, 0x4e, 0x65, 0x74, 0x46, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, - 0x74, 0x61, 0x2e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, - 0x69, 0x6f, 0x6e, 0x22, 0x3a, 0x0a, 0x07, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0f, - 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, - 0x06, 0x0a, 0x02, 0x56, 0x31, 0x10, 0x01, 0x12, 0x06, 0x0a, 0x02, 0x56, 0x35, 0x10, 0x02, 0x12, - 0x06, 0x0a, 0x02, 0x56, 0x37, 0x10, 0x03, 0x12, 0x06, 0x0a, 0x02, 0x56, 0x39, 0x10, 0x04, 0x22, - 0x6c, 0x0a, 0x0d, 0x49, 0x50, 0x46, 0x49, 0x58, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, - 0x12, 0x36, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0e, 0x32, 0x1c, 0x2e, 0x67, 0x6e, 0x70, 0x73, 0x69, 0x2e, 0x49, 0x50, 0x46, 0x49, 0x58, 0x4d, - 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, - 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x23, 0x0a, 0x07, 0x56, 0x65, 0x72, 0x73, - 0x69, 0x6f, 0x6e, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, - 0x45, 0x44, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x56, 0x31, 0x30, 0x10, 0x01, 0x22, 0x09, 0x0a, - 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0xfb, 0x01, 0x0a, 0x06, 0x53, 0x61, 0x6d, - 0x70, 0x6c, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x06, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x74, - 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, - 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x3b, 0x0a, 0x0e, 0x73, 0x66, 0x6c, - 0x6f, 0x77, 0x5f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x65, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6e, 0x70, 0x73, 0x69, 0x2e, 0x53, 0x46, 0x6c, 0x6f, 0x77, 0x4d, - 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x0d, 0x73, 0x66, 0x6c, 0x6f, 0x77, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x41, 0x0a, 0x10, 0x6e, 0x65, 0x74, 0x66, 0x6c, 0x6f, - 0x77, 0x5f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x66, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x16, 0x2e, 0x67, 0x6e, 0x70, 0x73, 0x69, 0x2e, 0x4e, 0x65, 0x74, 0x46, 0x6c, 0x6f, 0x77, - 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x0f, 0x6e, 0x65, 0x74, 0x66, 0x6c, 0x6f, - 0x77, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x3b, 0x0a, 0x0e, 0x69, 0x70, 0x66, - 0x69, 0x78, 0x5f, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x67, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6e, 0x70, 0x73, 0x69, 0x2e, 0x49, 0x50, 0x46, 0x49, 0x58, 0x4d, - 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x0d, 0x69, 0x70, 0x66, 0x69, 0x78, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x32, 0x35, 0x0a, 0x05, 0x67, 0x4e, 0x50, 0x53, 0x49, 0x12, - 0x2c, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x0e, 0x2e, 0x67, - 0x6e, 0x70, 0x73, 0x69, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x67, - 0x6e, 0x70, 0x73, 0x69, 0x2e, 0x53, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x30, 0x01, 0x42, 0x29, 0x5a, - 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x70, 0x65, 0x6e, - 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2f, 0x67, 0x6e, 0x70, 0x73, 0x69, 0x2f, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2f, 0x67, 0x6e, 0x70, 0x73, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_proto_gnpsi_gnpsi_proto_rawDescOnce sync.Once - file_proto_gnpsi_gnpsi_proto_rawDescData = file_proto_gnpsi_gnpsi_proto_rawDesc -) - -func file_proto_gnpsi_gnpsi_proto_rawDescGZIP() []byte { - file_proto_gnpsi_gnpsi_proto_rawDescOnce.Do(func() { - file_proto_gnpsi_gnpsi_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_gnpsi_gnpsi_proto_rawDescData) - }) - return file_proto_gnpsi_gnpsi_proto_rawDescData -} - -var file_proto_gnpsi_gnpsi_proto_enumTypes = make([]protoimpl.EnumInfo, 3) -var file_proto_gnpsi_gnpsi_proto_msgTypes = make([]protoimpl.MessageInfo, 5) -var file_proto_gnpsi_gnpsi_proto_goTypes = []interface{}{ - (SFlowMetadata_Version)(0), // 0: gnpsi.SFlowMetadata.Version - (NetFlowMetadata_Version)(0), // 1: gnpsi.NetFlowMetadata.Version - (IPFIXMetadata_Version)(0), // 2: gnpsi.IPFIXMetadata.Version - (*SFlowMetadata)(nil), // 3: gnpsi.SFlowMetadata - (*NetFlowMetadata)(nil), // 4: gnpsi.NetFlowMetadata - (*IPFIXMetadata)(nil), // 5: gnpsi.IPFIXMetadata - (*Request)(nil), // 6: gnpsi.Request - (*Sample)(nil), // 7: gnpsi.Sample -} -var file_proto_gnpsi_gnpsi_proto_depIdxs = []int32{ - 0, // 0: gnpsi.SFlowMetadata.version:type_name -> gnpsi.SFlowMetadata.Version - 1, // 1: gnpsi.NetFlowMetadata.version:type_name -> gnpsi.NetFlowMetadata.Version - 2, // 2: gnpsi.IPFIXMetadata.version:type_name -> gnpsi.IPFIXMetadata.Version - 3, // 3: gnpsi.Sample.sflow_metadata:type_name -> gnpsi.SFlowMetadata - 4, // 4: gnpsi.Sample.netflow_metadata:type_name -> gnpsi.NetFlowMetadata - 5, // 5: gnpsi.Sample.ipfix_metadata:type_name -> gnpsi.IPFIXMetadata - 6, // 6: gnpsi.gNPSI.Subscribe:input_type -> gnpsi.Request - 7, // 7: gnpsi.gNPSI.Subscribe:output_type -> gnpsi.Sample - 7, // [7:8] is the sub-list for method output_type - 6, // [6:7] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name -} - -func init() { file_proto_gnpsi_gnpsi_proto_init() } -func file_proto_gnpsi_gnpsi_proto_init() { - if File_proto_gnpsi_gnpsi_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_proto_gnpsi_gnpsi_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SFlowMetadata); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_proto_gnpsi_gnpsi_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*NetFlowMetadata); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_proto_gnpsi_gnpsi_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*IPFIXMetadata); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_proto_gnpsi_gnpsi_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Request); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_proto_gnpsi_gnpsi_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Sample); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_proto_gnpsi_gnpsi_proto_rawDesc, - NumEnums: 3, - NumMessages: 5, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_proto_gnpsi_gnpsi_proto_goTypes, - DependencyIndexes: file_proto_gnpsi_gnpsi_proto_depIdxs, - EnumInfos: file_proto_gnpsi_gnpsi_proto_enumTypes, - MessageInfos: file_proto_gnpsi_gnpsi_proto_msgTypes, - }.Build() - File_proto_gnpsi_gnpsi_proto = out.File - file_proto_gnpsi_gnpsi_proto_rawDesc = nil - file_proto_gnpsi_gnpsi_proto_goTypes = nil - file_proto_gnpsi_gnpsi_proto_depIdxs = nil -} diff --git a/proto/gnpsi/gnpsi_grpc.pb.go b/proto/gnpsi/gnpsi_grpc.pb.go deleted file mode 100644 index c863db5..0000000 --- a/proto/gnpsi/gnpsi_grpc.pb.go +++ /dev/null @@ -1,134 +0,0 @@ -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. - -package gnpsi - -import ( - context "context" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 - -// GNPSIClient is the client API for GNPSI service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type GNPSIClient interface { - // gNPSI subscription allows client to subscribe to SFlow/NetFlow/IPFIX - // updates from the device. Past updates, i.e., updates before the - // subscription is received, will not be presented to the subscribing client. - Subscribe(ctx context.Context, in *Request, opts ...grpc.CallOption) (GNPSI_SubscribeClient, error) -} - -type gNPSIClient struct { - cc grpc.ClientConnInterface -} - -func NewGNPSIClient(cc grpc.ClientConnInterface) GNPSIClient { - return &gNPSIClient{cc} -} - -func (c *gNPSIClient) Subscribe(ctx context.Context, in *Request, opts ...grpc.CallOption) (GNPSI_SubscribeClient, error) { - stream, err := c.cc.NewStream(ctx, &GNPSI_ServiceDesc.Streams[0], "/gnpsi.gNPSI/Subscribe", opts...) - if err != nil { - return nil, err - } - x := &gNPSISubscribeClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type GNPSI_SubscribeClient interface { - Recv() (*Sample, error) - grpc.ClientStream -} - -type gNPSISubscribeClient struct { - grpc.ClientStream -} - -func (x *gNPSISubscribeClient) Recv() (*Sample, error) { - m := new(Sample) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// GNPSIServer is the server API for GNPSI service. -// All implementations must embed UnimplementedGNPSIServer -// for forward compatibility -type GNPSIServer interface { - // gNPSI subscription allows client to subscribe to SFlow/NetFlow/IPFIX - // updates from the device. Past updates, i.e., updates before the - // subscription is received, will not be presented to the subscribing client. - Subscribe(*Request, GNPSI_SubscribeServer) error - mustEmbedUnimplementedGNPSIServer() -} - -// UnimplementedGNPSIServer must be embedded to have forward compatible implementations. -type UnimplementedGNPSIServer struct { -} - -func (UnimplementedGNPSIServer) Subscribe(*Request, GNPSI_SubscribeServer) error { - return status.Errorf(codes.Unimplemented, "method Subscribe not implemented") -} -func (UnimplementedGNPSIServer) mustEmbedUnimplementedGNPSIServer() {} - -// UnsafeGNPSIServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to GNPSIServer will -// result in compilation errors. -type UnsafeGNPSIServer interface { - mustEmbedUnimplementedGNPSIServer() -} - -func RegisterGNPSIServer(s grpc.ServiceRegistrar, srv GNPSIServer) { - s.RegisterService(&GNPSI_ServiceDesc, srv) -} - -func _GNPSI_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(Request) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(GNPSIServer).Subscribe(m, &gNPSISubscribeServer{stream}) -} - -type GNPSI_SubscribeServer interface { - Send(*Sample) error - grpc.ServerStream -} - -type gNPSISubscribeServer struct { - grpc.ServerStream -} - -func (x *gNPSISubscribeServer) Send(m *Sample) error { - return x.ServerStream.SendMsg(m) -} - -// GNPSI_ServiceDesc is the grpc.ServiceDesc for GNPSI service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var GNPSI_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "gnpsi.gNPSI", - HandlerType: (*GNPSIServer)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "Subscribe", - Handler: _GNPSI_Subscribe_Handler, - ServerStreams: true, - }, - }, - Metadata: "proto/gnpsi/gnpsi.proto", -} diff --git a/server/BUILD.bazel b/server/BUILD.bazel new file mode 100644 index 0000000..56b3d81 --- /dev/null +++ b/server/BUILD.bazel @@ -0,0 +1,67 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Supporting infrastructure for implementing and testing PINS. + +package( + default_visibility = ["//visibility:public"], + licenses = ["notice"], +) + +cc_library( + name = "gnpsi_service_impl", + srcs = ["gnpsi_service_impl.cc"], + hdrs = ["gnpsi_service_impl.h"], + deps = [ + "//proto/gnpsi:gnpsi_cc_proto", + "//proto/gnpsi:gnpsi_grpc_proto", + "@com_github_google_glog//:glog", + "@com_github_grpc_grpc//:grpc++", + "@com_google_absl//absl/base:core_headers", + "@com_google_absl//absl/status", + "@com_google_absl//absl/strings", + "@com_google_absl//absl/synchronization", + "@com_google_absl//absl/time", + ], +) + +cc_library( + name = "gnpsi_relay_server", + srcs = ["gnpsi_relay_server.cc"], + hdrs = ["gnpsi_relay_server.h"], + deps = [ + ":gnpsi_service_impl", + "@com_github_google_glog//:glog", + ], +) + +cc_library( + name = "mock_gnpsi_service", + testonly = True, + hdrs = ["mock_gnpsi_service_impl.h"], + deps = [ + ":gnpsi_service_impl", + "@com_google_googletest//:gtest", + ], +) + +cc_test( + name = "gnpsi_relay_server_test", + srcs = ["gnpsi_relay_server_test.cc"], + deps = [ + ":gnpsi_relay_server", + ":mock_gnpsi_service", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/server/gnpsi_relay_server.cc b/server/gnpsi_relay_server.cc new file mode 100644 index 0000000..739c960 --- /dev/null +++ b/server/gnpsi_relay_server.cc @@ -0,0 +1,110 @@ +#include "server/gnpsi_relay_server.h" + +#include +#include +#include +#include +#include + +#include "glog/logging.h" +#include "server/gnpsi_service_impl.h" + +namespace { +enum ReadError { + // No Error in the current read + NoError = 0, + // Non Fatal Error can retry reading from the socket + NonFatalError = 1, + // Fatal Error should stop reading from the socket + FatalError = 2, +}; + +int SetUpReadSocket(int port, int addr_family, + gnpsi::SocketInterface* socket_provider) { + int socket_fd = socket_provider->Socket(addr_family, SOCK_DGRAM, IPPROTO_UDP); + if (socket_fd < 0) { + LOG(ERROR) << "Socket creation failed: " << strerror(errno); + return -1; + } + struct sockaddr_storage addr; + socklen_t sock_len = 0; + memset(&addr, 0, sizeof(addr)); + if (addr_family == AF_INET6) { + struct sockaddr_in6* address = (struct sockaddr_in6*)&addr; + sock_len = sizeof(struct sockaddr_in6); + address->sin6_family = AF_INET6; + address->sin6_addr = in6addr_loopback; + address->sin6_port = htons(port); + } else if (addr_family == AF_INET) { + struct sockaddr_in* address = (struct sockaddr_in*)&addr; + sock_len = sizeof(struct sockaddr_in); + address->sin_family = AF_INET; + address->sin_addr.s_addr = htonl(INADDR_LOOPBACK); + address->sin_port = htons(port); + } else { + LOG(ERROR) << "Invalid address family"; + socket_provider->Close(socket_fd); + return -1; + } + if (socket_provider->Bind(socket_fd, (struct sockaddr*)&addr, sock_len) < 0) { + LOG(ERROR) << "Failed to Bind socket: " << strerror(errno); + socket_provider->Close(socket_fd); + return -1; + } + return socket_fd; +} + +ReadError ErrorNoToReadError(int error_number) { + switch (error_number) { + case EINTR: + case EAGAIN: + return ReadError::NonFatalError; + default: + return ReadError::FatalError; + } +} + +ReadError Read(int fd, void* buf, size_t count, int& len, + gnpsi::SocketInterface* socket_provider) { + ReadError err = ReadError::NoError; + len = socket_provider->Read(fd, buf, count); + if (len < 0) { + err = ErrorNoToReadError(errno); + } + return err; +} +} // namespace + +namespace gnpsi { + +const int kMaxBufferSize = 4096; + +void GnpsiRelayServer::StartRelayAndWait(GnpsiSenderInterface& service) { + LOG(INFO) << "Setting up socket to read packets."; + int fd = SetUpReadSocket(udp_port_, addr_family_, socket_provider_.get()); + if (fd < 0) { + LOG(ERROR) << "Socket creation failed, cannot relay samples."; + return; + } + char buf[kMaxBufferSize]; + LOG(INFO) << "Start reading sample packets."; + while (true) { + int len = 0; + ReadError err = Read(fd, buf, kMaxBufferSize, len, socket_provider_.get()); + if (err == ReadError::FatalError) { + LOG(ERROR) << "Read from socket failed with a fatal error: " + << strerror(errno); + // Stop reading and relaying samples if error is fatal + break; + } + if (err == ReadError::NonFatalError) { + VLOG(1) << "Read from socket failed with a non fatal error: " + << strerror(errno); + // Continue relaying samples and ignore current read if error is non fatal + continue; + } + VLOG(1) << "Received sample with size: " << len; + service.SendSamplePacket(std::string(buf, len)); + } +} +} // namespace gnpsi diff --git a/server/gnpsi_relay_server.h b/server/gnpsi_relay_server.h new file mode 100644 index 0000000..72c04e6 --- /dev/null +++ b/server/gnpsi_relay_server.h @@ -0,0 +1,67 @@ +#ifndef OPENCONFIG_GNPSI_SERVER_GNPSI_RELAY_SERVER_H_ +#define OPENCONFIG_GNPSI_SERVER_GNPSI_RELAY_SERVER_H_ + +#include +#include +#include + +#include + +#include "server/gnpsi_service_impl.h" + +namespace gnpsi { +class SocketInterface { + public: + virtual ~SocketInterface() {} + virtual int Socket(int domain, int type, int protocol) = 0; + virtual ssize_t Read(int fd, void* buf, size_t count) = 0; + virtual int Bind(int sockfd, const struct sockaddr* addr, + socklen_t addrlen) = 0; + virtual int Close(int fd) = 0; +}; + +class SocketProvider : public SocketInterface { + public: + // Sets up a socket using the socket system call + int Socket(int domain, int type, int protocol) override { + return socket(domain, type, protocol); + } + // Reads bytes using the read system call + ssize_t Read(int fd, void* buf, size_t count) override { + return read(fd, buf, count); + } + // Binds a socket to a address using the bind system call + int Bind(int sockfd, const struct sockaddr* addr, + socklen_t addrlen) override { + return bind(sockfd, addr, addrlen); + } + // Closes a socket using the close system call + int Close(int fd) override { return close(fd); } +}; + +class GnpsiRelayServer { + public: + GnpsiRelayServer(int udp_port, int addr_family) + : udp_port_(udp_port), + addr_family_(addr_family), + socket_provider_(new SocketProvider) {} + + // Start Relaying Samples by reading from the udp port. This is a blocking + // call and will keep on reading samples until a critical error is encountered + void StartRelayAndWait(GnpsiSenderInterface& service); + + // Mutator + void set_socket_interface(SocketInterface* new_interface) { + socket_provider_.reset(new_interface); + } + + private: + int udp_port_; + int addr_family_; + // The socket_ provides access calls to setup and read from sockets. Unit + // tests can replace the normally constructed interface with a mock interface. + std::unique_ptr socket_provider_; +}; + +} // namespace gnpsi +#endif // OPENCONFIG_GNPSI_SERVER_GNPSI_RELAY_SERVER_H_ diff --git a/server/gnpsi_relay_server_test.cc b/server/gnpsi_relay_server_test.cc new file mode 100644 index 0000000..bb9c3d2 --- /dev/null +++ b/server/gnpsi_relay_server_test.cc @@ -0,0 +1,114 @@ +#include "server/gnpsi_relay_server.h" + +#include +#include +#include + +#include +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "server/mock_gnpsi_service_impl.h" + +namespace gnpsi { +namespace { +using testing::_; +using testing::Invoke; +using testing::Return; + +const int kUdpPort = 0; +} // namespace + +// This class replaces the normal socket interface for test use +class MockSocket : public SocketInterface { + public: + MOCK_METHOD(int, Socket, (int domain, int type, int protocol), (override)); + MOCK_METHOD(ssize_t, Read, (int fd, void* buf, size_t count), (override)); + MOCK_METHOD(int, Bind, + (int sockfd, const struct sockaddr* addr, socklen_t addrlen), + (override)); + MOCK_METHOD(int, Close, (int fd), (override)); +}; + +class GnpsiRelayServerTest : public ::testing::Test { + protected: + GnpsiRelayServerTest() + : mock_socket_(new MockSocket), relay_server_(kUdpPort, AF_INET6) { + relay_server_.set_socket_interface(mock_socket_); + } + + MockSocket* mock_socket_; + MockGnpsiServiceImpl gnpsi_service_impl_; + GnpsiRelayServer relay_server_; +}; + +TEST_F(GnpsiRelayServerTest, RelaySuccess) { + EXPECT_CALL(*mock_socket_, Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)) + .WillOnce(Return(0)); + EXPECT_CALL(*mock_socket_, Bind(0, _, _)).WillOnce(Return(0)); + EXPECT_CALL(*mock_socket_, Read(0, _, 4096)) + .Times(2) + .WillOnce(Invoke([&]() { + errno = 0; + return 0; + })) + .WillOnce(Invoke([&]() { + errno = EFAULT; + return -1; + })); + // Assert a send call made in case of a successful read + EXPECT_CALL(gnpsi_service_impl_, SendSamplePacket(_, _)).Times(1); + relay_server_.StartRelayAndWait(gnpsi_service_impl_); +} + +TEST_F(GnpsiRelayServerTest, NoDeathOnNonCriticalReadError) { + EXPECT_CALL(*mock_socket_, Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)) + .WillOnce(Return(0)); + EXPECT_CALL(*mock_socket_, Bind(0, _, _)).WillOnce(Return(0)); + EXPECT_CALL(*mock_socket_, Read(0, _, 4096)) + .Times(2) + .WillOnce(Invoke([&]() { + errno = EINTR; + return -1; + })) + .WillOnce(Invoke([&]() { + errno = EFAULT; + return -1; + })); + // Assert no send call is made in case non fatal error is encountered + EXPECT_CALL(gnpsi_service_impl_, SendSamplePacket(_, _)).Times(0); + relay_server_.StartRelayAndWait(gnpsi_service_impl_); +} + +TEST_F(GnpsiRelayServerTest, DeathOnSocketCreationError) { + EXPECT_CALL(*mock_socket_, Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)) + .WillOnce(Return(-1)); + // Assert no read calls are made in case of socket setup failure + EXPECT_CALL(*mock_socket_, Read(0, _, 4096)).Times(0); + relay_server_.StartRelayAndWait(gnpsi_service_impl_); +} + +TEST_F(GnpsiRelayServerTest, DeathOnSocketBindError) { + EXPECT_CALL(*mock_socket_, Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)) + .WillOnce(Return(0)); + EXPECT_CALL(*mock_socket_, Bind(0, _, _)).WillOnce(Return(-1)); + EXPECT_CALL(*mock_socket_, Close(_)).WillOnce(Return(0)); + // Assert no read calls are made in case of socket setup failure + EXPECT_CALL(*mock_socket_, Read(0, _, 4096)).Times(0); + relay_server_.StartRelayAndWait(gnpsi_service_impl_); +} + +TEST_F(GnpsiRelayServerTest, DeathOnCriticalReadError) { + EXPECT_CALL(*mock_socket_, Socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP)) + .WillOnce(Return(0)); + EXPECT_CALL(*mock_socket_, Bind(0, _, _)).WillOnce(Return(0)); + EXPECT_CALL(*mock_socket_, Read(0, _, 4096)).Times(1).WillOnce(Invoke([&]() { + errno = EFAULT; + return -1; + })); + // Assert no send call is made in case fatal error is encountered + EXPECT_CALL(gnpsi_service_impl_, SendSamplePacket(_, _)).Times(0); + relay_server_.StartRelayAndWait(gnpsi_service_impl_); +} +} // namespace gnpsi diff --git a/server/gnpsi_service_impl.cc b/server/gnpsi_service_impl.cc new file mode 100644 index 0000000..1542ada --- /dev/null +++ b/server/gnpsi_service_impl.cc @@ -0,0 +1,125 @@ +#include "server/gnpsi_service_impl.h" + +#include +#include + +#include "absl/base/thread_annotations.h" +#include "glog/logging.h" +#include "absl/status/status.h" +#include "absl/strings/str_cat.h" +#include "absl/synchronization/mutex.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include "proto/gnpsi/gnpsi.pb.h" + +namespace gnpsi { + +void GnpsiConnection::WaitUntilClosed() { + absl::MutexLock l(&mu_); + auto stream_disconnected = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + return is_stream_closed_; + }; + mu_.Await(absl::Condition(&stream_disconnected)); +} + +Status GnpsiServiceImpl::Subscribe(ServerContext* context, + const Request* request, + ServerWriter* writer) { + if (context == nullptr) { + LOG(ERROR) << "StreamSflowSample context is a nullptr."; + return Status(StatusCode::INVALID_ARGUMENT, "Context cannot be nullptr."); + } + auto connection = std::make_unique(context, writer); + absl::Status status = AddConnection(connection.get()); + if (!status.ok()) { + return Status(StatusCode(status.code()), std::string(status.message())); + } + // Send Initial Metadata after adding connection indicating that the client + // should be receiving any new samples from this point on. + writer->SendInitialMetadata(); + // Blocks until it is closed by client and removes it from internal vector. + connection->WaitUntilClosed(); + DropConnection(connection.get()); + return Status::OK; +} + +int GnpsiServiceImpl::GetAliveConnections() { + // Check if any connection is stale. + int alive_connections = gnpsi_connections_.size(); + for (GnpsiConnection* connection : gnpsi_connections_) { + if (connection->IsContextCancelled()) { + alive_connections--; + connection->CloseStream(); + } + } + return alive_connections; +} + +absl::Status GnpsiServiceImpl::AddConnection(GnpsiConnection* connection) { + absl::MutexLock l(&mu_); + if (service_drained_) { + LOG(ERROR) << "Cannot add connections since the service has been drained."; + return absl::FailedPreconditionError( + "No more conections can be added after the service is drained."); + } + if (GetAliveConnections() >= client_max_number_) { + LOG(ERROR) << "GnpsiService only supports at most " << client_max_number_ + << " clients."; + return absl::FailedPreconditionError( + absl::StrCat("Number of clients exceeds maximum number. " + "GnpsiService supports at most ", + client_max_number_, " clients.")); + } + LOG(INFO) << "Add " << connection->GetPeerName() << " to client list."; + gnpsi_connections_.push_back(connection); + return absl::OkStatus(); +} + +// Iterate through the connections vector and remove `connection` if it exists. +void GnpsiServiceImpl::DropConnection(GnpsiConnection* connection) { + absl::MutexLock l(&mu_); + for (auto iter = gnpsi_connections_.begin(); iter != gnpsi_connections_.end(); + ++iter) { + if (*iter == connection) { + LOG(INFO) << "Dropping gNPSI connection to " << connection->GetPeerName(); + gnpsi_connections_.erase(iter); + break; + } + } +} + +void GnpsiServiceImpl::DrainConnections() { + absl::MutexLock lock(&mu_); + service_drained_ = true; + // Close all active connections. + for (auto it = gnpsi_connections_.begin(), end = gnpsi_connections_.end(); + it != end; it++) { + auto connection = *it; + connection->CloseStream(); + } +} + +void GnpsiServiceImpl::SendSamplePacket( + const std::string& sample_packet, ::gnpsi::SFlowMetadata::Version version) { + absl::MutexLock l(&mu_); + Sample response; + response.set_packet(sample_packet); + response.set_timestamp(absl::ToUnixNanos(absl::Now())); + response.mutable_sflow_metadata()->set_version(version); + for (auto it = gnpsi_connections_.begin(), end = gnpsi_connections_.end(); + it != end; it++) { + auto connection = *it; + if (!connection->IsContextCancelled() && + connection->SendResponse(response)) { + VLOG(1) << "Successfully sent sample packet to " + << connection->GetPeerName() << "."; + } else { + // If it fails to send response to any client, close this connection. + LOG(ERROR) << "Failed to send sample packet to " + << connection->GetPeerName() << "."; + connection->CloseStream(); + } + } +} + +} // namespace gnpsi diff --git a/server/gnpsi_service_impl.h b/server/gnpsi_service_impl.h new file mode 100644 index 0000000..9cc4836 --- /dev/null +++ b/server/gnpsi_service_impl.h @@ -0,0 +1,112 @@ +#ifndef OPENCONFIG_GNPSI_SERVER_GNPSI_SERVICE_IMPL_H_ +#define OPENCONFIG_GNPSI_SERVER_GNPSI_SERVICE_IMPL_H_ + +#include +#include + +#include "absl/synchronization/mutex.h" +#include "grpcpp/server_context.h" +#include "grpcpp/support/status.h" +#include "proto/gnpsi/gnpsi.grpc.pb.h" +#include "proto/gnpsi/gnpsi.pb.h" + +namespace gnpsi { + +using ::grpc::ServerContext; +using ::grpc::ServerWriter; +using ::grpc::ServerWriterInterface; +using ::grpc::Status; +using ::grpc::StatusCode; + +// Interface to gNPSI sender method +class GnpsiSenderInterface { + public: + virtual ~GnpsiSenderInterface() = default; + virtual void SendSamplePacket( + const std::string& sample_packet, + SFlowMetadata::Version version = SFlowMetadata::V5) = 0; + virtual void DrainConnections() = 0; +}; + +// A connection between a client and gNPSI server. This class is thread-safe. +class GnpsiConnection { + public: + explicit GnpsiConnection(ServerContext* context, + ServerWriterInterface* writer) + : context_(context), writer_(writer), is_stream_closed_(false) {} + + std::string GetPeerName() const { return context_->peer(); } + + bool IsContextCancelled() const { return context_->IsCancelled(); } + + bool SendResponse(const ::gnpsi::Sample& response) { + return writer_->Write(response); + } + + // Sets is_stream_closed_ to true. + void CloseStream() ABSL_LOCKS_EXCLUDED(mu_) { + absl::MutexLock l(&mu_); + is_stream_closed_ = true; + } + // Blocks until connection is closed. This can either be the stream is broken + // or context is cancelled. + void WaitUntilClosed() ABSL_LOCKS_EXCLUDED(mu_); + + private: + ServerContext* context_; + ServerWriterInterface<::gnpsi::Sample>* writer_; + // Lock for protecting is_stream_closed_. + absl::Mutex mu_; + // When set to true, it means stream is broken. + bool is_stream_closed_ ABSL_GUARDED_BY(mu_); +}; + +// Implementation of gNPSI server. +// 1. Supports Subscribe from clients. +// 2. Exposes an interface for server to send Sample response. +class GnpsiServiceImpl : public ::gnpsi::gNPSI::Service, + public GnpsiSenderInterface { + public: + explicit GnpsiServiceImpl(int client_max_number) + : client_max_number_(client_max_number) {} + + // Creates a new GnpsiConnection and adds it into gnpsi_connections_ vector. + // Returns a FAILED_PRECONDITION error if gnpsi_connections_ size has reached + // client_max_number_. + Status Subscribe(ServerContext* context, const Request* request, + ServerWriter* writer) + ABSL_LOCKS_EXCLUDED(mu_) override; + + // Sends Sample response to each client. + void SendSamplePacket(const std::string& sample_packet, + SFlowMetadata::Version version = SFlowMetadata::V5) + ABSL_LOCKS_EXCLUDED(mu_) override; + + // Closes all current conections and blocks any new incoming connections. + void DrainConnections() ABSL_LOCKS_EXCLUDED(mu_) override; + + private: + int client_max_number_; + // Lock for protecting member fields. + absl::Mutex mu_; + // Adds `connection` to internal vector. + // Returns true if `connection` is successfully added. + // Returns false and does nothing if number of alive connections reaches + // client_max_number_. + absl::Status AddConnection(GnpsiConnection* connection) + ABSL_LOCKS_EXCLUDED(mu_); + // Removes `connection` from gnpsi_connections_ if it exists. Otherwise, does + // nothing. + void DropConnection(GnpsiConnection* connection) ABSL_LOCKS_EXCLUDED(mu_); + // Returns the number of alive connections and marks stale connections as + // closed. + int GetAliveConnections() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); + // Maintains a vector of GnpsiConnection. + std::vector gnpsi_connections_ ABSL_GUARDED_BY(mu_); + // Indicates whether service drain has been initiated. + bool service_drained_ ABSL_GUARDED_BY(mu_) = false; +}; + +} // namespace gnpsi + +#endif // OPENCONFIG_GNPSI_SERVER_GNPSI_SERVICE_IMPL_H_ diff --git a/server/mock_gnpsi_service_impl.h b/server/mock_gnpsi_service_impl.h new file mode 100644 index 0000000..2553bcd --- /dev/null +++ b/server/mock_gnpsi_service_impl.h @@ -0,0 +1,19 @@ +#ifndef OPENCONFIG_GNPSI_SERVER_MOCK_GNPSI_SERVICE_IMPL_H_ +#define OPENCONFIG_GNPSI_SERVER_MOCK_GNPSI_SERVICE_IMPL_H_ + +#include "gmock/gmock.h" +#include "server/gnpsi_service_impl.h" + +namespace gnpsi { +class MockGnpsiServiceImpl : public GnpsiSenderInterface { + public: + virtual ~MockGnpsiServiceImpl() = default; + MOCK_METHOD(void, SendSamplePacket, + (const std::string& sample_packet, + SFlowMetadata::Version version), + (override)); + MOCK_METHOD(void, DrainConnections, (), (override)); +}; +} // namespace gnpsi + +#endif // OPENCONFIG_GNPSI_SERVER_MOCK_GNPSI_SERVICE_IMPL_H_