diff --git a/compileopts/options.go b/compileopts/options.go index 30e0e4dbed..78eb17cde9 100644 --- a/compileopts/options.go +++ b/compileopts/options.go @@ -10,7 +10,7 @@ import ( var ( validBuildModeOptions = []string{"default", "c-shared"} validGCOptions = []string{"none", "leaking", "conservative", "custom", "precise"} - validSchedulerOptions = []string{"none", "tasks", "asyncify"} + validSchedulerOptions = []string{"none", "tasks", "asyncify", "threads"} validSerialOptions = []string{"none", "uart", "usb", "rtt"} validPrintSizeOptions = []string{"none", "short", "full", "html"} validPanicStrategyOptions = []string{"print", "trap"} diff --git a/compileopts/options_test.go b/compileopts/options_test.go index ee63c4c46d..d52ef5690e 100644 --- a/compileopts/options_test.go +++ b/compileopts/options_test.go @@ -10,7 +10,7 @@ import ( func TestVerifyOptions(t *testing.T) { expectedGCError := errors.New(`invalid gc option 'incorrect': valid values are none, leaking, conservative, custom, precise`) - expectedSchedulerError := errors.New(`invalid scheduler option 'incorrect': valid values are none, tasks, asyncify`) + expectedSchedulerError := errors.New(`invalid scheduler option 'incorrect': valid values are none, tasks, asyncify, threads`) expectedPrintSizeError := errors.New(`invalid size option 'incorrect': valid values are none, short, full, html`) expectedPanicStrategyError := errors.New(`invalid panic option 'incorrect': valid values are print, trap`) diff --git a/compileopts/target.go b/compileopts/target.go index 7893e58290..a28bf774fa 100644 --- a/compileopts/target.go +++ b/compileopts/target.go @@ -248,7 +248,6 @@ func defaultTarget(options *Options) (*TargetSpec, error) { GOARCH: options.GOARCH, BuildTags: []string{options.GOOS, options.GOARCH}, GC: "precise", - Scheduler: "tasks", Linker: "cc", DefaultStackSize: 1024 * 64, // 64kB GDB: []string{"gdb"}, @@ -381,6 +380,7 @@ func defaultTarget(options *Options) (*TargetSpec, error) { platformVersion = "11.0.0" // first macosx platform with arm64 support } llvmvendor = "apple" + spec.Scheduler = "tasks" spec.Linker = "ld.lld" spec.Libc = "darwin-libSystem" // Use macosx* instead of darwin, otherwise darwin/arm64 will refer to @@ -398,6 +398,7 @@ func defaultTarget(options *Options) (*TargetSpec, error) { "src/runtime/runtime_unix.c", "src/runtime/signal.c") case "linux": + spec.Scheduler = "threads" spec.Linker = "ld.lld" spec.RTLib = "compiler-rt" spec.Libc = "musl" @@ -418,9 +419,11 @@ func defaultTarget(options *Options) (*TargetSpec, error) { } spec.ExtraFiles = append(spec.ExtraFiles, "src/internal/futex/futex_linux.c", + "src/internal/task/task_threads.c", "src/runtime/runtime_unix.c", "src/runtime/signal.c") case "windows": + spec.Scheduler = "tasks" spec.Linker = "ld.lld" spec.Libc = "mingw-w64" // Note: using a medium code model, low image base and no ASLR diff --git a/src/internal/task/atomic-cooperative.go b/src/internal/task/atomic-cooperative.go index 60eb917a8e..bd4cba8956 100644 --- a/src/internal/task/atomic-cooperative.go +++ b/src/internal/task/atomic-cooperative.go @@ -1,3 +1,5 @@ +//go:build !scheduler.threads + package task // Atomics implementation for cooperative systems. The atomic types here aren't diff --git a/src/internal/task/atomic-preemptive.go b/src/internal/task/atomic-preemptive.go new file mode 100644 index 0000000000..275f36dce4 --- /dev/null +++ b/src/internal/task/atomic-preemptive.go @@ -0,0 +1,14 @@ +//go:build scheduler.threads + +package task + +// Atomics implementation for non-cooperative systems (multithreaded, etc). +// These atomic types use real atomic instructions. + +import "sync/atomic" + +type ( + Uintptr = atomic.Uintptr + Uint32 = atomic.Uint32 + Uint64 = atomic.Uint64 +) diff --git a/src/internal/task/futex-cooperative.go b/src/internal/task/futex-cooperative.go index 8351f88774..2a42c28d43 100644 --- a/src/internal/task/futex-cooperative.go +++ b/src/internal/task/futex-cooperative.go @@ -1,3 +1,5 @@ +//go:build !scheduler.threads + package task // A futex is a way for userspace to wait with the pointer as the key, and for diff --git a/src/internal/task/futex-preemptive.go b/src/internal/task/futex-preemptive.go new file mode 100644 index 0000000000..7f9e89580c --- /dev/null +++ b/src/internal/task/futex-preemptive.go @@ -0,0 +1,7 @@ +//go:build scheduler.threads + +package task + +import "internal/futex" + +type Futex = futex.Futex diff --git a/src/internal/task/linux.go b/src/internal/task/linux.go new file mode 100644 index 0000000000..7d28f708c4 --- /dev/null +++ b/src/internal/task/linux.go @@ -0,0 +1,9 @@ +//go:build linux && !baremetal + +package task + +import "unsafe" + +// Musl uses a pointer (or unsigned long for C++) so unsafe.Pointer should be +// fine. +type threadID unsafe.Pointer diff --git a/src/internal/task/mutex-cooperative.go b/src/internal/task/mutex-cooperative.go index e40966bed4..f1205eea25 100644 --- a/src/internal/task/mutex-cooperative.go +++ b/src/internal/task/mutex-cooperative.go @@ -1,3 +1,5 @@ +//go:build !scheduler.threads + package task type Mutex struct { diff --git a/src/internal/task/mutex-preemptive.go b/src/internal/task/mutex-preemptive.go new file mode 100644 index 0000000000..27f4646698 --- /dev/null +++ b/src/internal/task/mutex-preemptive.go @@ -0,0 +1,71 @@ +//go:build scheduler.threads + +package task + +// Futex-based mutex. +// This is largely based on the paper "Futexes are Tricky" by Ulrich Drepper. +// It describes a few ways to implement mutexes using a futex, and how some +// seemingly-obvious implementations don't exactly work as intended. +// Unfortunately, Go atomic operations work slightly differently so we can't +// copy the algorithm verbatim. +// +// The implementation works like this. The futex can have 3 different values, +// depending on the state: +// +// - 0: the futex is currently unlocked. +// - 1: the futex is locked, but is uncontended. There is one special case: if +// a contended futex is unlocked, it is set to 0. It is possible for another +// thread to lock the futex before the next waiter is woken. But because a +// waiter will be woken (if there is one), it will always change to 2 +// regardless. So this is not a problem. +// - 2: the futex is locked, and is contended. At least one thread is trying +// to obtain the lock (and is in the contended loop, see below). +// +// For the paper, see: +// https://dept-info.labri.fr/~denis/Enseignement/2008-IR/Articles/01-futex.pdf) + +type Mutex struct { + futex Futex +} + +func (m *Mutex) Lock() { + // Fast path: try to take an uncontended lock. + if m.futex.CompareAndSwap(0, 1) { + // We obtained the mutex. + return + } + + // The futex is contended, so we enter the contended loop. + // If we manage to change the futex from 0 to 2, we managed to take the + // lock. Else, we have to wait until a call to Unlock unlocks this mutex. + // (Unlock will wake one waiter when it finds the futex is set to 2 when + // unlocking). + for m.futex.Swap(2) != 0 { + // Wait until we get resumed in Unlock. + m.futex.Wait(2) + } +} + +func (m *Mutex) Unlock() { + if old := m.futex.Swap(0); old == 0 { + // Mutex wasn't locked before. + panic("sync: unlock of unlocked Mutex") + } else if old == 2 { + // Mutex was a contended lock, so we need to wake the next waiter. + m.futex.Wake() + } +} + +// TryLock tries to lock m and reports whether it succeeded. +// +// Note that while correct uses of TryLock do exist, they are rare, +// and use of TryLock is often a sign of a deeper problem +// in a particular use of mutexes. +func (m *Mutex) TryLock() bool { + // Fast path: try to take an uncontended lock. + if m.futex.CompareAndSwap(0, 1) { + // We obtained the mutex. + return true + } + return false +} diff --git a/src/internal/task/pmutex-cooperative.go b/src/internal/task/pmutex-cooperative.go index ae2aa4bad8..0e6c4f828b 100644 --- a/src/internal/task/pmutex-cooperative.go +++ b/src/internal/task/pmutex-cooperative.go @@ -1,3 +1,5 @@ +//go:build !scheduler.threads + package task // PMutex is a real mutex on systems that can be either preemptive or threaded, diff --git a/src/internal/task/pmutex-preemptive.go b/src/internal/task/pmutex-preemptive.go new file mode 100644 index 0000000000..10f0a63561 --- /dev/null +++ b/src/internal/task/pmutex-preemptive.go @@ -0,0 +1,11 @@ +//go:build scheduler.threads + +package task + +// PMutex is a real mutex on systems that can be either preemptive or threaded, +// and a dummy lock on other (purely cooperative) systems. +// +// It is mainly useful for short operations that need a lock when threading may +// be involved, but which do not need a lock with a purely cooperative +// scheduler. +type PMutex = Mutex diff --git a/src/internal/task/semaphore.go b/src/internal/task/semaphore.go new file mode 100644 index 0000000000..914f09bc5e --- /dev/null +++ b/src/internal/task/semaphore.go @@ -0,0 +1,32 @@ +package task + +// Barebones semaphore implementation. +// The main limitation is that if there are multiple waiters, a single Post() +// call won't do anything. Only when Post() has been called to awaken all +// waiters will the waiters proceed. +// This limitation is not a problem when there will only be a single waiter. +type Semaphore struct { + futex Futex +} + +// Post (unlock) the semaphore, incrementing the value in the semaphore. +func (s *Semaphore) Post() { + newValue := s.futex.Add(1) + if newValue == 0 { + s.futex.WakeAll() + } +} + +// Wait (lock) the semaphore, decrementing the value in the semaphore. +func (s *Semaphore) Wait() { + delta := int32(-1) + value := s.futex.Add(uint32(delta)) + for { + if int32(value) >= 0 { + // Semaphore unlocked! + return + } + s.futex.Wait(value) + value = s.futex.Load() + } +} diff --git a/src/internal/task/task_threads.c b/src/internal/task/task_threads.c new file mode 100644 index 0000000000..a14844f2ef --- /dev/null +++ b/src/internal/task/task_threads.c @@ -0,0 +1,104 @@ +//go:build none + +#define _GNU_SOURCE +#include +#include +#include +#include +#include + +// BDWGC also uses SIGRTMIN+6 on Linux, which seems like a reasonable choice. +#ifdef __linux__ +#define taskPauseSignal (SIGRTMIN + 6) +#endif + +// Pointer to the current task.Task structure. +// Ideally the entire task.Task structure would be a thread-local variable but +// this also works. +static __thread void *current_task; + +struct state_pass { + void *(*start)(void*); + void *args; + void *task; + uintptr_t *stackTop; + sem_t startlock; +}; + +// Handle the GC pause in Go. +void tinygo_task_gc_pause(int sig); + +// Initialize the main thread. +void tinygo_task_init(void *mainTask, pthread_t *thread, void *context) { + // Make sure the current task pointer is set correctly for the main + // goroutine as well. + current_task = mainTask; + + // Store the thread ID of the main thread. + *thread = pthread_self(); + + // Register the "GC pause" signal for the entire process. + // Using pthread_kill, we can still send the signal to a specific thread. + struct sigaction act = { 0 }; + act.sa_flags = SA_SIGINFO; + act.sa_handler = &tinygo_task_gc_pause; + sigaction(taskPauseSignal, &act, NULL); +} + +void tinygo_task_exited(void*); + +// Helper to start a goroutine while also storing the 'task' structure. +static void* start_wrapper(void *arg) { + struct state_pass *state = arg; + void *(*start)(void*) = state->start; + void *args = state->args; + current_task = state->task; + + // Save the current stack pointer in the goroutine state, for the GC. + int stackAddr; + *(state->stackTop) = (uintptr_t)(&stackAddr); + + // Notify the caller that the thread has successfully started and + // initialized. + sem_post(&state->startlock); + + // Run the goroutine function. + start(args); + + // Notify the Go side this thread will exit. + tinygo_task_exited(current_task); + + return NULL; +}; + +// Start a new goroutine in an OS thread. +int tinygo_task_start(uintptr_t fn, void *args, void *task, pthread_t *thread, uintptr_t *stackTop, void *context) { + // Sanity check. Should get optimized away. + if (sizeof(pthread_t) != sizeof(void*)) { + __builtin_trap(); + } + + struct state_pass state = { + .start = (void*)fn, + .args = args, + .task = task, + .stackTop = stackTop, + }; + sem_init(&state.startlock, 0, 0); + int result = pthread_create(thread, NULL, &start_wrapper, &state); + + // Wait until the thread has been created and read all state_pass variables. + sem_wait(&state.startlock); + + return result; +} + +// Return the current task (for task.Current()). +void* tinygo_task_current(void) { + return current_task; +} + +// Send a signal to cause the task to pause for the GC mark phase. +void tinygo_task_send_gc_signal(pthread_t thread) { + pthread_kill(thread, taskPauseSignal); +} diff --git a/src/internal/task/task_threads.go b/src/internal/task/task_threads.go new file mode 100644 index 0000000000..93204fb9ba --- /dev/null +++ b/src/internal/task/task_threads.go @@ -0,0 +1,265 @@ +//go:build scheduler.threads + +package task + +import ( + "sync/atomic" + "unsafe" +) + +// If true, print verbose debug logs. +const verbose = false + +// Scheduler-specific state. +type state struct { + // Goroutine ID. The number here is not really significant and after a while + // it could wrap around. But it is useful for debugging. + id uintptr + + // Thread ID, pthread_t or similar (typically implemented as a pointer). + thread threadID + + // Highest address of the stack. It is stored when the goroutine starts, and + // is needed to be able to scan the stack. + stackTop uintptr + + // Next task in the activeTasks queue. + QueueNext *Task + + // Semaphore to pause/resume the thread atomically. + pauseSem Semaphore + + // Semaphore used for stack scanning. + // We can't reuse pauseSem here since the thread might have been paused for + // other reasons (for example, because it was waiting on a channel). + gcSem Semaphore +} + +// Goroutine counter, starting at 0 for the main goroutine. +var goroutineID uintptr + +var mainTask Task + +// Queue of tasks (see QueueNext) that currently exist in the program. +var activeTasks = &mainTask +var activeTaskLock PMutex + +func OnSystemStack() bool { + runtimePanic("todo: task.OnSystemStack") + return false +} + +// Initialize the main goroutine state. Must be called by the runtime on +// startup, before starting any other goroutines. +func Init(sp uintptr) { + mainTask.state.stackTop = sp + tinygo_task_init(&mainTask, &mainTask.state.thread) +} + +// Return the task struct for the current thread. +func Current() *Task { + t := (*Task)(tinygo_task_current()) + if t == nil { + runtimePanic("unknown current task") + } + return t +} + +// Pause pauses the current task, until it is resumed by another task. +// It is possible that another task has called Resume() on the task before it +// hits Pause(), in which case the task won't be paused but continues +// immediately. +func Pause() { + // Wait until resumed + t := Current() + if verbose { + println("*** pause: ", t.state.id) + } + t.state.pauseSem.Wait() +} + +// Resume the given task. +// It is legal to resume a task before it gets paused, it means that the next +// call to Pause() won't pause but will continue immediately. This happens in +// practice sometimes in channel operations, where the Resume() might get called +// between the channel unlock and the call to Pause(). +func (t *Task) Resume() { + if verbose { + println("*** resume: ", t.state.id) + } + // Increment the semaphore counter. + // If the task is currently paused in Wait(), it will resume. + // If the task is not yet paused, the next call to Wait() will continue + // immediately. + t.state.pauseSem.Post() +} + +// Start a new OS thread. +func start(fn uintptr, args unsafe.Pointer, stackSize uintptr) { + t := &Task{} + t.state.id = atomic.AddUintptr(&goroutineID, 1) + if verbose { + println("*** start: ", t.state.id, "from", Current().state.id) + } + + // Start the new thread, and add it to the list of threads. + // Do this with a lock so that only started threads are part of the queue + // and the stop-the-world GC won't see threads that haven't started yet or + // are not fully started yet. + activeTaskLock.Lock() + errCode := tinygo_task_start(fn, args, t, &t.state.thread, &t.state.stackTop) + if errCode != 0 { + runtimePanic("could not start thread") + } + t.state.QueueNext = activeTasks + activeTasks = t + activeTaskLock.Unlock() +} + +//export tinygo_task_exited +func taskExited(t *Task) { + if verbose { + println("*** exit:", t.state.id) + } + + // Remove from the queue. + // TODO: this can be made more efficient by using a doubly linked list. + activeTaskLock.Lock() + found := false + for q := &activeTasks; *q != nil; q = &(*q).state.QueueNext { + if *q == t { + *q = t.state.QueueNext + found = true + break + } + } + activeTaskLock.Unlock() + + // Sanity check. + if !found { + runtimePanic("taskExited failed") + } +} + +// Futex to wait on until all tasks have finished scanning the stack. +// This is basically a sync.WaitGroup. +var scanDoneFutex Futex + +// GC scan phase. Because we need to stop the world while scanning, this kinda +// needs to be done in the tasks package. +func GCScan() { + current := Current() + + // Don't allow new goroutines to be started while pausing/resuming threads + // in the stop-the-world phase. + activeTaskLock.Lock() + + // Pause all other threads. + numOtherThreads := uint32(0) + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + numOtherThreads++ + tinygo_task_send_gc_signal(t.state.thread) + } + } + + // Store the number of threads to wait for in the futex. + // This is the equivalent of doing an initial wg.Add(numOtherThreads). + scanDoneFutex.Store(numOtherThreads) + + // Scan the current stack, and all current registers. + scanCurrentStack() + + // Wake each paused thread for the first time so it will scan the stack. + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + t.state.gcSem.Post() + } + } + + // Wait until all threads have finished scanning their stack. + // This is the equivalent of wg.Wait() + for { + val := scanDoneFutex.Load() + if val == 0 { + break + } + scanDoneFutex.Wait(val) + } + + // Scan all globals (implemented in the runtime). + gcScanGlobals() + + // Wake each paused thread for the second time, so they will resume normal + // operation. + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + t.state.gcSem.Post() + } + } + + // Allow goroutines to start and exit again. + activeTaskLock.Unlock() +} + +// Scan globals, implemented in the runtime package. +func gcScanGlobals() + +var stackScanLock PMutex + +//export tinygo_task_gc_pause +func tingyo_task_gc_pause() { + // Wait until we get the signal to start scanning the stack. + Current().state.gcSem.Wait() + + // Scan the thread stack. + // Only scan a single thread stack at a time, because the GC marking phase + // doesn't support parallelism. + // TODO: it may be possible to call markRoots directly (without saving + // registers) since we are in a signal handler that already saved a bunch of + // registers. This is an optimization left for a future time. + stackScanLock.Lock() + scanCurrentStack() + stackScanLock.Unlock() + + // Equivalent of wg.Done(): subtract one from the futex and if the result is + // 0 (meaning we were the last in the waitgroup), wake the waiting thread. + n := uint32(1) + if scanDoneFutex.Add(-n) == 0 { + scanDoneFutex.Wake() + } + + // Wait until we get the signal we can resume normally (after the mark phase + // has finished). + Current().state.gcSem.Wait() +} + +//go:export tinygo_scanCurrentStack +func scanCurrentStack() + +// Return the highest address of the current stack. +func StackTop() uintptr { + return Current().state.stackTop +} + +//go:linkname runtimePanic runtime.runtimePanic +func runtimePanic(msg string) + +// Using //go:linkname instead of //export so that we don't tell the compiler +// that the 't' parameter won't escape (because it will). +// +//go:linkname tinygo_task_init tinygo_task_init +func tinygo_task_init(t *Task, thread *threadID) + +// Here same as for tinygo_task_init. +// +//go:linkname tinygo_task_start tinygo_task_start +func tinygo_task_start(fn uintptr, args unsafe.Pointer, t *Task, thread *threadID, stackTop *uintptr) int32 + +// Pause the thread by sending it a signal. +// +//export tinygo_task_send_gc_signal +func tinygo_task_send_gc_signal(threadID) + +//export tinygo_task_current +func tinygo_task_current() unsafe.Pointer diff --git a/src/runtime/gc_blocks.go b/src/runtime/gc_blocks.go index d58bfd92a2..bce87ef271 100644 --- a/src/runtime/gc_blocks.go +++ b/src/runtime/gc_blocks.go @@ -57,6 +57,7 @@ var ( gcMallocs uint64 // total number of allocations gcFrees uint64 // total number of objects freed gcFreedBlocks uint64 // total number of freed blocks + gcLock task.PMutex // lock to avoid race conditions on multicore systems ) // zeroSizedAlloc is just a sentinel that gets returned when allocating 0 bytes. @@ -317,6 +318,10 @@ func alloc(size uintptr, layout unsafe.Pointer) unsafe.Pointer { runtimePanicAt(returnAddress(0), "heap alloc in interrupt") } + // Make sure there are no concurrent allocations. The heap is not currently + // designed for concurrent alloc/GC. + gcLock.Lock() + gcTotalAlloc += uint64(size) gcMallocs++ @@ -399,6 +404,9 @@ func alloc(size uintptr, layout unsafe.Pointer) unsafe.Pointer { i.setState(blockStateTail) } + // We've claimed this allocation, now we can unlock the heap. + gcLock.Unlock() + // Return a pointer to this allocation. pointer := thisAlloc.pointer() if preciseHeap { @@ -444,7 +452,9 @@ func free(ptr unsafe.Pointer) { // GC performs a garbage collection cycle. func GC() { + gcLock.Lock() runGC() + gcLock.Unlock() } // runGC performs a garbage collection cycle. It is the internal implementation @@ -456,8 +466,7 @@ func runGC() (freeBytes uintptr) { } // Mark phase: mark all reachable objects, recursively. - markStack() - findGlobals(markRoots) + gcMarkReachable() if baremetal && hasScheduler { // Channel operations in interrupts may move task pointers around while we are marking. @@ -714,6 +723,7 @@ func dumpHeap() { // The returned memory statistics are up to date as of the // call to ReadMemStats. This would not do GC implicitly for you. func ReadMemStats(m *MemStats) { + gcLock.Lock() m.HeapIdle = 0 m.HeapInuse = 0 for block := gcBlock(0); block < endBlock; block++ { @@ -733,6 +743,7 @@ func ReadMemStats(m *MemStats) { m.Sys = uint64(heapEnd - heapStart) m.HeapAlloc = (gcTotalBlocks - gcFreedBlocks) * uint64(bytesPerBlock) m.Alloc = m.HeapAlloc + gcLock.Unlock() } func SetFinalizer(obj interface{}, finalizer interface{}) { diff --git a/src/runtime/gc_stack_portable.go b/src/runtime/gc_stack_portable.go index d35e16e30c..750a34ec2c 100644 --- a/src/runtime/gc_stack_portable.go +++ b/src/runtime/gc_stack_portable.go @@ -8,6 +8,11 @@ import ( "unsafe" ) +func gcMarkReachable() { + markStack() + findGlobals(markRoots) +} + //go:extern runtime.stackChainStart var stackChainStart *stackChainObject diff --git a/src/runtime/gc_stack_raw.go b/src/runtime/gc_stack_raw.go index 5ee18622db..bdc3154fa5 100644 --- a/src/runtime/gc_stack_raw.go +++ b/src/runtime/gc_stack_raw.go @@ -1,9 +1,14 @@ -//go:build (gc.conservative || gc.precise) && !tinygo.wasm +//go:build (gc.conservative || gc.precise) && !tinygo.wasm && !scheduler.threads package runtime import "internal/task" +func gcMarkReachable() { + markStack() + findGlobals(markRoots) +} + // markStack marks all root pointers found on the stack. // // This implementation is conservative and relies on the stack top (provided by diff --git a/src/runtime/gc_stack_threads.go b/src/runtime/gc_stack_threads.go new file mode 100644 index 0000000000..9c77fa0c7b --- /dev/null +++ b/src/runtime/gc_stack_threads.go @@ -0,0 +1,25 @@ +//go:build scheduler.threads + +package runtime + +import "internal/task" + +func gcMarkReachable() { + task.GCScan() +} + +// Scan globals inside the stop-the-world phase. Called from the STW +// implementation in the internal/task package. +// +//go:linkname gcScanGlobals internal/task.gcScanGlobals +func gcScanGlobals() { + findGlobals(markRoots) +} + +// Function called from assembly with all registers pushed, to actually scan the +// stack. +// +//go:export tinygo_scanstack +func scanstack(sp uintptr) { + markRoots(sp, task.StackTop()) +} diff --git a/src/runtime/runtime_unix.go b/src/runtime/runtime_unix.go index 08e3e74269..17e004b2e8 100644 --- a/src/runtime/runtime_unix.go +++ b/src/runtime/runtime_unix.go @@ -73,6 +73,7 @@ type timespec struct { tv_nsec int64 // unsigned 64-bit integer on all time64 platforms } +// Highest address of the stack of the main thread. var stackTop uintptr // Entry point for Go. Initialize all packages and call main.main(). diff --git a/src/runtime/scheduler.go b/src/runtime/scheduler.go index 727c7f5f2c..7461c966ed 100644 --- a/src/runtime/scheduler.go +++ b/src/runtime/scheduler.go @@ -6,6 +6,8 @@ const schedulerDebug = false var mainExited bool +var timerQueue *timerNode + // Simple logging, for debugging. func scheduleLog(msg string) { if schedulerDebug { @@ -27,6 +29,34 @@ func scheduleLogChan(msg string, ch *channel, t *task.Task) { } } +func timerQueueAdd(tn *timerNode) { + q := &timerQueue + for ; *q != nil; q = &(*q).next { + if tn.whenTicks() < (*q).whenTicks() { + // this will finish earlier than the next - insert here + break + } + } + tn.next = *q + *q = tn +} + +func timerQueueRemove(t *timer) bool { + removedTimer := false + for q := &timerQueue; *q != nil; q = &(*q).next { + if (*q).timer == t { + scheduleLog("removed timer") + *q = (*q).next + removedTimer = true + break + } + } + if !removedTimer { + scheduleLog("did not remove timer") + } + return removedTimer +} + // Goexit terminates the currently running goroutine. No other goroutines are affected. func Goexit() { panicOrGoexit(nil, panicGoexit) diff --git a/src/runtime/scheduler_cooperative.go b/src/runtime/scheduler_cooperative.go index 91ba86409f..85c8f56f09 100644 --- a/src/runtime/scheduler_cooperative.go +++ b/src/runtime/scheduler_cooperative.go @@ -32,7 +32,6 @@ var ( runqueue task.Queue sleepQueue *task.Task sleepQueueBaseTime timeUnit - timerQueue *timerNode ) // deadlock is called when a goroutine cannot proceed any more, but is in theory @@ -100,36 +99,15 @@ func addSleepTask(t *task.Task, duration timeUnit) { // sleepQueue. func addTimer(tim *timerNode) { mask := interrupt.Disable() - - // Add to timer queue. - q := &timerQueue - for ; *q != nil; q = &(*q).next { - if tim.whenTicks() < (*q).whenTicks() { - // this will finish earlier than the next - insert here - break - } - } - tim.next = *q - *q = tim + timerQueueAdd(tim) interrupt.Restore(mask) } // removeTimer is the implementation of time.stopTimer. It removes a timer from // the timer queue, returning true if the timer is present in the timer queue. func removeTimer(tim *timer) bool { - removedTimer := false mask := interrupt.Disable() - for t := &timerQueue; *t != nil; t = &(*t).next { - if (*t).timer == tim { - scheduleLog("removed timer") - *t = (*t).next - removedTimer = true - break - } - } - if !removedTimer { - scheduleLog("did not remove timer") - } + removedTimer := timerQueueRemove(tim) interrupt.Restore(mask) return removedTimer } diff --git a/src/runtime/scheduler_threads.go b/src/runtime/scheduler_threads.go new file mode 100644 index 0000000000..e553a5b9c7 --- /dev/null +++ b/src/runtime/scheduler_threads.go @@ -0,0 +1,124 @@ +//go:build scheduler.threads + +package runtime + +import "internal/task" + +const hasScheduler = false // not using the cooperative scheduler + +// We use threads, so yes there is parallelism. +const hasParallelism = true + +var ( + timerQueueLock task.PMutex + timerQueueStarted bool + timerFutex task.Futex +) + +// Because we just use OS threads, we don't need to do anything special here. We +// can just initialize everything and run main.main on the main thread. +func run() { + initHeap() + task.Init(stackTop) + initAll() + callMain() +} + +// Pause the current task for a given time. +// +//go:linkname sleep time.Sleep +func sleep(duration int64) { + if duration <= 0 { + return + } + + sleepTicks(nanosecondsToTicks(duration)) +} + +func deadlock() { + // TODO: exit the thread via pthread_exit. + task.Pause() +} + +func scheduleTask(t *task.Task) { + t.Resume() +} + +func Gosched() { + // Each goroutine runs in a thread, so there's not much we can do here. + // There is sched_yield but it's only really intended for realtime + // operation, so is probably best not to use. +} + +// Separate goroutine (thread) that runs timer callbacks when they expire. +func timerRunner() { + for { + timerQueueLock.Lock() + + if timerQueue == nil { + // No timer in the queue, so wait until one becomes available. + val := timerFutex.Load() + timerQueueLock.Unlock() + timerFutex.Wait(val) + continue + } + + now := ticks() + if now < timerQueue.whenTicks() { + // There is a timer in the queue, but we need to wait until it + // expires. + // Using a futex, so that the wait is exited early when adding a new + // (sooner-to-expire) timer. + val := timerFutex.Load() + timerQueueLock.Unlock() + timeout := ticksToNanoseconds(timerQueue.whenTicks() - now) + timerFutex.WaitUntil(val, uint64(timeout)) + continue + } + + // Pop timer from queue. + tn := timerQueue + timerQueue = tn.next + tn.next = nil + + timerQueueLock.Unlock() + + // Run the callback stored in this timer node. + delay := ticksToNanoseconds(now - tn.whenTicks()) + tn.callback(tn, delay) + } +} + +func addTimer(tim *timerNode) { + timerQueueLock.Lock() + + if !timerQueueStarted { + timerQueueStarted = true + go timerRunner() + } + + timerQueueAdd(tim) + + timerFutex.Add(1) + timerFutex.Wake() + + timerQueueLock.Unlock() +} + +func removeTimer(tim *timer) bool { + timerQueueLock.Lock() + removed := timerQueueRemove(tim) + timerQueueLock.Unlock() + return removed +} + +func schedulerRunQueue() *task.Queue { + // This function is not actually used, it is only called when hasScheduler + // is true. So we can just return nil here. + return nil +} + +func runqueueForGC() *task.Queue { + // There is only a runqueue when using the cooperative scheduler. + return nil +} diff --git a/src/sync/mutex.go b/src/sync/mutex.go index 08c674d7ea..890af78606 100644 --- a/src/sync/mutex.go +++ b/src/sync/mutex.go @@ -6,131 +6,96 @@ import ( type Mutex = task.Mutex -type RWMutex struct { - // waitingWriters are all of the tasks waiting for write locks. - waitingWriters task.Stack - - // waitingReaders are all of the tasks waiting for a read lock. - waitingReaders task.Stack +//go:linkname runtimePanic runtime.runtimePanic +func runtimePanic(msg string) - // state is the current state of the RWMutex. - // Iff the mutex is completely unlocked, it contains rwMutexStateUnlocked (aka 0). - // Iff the mutex is write-locked, it contains rwMutexStateWLocked. - // While the mutex is read-locked, it contains the current number of readers. - state uint32 +type RWMutex struct { + // Reader count, with the number of readers that currently have read-locked + // this mutex. + // The value can be in two states: one where 0 means no readers and another + // where -rwMutexMaxReaders means no readers. A base of 0 is normal + // uncontended operation, a base of -rwMutexMaxReaders means a writer has + // the lock or is trying to get the lock. In the second case, readers should + // wait until the reader count becomes non-negative again to give the writer + // a chance to obtain the lock. + readers task.Futex + + // Writer futex, normally 0. If there is a writer waiting until all readers + // have unlocked, this value is 1. It will be changed to a 2 (and get a + // wake) when the last reader unlocks. + writer task.Futex + + // Writer lock. Held between Lock() and Unlock(). + writerLock Mutex } -const ( - rwMutexStateUnlocked = uint32(0) - rwMutexStateWLocked = ^uint32(0) - rwMutexMaxReaders = rwMutexStateWLocked - 1 -) +const rwMutexMaxReaders = 1 << 30 func (rw *RWMutex) Lock() { - if rw.state == 0 { - // The mutex is completely unlocked. - // Lock without waiting. - rw.state = rwMutexStateWLocked + // Exclusive lock for writers. + rw.writerLock.Lock() + + // Flag that we need to be awakened after the last read-lock unlocks. + rw.writer.Store(1) + + // Signal to readers that they can't lock this mutex anymore. + n := uint32(rwMutexMaxReaders) + waiting := rw.readers.Add(-n) + if int32(waiting) == -rwMutexMaxReaders { + // All readers were already unlocked, so we don't need to wait for them. + rw.writer.Store(0) return } - // Wait for the lock to be released. - rw.waitingWriters.Push(task.Current()) - task.Pause() + // There is at least one reader. + // Wait until all readers are unlocked. The last reader to unlock will set + // rw.writer to 2 and awaken us. + for rw.writer.Load() == 1 { + rw.writer.Wait(1) + } + rw.writer.Store(0) } func (rw *RWMutex) Unlock() { - switch rw.state { - case rwMutexStateWLocked: - // This is correct. - - case rwMutexStateUnlocked: - // The mutex is already unlocked. - panic("sync: unlock of unlocked RWMutex") - - default: - // The mutex is read-locked instead of write-locked. - panic("sync: write-unlock of read-locked RWMutex") + // Signal that new readers can lock this mutex. + waiting := rw.readers.Add(rwMutexMaxReaders) + if waiting != 0 { + // Awaken all waiting readers. + rw.readers.WakeAll() } - switch { - case rw.maybeUnblockReaders(): - // Switched over to read mode. - - case rw.maybeUnblockWriter(): - // Transferred to another writer. - - default: - // Nothing is waiting for the lock. - rw.state = rwMutexStateUnlocked - } + // Done with this lock (next writer can try to get a lock). + rw.writerLock.Unlock() } func (rw *RWMutex) RLock() { - if rw.state == rwMutexStateWLocked { - // Wait for the write lock to be released. - rw.waitingReaders.Push(task.Current()) - task.Pause() - return - } + // Add us as a reader. + newVal := rw.readers.Add(1) - if rw.state == rwMutexMaxReaders { - panic("sync: too many readers on RWMutex") + // Wait until the RWMutex is available for readers. + for int32(newVal) <= 0 { + rw.readers.Wait(newVal) + newVal = rw.readers.Load() } - - // Increase the reader count. - rw.state++ } func (rw *RWMutex) RUnlock() { - switch rw.state { - case rwMutexStateUnlocked: - // The mutex is already unlocked. - panic("sync: unlock of unlocked RWMutex") - - case rwMutexStateWLocked: - // The mutex is write-locked instead of read-locked. - panic("sync: read-unlock of write-locked RWMutex") - } - - rw.state-- + // Remove us as a reader. + one := uint32(1) + readers := int32(rw.readers.Add(-one)) - if rw.state == rwMutexStateUnlocked { - // This was the last reader. - // Try to unblock a writer. - rw.maybeUnblockWriter() + // Check whether RUnlock was called too often. + if readers == -1 || readers == (-rwMutexMaxReaders)-1 { + runtimePanic("sync: RUnlock of unlocked RWMutex") } -} -func (rw *RWMutex) maybeUnblockReaders() bool { - var n uint32 - for { - t := rw.waitingReaders.Pop() - if t == nil { - break + if readers == -rwMutexMaxReaders { + // This was the last read lock. Check whether we need to wake up a write + // lock. + if rw.writer.CompareAndSwap(1, 2) { + rw.writer.Wake() } - - n++ - scheduleTask(t) - } - if n == 0 { - return false } - - rw.state = n - return true -} - -func (rw *RWMutex) maybeUnblockWriter() bool { - t := rw.waitingWriters.Pop() - if t == nil { - return false - } - - rw.state = rwMutexStateWLocked - scheduleTask(t) - - return true } type Locker interface {