From a798ca23d97a0a6b7212131ea89f7eac69082df3 Mon Sep 17 00:00:00 2001 From: Nimrod Shlagman Date: Mon, 7 Nov 2022 17:24:58 +0200 Subject: [PATCH 01/11] fix(ScheduleSend): disable scheduleSend when onMessage callback is happening --- client/internal/httpsender.go | 2 +- client/internal/httpsender_test.go | 23 +++++++++++++++++ client/internal/receivedprocessor.go | 11 ++++++++- client/internal/sender.go | 37 ++++++++++++++++++++++++++-- client/internal/wsreceiver.go | 3 ++- client/internal/wsreceiver_test.go | 4 +-- client/wsclient.go | 1 + 7 files changed, 74 insertions(+), 7 deletions(-) diff --git a/client/internal/httpsender.go b/client/internal/httpsender.go index b2a9938b..cfa5e9ca 100644 --- a/client/internal/httpsender.go +++ b/client/internal/httpsender.go @@ -76,7 +76,7 @@ func (h *HTTPSender) Run( ) { h.url = url h.callbacks = callbacks - h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities) + h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, &h.SenderCommon, clientSyncedState, packagesStateProvider, capabilities) for { pollingTimer := time.NewTimer(time.Millisecond * time.Duration(atomic.LoadInt64(&h.pollingIntervalMs))) diff --git a/client/internal/httpsender_test.go b/client/internal/httpsender_test.go index 31f1aabb..cf0e3cae 100644 --- a/client/internal/httpsender_test.go +++ b/client/internal/httpsender_test.go @@ -13,6 +13,29 @@ import ( "github.com/stretchr/testify/assert" ) +func TestDelaySchedule(t *testing.T) { + sender := NewHTTPSender(&sharedinternal.NopLogger{}) + pendingMessageChan := sender.hasPendingMessage + scheduleSendDelayChan := sender.registerScheduleSend + sender.DisableScheduleSend() + + // Verify ScheduleSend is not writing to message channel when disabled + sender.ScheduleSend() + assert.Equal(t, 0, len(pendingMessageChan)) + assert.Equal(t, 1, len(scheduleSendDelayChan)) + + // Verify ScheduleSend is writing to message channel when enabled + sender.EnableScheduleSend() + assert.Equal(t, 1, len(pendingMessageChan)) + assert.Equal(t, 0, len(scheduleSendDelayChan)) + + // ScheduleSend sanity check after enabling + sender.ScheduleSend() + assert.Equal(t, 1, len(pendingMessageChan)) + assert.Equal(t, 0, len(scheduleSendDelayChan)) + +} + func TestHTTPSenderRetryForStatusTooManyRequests(t *testing.T) { var connectionAttempts int64 diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index 942505a0..685f9dcb 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -19,6 +19,9 @@ type receivedProcessor struct { // what will be sent later. sender Sender + // A senderCommon to handle the scheduling of sending. + senderCommon *SenderCommon + // Client state storage. This is needed if the Server asks to report the state. clientSyncedState *ClientSyncedState @@ -32,6 +35,7 @@ func newReceivedProcessor( logger types.Logger, callbacks types.Callbacks, sender Sender, + senderCommon *SenderCommon, clientSyncedState *ClientSyncedState, packagesStateProvider types.PackagesStateProvider, capabilities protobufs.AgentCapabilities, @@ -40,6 +44,7 @@ func newReceivedProcessor( logger: logger, callbacks: callbacks, sender: sender, + senderCommon: senderCommon, clientSyncedState: clientSyncedState, packagesStateProvider: packagesStateProvider, capabilities: capabilities, @@ -127,7 +132,6 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro msgData.AgentIdentification = msg.AgentIdentification } } - r.callbacks.OnMessage(ctx, msgData) r.rcvOpampConnectionSettings(ctx, msg.ConnectionSettings) @@ -143,6 +147,11 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro } } +func (r *receivedProcessor) onMessage(ctx context.Context, msgData *types.MessageData) { + r.senderCommon.DisableScheduleSend() + r.callbacks.OnMessage(ctx, msgData) + r.senderCommon.EnableScheduleSend() +} func (r *receivedProcessor) hasCapability(capability protobufs.AgentCapabilities) bool { return r.capabilities&capability != 0 } diff --git a/client/internal/sender.go b/client/internal/sender.go index f418b4c2..1f976885 100644 --- a/client/internal/sender.go +++ b/client/internal/sender.go @@ -31,6 +31,12 @@ type SenderCommon struct { // Indicates that there is a pending message to send. hasPendingMessage chan struct{} + // Indicates onMessage callback is running + onMessageRunning bool + + // Indicates ScheduleSend() was called during onMessage callback run + registerScheduleSend chan struct{} + // The next message to send. nextMessage NextMessage } @@ -39,8 +45,9 @@ type SenderCommon struct { // the WebSocket and HTTP Sender implementations. func NewSenderCommon() SenderCommon { return SenderCommon{ - hasPendingMessage: make(chan struct{}, 1), - nextMessage: NewNextMessage(), + hasPendingMessage: make(chan struct{}, 1), + registerScheduleSend: make(chan struct{}, 1), + nextMessage: NewNextMessage(), } } @@ -48,6 +55,16 @@ func NewSenderCommon() SenderCommon { // is now ready to be sent. If there is no pending message (e.g. the NextMessage was // already sent and "pending" flag is reset) then no message will be sent. func (h *SenderCommon) ScheduleSend() { + if h.onMessageRunning { + // onMessage callback is running, ScheduleSend() will rerun after it is done + select { + case h.registerScheduleSend <- struct{}{}: + default: + break + } + return + } + // Set pending flag. Don't block on writing to channel. select { case h.hasPendingMessage <- struct{}{}: @@ -62,6 +79,22 @@ func (h *SenderCommon) NextMessage() *NextMessage { return &h.nextMessage } +// DisableScheduleSend temporary preventing ScheduleSend from writing to channel +func (h *SenderCommon) DisableScheduleSend() { + h.onMessageRunning = true +} + +// EnableScheduleSend re-enables ScheduleSend and checks if it was called during onMessage callback +func (h *SenderCommon) EnableScheduleSend() { + h.onMessageRunning = false + select { + case <-h.registerScheduleSend: + h.ScheduleSend() + default: + break + } +} + // SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent. // Can be called concurrently, normally is called when a message is received from the // Server that instructs us to change our instance UID. diff --git a/client/internal/wsreceiver.go b/client/internal/wsreceiver.go index 75461a2c..d94233b1 100644 --- a/client/internal/wsreceiver.go +++ b/client/internal/wsreceiver.go @@ -27,6 +27,7 @@ func NewWSReceiver( callbacks types.Callbacks, conn *websocket.Conn, sender *WSSender, + senderCommon *SenderCommon, clientSyncedState *ClientSyncedState, packagesStateProvider types.PackagesStateProvider, capabilities protobufs.AgentCapabilities, @@ -36,7 +37,7 @@ func NewWSReceiver( logger: logger, sender: sender, callbacks: callbacks, - processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities), + processor: newReceivedProcessor(logger, callbacks, sender, senderCommon, clientSyncedState, packagesStateProvider, capabilities), } return w diff --git a/client/internal/wsreceiver_test.go b/client/internal/wsreceiver_test.go index f1595a0c..d82db4c1 100644 --- a/client/internal/wsreceiver_test.go +++ b/client/internal/wsreceiver_test.go @@ -74,7 +74,7 @@ func TestServerToAgentCommand(t *testing.T) { remoteConfigStatus: &protobufs.RemoteConfigStatus{}, } sender := WSSender{} - receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &clientSyncedState, nil, 0) + receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &sender.SenderCommon, &clientSyncedState, nil, 0) receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{ Command: test.command, }) @@ -97,7 +97,7 @@ func TestServerToAgentCommandExclusive(t *testing.T) { }, } clientSyncedState := ClientSyncedState{} - receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, &clientSyncedState, nil, 0) + receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, nil, &clientSyncedState, nil, 0) receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{ Command: &protobufs.ServerToAgentCommand{ Type: protobufs.CommandType_CommandType_Restart, diff --git a/client/wsclient.go b/client/wsclient.go index d07f7bda..02be4d4c 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -236,6 +236,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { c.common.Callbacks, c.conn, c.sender, + &c.sender.SenderCommon, &c.common.ClientSyncedState, c.common.PackagesStateProvider, c.common.Capabilities, From 122c3f30ca02f5506fafddd77612c0f05d6af277 Mon Sep 17 00:00:00 2001 From: Nimrod Shlagman Date: Tue, 8 Nov 2022 13:45:40 +0200 Subject: [PATCH 02/11] fix(ScheduleSend): fix data race with atomic wrapper --- client/internal/httpsender_test.go | 10 ++++++++++ client/internal/receivedprocessor.go | 2 +- client/internal/sender.go | 15 +++++++++++---- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/client/internal/httpsender_test.go b/client/internal/httpsender_test.go index cf0e3cae..377b7f40 100644 --- a/client/internal/httpsender_test.go +++ b/client/internal/httpsender_test.go @@ -24,11 +24,21 @@ func TestDelaySchedule(t *testing.T) { assert.Equal(t, 0, len(pendingMessageChan)) assert.Equal(t, 1, len(scheduleSendDelayChan)) + // Repeat process to verify non-blocking and no change in channel length + sender.ScheduleSend() + assert.Equal(t, 0, len(pendingMessageChan)) + assert.Equal(t, 1, len(scheduleSendDelayChan)) + // Verify ScheduleSend is writing to message channel when enabled sender.EnableScheduleSend() assert.Equal(t, 1, len(pendingMessageChan)) assert.Equal(t, 0, len(scheduleSendDelayChan)) + // Repeat process to verify non-blocking and no change in channel length + sender.EnableScheduleSend() + assert.Equal(t, 1, len(pendingMessageChan)) + assert.Equal(t, 0, len(scheduleSendDelayChan)) + // ScheduleSend sanity check after enabling sender.ScheduleSend() assert.Equal(t, 1, len(pendingMessageChan)) diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index 685f9dcb..e34ff740 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -132,7 +132,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro msgData.AgentIdentification = msg.AgentIdentification } } - r.callbacks.OnMessage(ctx, msgData) + r.onMessage(ctx, msgData) r.rcvOpampConnectionSettings(ctx, msg.ConnectionSettings) diff --git a/client/internal/sender.go b/client/internal/sender.go index 1f976885..08d33a8b 100644 --- a/client/internal/sender.go +++ b/client/internal/sender.go @@ -2,6 +2,7 @@ package internal import ( "errors" + "sync/atomic" "github.com/oklog/ulid/v2" "github.com/open-telemetry/opamp-go/protobufs" @@ -32,7 +33,7 @@ type SenderCommon struct { hasPendingMessage chan struct{} // Indicates onMessage callback is running - onMessageRunning bool + onMessageRunning int32 // Indicates ScheduleSend() was called during onMessage callback run registerScheduleSend chan struct{} @@ -48,6 +49,7 @@ func NewSenderCommon() SenderCommon { hasPendingMessage: make(chan struct{}, 1), registerScheduleSend: make(chan struct{}, 1), nextMessage: NewNextMessage(), + onMessageRunning: 0, } } @@ -55,7 +57,7 @@ func NewSenderCommon() SenderCommon { // is now ready to be sent. If there is no pending message (e.g. the NextMessage was // already sent and "pending" flag is reset) then no message will be sent. func (h *SenderCommon) ScheduleSend() { - if h.onMessageRunning { + if h.IsOnMessageRunning() { // onMessage callback is running, ScheduleSend() will rerun after it is done select { case h.registerScheduleSend <- struct{}{}: @@ -79,14 +81,19 @@ func (h *SenderCommon) NextMessage() *NextMessage { return &h.nextMessage } +// IsOnMessageRunning returns true if onMessage callback is running +func (h *SenderCommon) IsOnMessageRunning() bool { + return atomic.LoadInt32(&h.onMessageRunning) != 0 +} + // DisableScheduleSend temporary preventing ScheduleSend from writing to channel func (h *SenderCommon) DisableScheduleSend() { - h.onMessageRunning = true + atomic.StoreInt32(&h.onMessageRunning, 1) } // EnableScheduleSend re-enables ScheduleSend and checks if it was called during onMessage callback func (h *SenderCommon) EnableScheduleSend() { - h.onMessageRunning = false + atomic.StoreInt32(&h.onMessageRunning, 0) select { case <-h.registerScheduleSend: h.ScheduleSend() From 51179e644ebea6ae3a3c1ff248ffd457a19d9f1a Mon Sep 17 00:00:00 2001 From: Nimrod Shlagman Date: Tue, 15 Nov 2022 11:34:25 +0200 Subject: [PATCH 03/11] cr fix: use defer to EnableScheduleSend() --- client/internal/receivedprocessor.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index e34ff740..d27e4e99 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -55,6 +55,16 @@ func newReceivedProcessor( // the received message and performs any processing necessary based on what fields are set. // This function will call any relevant callbacks. func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *protobufs.ServerToAgent) { + if r.senderCommon != nil { + r.senderCommon.DisableScheduleSend() + } + + defer func() { + if r.senderCommon != nil { + r.senderCommon.EnableScheduleSend() + } + }() + if r.callbacks != nil { if msg.Command != nil { r.rcvCommand(msg.Command) @@ -132,7 +142,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro msgData.AgentIdentification = msg.AgentIdentification } } - r.onMessage(ctx, msgData) + r.callbacks.OnMessage(ctx, msgData) r.rcvOpampConnectionSettings(ctx, msg.ConnectionSettings) @@ -147,11 +157,6 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro } } -func (r *receivedProcessor) onMessage(ctx context.Context, msgData *types.MessageData) { - r.senderCommon.DisableScheduleSend() - r.callbacks.OnMessage(ctx, msgData) - r.senderCommon.EnableScheduleSend() -} func (r *receivedProcessor) hasCapability(capability protobufs.AgentCapabilities) bool { return r.capabilities&capability != 0 } From 063f7e4549a64fb97f4926bb01ed0900f530654e Mon Sep 17 00:00:00 2001 From: Nimrod Shlagman Date: Sun, 20 Nov 2022 16:33:33 +0200 Subject: [PATCH 04/11] cr fix: expose enable/disableScheduleSend via sender --- client/internal/httpsender.go | 2 +- client/internal/receivedprocessor.go | 11 ++++------- client/internal/sender.go | 6 ++++++ client/internal/wsreceiver.go | 3 +-- client/internal/wsreceiver_test.go | 4 ++-- client/wsclient.go | 8 ++++---- 6 files changed, 18 insertions(+), 16 deletions(-) diff --git a/client/internal/httpsender.go b/client/internal/httpsender.go index cfa5e9ca..b2a9938b 100644 --- a/client/internal/httpsender.go +++ b/client/internal/httpsender.go @@ -76,7 +76,7 @@ func (h *HTTPSender) Run( ) { h.url = url h.callbacks = callbacks - h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, &h.SenderCommon, clientSyncedState, packagesStateProvider, capabilities) + h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities) for { pollingTimer := time.NewTimer(time.Millisecond * time.Duration(atomic.LoadInt64(&h.pollingIntervalMs))) diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index d27e4e99..f43843d8 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -35,7 +35,6 @@ func newReceivedProcessor( logger types.Logger, callbacks types.Callbacks, sender Sender, - senderCommon *SenderCommon, clientSyncedState *ClientSyncedState, packagesStateProvider types.PackagesStateProvider, capabilities protobufs.AgentCapabilities, @@ -44,7 +43,6 @@ func newReceivedProcessor( logger: logger, callbacks: callbacks, sender: sender, - senderCommon: senderCommon, clientSyncedState: clientSyncedState, packagesStateProvider: packagesStateProvider, capabilities: capabilities, @@ -55,16 +53,15 @@ func newReceivedProcessor( // the received message and performs any processing necessary based on what fields are set. // This function will call any relevant callbacks. func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *protobufs.ServerToAgent) { - if r.senderCommon != nil { - r.senderCommon.DisableScheduleSend() + if r.sender != nil { + r.sender.DisableScheduleSend() } defer func() { - if r.senderCommon != nil { - r.senderCommon.EnableScheduleSend() + if r.sender != nil { + r.sender.EnableScheduleSend() } }() - if r.callbacks != nil { if msg.Command != nil { r.rcvCommand(msg.Command) diff --git a/client/internal/sender.go b/client/internal/sender.go index 08d33a8b..fd6ef7ee 100644 --- a/client/internal/sender.go +++ b/client/internal/sender.go @@ -21,6 +21,12 @@ type Sender interface { // "pending" flag is reset) then no message will be sent. ScheduleSend() + // DisableScheduleSend temporary preventing ScheduleSend from writing to channel + DisableScheduleSend() + + // EnableScheduleSend re-enables ScheduleSend and checks if it was called during onMessage callback + EnableScheduleSend() + // SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent. SetInstanceUid(instanceUid string) error } diff --git a/client/internal/wsreceiver.go b/client/internal/wsreceiver.go index d94233b1..75461a2c 100644 --- a/client/internal/wsreceiver.go +++ b/client/internal/wsreceiver.go @@ -27,7 +27,6 @@ func NewWSReceiver( callbacks types.Callbacks, conn *websocket.Conn, sender *WSSender, - senderCommon *SenderCommon, clientSyncedState *ClientSyncedState, packagesStateProvider types.PackagesStateProvider, capabilities protobufs.AgentCapabilities, @@ -37,7 +36,7 @@ func NewWSReceiver( logger: logger, sender: sender, callbacks: callbacks, - processor: newReceivedProcessor(logger, callbacks, sender, senderCommon, clientSyncedState, packagesStateProvider, capabilities), + processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities), } return w diff --git a/client/internal/wsreceiver_test.go b/client/internal/wsreceiver_test.go index d82db4c1..02348beb 100644 --- a/client/internal/wsreceiver_test.go +++ b/client/internal/wsreceiver_test.go @@ -74,7 +74,7 @@ func TestServerToAgentCommand(t *testing.T) { remoteConfigStatus: &protobufs.RemoteConfigStatus{}, } sender := WSSender{} - receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &sender.SenderCommon, &clientSyncedState, nil, 0) + receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &clientSyncedState, nil, 0) receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{ Command: test.command, }) @@ -97,7 +97,7 @@ func TestServerToAgentCommandExclusive(t *testing.T) { }, } clientSyncedState := ClientSyncedState{} - receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, nil, &clientSyncedState, nil, 0) + receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &WSSender{}, &clientSyncedState, nil, 0) receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{ Command: &protobufs.ServerToAgentCommand{ Type: protobufs.CommandType_CommandType_Restart, diff --git a/client/wsclient.go b/client/wsclient.go index 02be4d4c..b1f2bdc3 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -193,9 +193,10 @@ func (c *wsClient) ensureConnected(ctx context.Context) error { } // runOneCycle performs the following actions: -// 1. connect (try until succeeds). -// 2. send first status report. -// 3. receive and process messages until error happens. +// 1. connect (try until succeeds). +// 2. send first status report. +// 3. receive and process messages until error happens. +// // If it encounters an error it closes the connection and returns. // Will stop and return if Stop() is called (ctx is cancelled, isStopping is set). func (c *wsClient) runOneCycle(ctx context.Context) { @@ -236,7 +237,6 @@ func (c *wsClient) runOneCycle(ctx context.Context) { c.common.Callbacks, c.conn, c.sender, - &c.sender.SenderCommon, &c.common.ClientSyncedState, c.common.PackagesStateProvider, c.common.Capabilities, From 58f00ebff19da7d4f2de28276f8829c168479016 Mon Sep 17 00:00:00 2001 From: Nimrod Shlagman Date: Mon, 21 Nov 2022 17:08:00 +0200 Subject: [PATCH 05/11] rename onMessageRunning to isSendingDisabled --- client/internal/receivedprocessor.go | 14 +++----------- client/internal/sender.go | 17 +++++++++-------- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index f43843d8..3898b637 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -19,9 +19,6 @@ type receivedProcessor struct { // what will be sent later. sender Sender - // A senderCommon to handle the scheduling of sending. - senderCommon *SenderCommon - // Client state storage. This is needed if the Server asks to report the state. clientSyncedState *ClientSyncedState @@ -53,15 +50,9 @@ func newReceivedProcessor( // the received message and performs any processing necessary based on what fields are set. // This function will call any relevant callbacks. func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *protobufs.ServerToAgent) { - if r.sender != nil { - r.sender.DisableScheduleSend() - } + r.sender.DisableScheduleSend() + defer r.sender.EnableScheduleSend() - defer func() { - if r.sender != nil { - r.sender.EnableScheduleSend() - } - }() if r.callbacks != nil { if msg.Command != nil { r.rcvCommand(msg.Command) @@ -141,6 +132,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro } r.callbacks.OnMessage(ctx, msgData) + r.sender.EnableScheduleSend() r.rcvOpampConnectionSettings(ctx, msg.ConnectionSettings) if scheduled { diff --git a/client/internal/sender.go b/client/internal/sender.go index fd6ef7ee..36b6102d 100644 --- a/client/internal/sender.go +++ b/client/internal/sender.go @@ -39,7 +39,7 @@ type SenderCommon struct { hasPendingMessage chan struct{} // Indicates onMessage callback is running - onMessageRunning int32 + isSendingDisabled int32 // Indicates ScheduleSend() was called during onMessage callback run registerScheduleSend chan struct{} @@ -55,7 +55,7 @@ func NewSenderCommon() SenderCommon { hasPendingMessage: make(chan struct{}, 1), registerScheduleSend: make(chan struct{}, 1), nextMessage: NewNextMessage(), - onMessageRunning: 0, + isSendingDisabled: 0, } } @@ -63,7 +63,7 @@ func NewSenderCommon() SenderCommon { // is now ready to be sent. If there is no pending message (e.g. the NextMessage was // already sent and "pending" flag is reset) then no message will be sent. func (h *SenderCommon) ScheduleSend() { - if h.IsOnMessageRunning() { + if h.IsSendingDisabled() { // onMessage callback is running, ScheduleSend() will rerun after it is done select { case h.registerScheduleSend <- struct{}{}: @@ -87,19 +87,20 @@ func (h *SenderCommon) NextMessage() *NextMessage { return &h.nextMessage } -// IsOnMessageRunning returns true if onMessage callback is running -func (h *SenderCommon) IsOnMessageRunning() bool { - return atomic.LoadInt32(&h.onMessageRunning) != 0 +// IsSendingDisabled returns true when onMessage callback is running +func (h *SenderCommon) IsSendingDisabled() bool { + return atomic.LoadInt32(&h.isSendingDisabled) != 0 } // DisableScheduleSend temporary preventing ScheduleSend from writing to channel func (h *SenderCommon) DisableScheduleSend() { - atomic.StoreInt32(&h.onMessageRunning, 1) + + atomic.StoreInt32(&h.isSendingDisabled, 1) } // EnableScheduleSend re-enables ScheduleSend and checks if it was called during onMessage callback func (h *SenderCommon) EnableScheduleSend() { - atomic.StoreInt32(&h.onMessageRunning, 0) + atomic.StoreInt32(&h.isSendingDisabled, 0) select { case <-h.registerScheduleSend: h.ScheduleSend() From aa84a31d65ea230f68e9f229d49906bd443bc539 Mon Sep 17 00:00:00 2001 From: Nimrod Shlagman Date: Thu, 24 Nov 2022 12:02:41 +0200 Subject: [PATCH 06/11] comments clarity fix --- client/internal/receivedprocessor.go | 2 ++ client/internal/sender.go | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index 3898b637..001b12a9 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -51,6 +51,8 @@ func newReceivedProcessor( // This function will call any relevant callbacks. func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *protobufs.ServerToAgent) { r.sender.DisableScheduleSend() + + // Verify message sending is enabled. Can be called several times since process is non-blocking defer r.sender.EnableScheduleSend() if r.callbacks != nil { diff --git a/client/internal/sender.go b/client/internal/sender.go index 36b6102d..90c05ac4 100644 --- a/client/internal/sender.go +++ b/client/internal/sender.go @@ -38,10 +38,10 @@ type SenderCommon struct { // Indicates that there is a pending message to send. hasPendingMessage chan struct{} - // Indicates onMessage callback is running + // When set to non-zero indicates message sending is disabled isSendingDisabled int32 - // Indicates ScheduleSend() was called during onMessage callback run + // Indicates ScheduleSend() was called when message sending was disabled registerScheduleSend chan struct{} // The next message to send. @@ -64,7 +64,7 @@ func NewSenderCommon() SenderCommon { // already sent and "pending" flag is reset) then no message will be sent. func (h *SenderCommon) ScheduleSend() { if h.IsSendingDisabled() { - // onMessage callback is running, ScheduleSend() will rerun after it is done + // Register message sending to when message sending is enabled, won't block on writing to channel. select { case h.registerScheduleSend <- struct{}{}: default: @@ -98,7 +98,7 @@ func (h *SenderCommon) DisableScheduleSend() { atomic.StoreInt32(&h.isSendingDisabled, 1) } -// EnableScheduleSend re-enables ScheduleSend and checks if it was called during onMessage callback +// EnableScheduleSend re-enables message sending, won't block on reading from channel. func (h *SenderCommon) EnableScheduleSend() { atomic.StoreInt32(&h.isSendingDisabled, 0) select { From 2a3893d836c4cc92801e880998129eb8d82c4d55 Mon Sep 17 00:00:00 2001 From: Nimrod Shlagman Date: Sun, 27 Nov 2022 17:10:55 +0200 Subject: [PATCH 07/11] fix test race --- client/internal/sender.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/client/internal/sender.go b/client/internal/sender.go index 90c05ac4..c9b9ea7b 100644 --- a/client/internal/sender.go +++ b/client/internal/sender.go @@ -2,10 +2,10 @@ package internal import ( "errors" - "sync/atomic" - "github.com/oklog/ulid/v2" "github.com/open-telemetry/opamp-go/protobufs" + "sync/atomic" + "time" ) // Sender is an interface of the sending portion of OpAMP protocol that stores @@ -104,6 +104,8 @@ func (h *SenderCommon) EnableScheduleSend() { select { case <-h.registerScheduleSend: h.ScheduleSend() + case <-time.Tick(100 * time.Millisecond): + break default: break } From 3ba2e9919a42069001fb63bde86b11be06b42382 Mon Sep 17 00:00:00 2001 From: Nimrod Shlagman Date: Mon, 12 Dec 2022 17:08:40 +0200 Subject: [PATCH 08/11] fix channel block bug --- client/internal/sender.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/client/internal/sender.go b/client/internal/sender.go index c9b9ea7b..21b325f6 100644 --- a/client/internal/sender.go +++ b/client/internal/sender.go @@ -5,7 +5,6 @@ import ( "github.com/oklog/ulid/v2" "github.com/open-telemetry/opamp-go/protobufs" "sync/atomic" - "time" ) // Sender is an interface of the sending portion of OpAMP protocol that stores @@ -68,9 +67,8 @@ func (h *SenderCommon) ScheduleSend() { select { case h.registerScheduleSend <- struct{}{}: default: - break + return } - return } // Set pending flag. Don't block on writing to channel. @@ -104,8 +102,6 @@ func (h *SenderCommon) EnableScheduleSend() { select { case <-h.registerScheduleSend: h.ScheduleSend() - case <-time.Tick(100 * time.Millisecond): - break default: break } From 28f7656648465e3d4de03e829c8d8069e6ff64ce Mon Sep 17 00:00:00 2001 From: Nimrod Shlagman Date: Wed, 14 Dec 2022 15:49:14 +0200 Subject: [PATCH 09/11] research sync issue --- client/internal/receivedprocessor.go | 5 ++++- client/internal/sender.go | 5 +++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index 001b12a9..def5ba7f 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -3,9 +3,9 @@ package internal import ( "context" "errors" - "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" + "time" ) // receivedProcessor handles the processing of messages received from the Server. @@ -133,8 +133,11 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro } } r.callbacks.OnMessage(ctx, msgData) + //time.Sleep(time.Microsecond * 100) r.sender.EnableScheduleSend() + time.Sleep(time.Microsecond * 100) + r.rcvOpampConnectionSettings(ctx, msg.ConnectionSettings) if scheduled { diff --git a/client/internal/sender.go b/client/internal/sender.go index 21b325f6..23737247 100644 --- a/client/internal/sender.go +++ b/client/internal/sender.go @@ -67,8 +67,9 @@ func (h *SenderCommon) ScheduleSend() { select { case h.registerScheduleSend <- struct{}{}: default: - return + break } + return } // Set pending flag. Don't block on writing to channel. @@ -85,7 +86,7 @@ func (h *SenderCommon) NextMessage() *NextMessage { return &h.nextMessage } -// IsSendingDisabled returns true when onMessage callback is running +// IsSendingDisabled returns true when isSendingDisabled is set to non-zero value. func (h *SenderCommon) IsSendingDisabled() bool { return atomic.LoadInt32(&h.isSendingDisabled) != 0 } From 63cb45742acd5343ac86aeb88206f7cbbbff8743 Mon Sep 17 00:00:00 2001 From: Nimrod Shlagman Date: Wed, 14 Dec 2022 15:58:41 +0200 Subject: [PATCH 10/11] research sync issue --- client/internal/receivedprocessor.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index def5ba7f..c41d3a44 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -5,7 +5,6 @@ import ( "errors" "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" - "time" ) // receivedProcessor handles the processing of messages received from the Server. @@ -136,7 +135,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro //time.Sleep(time.Microsecond * 100) r.sender.EnableScheduleSend() - time.Sleep(time.Microsecond * 100) + //time.Sleep(time.Microsecond * 100) r.rcvOpampConnectionSettings(ctx, msg.ConnectionSettings) From 62dae795b588faede575ac48d950bd4cdcac1923 Mon Sep 17 00:00:00 2001 From: Nimrod Shlagman Date: Wed, 14 Dec 2022 16:03:04 +0200 Subject: [PATCH 11/11] cleanup --- client/internal/receivedprocessor.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index c41d3a44..24bfe613 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -132,10 +132,8 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro } } r.callbacks.OnMessage(ctx, msgData) - //time.Sleep(time.Microsecond * 100) r.sender.EnableScheduleSend() - //time.Sleep(time.Microsecond * 100) r.rcvOpampConnectionSettings(ctx, msg.ConnectionSettings)