-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Disable scheduleSend when onMessage callback is running #144
base: main
Are you sure you want to change the base?
Changes from 6 commits
a798ca2
122c3f3
f9558fe
51179e6
063f7e4
58f00eb
aa84a31
2a3893d
0bacb27
3ba2e99
28f7656
63cb457
62dae79
348739e
ad3a057
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package internal | |
|
||
import ( | ||
"errors" | ||
"sync/atomic" | ||
|
||
"github.com/oklog/ulid/v2" | ||
"github.com/open-telemetry/opamp-go/protobufs" | ||
|
@@ -20,6 +21,12 @@ type Sender interface { | |
// "pending" flag is reset) then no message will be sent. | ||
ScheduleSend() | ||
|
||
// DisableScheduleSend temporary preventing ScheduleSend from writing to channel | ||
DisableScheduleSend() | ||
|
||
// EnableScheduleSend re-enables ScheduleSend and checks if it was called during onMessage callback | ||
EnableScheduleSend() | ||
|
||
// SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent. | ||
SetInstanceUid(instanceUid string) error | ||
} | ||
|
@@ -31,6 +38,12 @@ type SenderCommon struct { | |
// Indicates that there is a pending message to send. | ||
hasPendingMessage chan struct{} | ||
|
||
// Indicates onMessage callback is running | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: the comment doesn't match what the field name implies. It would be best to reword it to something like "Set to non-zero to indicate that the message sending is disabled" |
||
isSendingDisabled int32 | ||
|
||
// Indicates ScheduleSend() was called during onMessage callback run | ||
registerScheduleSend chan struct{} | ||
|
||
// The next message to send. | ||
nextMessage NextMessage | ||
} | ||
|
@@ -39,15 +52,27 @@ type SenderCommon struct { | |
// the WebSocket and HTTP Sender implementations. | ||
func NewSenderCommon() SenderCommon { | ||
return SenderCommon{ | ||
hasPendingMessage: make(chan struct{}, 1), | ||
nextMessage: NewNextMessage(), | ||
hasPendingMessage: make(chan struct{}, 1), | ||
registerScheduleSend: make(chan struct{}, 1), | ||
nextMessage: NewNextMessage(), | ||
isSendingDisabled: 0, | ||
} | ||
} | ||
|
||
// ScheduleSend signals to HTTPSender that the message in NextMessage struct | ||
// is now ready to be sent. If there is no pending message (e.g. the NextMessage was | ||
// already sent and "pending" flag is reset) then no message will be sent. | ||
func (h *SenderCommon) ScheduleSend() { | ||
if h.IsSendingDisabled() { | ||
// onMessage callback is running, ScheduleSend() will rerun after it is done | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here, comment doesn't match what the code and func names imply. |
||
select { | ||
case h.registerScheduleSend <- struct{}{}: | ||
default: | ||
break | ||
} | ||
return | ||
} | ||
|
||
// Set pending flag. Don't block on writing to channel. | ||
select { | ||
case h.hasPendingMessage <- struct{}{}: | ||
|
@@ -62,6 +87,28 @@ func (h *SenderCommon) NextMessage() *NextMessage { | |
return &h.nextMessage | ||
} | ||
|
||
// IsSendingDisabled returns true when onMessage callback is running | ||
func (h *SenderCommon) IsSendingDisabled() bool { | ||
return atomic.LoadInt32(&h.isSendingDisabled) != 0 | ||
} | ||
|
||
// DisableScheduleSend temporary preventing ScheduleSend from writing to channel | ||
func (h *SenderCommon) DisableScheduleSend() { | ||
|
||
atomic.StoreInt32(&h.isSendingDisabled, 1) | ||
} | ||
|
||
// EnableScheduleSend re-enables ScheduleSend and checks if it was called during onMessage callback | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I would refrain from mentioning onMessage here. It is not a concern of SenderCommon. |
||
func (h *SenderCommon) EnableScheduleSend() { | ||
atomic.StoreInt32(&h.isSendingDisabled, 0) | ||
select { | ||
case <-h.registerScheduleSend: | ||
h.ScheduleSend() | ||
default: | ||
break | ||
} | ||
} | ||
|
||
// SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent. | ||
// Can be called concurrently, normally is called when a message is received from the | ||
// Server that instructs us to change our instance UID. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment to EnableScheduleSend definition that calling EnableScheduleSend when it is already enabled is allowed? (since we do it twice here).