From 2599db29904a8b966991003522a6417c46e11997 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Sun, 27 Oct 2024 10:26:23 +0100 Subject: [PATCH 1/6] internal/task: implement atomic primitives for preemptive scheduling --- src/internal/task/atomic-cooperative.go | 2 + src/internal/task/atomic-preemptive.go | 14 +++++ src/internal/task/futex-cooperative.go | 2 + src/internal/task/futex-preemptive.go | 7 +++ src/internal/task/mutex-cooperative.go | 2 + src/internal/task/mutex-preemptive.go | 71 +++++++++++++++++++++++++ src/internal/task/pmutex-cooperative.go | 2 + src/internal/task/pmutex-preemptive.go | 11 ++++ 8 files changed, 111 insertions(+) create mode 100644 src/internal/task/atomic-preemptive.go create mode 100644 src/internal/task/futex-preemptive.go create mode 100644 src/internal/task/mutex-preemptive.go create mode 100644 src/internal/task/pmutex-preemptive.go diff --git a/src/internal/task/atomic-cooperative.go b/src/internal/task/atomic-cooperative.go index 60eb917a8e..bd4cba8956 100644 --- a/src/internal/task/atomic-cooperative.go +++ b/src/internal/task/atomic-cooperative.go @@ -1,3 +1,5 @@ +//go:build !scheduler.threads + package task // Atomics implementation for cooperative systems. The atomic types here aren't diff --git a/src/internal/task/atomic-preemptive.go b/src/internal/task/atomic-preemptive.go new file mode 100644 index 0000000000..275f36dce4 --- /dev/null +++ b/src/internal/task/atomic-preemptive.go @@ -0,0 +1,14 @@ +//go:build scheduler.threads + +package task + +// Atomics implementation for non-cooperative systems (multithreaded, etc). +// These atomic types use real atomic instructions. + +import "sync/atomic" + +type ( + Uintptr = atomic.Uintptr + Uint32 = atomic.Uint32 + Uint64 = atomic.Uint64 +) diff --git a/src/internal/task/futex-cooperative.go b/src/internal/task/futex-cooperative.go index 8351f88774..2a42c28d43 100644 --- a/src/internal/task/futex-cooperative.go +++ b/src/internal/task/futex-cooperative.go @@ -1,3 +1,5 @@ +//go:build !scheduler.threads + package task // A futex is a way for userspace to wait with the pointer as the key, and for diff --git a/src/internal/task/futex-preemptive.go b/src/internal/task/futex-preemptive.go new file mode 100644 index 0000000000..7f9e89580c --- /dev/null +++ b/src/internal/task/futex-preemptive.go @@ -0,0 +1,7 @@ +//go:build scheduler.threads + +package task + +import "internal/futex" + +type Futex = futex.Futex diff --git a/src/internal/task/mutex-cooperative.go b/src/internal/task/mutex-cooperative.go index e40966bed4..f1205eea25 100644 --- a/src/internal/task/mutex-cooperative.go +++ b/src/internal/task/mutex-cooperative.go @@ -1,3 +1,5 @@ +//go:build !scheduler.threads + package task type Mutex struct { diff --git a/src/internal/task/mutex-preemptive.go b/src/internal/task/mutex-preemptive.go new file mode 100644 index 0000000000..27f4646698 --- /dev/null +++ b/src/internal/task/mutex-preemptive.go @@ -0,0 +1,71 @@ +//go:build scheduler.threads + +package task + +// Futex-based mutex. +// This is largely based on the paper "Futexes are Tricky" by Ulrich Drepper. +// It describes a few ways to implement mutexes using a futex, and how some +// seemingly-obvious implementations don't exactly work as intended. +// Unfortunately, Go atomic operations work slightly differently so we can't +// copy the algorithm verbatim. +// +// The implementation works like this. The futex can have 3 different values, +// depending on the state: +// +// - 0: the futex is currently unlocked. +// - 1: the futex is locked, but is uncontended. There is one special case: if +// a contended futex is unlocked, it is set to 0. It is possible for another +// thread to lock the futex before the next waiter is woken. But because a +// waiter will be woken (if there is one), it will always change to 2 +// regardless. So this is not a problem. +// - 2: the futex is locked, and is contended. At least one thread is trying +// to obtain the lock (and is in the contended loop, see below). +// +// For the paper, see: +// https://dept-info.labri.fr/~denis/Enseignement/2008-IR/Articles/01-futex.pdf) + +type Mutex struct { + futex Futex +} + +func (m *Mutex) Lock() { + // Fast path: try to take an uncontended lock. + if m.futex.CompareAndSwap(0, 1) { + // We obtained the mutex. + return + } + + // The futex is contended, so we enter the contended loop. + // If we manage to change the futex from 0 to 2, we managed to take the + // lock. Else, we have to wait until a call to Unlock unlocks this mutex. + // (Unlock will wake one waiter when it finds the futex is set to 2 when + // unlocking). + for m.futex.Swap(2) != 0 { + // Wait until we get resumed in Unlock. + m.futex.Wait(2) + } +} + +func (m *Mutex) Unlock() { + if old := m.futex.Swap(0); old == 0 { + // Mutex wasn't locked before. + panic("sync: unlock of unlocked Mutex") + } else if old == 2 { + // Mutex was a contended lock, so we need to wake the next waiter. + m.futex.Wake() + } +} + +// TryLock tries to lock m and reports whether it succeeded. +// +// Note that while correct uses of TryLock do exist, they are rare, +// and use of TryLock is often a sign of a deeper problem +// in a particular use of mutexes. +func (m *Mutex) TryLock() bool { + // Fast path: try to take an uncontended lock. + if m.futex.CompareAndSwap(0, 1) { + // We obtained the mutex. + return true + } + return false +} diff --git a/src/internal/task/pmutex-cooperative.go b/src/internal/task/pmutex-cooperative.go index ae2aa4bad8..0e6c4f828b 100644 --- a/src/internal/task/pmutex-cooperative.go +++ b/src/internal/task/pmutex-cooperative.go @@ -1,3 +1,5 @@ +//go:build !scheduler.threads + package task // PMutex is a real mutex on systems that can be either preemptive or threaded, diff --git a/src/internal/task/pmutex-preemptive.go b/src/internal/task/pmutex-preemptive.go new file mode 100644 index 0000000000..10f0a63561 --- /dev/null +++ b/src/internal/task/pmutex-preemptive.go @@ -0,0 +1,11 @@ +//go:build scheduler.threads + +package task + +// PMutex is a real mutex on systems that can be either preemptive or threaded, +// and a dummy lock on other (purely cooperative) systems. +// +// It is mainly useful for short operations that need a lock when threading may +// be involved, but which do not need a lock with a purely cooperative +// scheduler. +type PMutex = Mutex From 8659b18d6f487e438959bf901c67bdf6b0862d18 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Sat, 2 Nov 2024 10:47:28 +0100 Subject: [PATCH 2/6] runtime: refactor GC mark phase into gcMarkReachable This is a small refactor to prepare GC marking for multithreaded stop-the-world. --- src/runtime/gc_blocks.go | 3 +-- src/runtime/gc_stack_portable.go | 5 +++++ src/runtime/gc_stack_raw.go | 5 +++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/runtime/gc_blocks.go b/src/runtime/gc_blocks.go index d58bfd92a2..5b0453bb2e 100644 --- a/src/runtime/gc_blocks.go +++ b/src/runtime/gc_blocks.go @@ -456,8 +456,7 @@ func runGC() (freeBytes uintptr) { } // Mark phase: mark all reachable objects, recursively. - markStack() - findGlobals(markRoots) + gcMarkReachable() if baremetal && hasScheduler { // Channel operations in interrupts may move task pointers around while we are marking. diff --git a/src/runtime/gc_stack_portable.go b/src/runtime/gc_stack_portable.go index d35e16e30c..750a34ec2c 100644 --- a/src/runtime/gc_stack_portable.go +++ b/src/runtime/gc_stack_portable.go @@ -8,6 +8,11 @@ import ( "unsafe" ) +func gcMarkReachable() { + markStack() + findGlobals(markRoots) +} + //go:extern runtime.stackChainStart var stackChainStart *stackChainObject diff --git a/src/runtime/gc_stack_raw.go b/src/runtime/gc_stack_raw.go index 5ee18622db..d55522a9f6 100644 --- a/src/runtime/gc_stack_raw.go +++ b/src/runtime/gc_stack_raw.go @@ -4,6 +4,11 @@ package runtime import "internal/task" +func gcMarkReachable() { + markStack() + findGlobals(markRoots) +} + // markStack marks all root pointers found on the stack. // // This implementation is conservative and relies on the stack top (provided by From c9bb33a73ce77eb8223c043640a8313741c8c0c7 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Sat, 2 Nov 2024 11:08:02 +0100 Subject: [PATCH 3/6] runtime: make conservative and precise GC MT-safe Using a global lock may be slow, but it is certainly simple and safe. If this global lock becomes a bottleneck, we can of course look into making the GC truly support multithreading. --- src/runtime/gc_blocks.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/runtime/gc_blocks.go b/src/runtime/gc_blocks.go index 5b0453bb2e..bce87ef271 100644 --- a/src/runtime/gc_blocks.go +++ b/src/runtime/gc_blocks.go @@ -57,6 +57,7 @@ var ( gcMallocs uint64 // total number of allocations gcFrees uint64 // total number of objects freed gcFreedBlocks uint64 // total number of freed blocks + gcLock task.PMutex // lock to avoid race conditions on multicore systems ) // zeroSizedAlloc is just a sentinel that gets returned when allocating 0 bytes. @@ -317,6 +318,10 @@ func alloc(size uintptr, layout unsafe.Pointer) unsafe.Pointer { runtimePanicAt(returnAddress(0), "heap alloc in interrupt") } + // Make sure there are no concurrent allocations. The heap is not currently + // designed for concurrent alloc/GC. + gcLock.Lock() + gcTotalAlloc += uint64(size) gcMallocs++ @@ -399,6 +404,9 @@ func alloc(size uintptr, layout unsafe.Pointer) unsafe.Pointer { i.setState(blockStateTail) } + // We've claimed this allocation, now we can unlock the heap. + gcLock.Unlock() + // Return a pointer to this allocation. pointer := thisAlloc.pointer() if preciseHeap { @@ -444,7 +452,9 @@ func free(ptr unsafe.Pointer) { // GC performs a garbage collection cycle. func GC() { + gcLock.Lock() runGC() + gcLock.Unlock() } // runGC performs a garbage collection cycle. It is the internal implementation @@ -713,6 +723,7 @@ func dumpHeap() { // The returned memory statistics are up to date as of the // call to ReadMemStats. This would not do GC implicitly for you. func ReadMemStats(m *MemStats) { + gcLock.Lock() m.HeapIdle = 0 m.HeapInuse = 0 for block := gcBlock(0); block < endBlock; block++ { @@ -732,6 +743,7 @@ func ReadMemStats(m *MemStats) { m.Sys = uint64(heapEnd - heapStart) m.HeapAlloc = (gcTotalBlocks - gcFreedBlocks) * uint64(bytesPerBlock) m.Alloc = m.HeapAlloc + gcLock.Unlock() } func SetFinalizer(obj interface{}, finalizer interface{}) { From 3cc9c44c268905c0a5965698267acfc61a034ee5 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Sat, 2 Nov 2024 13:45:17 +0100 Subject: [PATCH 4/6] runtime: refactor timerQueue Move common functions to scheduler.go. They will be used both from the cooperative and from the threads scheduler. --- src/runtime/scheduler.go | 30 ++++++++++++++++++++++++++++ src/runtime/scheduler_cooperative.go | 26 ++---------------------- 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/src/runtime/scheduler.go b/src/runtime/scheduler.go index 727c7f5f2c..7461c966ed 100644 --- a/src/runtime/scheduler.go +++ b/src/runtime/scheduler.go @@ -6,6 +6,8 @@ const schedulerDebug = false var mainExited bool +var timerQueue *timerNode + // Simple logging, for debugging. func scheduleLog(msg string) { if schedulerDebug { @@ -27,6 +29,34 @@ func scheduleLogChan(msg string, ch *channel, t *task.Task) { } } +func timerQueueAdd(tn *timerNode) { + q := &timerQueue + for ; *q != nil; q = &(*q).next { + if tn.whenTicks() < (*q).whenTicks() { + // this will finish earlier than the next - insert here + break + } + } + tn.next = *q + *q = tn +} + +func timerQueueRemove(t *timer) bool { + removedTimer := false + for q := &timerQueue; *q != nil; q = &(*q).next { + if (*q).timer == t { + scheduleLog("removed timer") + *q = (*q).next + removedTimer = true + break + } + } + if !removedTimer { + scheduleLog("did not remove timer") + } + return removedTimer +} + // Goexit terminates the currently running goroutine. No other goroutines are affected. func Goexit() { panicOrGoexit(nil, panicGoexit) diff --git a/src/runtime/scheduler_cooperative.go b/src/runtime/scheduler_cooperative.go index 91ba86409f..85c8f56f09 100644 --- a/src/runtime/scheduler_cooperative.go +++ b/src/runtime/scheduler_cooperative.go @@ -32,7 +32,6 @@ var ( runqueue task.Queue sleepQueue *task.Task sleepQueueBaseTime timeUnit - timerQueue *timerNode ) // deadlock is called when a goroutine cannot proceed any more, but is in theory @@ -100,36 +99,15 @@ func addSleepTask(t *task.Task, duration timeUnit) { // sleepQueue. func addTimer(tim *timerNode) { mask := interrupt.Disable() - - // Add to timer queue. - q := &timerQueue - for ; *q != nil; q = &(*q).next { - if tim.whenTicks() < (*q).whenTicks() { - // this will finish earlier than the next - insert here - break - } - } - tim.next = *q - *q = tim + timerQueueAdd(tim) interrupt.Restore(mask) } // removeTimer is the implementation of time.stopTimer. It removes a timer from // the timer queue, returning true if the timer is present in the timer queue. func removeTimer(tim *timer) bool { - removedTimer := false mask := interrupt.Disable() - for t := &timerQueue; *t != nil; t = &(*t).next { - if (*t).timer == tim { - scheduleLog("removed timer") - *t = (*t).next - removedTimer = true - break - } - } - if !removedTimer { - scheduleLog("did not remove timer") - } + removedTimer := timerQueueRemove(tim) interrupt.Restore(mask) return removedTimer } From 8d6e16019ab915ace09ab0f2237c6b2e5f1d1df2 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Sat, 2 Nov 2024 15:33:06 +0100 Subject: [PATCH 5/6] sync: implement RWMutex using futexes Somewhat surprisingly, this results in smaller code than the old code with the cooperative (tasks) scheduler. Probably because the new RWMutex is also simpler. --- src/sync/mutex.go | 163 ++++++++++++++++++---------------------------- 1 file changed, 64 insertions(+), 99 deletions(-) diff --git a/src/sync/mutex.go b/src/sync/mutex.go index 08c674d7ea..890af78606 100644 --- a/src/sync/mutex.go +++ b/src/sync/mutex.go @@ -6,131 +6,96 @@ import ( type Mutex = task.Mutex -type RWMutex struct { - // waitingWriters are all of the tasks waiting for write locks. - waitingWriters task.Stack - - // waitingReaders are all of the tasks waiting for a read lock. - waitingReaders task.Stack +//go:linkname runtimePanic runtime.runtimePanic +func runtimePanic(msg string) - // state is the current state of the RWMutex. - // Iff the mutex is completely unlocked, it contains rwMutexStateUnlocked (aka 0). - // Iff the mutex is write-locked, it contains rwMutexStateWLocked. - // While the mutex is read-locked, it contains the current number of readers. - state uint32 +type RWMutex struct { + // Reader count, with the number of readers that currently have read-locked + // this mutex. + // The value can be in two states: one where 0 means no readers and another + // where -rwMutexMaxReaders means no readers. A base of 0 is normal + // uncontended operation, a base of -rwMutexMaxReaders means a writer has + // the lock or is trying to get the lock. In the second case, readers should + // wait until the reader count becomes non-negative again to give the writer + // a chance to obtain the lock. + readers task.Futex + + // Writer futex, normally 0. If there is a writer waiting until all readers + // have unlocked, this value is 1. It will be changed to a 2 (and get a + // wake) when the last reader unlocks. + writer task.Futex + + // Writer lock. Held between Lock() and Unlock(). + writerLock Mutex } -const ( - rwMutexStateUnlocked = uint32(0) - rwMutexStateWLocked = ^uint32(0) - rwMutexMaxReaders = rwMutexStateWLocked - 1 -) +const rwMutexMaxReaders = 1 << 30 func (rw *RWMutex) Lock() { - if rw.state == 0 { - // The mutex is completely unlocked. - // Lock without waiting. - rw.state = rwMutexStateWLocked + // Exclusive lock for writers. + rw.writerLock.Lock() + + // Flag that we need to be awakened after the last read-lock unlocks. + rw.writer.Store(1) + + // Signal to readers that they can't lock this mutex anymore. + n := uint32(rwMutexMaxReaders) + waiting := rw.readers.Add(-n) + if int32(waiting) == -rwMutexMaxReaders { + // All readers were already unlocked, so we don't need to wait for them. + rw.writer.Store(0) return } - // Wait for the lock to be released. - rw.waitingWriters.Push(task.Current()) - task.Pause() + // There is at least one reader. + // Wait until all readers are unlocked. The last reader to unlock will set + // rw.writer to 2 and awaken us. + for rw.writer.Load() == 1 { + rw.writer.Wait(1) + } + rw.writer.Store(0) } func (rw *RWMutex) Unlock() { - switch rw.state { - case rwMutexStateWLocked: - // This is correct. - - case rwMutexStateUnlocked: - // The mutex is already unlocked. - panic("sync: unlock of unlocked RWMutex") - - default: - // The mutex is read-locked instead of write-locked. - panic("sync: write-unlock of read-locked RWMutex") + // Signal that new readers can lock this mutex. + waiting := rw.readers.Add(rwMutexMaxReaders) + if waiting != 0 { + // Awaken all waiting readers. + rw.readers.WakeAll() } - switch { - case rw.maybeUnblockReaders(): - // Switched over to read mode. - - case rw.maybeUnblockWriter(): - // Transferred to another writer. - - default: - // Nothing is waiting for the lock. - rw.state = rwMutexStateUnlocked - } + // Done with this lock (next writer can try to get a lock). + rw.writerLock.Unlock() } func (rw *RWMutex) RLock() { - if rw.state == rwMutexStateWLocked { - // Wait for the write lock to be released. - rw.waitingReaders.Push(task.Current()) - task.Pause() - return - } + // Add us as a reader. + newVal := rw.readers.Add(1) - if rw.state == rwMutexMaxReaders { - panic("sync: too many readers on RWMutex") + // Wait until the RWMutex is available for readers. + for int32(newVal) <= 0 { + rw.readers.Wait(newVal) + newVal = rw.readers.Load() } - - // Increase the reader count. - rw.state++ } func (rw *RWMutex) RUnlock() { - switch rw.state { - case rwMutexStateUnlocked: - // The mutex is already unlocked. - panic("sync: unlock of unlocked RWMutex") - - case rwMutexStateWLocked: - // The mutex is write-locked instead of read-locked. - panic("sync: read-unlock of write-locked RWMutex") - } - - rw.state-- + // Remove us as a reader. + one := uint32(1) + readers := int32(rw.readers.Add(-one)) - if rw.state == rwMutexStateUnlocked { - // This was the last reader. - // Try to unblock a writer. - rw.maybeUnblockWriter() + // Check whether RUnlock was called too often. + if readers == -1 || readers == (-rwMutexMaxReaders)-1 { + runtimePanic("sync: RUnlock of unlocked RWMutex") } -} -func (rw *RWMutex) maybeUnblockReaders() bool { - var n uint32 - for { - t := rw.waitingReaders.Pop() - if t == nil { - break + if readers == -rwMutexMaxReaders { + // This was the last read lock. Check whether we need to wake up a write + // lock. + if rw.writer.CompareAndSwap(1, 2) { + rw.writer.Wake() } - - n++ - scheduleTask(t) - } - if n == 0 { - return false } - - rw.state = n - return true -} - -func (rw *RWMutex) maybeUnblockWriter() bool { - t := rw.waitingWriters.Pop() - if t == nil { - return false - } - - rw.state = rwMutexStateWLocked - scheduleTask(t) - - return true } type Locker interface { From 3767decf9e272cd34ab3b7c4807758df33af11a5 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Thu, 24 Oct 2024 10:26:17 +0200 Subject: [PATCH 6/6] runtime: map every goroutine to a new OS thread This is not a scheduler in the runtime, instead every goroutine is mapped to a single OS thread - meaning 1:1 scheduling. While this may not perform well (or at all) for large numbers of threads, it greatly simplifies many things in the runtime. For example, blocking syscalls can be called directly instead of having to use epoll or similar. Also, we don't need to do anything special to call C code - the default stack is all we need. --- compileopts/options.go | 2 +- compileopts/options_test.go | 2 +- compileopts/target.go | 5 +- src/internal/task/linux.go | 9 + src/internal/task/semaphore.go | 32 ++++ src/internal/task/task_threads.c | 104 ++++++++++++ src/internal/task/task_threads.go | 265 ++++++++++++++++++++++++++++++ src/runtime/gc_stack_raw.go | 2 +- src/runtime/gc_stack_threads.go | 25 +++ src/runtime/runtime_unix.go | 1 + src/runtime/scheduler_threads.go | 124 ++++++++++++++ 11 files changed, 567 insertions(+), 4 deletions(-) create mode 100644 src/internal/task/linux.go create mode 100644 src/internal/task/semaphore.go create mode 100644 src/internal/task/task_threads.c create mode 100644 src/internal/task/task_threads.go create mode 100644 src/runtime/gc_stack_threads.go create mode 100644 src/runtime/scheduler_threads.go diff --git a/compileopts/options.go b/compileopts/options.go index 30e0e4dbed..78eb17cde9 100644 --- a/compileopts/options.go +++ b/compileopts/options.go @@ -10,7 +10,7 @@ import ( var ( validBuildModeOptions = []string{"default", "c-shared"} validGCOptions = []string{"none", "leaking", "conservative", "custom", "precise"} - validSchedulerOptions = []string{"none", "tasks", "asyncify"} + validSchedulerOptions = []string{"none", "tasks", "asyncify", "threads"} validSerialOptions = []string{"none", "uart", "usb", "rtt"} validPrintSizeOptions = []string{"none", "short", "full", "html"} validPanicStrategyOptions = []string{"print", "trap"} diff --git a/compileopts/options_test.go b/compileopts/options_test.go index ee63c4c46d..d52ef5690e 100644 --- a/compileopts/options_test.go +++ b/compileopts/options_test.go @@ -10,7 +10,7 @@ import ( func TestVerifyOptions(t *testing.T) { expectedGCError := errors.New(`invalid gc option 'incorrect': valid values are none, leaking, conservative, custom, precise`) - expectedSchedulerError := errors.New(`invalid scheduler option 'incorrect': valid values are none, tasks, asyncify`) + expectedSchedulerError := errors.New(`invalid scheduler option 'incorrect': valid values are none, tasks, asyncify, threads`) expectedPrintSizeError := errors.New(`invalid size option 'incorrect': valid values are none, short, full, html`) expectedPanicStrategyError := errors.New(`invalid panic option 'incorrect': valid values are print, trap`) diff --git a/compileopts/target.go b/compileopts/target.go index 7893e58290..a28bf774fa 100644 --- a/compileopts/target.go +++ b/compileopts/target.go @@ -248,7 +248,6 @@ func defaultTarget(options *Options) (*TargetSpec, error) { GOARCH: options.GOARCH, BuildTags: []string{options.GOOS, options.GOARCH}, GC: "precise", - Scheduler: "tasks", Linker: "cc", DefaultStackSize: 1024 * 64, // 64kB GDB: []string{"gdb"}, @@ -381,6 +380,7 @@ func defaultTarget(options *Options) (*TargetSpec, error) { platformVersion = "11.0.0" // first macosx platform with arm64 support } llvmvendor = "apple" + spec.Scheduler = "tasks" spec.Linker = "ld.lld" spec.Libc = "darwin-libSystem" // Use macosx* instead of darwin, otherwise darwin/arm64 will refer to @@ -398,6 +398,7 @@ func defaultTarget(options *Options) (*TargetSpec, error) { "src/runtime/runtime_unix.c", "src/runtime/signal.c") case "linux": + spec.Scheduler = "threads" spec.Linker = "ld.lld" spec.RTLib = "compiler-rt" spec.Libc = "musl" @@ -418,9 +419,11 @@ func defaultTarget(options *Options) (*TargetSpec, error) { } spec.ExtraFiles = append(spec.ExtraFiles, "src/internal/futex/futex_linux.c", + "src/internal/task/task_threads.c", "src/runtime/runtime_unix.c", "src/runtime/signal.c") case "windows": + spec.Scheduler = "tasks" spec.Linker = "ld.lld" spec.Libc = "mingw-w64" // Note: using a medium code model, low image base and no ASLR diff --git a/src/internal/task/linux.go b/src/internal/task/linux.go new file mode 100644 index 0000000000..7d28f708c4 --- /dev/null +++ b/src/internal/task/linux.go @@ -0,0 +1,9 @@ +//go:build linux && !baremetal + +package task + +import "unsafe" + +// Musl uses a pointer (or unsigned long for C++) so unsafe.Pointer should be +// fine. +type threadID unsafe.Pointer diff --git a/src/internal/task/semaphore.go b/src/internal/task/semaphore.go new file mode 100644 index 0000000000..914f09bc5e --- /dev/null +++ b/src/internal/task/semaphore.go @@ -0,0 +1,32 @@ +package task + +// Barebones semaphore implementation. +// The main limitation is that if there are multiple waiters, a single Post() +// call won't do anything. Only when Post() has been called to awaken all +// waiters will the waiters proceed. +// This limitation is not a problem when there will only be a single waiter. +type Semaphore struct { + futex Futex +} + +// Post (unlock) the semaphore, incrementing the value in the semaphore. +func (s *Semaphore) Post() { + newValue := s.futex.Add(1) + if newValue == 0 { + s.futex.WakeAll() + } +} + +// Wait (lock) the semaphore, decrementing the value in the semaphore. +func (s *Semaphore) Wait() { + delta := int32(-1) + value := s.futex.Add(uint32(delta)) + for { + if int32(value) >= 0 { + // Semaphore unlocked! + return + } + s.futex.Wait(value) + value = s.futex.Load() + } +} diff --git a/src/internal/task/task_threads.c b/src/internal/task/task_threads.c new file mode 100644 index 0000000000..a14844f2ef --- /dev/null +++ b/src/internal/task/task_threads.c @@ -0,0 +1,104 @@ +//go:build none + +#define _GNU_SOURCE +#include +#include +#include +#include +#include + +// BDWGC also uses SIGRTMIN+6 on Linux, which seems like a reasonable choice. +#ifdef __linux__ +#define taskPauseSignal (SIGRTMIN + 6) +#endif + +// Pointer to the current task.Task structure. +// Ideally the entire task.Task structure would be a thread-local variable but +// this also works. +static __thread void *current_task; + +struct state_pass { + void *(*start)(void*); + void *args; + void *task; + uintptr_t *stackTop; + sem_t startlock; +}; + +// Handle the GC pause in Go. +void tinygo_task_gc_pause(int sig); + +// Initialize the main thread. +void tinygo_task_init(void *mainTask, pthread_t *thread, void *context) { + // Make sure the current task pointer is set correctly for the main + // goroutine as well. + current_task = mainTask; + + // Store the thread ID of the main thread. + *thread = pthread_self(); + + // Register the "GC pause" signal for the entire process. + // Using pthread_kill, we can still send the signal to a specific thread. + struct sigaction act = { 0 }; + act.sa_flags = SA_SIGINFO; + act.sa_handler = &tinygo_task_gc_pause; + sigaction(taskPauseSignal, &act, NULL); +} + +void tinygo_task_exited(void*); + +// Helper to start a goroutine while also storing the 'task' structure. +static void* start_wrapper(void *arg) { + struct state_pass *state = arg; + void *(*start)(void*) = state->start; + void *args = state->args; + current_task = state->task; + + // Save the current stack pointer in the goroutine state, for the GC. + int stackAddr; + *(state->stackTop) = (uintptr_t)(&stackAddr); + + // Notify the caller that the thread has successfully started and + // initialized. + sem_post(&state->startlock); + + // Run the goroutine function. + start(args); + + // Notify the Go side this thread will exit. + tinygo_task_exited(current_task); + + return NULL; +}; + +// Start a new goroutine in an OS thread. +int tinygo_task_start(uintptr_t fn, void *args, void *task, pthread_t *thread, uintptr_t *stackTop, void *context) { + // Sanity check. Should get optimized away. + if (sizeof(pthread_t) != sizeof(void*)) { + __builtin_trap(); + } + + struct state_pass state = { + .start = (void*)fn, + .args = args, + .task = task, + .stackTop = stackTop, + }; + sem_init(&state.startlock, 0, 0); + int result = pthread_create(thread, NULL, &start_wrapper, &state); + + // Wait until the thread has been created and read all state_pass variables. + sem_wait(&state.startlock); + + return result; +} + +// Return the current task (for task.Current()). +void* tinygo_task_current(void) { + return current_task; +} + +// Send a signal to cause the task to pause for the GC mark phase. +void tinygo_task_send_gc_signal(pthread_t thread) { + pthread_kill(thread, taskPauseSignal); +} diff --git a/src/internal/task/task_threads.go b/src/internal/task/task_threads.go new file mode 100644 index 0000000000..93204fb9ba --- /dev/null +++ b/src/internal/task/task_threads.go @@ -0,0 +1,265 @@ +//go:build scheduler.threads + +package task + +import ( + "sync/atomic" + "unsafe" +) + +// If true, print verbose debug logs. +const verbose = false + +// Scheduler-specific state. +type state struct { + // Goroutine ID. The number here is not really significant and after a while + // it could wrap around. But it is useful for debugging. + id uintptr + + // Thread ID, pthread_t or similar (typically implemented as a pointer). + thread threadID + + // Highest address of the stack. It is stored when the goroutine starts, and + // is needed to be able to scan the stack. + stackTop uintptr + + // Next task in the activeTasks queue. + QueueNext *Task + + // Semaphore to pause/resume the thread atomically. + pauseSem Semaphore + + // Semaphore used for stack scanning. + // We can't reuse pauseSem here since the thread might have been paused for + // other reasons (for example, because it was waiting on a channel). + gcSem Semaphore +} + +// Goroutine counter, starting at 0 for the main goroutine. +var goroutineID uintptr + +var mainTask Task + +// Queue of tasks (see QueueNext) that currently exist in the program. +var activeTasks = &mainTask +var activeTaskLock PMutex + +func OnSystemStack() bool { + runtimePanic("todo: task.OnSystemStack") + return false +} + +// Initialize the main goroutine state. Must be called by the runtime on +// startup, before starting any other goroutines. +func Init(sp uintptr) { + mainTask.state.stackTop = sp + tinygo_task_init(&mainTask, &mainTask.state.thread) +} + +// Return the task struct for the current thread. +func Current() *Task { + t := (*Task)(tinygo_task_current()) + if t == nil { + runtimePanic("unknown current task") + } + return t +} + +// Pause pauses the current task, until it is resumed by another task. +// It is possible that another task has called Resume() on the task before it +// hits Pause(), in which case the task won't be paused but continues +// immediately. +func Pause() { + // Wait until resumed + t := Current() + if verbose { + println("*** pause: ", t.state.id) + } + t.state.pauseSem.Wait() +} + +// Resume the given task. +// It is legal to resume a task before it gets paused, it means that the next +// call to Pause() won't pause but will continue immediately. This happens in +// practice sometimes in channel operations, where the Resume() might get called +// between the channel unlock and the call to Pause(). +func (t *Task) Resume() { + if verbose { + println("*** resume: ", t.state.id) + } + // Increment the semaphore counter. + // If the task is currently paused in Wait(), it will resume. + // If the task is not yet paused, the next call to Wait() will continue + // immediately. + t.state.pauseSem.Post() +} + +// Start a new OS thread. +func start(fn uintptr, args unsafe.Pointer, stackSize uintptr) { + t := &Task{} + t.state.id = atomic.AddUintptr(&goroutineID, 1) + if verbose { + println("*** start: ", t.state.id, "from", Current().state.id) + } + + // Start the new thread, and add it to the list of threads. + // Do this with a lock so that only started threads are part of the queue + // and the stop-the-world GC won't see threads that haven't started yet or + // are not fully started yet. + activeTaskLock.Lock() + errCode := tinygo_task_start(fn, args, t, &t.state.thread, &t.state.stackTop) + if errCode != 0 { + runtimePanic("could not start thread") + } + t.state.QueueNext = activeTasks + activeTasks = t + activeTaskLock.Unlock() +} + +//export tinygo_task_exited +func taskExited(t *Task) { + if verbose { + println("*** exit:", t.state.id) + } + + // Remove from the queue. + // TODO: this can be made more efficient by using a doubly linked list. + activeTaskLock.Lock() + found := false + for q := &activeTasks; *q != nil; q = &(*q).state.QueueNext { + if *q == t { + *q = t.state.QueueNext + found = true + break + } + } + activeTaskLock.Unlock() + + // Sanity check. + if !found { + runtimePanic("taskExited failed") + } +} + +// Futex to wait on until all tasks have finished scanning the stack. +// This is basically a sync.WaitGroup. +var scanDoneFutex Futex + +// GC scan phase. Because we need to stop the world while scanning, this kinda +// needs to be done in the tasks package. +func GCScan() { + current := Current() + + // Don't allow new goroutines to be started while pausing/resuming threads + // in the stop-the-world phase. + activeTaskLock.Lock() + + // Pause all other threads. + numOtherThreads := uint32(0) + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + numOtherThreads++ + tinygo_task_send_gc_signal(t.state.thread) + } + } + + // Store the number of threads to wait for in the futex. + // This is the equivalent of doing an initial wg.Add(numOtherThreads). + scanDoneFutex.Store(numOtherThreads) + + // Scan the current stack, and all current registers. + scanCurrentStack() + + // Wake each paused thread for the first time so it will scan the stack. + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + t.state.gcSem.Post() + } + } + + // Wait until all threads have finished scanning their stack. + // This is the equivalent of wg.Wait() + for { + val := scanDoneFutex.Load() + if val == 0 { + break + } + scanDoneFutex.Wait(val) + } + + // Scan all globals (implemented in the runtime). + gcScanGlobals() + + // Wake each paused thread for the second time, so they will resume normal + // operation. + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + t.state.gcSem.Post() + } + } + + // Allow goroutines to start and exit again. + activeTaskLock.Unlock() +} + +// Scan globals, implemented in the runtime package. +func gcScanGlobals() + +var stackScanLock PMutex + +//export tinygo_task_gc_pause +func tingyo_task_gc_pause() { + // Wait until we get the signal to start scanning the stack. + Current().state.gcSem.Wait() + + // Scan the thread stack. + // Only scan a single thread stack at a time, because the GC marking phase + // doesn't support parallelism. + // TODO: it may be possible to call markRoots directly (without saving + // registers) since we are in a signal handler that already saved a bunch of + // registers. This is an optimization left for a future time. + stackScanLock.Lock() + scanCurrentStack() + stackScanLock.Unlock() + + // Equivalent of wg.Done(): subtract one from the futex and if the result is + // 0 (meaning we were the last in the waitgroup), wake the waiting thread. + n := uint32(1) + if scanDoneFutex.Add(-n) == 0 { + scanDoneFutex.Wake() + } + + // Wait until we get the signal we can resume normally (after the mark phase + // has finished). + Current().state.gcSem.Wait() +} + +//go:export tinygo_scanCurrentStack +func scanCurrentStack() + +// Return the highest address of the current stack. +func StackTop() uintptr { + return Current().state.stackTop +} + +//go:linkname runtimePanic runtime.runtimePanic +func runtimePanic(msg string) + +// Using //go:linkname instead of //export so that we don't tell the compiler +// that the 't' parameter won't escape (because it will). +// +//go:linkname tinygo_task_init tinygo_task_init +func tinygo_task_init(t *Task, thread *threadID) + +// Here same as for tinygo_task_init. +// +//go:linkname tinygo_task_start tinygo_task_start +func tinygo_task_start(fn uintptr, args unsafe.Pointer, t *Task, thread *threadID, stackTop *uintptr) int32 + +// Pause the thread by sending it a signal. +// +//export tinygo_task_send_gc_signal +func tinygo_task_send_gc_signal(threadID) + +//export tinygo_task_current +func tinygo_task_current() unsafe.Pointer diff --git a/src/runtime/gc_stack_raw.go b/src/runtime/gc_stack_raw.go index d55522a9f6..bdc3154fa5 100644 --- a/src/runtime/gc_stack_raw.go +++ b/src/runtime/gc_stack_raw.go @@ -1,4 +1,4 @@ -//go:build (gc.conservative || gc.precise) && !tinygo.wasm +//go:build (gc.conservative || gc.precise) && !tinygo.wasm && !scheduler.threads package runtime diff --git a/src/runtime/gc_stack_threads.go b/src/runtime/gc_stack_threads.go new file mode 100644 index 0000000000..9c77fa0c7b --- /dev/null +++ b/src/runtime/gc_stack_threads.go @@ -0,0 +1,25 @@ +//go:build scheduler.threads + +package runtime + +import "internal/task" + +func gcMarkReachable() { + task.GCScan() +} + +// Scan globals inside the stop-the-world phase. Called from the STW +// implementation in the internal/task package. +// +//go:linkname gcScanGlobals internal/task.gcScanGlobals +func gcScanGlobals() { + findGlobals(markRoots) +} + +// Function called from assembly with all registers pushed, to actually scan the +// stack. +// +//go:export tinygo_scanstack +func scanstack(sp uintptr) { + markRoots(sp, task.StackTop()) +} diff --git a/src/runtime/runtime_unix.go b/src/runtime/runtime_unix.go index 08e3e74269..17e004b2e8 100644 --- a/src/runtime/runtime_unix.go +++ b/src/runtime/runtime_unix.go @@ -73,6 +73,7 @@ type timespec struct { tv_nsec int64 // unsigned 64-bit integer on all time64 platforms } +// Highest address of the stack of the main thread. var stackTop uintptr // Entry point for Go. Initialize all packages and call main.main(). diff --git a/src/runtime/scheduler_threads.go b/src/runtime/scheduler_threads.go new file mode 100644 index 0000000000..e553a5b9c7 --- /dev/null +++ b/src/runtime/scheduler_threads.go @@ -0,0 +1,124 @@ +//go:build scheduler.threads + +package runtime + +import "internal/task" + +const hasScheduler = false // not using the cooperative scheduler + +// We use threads, so yes there is parallelism. +const hasParallelism = true + +var ( + timerQueueLock task.PMutex + timerQueueStarted bool + timerFutex task.Futex +) + +// Because we just use OS threads, we don't need to do anything special here. We +// can just initialize everything and run main.main on the main thread. +func run() { + initHeap() + task.Init(stackTop) + initAll() + callMain() +} + +// Pause the current task for a given time. +// +//go:linkname sleep time.Sleep +func sleep(duration int64) { + if duration <= 0 { + return + } + + sleepTicks(nanosecondsToTicks(duration)) +} + +func deadlock() { + // TODO: exit the thread via pthread_exit. + task.Pause() +} + +func scheduleTask(t *task.Task) { + t.Resume() +} + +func Gosched() { + // Each goroutine runs in a thread, so there's not much we can do here. + // There is sched_yield but it's only really intended for realtime + // operation, so is probably best not to use. +} + +// Separate goroutine (thread) that runs timer callbacks when they expire. +func timerRunner() { + for { + timerQueueLock.Lock() + + if timerQueue == nil { + // No timer in the queue, so wait until one becomes available. + val := timerFutex.Load() + timerQueueLock.Unlock() + timerFutex.Wait(val) + continue + } + + now := ticks() + if now < timerQueue.whenTicks() { + // There is a timer in the queue, but we need to wait until it + // expires. + // Using a futex, so that the wait is exited early when adding a new + // (sooner-to-expire) timer. + val := timerFutex.Load() + timerQueueLock.Unlock() + timeout := ticksToNanoseconds(timerQueue.whenTicks() - now) + timerFutex.WaitUntil(val, uint64(timeout)) + continue + } + + // Pop timer from queue. + tn := timerQueue + timerQueue = tn.next + tn.next = nil + + timerQueueLock.Unlock() + + // Run the callback stored in this timer node. + delay := ticksToNanoseconds(now - tn.whenTicks()) + tn.callback(tn, delay) + } +} + +func addTimer(tim *timerNode) { + timerQueueLock.Lock() + + if !timerQueueStarted { + timerQueueStarted = true + go timerRunner() + } + + timerQueueAdd(tim) + + timerFutex.Add(1) + timerFutex.Wake() + + timerQueueLock.Unlock() +} + +func removeTimer(tim *timer) bool { + timerQueueLock.Lock() + removed := timerQueueRemove(tim) + timerQueueLock.Unlock() + return removed +} + +func schedulerRunQueue() *task.Queue { + // This function is not actually used, it is only called when hasScheduler + // is true. So we can just return nil here. + return nil +} + +func runqueueForGC() *task.Queue { + // There is only a runqueue when using the cooperative scheduler. + return nil +}