Skip to content

Commit

Permalink
Improved logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Utkarsh Shukla committed Dec 9, 2024
1 parent a954998 commit 7e46b9a
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 55 deletions.
14 changes: 7 additions & 7 deletions app/client/agent_receiver_echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (e *AgentReceiverEcho) RunRequest(ctx context.Context, dest serviceconfig.D

stream, err := e.client.RunRequest(ctx, pbr)
if err != nil {
logger.Infow("unable to process request, agent won't close!", "error", err)
logger.Infow("unable to process request, agent won't close!", "error", err.Error())
w.WriteHeader(http.StatusBadGateway)
// return
}
Expand All @@ -109,15 +109,15 @@ func (e *AgentReceiverEcho) RunRequest(ctx context.Context, dest serviceconfig.D
// read and process another message
msg, err := stream.Recv()
if err == io.EOF {
logger.Infow("stream EOF, agent won't close!", err)
logger.Infow("stream EOF, agent won't close!", err.Error())
if !headersSent {
w.WriteHeader(http.StatusBadGateway)
}
continue
// return
}
if err != nil {
logger.Infow("error on stream, agent won't close!", "error", err)
logger.Infow("error on stream, agent won't close!", "error", err.Error())
if !headersSent {
w.WriteHeader(http.StatusBadGateway)
}
Expand All @@ -130,7 +130,7 @@ func (e *AgentReceiverEcho) RunRequest(ctx context.Context, dest serviceconfig.D
n, err := w.Write(data)
logger.Infow("Got data on stream", data, err)
if err != nil {
logger.Infow("send to client: %v, agent won't close!", err)
logger.Infow("send to client: %v, agent won't close!", err.Error())
continue
// return
}
Expand All @@ -143,7 +143,7 @@ func (e *AgentReceiverEcho) RunRequest(ctx context.Context, dest serviceconfig.D
case *pb.StreamFlow_Headers:
headers := msg.GetHeaders()
headersSent = true
logger.Infow("Got Headers: %v", headers)
logger.Infow("Got Headers")
for name := range w.Header() {
w.Header().Del(name)
}
Expand All @@ -154,14 +154,14 @@ func (e *AgentReceiverEcho) RunRequest(ctx context.Context, dest serviceconfig.D
}
w.WriteHeader(int(headers.StatusCode))
case *pb.StreamFlow_Cancel:
logger.Infow("stream canceled, agent wont close!", msg)
logger.Infow("stream canceled, agent wont close!", msg.String())
if !headersSent {
w.WriteHeader(http.StatusBadGateway)
}
continue
// return
case *pb.StreamFlow_Done:
logger.Infow("stream done, agent wont close!", msg)
logger.Infow("stream done, agent wont close!", msg.String())
if !headersSent {
w.WriteHeader(http.StatusBadGateway)
}
Expand Down
6 changes: 3 additions & 3 deletions app/client/agent_sender_echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,20 @@ func (e *AgentSenderEcho) RunDataSender(ctx context.Context) {

stream, err := e.c.DataFlowAgentToController(ctx)
if err != nil {
logger.Infow("error e.c.DataFlowAgentToController(): %v", err)
logger.Infow("error e.c.DataFlowAgentToController()", err.Error())
return
}

defer func() {
_, err := stream.CloseAndRecv()
if err != nil && err != io.EOF {
logger.Infow("stream.CloseAndRecv error: %v", err)
logger.Infow("stream.CloseAndRecv error", err.Error())
}
}()

err = stream.Send(pb.StreamflowWrapStreamID(e.streamID))
if err != nil {
logger.Infow("Cannot send stream id: %v", err)
logger.Infow("Cannot send stream id", err.Error())
return
}

Expand Down
46 changes: 21 additions & 25 deletions app/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ var (
func check(ctx context.Context, err error) {
_, logger := loggerFromContext(ctx)
if err != nil {
logger.Infow("Got an error", err)
logger.Infow("Got an error", err.Error())
}
}

Expand Down Expand Up @@ -110,7 +110,7 @@ func sendHello(ctx context.Context, c pb.TunnelServiceClient, info *pb.AgentInfo
AgentInfo: info,
}

logger.Infow("Sending Hello request", req)
logger.Infow("Sending Hello request", req.String())
return c.Hello(ctx, req)
}

Expand Down Expand Up @@ -139,13 +139,13 @@ func waitForRequest(ctx context.Context, c pb.TunnelServiceClient) error {
stream, err := c.WaitForRequest(ctx, &pb.WaitForRequestArgs{})
logger.Info("WaitForRequest successfully returned")
if err != nil {
logger.Infow("Wait for request grpc action failed, agent wont close!", err)
logger.Infow("Wait for request grpc action failed, agent wont close!", err.Error())
// return err
}
for {
req, err := stream.Recv()
if err != nil {
logger.Infow("Recieved error on stream.Recv() , agent wont close!", err)
logger.Infow("Recieved error on stream.Recv() , agent wont close!", err.Error())
continue
// return err
}
Expand All @@ -164,10 +164,10 @@ func waitForRequest(ctx context.Context, c pb.TunnelServiceClient) error {
doneChan := make(chan bool)
echo := MakeAgentSenderEcho(ctx, c, req.StreamId, doneChan)
ep, found := findEndpoint(ctx, req.Name, req.Type)
logger.Infow("Endpoint found", ep, found)
logger.Infow("Endpoint found", ep.Name, found)
if !found {
if err := echo.Fail(ctx, http.StatusBadGateway, fmt.Errorf("no such service on agent")); err != nil {
logger.Infow("Could not find endpoint", err)
logger.Infow("Could not find endpoint", err.Error())
}
echo.Shutdown(ctx)
continue
Expand All @@ -177,7 +177,7 @@ func waitForRequest(ctx context.Context, c pb.TunnelServiceClient) error {
go pprof.Do(ctx, labels, func(ctx context.Context) {
defer echo.Shutdown(ctx)
if err := ep.Instance.ExecuteHTTPRequest(ctx, session.agentID, echo, req); err != nil {
logger.Infow("Error in executing request", err)
logger.Infow("Error in executing request", err.Error())
}
})
}
Expand Down Expand Up @@ -219,7 +219,6 @@ func connect(ctx context.Context, address string, ta credentials.TransportCreden
PermitWithoutStream: true,
}

logger.Infow("Keep alive parameters", kparams)
gopts := []grpc.DialOption{
grpc.WithTransportCredentials(ta),
grpc.WithKeepaliveParams(kparams),
Expand All @@ -229,21 +228,19 @@ func connect(ctx context.Context, address string, ta credentials.TransportCreden
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
}

logger.Infow("gopts", gopts)
if config.InsecureControllerConnection {
gopts = append(gopts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
logger.Infow("config", config)
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer func() {
logger.Info("connect cancel() called")
cancel()
}()
conn, err := grpc.DialContext(ctx, address, gopts...)

logger.Infow("grpc connection", conn)
logger.Infow("grpc connection", conn.GetState().String())

logger.Infow("grpc error", err)
logger.Infow("grpc error", err.Error())
check(ctx, err)

return conn
Expand Down Expand Up @@ -426,7 +423,7 @@ func main() {
defer tracerProvider.Shutdown(ctx)

if c, err := loadConfig(*configFile); err != nil {
logger.Infow("loading config error: %v", err)
logger.Infow("loading config error:", err.Error())
} else {
config = c
logger.Infow("Set config successfully")
Expand All @@ -435,7 +432,7 @@ func main() {

agentServiceConfig, err := serviceconfig.LoadServiceConfig(config.ServicesConfigFile)
if err != nil {
logger.Infow("Error in loading services config:", err)
logger.Infow("Error in loading services config:", err.Error())
}

secretsLoader = makeSecretsLoader(ctx)
Expand All @@ -456,14 +453,14 @@ func main() {
logger.Infow("Set default deafult http client")
authToken, err := getAuthToken(config.AuthTokenFile)
if err != nil {
logger.Infow("Error in gettign authconfig", err)
logger.Infow("Error in gettign authconfig", err.Error())
}
session.authorization = authToken

logger.Infow("Session information initialized", session)
logger.Infow("Session information initialized ", session.agentID)
agentInfo, err := loadAgentInfo(config.ServicesConfigFile)
if err != nil {
logger.Infow("Error in getting agent Info", err)
logger.Infow("Error in getting agent Info", err.Error())
}

logger.Infow("Agent information initialized", agentInfo)
Expand All @@ -474,7 +471,7 @@ func main() {
if found {
caCertPool := x509.NewCertPool()
if ok := caCertPool.AppendCertsFromPEM(cacert); !ok {
logger.Infow("append certificate to pool: %v", err)
logger.Infow("append certificate to pool", err.Error())
}
tlsConfig.RootCAs = caCertPool
}
Expand All @@ -487,23 +484,22 @@ func main() {
defer conn.Close()
c := pb.NewTunnelServiceClient(conn)

logger.Infow("Tunnel service client", c)
hello, err := sendHello(ctx, c, agentInfo, endpoints, hostname, version.VersionString())

logger.Infow("Hello response", hello)
logger.Infow("Hello response", hello.String())
check(ctx, err)

session.sessionID = hello.InstanceId
session.agentID = hello.AgentId
logger.Infow("controller services", "endpoints", hello.Endpoints)

logger.Infow("Session set", session)
logger.Infow("Session set", session.agentID)
destinations := makeControllerDestination(ctx, endpoints)
echoManager := &AgentReceiverEchoManager{
client: c,
}
logger.Infow("destinations set", destinations)
logger.Infow("echo manager set", echoManager)
logger.Infow("destinations set")
logger.Infow("echo manager set")
for _, service := range agentServiceConfig.IncomingServices {
logger.Infow("Running HTTP server for service", service)

Expand All @@ -512,13 +508,13 @@ func main() {

go func() {
err := waitForRequest(ctx, c)
logger.Infow("waitForRequest failed: %v", err)
logger.Infow("waitForRequest failed", err.Error())
session.done <- struct{}{}
}()

go func() {
err := pinger(ctx, c, *tickTime)
logger.Infow("pinger failed: %v", err)
logger.Infow("pinger failed", err.Error())
session.done <- struct{}{}
}()

Expand Down
30 changes: 15 additions & 15 deletions app/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ func (s *server) WaitForRequest(in *pb.WaitForRequestArgs, stream pb.TunnelServi
ctx, logger := loggerFromContext(stream.Context())
logger.Infow("Entered WaitForRequest stub callback")
session, err := agents.findSession(stream.Context())
logger.Infow("Found session", session)
logger.Infow("Found session", session.AgentID)
if err != nil {
logger.Infow("Error in finding session, first call hello", err)
logger.Infow("Error in finding session, first call hello", err.Error())
return status.Error(codes.FailedPrecondition, "Hello must be called first")
}
// defer agents.removeSession(session)
Expand All @@ -112,19 +112,19 @@ func (s *server) WaitForRequest(in *pb.WaitForRequestArgs, stream pb.TunnelServi
}
if err := stream.Send(keepalive); err != nil {
s.closeAgentSession(ctx, session)
logger.Infow("WaitForRequest stream.Send() failed, dropping agent", "error", err)
logger.Infow("WaitForRequest stream.Send() failed, dropping agent error", err.Error())
return status.Error(codes.Canceled, "send failed")
}
case <-ctx.Done():
logger.Infow("closed connection with error: ", ctx.Err())
s.closeAgentSession(ctx, session)
return status.Error(codes.Canceled, "client closed connection")
case sr := <-session.requestChan:
logger.Infow("got request from request channel: ", sr)
logger.Infow("got request from request channel: ", sr.req.String())
s.streamManager.Register(ctx, session, sr.req.StreamId, sr.closechan, sr.echo)
if err := stream.Send(sr.req); err != nil {
s.closeAgentSession(ctx, session)
logger.Infow("WaitForRequest stream.Send() failed, dropping agent", "error", err)
logger.Infow("WaitForRequest stream.Send() failed, dropping agent error", err.Error())
return status.Error(codes.Canceled, "send failed")
}
}
Expand Down Expand Up @@ -158,12 +158,12 @@ func (s *server) DataFlowAgentToController(rpcstream pb.TunnelService_DataFlowAg
sugar := zap.Must(zap.NewProduction()).Sugar()
sugar.Infow("Entered DataFlowAgentToController")
if err != nil {
sugar.Infow("Error recieving rpc recv()", err)
sugar.Infow("Error recieving rpc recv()", err.Error())
return status.Error(codes.InvalidArgument, "unable to read streamID")
}
streamID, stream, err := s.getStreamAndID(ctx, event)
if err != nil {
sugar.Infow("Error getting stream", err)
sugar.Infow("Error getting stream", err.Error())
return err
}
ctx, logger := loggerFromContext(ctx, zap.String("streamID", streamID))
Expand All @@ -172,12 +172,12 @@ func (s *server) DataFlowAgentToController(rpcstream pb.TunnelService_DataFlowAg
for {
event, err := rpcstream.Recv()
if err == io.EOF {
logger.Infow("received error from stream", err)
logger.Infow("received error from stream", err.Error())
s.done(ctx, stream)
return nil
}
if err != nil {
logger.Infof("stream error: %v, sending Fail() on echo", err)
logger.Infof("stream error: %v, sending Fail() on echo", err.Error())
if serr, ok := status.FromError(err); ok {
if serr.Code() == codes.Canceled {
return nil
Expand Down Expand Up @@ -233,15 +233,15 @@ func (s *server) RunRequest(in *pb.TunnelRequest, stream pb.TunnelService_RunReq
)
logger := logging.WithContext(ctx).Sugar()
endpoint, found := findEndpoint(stream.Context(), in.Name, in.Type)
logger.Infow("Found endpoint", endpoint, found)
logger.Infow("Found endpoint", endpoint.Name)
if !found {

err := stream.Send(pb.StreamflowWrapHeaderMsg(
&pb.TunnelHeaders{
StatusCode: http.StatusBadGateway,
}))
if err != nil {
logger.Infow("unable to send: %v", err)
logger.Infow("unable to send", err.Error())
}
return err
}
Expand All @@ -253,7 +253,7 @@ func (s *server) RunRequest(in *pb.TunnelRequest, stream pb.TunnelService_RunReq
go func() {
err := endpoint.Instance.ExecuteHTTPRequest(ctx, "controller", echo, in)
if err != nil {
logger.Infow("ExecuteHTTPRequest", "error", err)
logger.Infow("ExecuteHTTPRequest", err.Error())
echo.Shutdown(ctx)
}
select {
Expand All @@ -270,15 +270,15 @@ func (s *server) RunRequest(in *pb.TunnelRequest, stream pb.TunnelService_RunReq
}
err := stream.Send(msg)
if err != nil {
logger.Infow("Send()", "error", err)
logger.Infow("Send() error", err.Error())
return err
}
case <-ctx.Done():
logger.Infow("Context done() called in Run request", ctx.Err())
return ctx.Err()
case err := <-donechan:
if err != nil {
logger.Infow("HTTP request ended with error: ", err)
logger.Infow("HTTP request ended with error: ", err.Error())
}
return err
}
Expand All @@ -302,7 +302,7 @@ func loadTLSCredentials(tlsPath string) (credentials.TransportCredentials, error

func runAgentGRPCServer(ctx context.Context, tlsPath string) {
ctx, logger := loggerFromContext(ctx, zap.String("component", "grpcServer"))
logger.Infow("starting agent GRPC server", "port", config.AgentListenPort)
logger.Infow("starting agent GRPC server on port", config.AgentListenPort)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", config.AgentListenPort))
if err != nil {
logger.Fatalw("failed to listen on agent port", "error", err)
Expand Down
Loading

0 comments on commit 7e46b9a

Please sign in to comment.