diff --git a/platform/view/core/manager/context.go b/platform/view/core/manager/context.go index 89a7f3b78..cfb487911 100644 --- a/platform/view/core/manager/context.go +++ b/platform/view/core/manager/context.go @@ -242,7 +242,12 @@ func (ctx *ctx) GetSession(f view.View, party view.Identity) (view.Session, erro if err != nil { return nil, err } - ctx.sessions[id.UniqueID()] = s + key := id.UniqueID() + ctx.sessions[key] = &disposableSession{ + Session: s, + ctx: ctx, + key: key, + } } else { if logger.IsEnabledFor(zapcore.DebugLevel) { logger.Debugf("[%s] Reusing session [to:%s]", ctx.me, id) @@ -267,7 +272,11 @@ func (ctx *ctx) GetSessionByID(id string, party view.Identity) (view.Session, er if err != nil { return nil, err } - ctx.sessions[key] = s + ctx.sessions[key] = &disposableSession{ + Session: s, + ctx: ctx, + key: key, + } } else { if logger.IsEnabledFor(zapcore.DebugLevel) { logger.Debugf("[%s] Reusing session with given id [id:%s][to:%s]", id, ctx.me, party) @@ -367,7 +376,27 @@ func (ctx *ctx) safeInvoke(f func()) { f() } +func (ctx *ctx) disposeSession(key string) { + ctx.sessionsLock.Lock() + defer ctx.sessionsLock.Unlock() + + delete(ctx.sessions, key) +} + type localContext interface { disposableContext cleanup() } + +type disposableSession struct { + view.Session + ctx *ctx + key string +} + +// Close releases all the resources allocated by this session +func (s *disposableSession) Close() { + // remove from context + s.ctx.disposeSession(s.key) + s.Session.Close() +} diff --git a/platform/view/services/comm/session.go b/platform/view/services/comm/session.go index 70cd4f489..3711306d0 100644 --- a/platform/view/services/comm/session.go +++ b/platform/view/services/comm/session.go @@ -28,6 +28,7 @@ type NetworkStreamSession struct { incoming chan *view.Message streams map[*streamHandler]struct{} closed bool + whoClosed string mutex sync.Mutex } @@ -40,6 +41,7 @@ func (n *NetworkStreamSession) Info() view.SessionInfo { Endpoint: n.endpointAddress, EndpointPKID: n.endpointID, Closed: n.closed, + WhoClosed: n.whoClosed, } n.mutex.Unlock() return ret @@ -103,6 +105,7 @@ func (n *NetworkStreamSession) closeInternal() { } close(n.incoming) n.closed = true + n.whoClosed = string(debug.Stack()) n.streams = make(map[*streamHandler]struct{}) if logger.IsEnabledFor(zapcore.DebugLevel) { diff --git a/platform/view/services/session/bidi.go b/platform/view/services/session/bidi.go index c081ea357..12464a186 100644 --- a/platform/view/services/session/bidi.go +++ b/platform/view/services/session/bidi.go @@ -9,6 +9,7 @@ package session import ( "context" "encoding/base64" + "runtime/debug" "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" "github.com/pkg/errors" @@ -134,4 +135,5 @@ func (s *localSession) Receive() <-chan *view.Message { func (s *localSession) Close() { s.info.Closed = true + s.info.WhoClosed = string(debug.Stack()) } diff --git a/platform/view/view/session.go b/platform/view/view/session.go index c4cb3b414..3c1c47ab4 100644 --- a/platform/view/view/session.go +++ b/platform/view/view/session.go @@ -42,6 +42,7 @@ type SessionInfo struct { Endpoint string EndpointPKID []byte Closed bool + WhoClosed string } func (i *SessionInfo) String() string {