diff --git a/cmd/maestro/server/event_server.go b/cmd/maestro/server/event_server.go index 26c5d09a..a96b2d97 100644 --- a/cmd/maestro/server/event_server.go +++ b/cmd/maestro/server/event_server.go @@ -109,7 +109,7 @@ func (s *MessageQueueEventServer) startSubscription(ctx context.Context) { return fmt.Errorf("failed to handle resource status update %s: %s", resource.ID, err.Error()) } default: - return fmt.Errorf("unsupported action %s", action) + return fmt.Errorf("failed to handle resource status update %s: unsupported action %s", resource.ID, action) } return nil @@ -277,7 +277,7 @@ func broadcastStatusEvent(ctx context.Context, } // broadcast the resource status to subscribers - log.V(4).Infof("Broadcast the resource status %s", resource.ID) + log.V(4).Infof("Broadcast the resource status, id=%s, statusEventType=%s", resource.ID, statusEvent.StatusEventType) eventBroadcaster.Broadcast(resource) // add the event instance record diff --git a/cmd/maestro/server/grpc_server.go b/cmd/maestro/server/grpc_server.go index 32e9fc40..88a3b0da 100644 --- a/cmd/maestro/server/grpc_server.go +++ b/cmd/maestro/server/grpc_server.go @@ -177,7 +177,8 @@ func (svr *GRPCServer) Publish(ctx context.Context, pubReq *pbv1.PublishRequest) return nil, fmt.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err) } - klog.V(4).Infof("receive the event with grpc server, %s", evt) + klog.V(4).Infof("receive the event from client, %s", evt.Context) + klog.V(10).Infof("receive the event from client, evt=%s", evt) // handler resync request if eventType.Action == types.ResyncRequestAction { @@ -247,7 +248,8 @@ func (svr *GRPCServer) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv return fmt.Errorf("failed to encode resource %s to cloudevent: %v", res.ID, err) } - klog.V(4).Infof("send the event to status subscribers, %s", evt) + klog.V(4).Infof("send the event to status subscribers, %s", evt.Context) + klog.V(10).Infof("send the event to status subscribers, evt=%s", evt) // WARNING: don't use "pbEvt, err := pb.ToProto(evt)" to convert cloudevent to protobuf pbEvt := &pbv1.CloudEvent{} diff --git a/cmd/maestro/server/healthcheck_server.go b/cmd/maestro/server/healthcheck_server.go index a418cba6..0081d54e 100755 --- a/cmd/maestro/server/healthcheck_server.go +++ b/cmd/maestro/server/healthcheck_server.go @@ -182,7 +182,7 @@ func (s *HealthCheckServer) healthCheckHandler(w http.ResponseWriter, r *http.Re return } if instance.Ready { - klog.Infof("Instance is ready") + klog.V(10).Infof("Instance is ready") w.WriteHeader(http.StatusOK) _, err := w.Write([]byte(`{"status": "ok"}`)) if err != nil { diff --git a/cmd/maestro/server/logging/logging.go b/cmd/maestro/server/logging/logging.go index cbacd444..a0e7c8f7 100755 --- a/cmd/maestro/server/logging/logging.go +++ b/cmd/maestro/server/logging/logging.go @@ -1,3 +1,4 @@ package logging -const LoggingThreshold int32 = 1 +// Using trace level for http request/response +const LoggingThreshold int32 = 10 diff --git a/docs/images/maestro-mqtt-pub-dataflow.drawio b/docs/images/maestro-mqtt-pub-dataflow.drawio new file mode 100644 index 00000000..c902aaa6 --- /dev/null +++ b/docs/images/maestro-mqtt-pub-dataflow.drawio @@ -0,0 +1,134 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/images/maestro-mqtt-pub-dataflow.png b/docs/images/maestro-mqtt-pub-dataflow.png new file mode 100644 index 00000000..6ac36686 Binary files /dev/null and b/docs/images/maestro-mqtt-pub-dataflow.png differ diff --git a/docs/images/maestro-mqtt-sub-dataflow.drawio b/docs/images/maestro-mqtt-sub-dataflow.drawio new file mode 100644 index 00000000..a2666ae8 --- /dev/null +++ b/docs/images/maestro-mqtt-sub-dataflow.drawio @@ -0,0 +1,173 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/images/maestro-mqtt-sub-dataflow.png b/docs/images/maestro-mqtt-sub-dataflow.png new file mode 100644 index 00000000..2eeb0fb9 Binary files /dev/null and b/docs/images/maestro-mqtt-sub-dataflow.png differ diff --git a/docs/maestro.md b/docs/maestro.md index 092d1e5c..5f211b36 100644 --- a/docs/maestro.md +++ b/docs/maestro.md @@ -42,3 +42,13 @@ The Maestro Server includes various components to fulfill its functions, as illu [maestro-resource-status-flow](https://swimlanes.io/#lVTLUuNADLzPV+gDeNw5bBXYBraKRxbCeWtia+MphhmvpCG/v5rYJinHhMU3S61WS2pbnHi8gHuLLBThCTkmqhGexUpiuPZxY4yp3jEIXFG0TW1ZkOD0BxwEL3IVCUiE1RgFGhl5y2jM2OrmaVHAM9J7zzYT3uPjHrfOScK/SbHGDBqLGLTQ+57nIDiybCK9KkTJWhsaPwr6jXmMnaxF8owzuvbjA+XnRYOI0nFnpW53Ig4SxjxE0QNcR4L7X8slbJy0wK0lbIDTimtynbgY9KXrIskJJG0lLUKIp7HjYQxoPijP4FHTtHGMO/ABqm9Ux8BOb6eHbC23LqzzghQl5FZJPupS11hBzklKIWTc2zC7C4oJNfLZkX30Fii80066i36uFWa+qeWUJe9BffU6WzzFFz6mpjfjtw5Solr2zQUE9weKRJSX8HMYRi8Qk2/gtrfKpOd33fKy3d55iV577siUG2pCTcHLorxcVudldVctq4kz/9fmt3O2ni2eV3n19Tc7HXfmF3CEZfYvMlKv97/7T0myYbZIHi1ESjx5/gE=) ![maestro-resource-status-flow](./images/maestro-resource-status-flow.png) + +## Maestro Resource Data Flow + +### Maestro Publish Resource with MQTT + +![maestro-mqtt-pub-dataflow](./images/maestro-mqtt-pub-dataflow.png) + +### Maestro Subscribe Resource Status with MQTT + +![maestro-mqtt-sub-dataflow](./images/maestro-mqtt-sub-dataflow.png) diff --git a/pkg/client/grpcauthorizer/kube_authorizer.go b/pkg/client/grpcauthorizer/kube_authorizer.go index 87da1040..1b06e8d3 100644 --- a/pkg/client/grpcauthorizer/kube_authorizer.go +++ b/pkg/client/grpcauthorizer/kube_authorizer.go @@ -26,8 +26,6 @@ var _ GRPCAuthorizer = &KubeGRPCAuthorizer{} // TokenReview validates the given token and returns the user and groups associated with it. func (k *KubeGRPCAuthorizer) TokenReview(ctx context.Context, token string) (user string, groups []string, err error) { - klog.V(4).Infof("TokenReview: token=%s", token) - tr, err := k.kubeClient.AuthenticationV1().TokenReviews().Create(ctx, &authenticationv1.TokenReview{ Spec: authenticationv1.TokenReviewSpec{ Token: token, diff --git a/pkg/db/db_session/default.go b/pkg/db/db_session/default.go index 89aa3d2a..59809a1f 100755 --- a/pkg/db/db_session/default.go +++ b/pkg/db/db_session/default.go @@ -147,7 +147,7 @@ func waitForNotification(ctx context.Context, l *pq.Listener, dbConfig *config.D return case n := <-l.Notify: if n != nil { - logger.V(4).Infof("Received event from channel [%s] : %s", n.Channel, n.Extra) + logger.V(10).Infof("Received event from channel [%s] : %s", n.Channel, n.Extra) callback(n.Extra) } else { // nil notification means the connection was closed diff --git a/pkg/services/resource.go b/pkg/services/resource.go index 1adb8a96..40e2ee8e 100755 --- a/pkg/services/resource.go +++ b/pkg/services/resource.go @@ -193,7 +193,8 @@ func (s *sqlResourceService) UpdateStatus(ctx context.Context, resource *api.Res return nil, false, errors.GeneralError("Unable to convert resource status to cloudevent: %s", err) } - logger.V(4).Info(fmt.Sprintf("Updating resource status with event %s", resourceStatusEvent)) + logger.V(4).Info(fmt.Sprintf("Updating resource status, id=%s", resource.ID)) + logger.V(10).Info(fmt.Sprintf("Updating resource status, evt=%s", resourceStatusEvent)) sequenceID, err := cloudeventstypes.ToString(resourceStatusEvent.Context.GetExtensions()[cetypes.ExtensionStatusUpdateSequenceID]) if err != nil {