Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for customized future executor #144

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

gaoqiangz
Copy link

Support to customize the runtime associated with spawn to solve the problem of across runtimes.
Scenario:
The connection connected by the runtime A, use by the runtime B, destroy A before B while connection are using will cause IO driver has terminated.

@djc
Copy link
Owner

djc commented Dec 6, 2022

What runtime are you using? Why do you want to use bb8 instead of another connection pool that already supports your runtime?

@gaoqiangz
Copy link
Author

Please see this example:

  • Cargo.toml
bb8 = "0.8.0"
bb8-tiberius = { version = "0.13.0", features = ["sql-browser"] }
  • main.rs
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let host = ".\\JQZ"; //local instance `JQZ`
    let conn_str = format!(
        "Server={host};Database=master;TrustServerCertificate=true;IntegratedSecurity=true;"
    );

    let mgr = bb8_tiberius::ConnectionManager::build(conn_str.as_str())?.using_named_connection();
    let pool = bb8::Pool::builder().max_size(1).build_unchecked(mgr);

    let (tx_connected, rx_connected) = tokio::sync::oneshot::channel();
    let thread_1 = std::thread::spawn({
        let pool = pool.clone();
        move || {
            let rt = tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap();
            rt.block_on(async {
                //connect in this runtime
                println!("thread_1 connecting");
                let conn = pool.get().await.unwrap();
                println!("thread_1 connected");
                tx_connected.send(());
                //make `thread_2` block at the `get` call
                tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
                //release `thread_2`
                println!("thread_1 release");
                drop(conn);
                tokio::time::sleep(tokio::time::Duration::from_secs(0)).await;
            });
            println!("thread_1 runtime dropping");
            //cause `thread_2` error
            drop(rt);
            println!("thread_1 runtime droped");
        }
    });

    let thread_2 = std::thread::spawn({
        let pool = pool.clone();
        move || {
            let rt = tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .build()
                .unwrap();
            rt.block_on(async {
                rx_connected.await;
                //use here
                println!("thread_2 getting connection");
                let mut conn = pool.get().await.unwrap();
                println!("thread_2 querying");
                //sleep 10 seconds, should have a error `IO driver has terminated`
                let e = conn.simple_query("WAITFOR DELAY '00:00:10'").await.err();
                println!("thread_2 queried: {e:?}");
            });
        }
    });

    thread_1.join().unwrap();
    println!("thread_1 exited");
    thread_2.join().unwrap();
    println!("thread_2 exited");

    Ok(())
}
  • log
thread_1 connecting
thread_1 connected
thread_2 getting connection
thread_1 release
thread_2 querying
thread_1 runtime dropping
thread_2 queried: Some(Io { kind: Other, message: "IO driver has terminated" })
thread_1 runtime droped
thread_1 exited
thread_2 exited

@djc
Copy link
Owner

djc commented Dec 7, 2022

Okay, so you want to use a per-thread single-threaded Tokio runtime. Could have just said so...

@gaoqiangz
Copy link
Author

Yes, there are multiple runtimes in our program,such as independent timer threads.

bb8/src/inner.rs Outdated Show resolved Hide resolved
bb8/src/internals.rs Show resolved Hide resolved
bb8/src/api.rs Outdated Show resolved Hide resolved
bb8/src/api.rs Outdated Show resolved Hide resolved
bb8/src/inner.rs Outdated Show resolved Hide resolved
gaoqiangz and others added 3 commits December 7, 2022 17:54
@@ -1,9 +1,9 @@
[package]
name = "bb8"
version = "0.8.0"
description = "Full-featured async (tokio-based) connection pool (like r2d2)"
version = "0.8.0-rc.1"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't be able to merge this PR with this stuff in it, please revert these changes.

struct TokioExecutor;

impl Executor for TokioExecutor {
fn execute(&self, fut: Pin<Box<dyn Future<Output = ()> + Send>>) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer to call this spawn().

@@ -15,6 +15,7 @@ where
M: ManageConnection + Send,
{
pub(crate) statics: Builder<M>,
pub(crate) executor: Box<dyn Executor>,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need a trait object? It would be nice if we didn't have to Box every future just to support this fairly niche use case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants