Skip to content

Commit

Permalink
Added cancel() logs to track where context is being cancelled
Browse files Browse the repository at this point in the history
  • Loading branch information
Utkarsh Shukla committed Dec 4, 2024
1 parent 3797ad6 commit a9ff3a5
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 13 deletions.
8 changes: 7 additions & 1 deletion app/client/agent_receiver_echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/opsmx/oes-birger/internal/logging"
"github.com/opsmx/oes-birger/internal/serviceconfig"
pb "github.com/opsmx/oes-birger/internal/tunnel"
"go.uber.org/zap"
)

// AgentReceiverEcho takes an agent-side connection and sends the request to the
Expand Down Expand Up @@ -67,10 +68,15 @@ func (e *AgentReceiverEcho) Cancel(ctx context.Context) error {
return nil
}

func (e *AgentReceiverEcho) RunRequestCancel(ctx context.Context, cl context.CancelFunc, logger *zap.SugaredLogger) {
logger.Info("ReunRequestCancel cancel() called")
cl()
}

func (e *AgentReceiverEcho) RunRequest(ctx context.Context, dest serviceconfig.Destination, body []byte, w http.ResponseWriter, r *http.Request) {
ctx, cancel := getHeaderContext(ctx, 0)
defer cancel()
logger := logging.WithContext(ctx).Sugar()
defer e.RunRequestCancel(ctx, cancel, logger)
headersSent := false
flusher := w.(http.Flusher)

Expand Down
8 changes: 7 additions & 1 deletion app/client/agent_sender_echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/opsmx/oes-birger/internal/serviceconfig"
pb "github.com/opsmx/oes-birger/internal/tunnel"
"go.uber.org/zap"
)

type AgentSenderEcho struct {
Expand Down Expand Up @@ -54,11 +55,16 @@ func (e *AgentSenderEcho) Shutdown(ctx context.Context) {
}
}

func RunDataSenderCancel(ctx context.Context, cl context.CancelFunc, logger *zap.SugaredLogger) {
logger.Info("RunDataSenderCancel cancel() called")
cl()
}

// TODO: return any errors to "caller"
func (e *AgentSenderEcho) RunDataSender(ctx context.Context) {
ctx, logger := loggerFromContext(ctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer RunDataSenderCancel(ctx, cancel, logger)

stream, err := e.c.DataFlowAgentToController(ctx)
if err != nil {
Expand Down
32 changes: 24 additions & 8 deletions app/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ var session = AgentSession{

func sendHello(ctx context.Context, c pb.TunnelServiceClient, info *pb.AgentInfo, endpoints []serviceconfig.ConfiguredEndpoint, hostname string, version string) (*pb.HelloResponse, error) {
ctx, cancel := getHeaderContext(ctx, session.rpcTimeout)
defer cancel()
ctx, logger := loggerFromContext(ctx)
defer func() {
logger.Info("sendHello cancel() called")
cancel()
}()

req := &pb.HelloRequest{
Hostname: hostname,
Expand Down Expand Up @@ -128,7 +132,10 @@ func waitForRequest(ctx context.Context, c pb.TunnelServiceClient) error {
logger.Info("Entered waitForRequest")
logger.Infof("Entered waitForRequest")
logger.Infow("Entered waitForRequest")
defer cancel()
defer func() {
logger.Info("sendWaitForRequest cancel() called")
cancel()
}()
stream, err := c.WaitForRequest(ctx, &pb.WaitForRequestArgs{})
logger.Info("WaitForRequest successfully returned")
if err != nil {
Expand All @@ -138,9 +145,8 @@ func waitForRequest(ctx context.Context, c pb.TunnelServiceClient) error {
for {
req, err := stream.Recv()
if err != nil {
// return err
logger.Infow("Recieved error on stream.Recv()", err)
continue
return err
}
if req.IsKeepalive {
logger.Info("Keepalive request received")
Expand Down Expand Up @@ -196,7 +202,10 @@ func pinger(ctx context.Context, c pb.TunnelServiceClient, tickTime int) error {
for {
time.Sleep(time.Duration(tickTime) * time.Second)
ctx, cancel := getHeaderContext(ctx, session.rpcTimeout)
defer cancel()
defer func() {
logger.Info("pinger cancel() called")
cancel()
}()
r, err := c.Ping(ctx, &pb.PingRequest{
Ts: uint64(time.Now().UnixNano()),
})
Expand All @@ -208,6 +217,7 @@ func pinger(ctx context.Context, c pb.TunnelServiceClient, tickTime int) error {
}

func connect(ctx context.Context, address string, ta credentials.TransportCredentials) *grpc.ClientConn {
ctx, logger := loggerFromContext(ctx)
kparams := keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 5 * time.Second,
Expand All @@ -225,7 +235,10 @@ func connect(ctx context.Context, address string, ta credentials.TransportCreden
gopts = append(gopts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
defer func() {
logger.Info("connect cancel() called")
cancel()
}()
conn, err := grpc.DialContext(ctx, address, gopts...)
check(ctx, err)

Expand Down Expand Up @@ -382,10 +395,13 @@ func makeSecretsLoader(ctx context.Context) secrets.SecretLoader {

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctx, logger := loggerFromContext(ctx)

defer func() {
logger.Info("main cancel() called")
cancel()
}()

logger.Infof("%s", version.VersionString())
flag.Parse()
if *showversion {
Expand Down
5 changes: 4 additions & 1 deletion app/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,11 @@ func loadAgentAuthKeyset(ctx context.Context) {

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx, logger := loggerFromContext(ctx)
defer func() {
logger.Info("main cancel() called")
cancel()
}()

logger.Infof("%s", version.VersionString())
flag.Parse()
Expand Down
5 changes: 4 additions & 1 deletion app/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,10 @@ func runAgentGRPCServer(ctx context.Context, tlsPath string) {
}

cleanerCtx, cleanerCancel := context.WithCancel(ctx)
defer cleanerCancel()
defer func() {
logger.Info("runAgentGRPSServer cleanerCancel() called")
cleanerCancel()
}()
go agents.checkSessionTimeouts(cleanerCtx, s.agentIdleTimeout)

jwtInterceptor := NewJWTInterceptor()
Expand Down
5 changes: 4 additions & 1 deletion internal/serviceconfig/generic_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,10 @@ func makeResponse(id string, response *http.Response) (*pb.TunnelHeaders, error)

func RunHTTPRequest(ctx context.Context, cancel context.CancelFunc, client *http.Client, req *pb.TunnelRequest, httpRequest *http.Request, echo Echo, baseURL string) {
logger := logging.WithContext(ctx).Sugar()
defer cancel()
defer func() {
logger.Info("RunHTTPRequest cancel() called")
cancel()
}()

requestURI := baseURL + req.URI
logger.Debugf("Sending HTTP request: %s to %s", req.Method, requestURI)
Expand Down

0 comments on commit a9ff3a5

Please sign in to comment.