diff --git a/pkg/target/subscribe.go b/pkg/target/subscribe.go index 7fbe0615..0342113c 100644 --- a/pkg/target/subscribe.go +++ b/pkg/target/subscribe.go @@ -18,6 +18,7 @@ import ( "github.com/jhump/protoreflect/dynamic" "github.com/openconfig/gnmi/proto/gnmi" + "github.com/openconfig/gnmic/pkg/types" ) // Subscribe sends a gnmi.SubscribeRequest to the target *t, responses and error are sent to the target channels @@ -53,6 +54,7 @@ SUBSC: t.subscribeCancelFn[subscriptionName] = cancel subConfig := t.Subscriptions[subscriptionName] t.m.Unlock() + err = subscribeClient.Send(req) if err != nil { t.errors <- &TargetError{ @@ -64,93 +66,52 @@ SUBSC: goto SUBSC } - switch req.GetSubscribe().Mode { + switch req.GetSubscribe().GetMode() { case gnmi.SubscriptionList_STREAM: - for { - if nctx.Err() != nil { - return - } - response, err := subscribeClient.Recv() - if err != nil { - t.errors <- &TargetError{ - SubscriptionName: subscriptionName, - Err: err, - } - t.errors <- &TargetError{ - SubscriptionName: subscriptionName, - Err: fmt.Errorf("retrying in %s", t.Config.RetryTimer), - } - cancel() - time.Sleep(t.Config.RetryTimer) - goto SUBSC + err = t.handleStreamSubscriptionRcv(nctx, subscribeClient, subConfig) + if err != nil { + t.errors <- &TargetError{ + SubscriptionName: subscriptionName, + Err: err, } - t.subscribeResponses <- &SubscribeResponse{ - SubscriptionName: subscriptionName, - SubscriptionConfig: subConfig, - Response: response, + t.errors <- &TargetError{ + SubscriptionName: subscriptionName, + Err: fmt.Errorf("retrying in %s", t.Config.RetryTimer), } + cancel() + time.Sleep(t.Config.RetryTimer) + goto SUBSC } case gnmi.SubscriptionList_ONCE: - for { - response, err := subscribeClient.Recv() - if err != nil { - t.errors <- &TargetError{ - SubscriptionName: subscriptionName, - Err: err, - } - if errors.Is(err, io.EOF) { - return - } - t.errors <- &TargetError{ - SubscriptionName: subscriptionName, - Err: fmt.Errorf("retrying in %d", t.Config.RetryTimer), - } - cancel() - time.Sleep(t.Config.RetryTimer) - goto SUBSC - } - t.subscribeResponses <- &SubscribeResponse{ - SubscriptionName: subscriptionName, - SubscriptionConfig: subConfig, - Response: response, + err = t.handleONCESubscriptionRcv(nctx, subscribeClient, subConfig) + if err != nil { + t.errors <- &TargetError{ + SubscriptionName: subscriptionName, + Err: err, } - switch response.Response.(type) { - case *gnmi.SubscribeResponse_SyncResponse: + if errors.Is(err, io.EOF) { return } + t.errors <- &TargetError{ + SubscriptionName: subscriptionName, + Err: fmt.Errorf("retrying in %d", t.Config.RetryTimer), + } + cancel() + time.Sleep(t.Config.RetryTimer) + goto SUBSC } + return case gnmi.SubscriptionList_POLL: - for { - select { - case subName := <-t.pollChan: - err = t.SubscribeClients[subName].Send(&gnmi.SubscribeRequest{ - Request: &gnmi.SubscribeRequest_Poll{ - Poll: &gnmi.Poll{}, - }, - }) - if err != nil { - t.errors <- &TargetError{ - SubscriptionName: subscriptionName, - Err: fmt.Errorf("failed to send PollRequest: %v", err), - } - continue - } - response, err := subscribeClient.Recv() - if err != nil { - t.errors <- &TargetError{ - SubscriptionName: subscriptionName, - Err: err, - } - continue - } - t.subscribeResponses <- &SubscribeResponse{ - SubscriptionName: subscriptionName, - SubscriptionConfig: subConfig, - Response: response, - } - case <-nctx.Done(): - return + go t.listenPolls(nctx) + err = t.handlePollSubscriptionRcv(nctx, subscribeClient, subConfig) + if err != nil { + t.errors <- &TargetError{ + SubscriptionName: subscriptionName, + Err: err, } + cancel() + time.Sleep(t.Config.RetryTimer) + goto SUBSC } } } @@ -209,6 +170,20 @@ LOOP: return responses, nil } +func (t *Target) SubscribePoll(ctx context.Context, subName string) error { + t.m.Lock() + stream, ok := t.SubscribeClients[subName] + t.m.Unlock() + if !ok { + return fmt.Errorf("unknown subscription name %q", subName) + } + return stream.Send(&gnmi.SubscribeRequest{ + Request: &gnmi.SubscribeRequest_Poll{ + Poll: new(gnmi.Poll), + }, + }) +} + func (t *Target) ReadSubscriptions() (chan *SubscribeResponse, chan *TargetError) { return t.subscribeResponses, t.errors } @@ -264,3 +239,77 @@ func (t *Target) StopSubscription(name string) { delete(t.subscribeCancelFn, name) delete(t.SubscribeClients, name) } + +func (t *Target) listenPolls(ctx context.Context) { + for { + select { + case subName := <-t.pollChan: + err := t.SubscribePoll(ctx, subName) + if err != nil { + t.errors <- &TargetError{ + SubscriptionName: subName, + Err: fmt.Errorf("failed to send PollRequest to subscription %s: %v", subName, err), + } + } + case <-ctx.Done(): + return + } + } +} + +func (t *Target) handleStreamSubscriptionRcv(ctx context.Context, stream gnmi.GNMI_SubscribeClient, subConfig *types.SubscriptionConfig) error { + for { + if ctx.Err() != nil { + return nil + } + response, err := stream.Recv() + if err != nil { + return err + } + t.subscribeResponses <- &SubscribeResponse{ + SubscriptionName: subConfig.Name, + SubscriptionConfig: subConfig, + Response: response, + } + } +} + +func (t *Target) handleONCESubscriptionRcv(ctx context.Context, stream gnmi.GNMI_SubscribeClient, subConfig *types.SubscriptionConfig) error { + for { + if ctx.Err() != nil { + return nil + } + response, err := stream.Recv() + if err != nil { + return err + } + t.subscribeResponses <- &SubscribeResponse{ + SubscriptionName: subConfig.Name, + SubscriptionConfig: subConfig, + Response: response, + } + switch response.Response.(type) { + case *gnmi.SubscribeResponse_SyncResponse: + return nil + } + } +} + +func (t *Target) handlePollSubscriptionRcv(ctx context.Context, stream gnmi.GNMI_SubscribeClient, subConfig *types.SubscriptionConfig) error { + for { + select { + case <-ctx.Done(): + return nil + default: + response, err := stream.Recv() + if err != nil { + return err + } + t.subscribeResponses <- &SubscribeResponse{ + SubscriptionName: subConfig.Name, + SubscriptionConfig: subConfig, + Response: response, + } + } + } +} diff --git a/pkg/target/target.go b/pkg/target/target.go index 9a9437d9..4ce7bf5a 100644 --- a/pkg/target/target.go +++ b/pkg/target/target.go @@ -16,17 +16,15 @@ import ( "strings" "sync" + "github.com/jhump/protoreflect/desc" + "github.com/openconfig/gnmi/proto/gnmi" + "github.com/openconfig/gnmi/proto/gnmi_ext" + "github.com/openconfig/gnmic/pkg/types" "golang.org/x/net/proxy" "golang.org/x/oauth2" "google.golang.org/grpc" "google.golang.org/grpc/credentials/oauth" "google.golang.org/grpc/metadata" - - "github.com/jhump/protoreflect/desc" - "github.com/openconfig/gnmi/proto/gnmi" - "github.com/openconfig/gnmi/proto/gnmi_ext" - - "github.com/openconfig/gnmic/pkg/types" ) type TargetError struct {