Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare channels for parallelism #4642

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/internal/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Task struct {
}

// DataUint32 returns the Data field as a uint32. The value is only valid after
// setting it through SetDataUint32.
// setting it through SetDataUint32 or by storing to it using DataAtomicUint32.
func (t *Task) DataUint32() uint32 {
return *(*uint32)(unsafe.Pointer(&t.Data))
}
Expand All @@ -38,6 +38,11 @@ func (t *Task) SetDataUint32(val uint32) {
*(*uint32)(unsafe.Pointer(&t.Data)) = val
}

// DataAtomicUint32 returns the Data field as an atomic-if-needed Uint32 value.
func (t *Task) DataAtomicUint32() *Uint32 {
return (*Uint32)(unsafe.Pointer(&t.Data))
}

// getGoroutineStackSize is a compiler intrinsic that returns the stack size for
// the given function and falls back to the default stack size. It is replaced
// with a load from a special section just before codegen.
Expand Down
120 changes: 93 additions & 27 deletions src/runtime/chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ package runtime
// non-select operations) so that the select operation knows which case did
// proceed.
// The value is at the same time also a way that goroutines can be the first
// (and only) goroutine to 'take' a channel operation to change it from
// 'waiting' to any other value. This is important for the select statement
// because multiple goroutines could try to let different channels in the
// select statement proceed at the same time. By using Task.Data, only a
// single channel operation in the select statement can proceed.
// (and only) goroutine to 'take' a channel operation using an atomic CAS
// operation to change it from 'waiting' to any other value. This is important
// for the select statement because multiple goroutines could try to let
// different channels in the select statement proceed at the same time. By
// using Task.Data, only a single channel operation in the select statement
// can proceed.
// - It is possible for the channel queues to contain already-processed senders
// or receivers. This can happen when the select statement managed to proceed
// but the goroutine doing the select has not yet cleaned up the stale queue
Expand All @@ -49,15 +50,17 @@ import (

// The runtime implementation of the Go 'chan' type.
type channel struct {
closed bool
elementSize uintptr
bufCap uintptr // 'cap'
bufLen uintptr // 'len'
bufHead uintptr
bufTail uintptr
senders chanQueue
receivers chanQueue
buf unsafe.Pointer
closed bool
selectLocked bool
elementSize uintptr
bufCap uintptr // 'cap'
bufLen uintptr // 'len'
bufHead uintptr
bufTail uintptr
senders chanQueue
receivers chanQueue
lock task.PMutex
buf unsafe.Pointer
}

const (
Expand All @@ -73,7 +76,8 @@ type chanQueue struct {

// Pus the next channel operation to the queue. All appropriate fields must have
// been initialized already.
// This function must be called with interrupts disabled.
// This function must be called with interrupts disabled and the channel lock
// held.
func (q *chanQueue) push(node *channelOp) {
node.next = q.first
q.first = node
Expand All @@ -99,16 +103,17 @@ func (q *chanQueue) pop(chanOp uint32) *channelOp {
newDataValue := chanOp | popped.index<<2

// Try to be the first to proceed with this goroutine.
if popped.task.DataUint32() == chanOperationWaiting {
popped.task.SetDataUint32(newDataValue)
swapped := popped.task.DataAtomicUint32().CompareAndSwap(0, newDataValue)
if swapped {
return popped
}
}
}

// Remove the given to-be-removed node from the queue if it is part of the
// queue. If there are multiple, only one will be removed.
// This function must be called with interrupts disabled.
// This function must be called with interrupts disabled and the channel lock
// held.
func (q *chanQueue) remove(remove *channelOp) {
n := &q.first
for *n != nil {
Expand Down Expand Up @@ -159,8 +164,8 @@ func chanCap(c *channel) int {
}

// Push the value to the channel buffer array, for a send operation.
// This function may only be called when interrupts are disabled and it is known
// there is space available in the buffer.
// This function may only be called when interrupts are disabled, the channel is
// locked and it is known there is space available in the buffer.
func (ch *channel) bufferPush(value unsafe.Pointer) {
elemAddr := unsafe.Add(ch.buf, ch.bufHead*ch.elementSize)
ch.bufLen++
Expand All @@ -174,8 +179,8 @@ func (ch *channel) bufferPush(value unsafe.Pointer) {

// Pop a value from the channel buffer and store it in the 'value' pointer, for
// a receive operation.
// This function may only be called when interrupts are disabled and it is known
// there is at least one value available in the buffer.
// This function may only be called when interrupts are disabled, the channel is
// locked and it is known there is at least one value available in the buffer.
func (ch *channel) bufferPop(value unsafe.Pointer) {
elemAddr := unsafe.Add(ch.buf, ch.bufTail*ch.elementSize)
ch.bufLen--
Expand All @@ -191,7 +196,8 @@ func (ch *channel) bufferPop(value unsafe.Pointer) {
}

// Try to proceed with this send operation without blocking, and return whether
// the send succeeded. Interrupts must be disabled when calling this function.
// the send succeeded. Interrupts must be disabled and the lock must be held
// when calling this function.
func (ch *channel) trySend(value unsafe.Pointer) bool {
// To make sure we send values in the correct order, we can only send
// directly to a receiver when there are no values in the buffer.
Expand Down Expand Up @@ -230,9 +236,11 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
}

mask := interrupt.Disable()
ch.lock.Lock()

// See whether we can proceed immediately, and if so, return early.
if ch.trySend(value) {
ch.lock.Unlock()
interrupt.Restore(mask)
return
}
Expand All @@ -244,9 +252,12 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
op.index = 0
op.value = value
ch.senders.push(op)
ch.lock.Unlock()
interrupt.Restore(mask)

// Wait until this goroutine is resumed.
// It might be resumed after Unlock() and before Pause(). In that case,
// because we use semaphores, the Pause() will continue immediately.
task.Pause()

// Check whether the sent happened normally (not because the channel was
Expand All @@ -258,8 +269,8 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
}

// Try to proceed with this receive operation without blocking, and return
// whether the receive operation succeeded. Interrupts must be disabled when
// calling this function.
// whether the receive operation succeeded. Interrupts must be disabled and the
// lock must be held when calling this function.
func (ch *channel) tryRecv(value unsafe.Pointer) (received, ok bool) {
// To make sure we keep the values in the channel in the correct order, we
// first have to read values from the buffer before we can look at the
Expand Down Expand Up @@ -303,8 +314,10 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
}

mask := interrupt.Disable()
ch.lock.Lock()

if received, ok := ch.tryRecv(value); received {
ch.lock.Unlock()
interrupt.Restore(mask)
return ok
}
Expand All @@ -317,6 +330,7 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
op.task = t
op.index = 0
ch.receivers.push(op)
ch.lock.Unlock()
interrupt.Restore(mask)

// Wait until the goroutine is resumed.
Expand All @@ -335,9 +349,11 @@ func chanClose(ch *channel) {
}

mask := interrupt.Disable()
ch.lock.Lock()

if ch.closed {
// Not allowed by the language spec.
ch.lock.Unlock()
interrupt.Restore(mask)
runtimePanic("close of closed channel")
}
Expand Down Expand Up @@ -370,14 +386,56 @@ func chanClose(ch *channel) {

ch.closed = true

ch.lock.Unlock()
interrupt.Restore(mask)
}

// We currently use a global select lock to avoid deadlocks while locking each
// individual channel in the select. Without this global lock, two select
// operations that have a different order of the same channels could end up in a
// deadlock. This global lock is inefficient if there are many select operations
// happening in parallel, but gets the job done.
//
// If this becomes a performance issue, we can see how the Go runtime does this.
// I think it does this by sorting all states by channel address and then
// locking them in that order to avoid this deadlock.
var chanSelectLock task.PMutex

// Lock all channels (taking care to skip duplicate channels).
func lockAllStates(states []chanSelectState) {
if !hasParallelism {
return
}
for _, state := range states {
if state.ch != nil && !state.ch.selectLocked {
state.ch.lock.Lock()
state.ch.selectLocked = true
}
}
}

// Unlock all channels (taking care to skip duplicate channels).
func unlockAllStates(states []chanSelectState) {
if !hasParallelism {
return
}
for _, state := range states {
if state.ch != nil && state.ch.selectLocked {
state.ch.lock.Unlock()
state.ch.selectLocked = false
}
}
}

// chanSelect implements blocking or non-blocking select operations.
// The 'ops' slice must be set if (and only if) this is a blocking select.
func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelOp) (uint32, bool) {
mask := interrupt.Disable()

// Lock everything.
chanSelectLock.Lock()
lockAllStates(states)

const selectNoIndex = ^uint32(0)
selectIndex := selectNoIndex
selectOk := true
Expand Down Expand Up @@ -409,6 +467,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
// return early.
blocking := len(ops) != 0
if selectIndex != selectNoIndex || !blocking {
unlockAllStates(states)
chanSelectLock.Unlock()
interrupt.Restore(mask)
return selectIndex, selectOk
}
Expand All @@ -417,8 +477,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
// become more complicated.
// We add ourselves as a sender/receiver to every channel, and wait for the
// first one to complete. Only one will successfully complete, because
// senders and receivers will check t.Data for the state so that only one
// will be able to "take" this select operation.
// senders and receivers use a compare-and-exchange atomic operation on
// t.Data so that only one will be able to "take" this select operation.
t := task.Current()
t.Ptr = recvbuf
t.SetDataUint32(chanOperationWaiting)
Expand All @@ -438,13 +498,17 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
}

// Now we wait until one of the send/receive operations can proceed.
unlockAllStates(states)
chanSelectLock.Unlock()
interrupt.Restore(mask)
task.Pause()

// Resumed, so one channel operation must have progressed.

// Make sure all channel ops are removed from the senders/receivers
// queue before we return and the memory of them becomes invalid.
chanSelectLock.Lock()
lockAllStates(states)
for i, state := range states {
if state.ch == nil {
continue
Expand All @@ -458,6 +522,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
}
interrupt.Restore(mask)
}
unlockAllStates(states)
chanSelectLock.Unlock()

// Pull the return values out of t.Data (which contains two bitfields).
selectIndex = t.DataUint32() >> 2
Expand Down
8 changes: 6 additions & 2 deletions src/runtime/scheduler_cooperative.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import (
// queue a new scheduler invocation using setTimeout.
const asyncScheduler = GOOS == "js"

const hasScheduler = true

// Concurrency is not parallelism. While the cooperative scheduler has
// concurrency, it does not have parallelism.
const hasParallelism = false

// Queues used by the scheduler.
var (
runqueue task.Queue
Expand Down Expand Up @@ -248,5 +254,3 @@ func run() {
}()
scheduler(false)
}

const hasScheduler = true
3 changes: 3 additions & 0 deletions src/runtime/scheduler_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import "internal/task"

const hasScheduler = false

// No goroutines are allowed, so there's no parallelism anywhere.
const hasParallelism = false

// run is called by the program entry point to execute the go program.
// With the "none" scheduler, init and the main function are invoked directly.
func run() {
Expand Down
Loading