Skip to content

Commit

Permalink
Corrections
Browse files Browse the repository at this point in the history
  • Loading branch information
tyranron committed Dec 30, 2024
1 parent 1d6dc13 commit b7783ee
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 35 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ All user visible changes to this project will be documented in this file. This p

### Fixed

- Futures executor being called from non-main thread of Dart platforms. ([#197])
- Futures executor being called from non-main thread on Dart platforms. ([#197])

### Upgraded

Expand Down
5 changes: 5 additions & 0 deletions flutter/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ All user visible changes to this project will be documented in this file. This p

See also [`medea-jason` crate `master` changes](/../../tree/master/CHANGELOG.md).

### Fixed

- Rust futures executor being called from non-main thread. ([#197])

### Upgraded

- Dependencies:
- [`flutter_rust_bridge`] to 2.7.0 version. ([#195])

[#195]: /../../pull/195
[#197]: /../../pull/197



Expand Down
11 changes: 5 additions & 6 deletions src/platform/dart/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,11 @@ pub unsafe extern "C" fn rust_executor_init(wake_port: Dart_Port) {
///
/// # Safety
///
/// - Valid [`Task`] pointer must be provided;
/// - Must be called on the same thread where [`Task`] was originally created.
/// Valid [`Task`] pointer must be provided.
///
/// # Panics
///
/// If called not on the same thread where [`Task`] was originally created.
/// If called not on the same thread where the [`Task`] was originally created.
#[no_mangle]
pub unsafe extern "C" fn rust_executor_poll_task(task: ptr::NonNull<Task>) {
propagate_panic(move || unsafe { Arc::from_raw(task.as_ptr()).poll() });
Expand All @@ -66,12 +65,12 @@ pub unsafe extern "C" fn rust_executor_poll_task(task: ptr::NonNull<Task>) {
/// [`WAKE_PORT`]. When received, Dart must poll it by calling the
/// [`rust_executor_poll_task()`] function.
///
/// # Safety
/// # Panics
///
/// - Dart-driven async [`Task`] executor must be initialized.
/// If Dart-driven async [`Task`] executor is not initialized.
fn task_wake(task: Arc<Task>) {
let wake_port = WAKE_PORT.load(atomic::Ordering::Acquire);
debug_assert!(wake_port > 0, "`WAKE_PORT` address must be initialized");
assert!(wake_port > 0, "`WAKE_PORT` address must be initialized");
let task = Arc::into_raw(task);

let mut task_addr = Dart_CObject {
Expand Down
38 changes: 21 additions & 17 deletions src/platform/dart/executor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,22 @@ pub struct Task {
/// [`Task`].
is_scheduled: AtomicBool,

/// Thread that this task was created on and must be polled on.
thread: ThreadId,
/// ID of the thread this [`Task`] was created on and must be polled on.
thread_id: ThreadId,
}

/// [`Task`] can be sent across threads safely because it ensures that the
/// underlying [`Future`] will only be touched from a single thread it was
/// created on.
unsafe impl Send for Task {}
/// [`Task`] can be shared across threads safely because it ensures that the
/// underlying [`Future`] will only be touched from a single thread it was
/// created on.
unsafe impl Sync for Task {}

impl ArcWake for Task {
/// Calls the `task_wake()` function by the provided reference if this
/// [`Task`] s incomplete and there are no [`Poll::Pending`] awake requests
/// already.
/// Commands an external Dart executor to poll this [`Task`] if it's
/// incomplete and there are no [`Poll::Pending`] awake requests already.
///
/// [`Poll::Pending`]: task::Poll::Pending
fn wake_by_ref(arc_self: &Arc<Self>) {
Expand All @@ -69,14 +77,15 @@ impl ArcWake for Task {
impl Task {
/// Spawns a new [`Task`] that will drive the given [`Future`].
///
/// Must be called on the same thread where [`Task`] will be polled.
/// Must be called on the same thread where the [`Task`] will be polled,
/// otherwise polling will panic.
///
/// [`Future`]: std::future::Future
pub fn spawn(future: LocalBoxFuture<'static, ()>) {
let this = Arc::new(Self {
inner: RefCell::new(None),
is_scheduled: AtomicBool::new(false),
thread: thread::current().id(),
thread_id: thread::current().id(),
});

let waker = task::waker(Arc::clone(&this));
Expand All @@ -91,15 +100,16 @@ impl Task {
///
/// # Panics
///
/// If called not on the same thread where [`Task`] was originally created.
/// If called not on the same thread where this [`Task`] was originally
/// created.
///
/// [`Future`]: std::future::Future
pub fn poll(&self) {
assert_eq!(
self.thread,
self.thread_id,
thread::current().id(),
"A Future can only be polled on the same thread where it was \
originally created."
"`dart::executor::Task` can only be polled on the same thread \
where it was originally created",
);

let mut borrow = self.inner.borrow_mut();
Expand All @@ -122,9 +132,3 @@ impl Task {
}
}
}

// `Task` can be sent across threads safely because it ensures that
// the underlying Future will only be touched from a single thread it
// was created on.
unsafe impl Send for Task {}
unsafe impl Sync for Task {}
14 changes: 7 additions & 7 deletions src/platform/dart/utils/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,19 @@ impl Callback {
// some gymnastics to reclaim Rust memory:
// Dart will call `callback_finalizer` when the object becomes
// unreachable. Since this callback might be called on a different
// thread we can't reclaim Rust side resources there. So
// `callback_finalizer` will signal the main thread to do actual
// thread we can't reclaim Rust side resources there. So,
// `callback_finalizer` will signal the main thread to do the actual
// memory reclamation.
let (finalizer_tx, finalizer_rx) = oneshot::channel::<()>();
unsafe {
_ = dart_api::new_finalizable_handle(
handle,
Box::into_raw(Box::new(finalizer_tx)).cast(),
// 128 is the approximate size of the channel and memory
// reclamation closure as of rustc 1.81 and futures
// v0.3.31. Ideally, it should be revisited occasionally,
// but it is okay for this value to be approximate since it
// works as a hint for the Dart's GC.
// `128` is the approximate size of the channel and memory
// reclamation closure as of Rust 1.81 and `futures` crate
// `0.3.31`. Ideally, it should be revisited occasionally,
// but it's OK for this value to be approximate, since it
// works only as a hint for the Dart's GC.
(size_of::<Self>() + 128) as libc::intptr_t,
Some(callback_finalizer),
);
Expand Down
6 changes: 2 additions & 4 deletions src/platform/dart/utils/dart_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,8 @@ pub unsafe fn post_c_object(
/// associated with the handle.
///
/// Once finalizable handle is collected by GC, the provided `callback` is
/// called.
///
/// The callback can be executed on any thread, will have an isolate group,
/// but will not have a current isolate.
/// called. It may be executed on any thread, will have an isolate group, but
/// won't have the current isolate.
///
/// `peer` argument will be provided to the `callback` on finalize.
///
Expand Down

0 comments on commit b7783ee

Please sign in to comment.