Skip to content

Commit

Permalink
refactor(server): refactoring tanour server (#20)
Browse files Browse the repository at this point in the history
* refactor(server): refactoring tanour server

* wip: updating tanour server

* adding read_page and write_page apis

* exported functions argument as byte slice

* executing contract from server

* installing capnp for github action

* updating ChangeLog
  • Loading branch information
b00f authored Mar 5, 2023
1 parent 733be9d commit 4d7839a
Show file tree
Hide file tree
Showing 20 changed files with 233 additions and 371 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2

- name: Install CapnProto
run: sudo apt-get install capnproto

- name: Install Rust
run: |
rustup toolchain install nightly --component llvm-tools-preview rustfmt clippy
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Adding storage API to read from the storage file and write into storage file
- Updating Kelk version to 0.3.0 in test-contract
- Updating Wasmer version to 3.1.0
- Refactoring Tanour Server to call Tanour executor using CapnProto APIs

## 0.1.0

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[workspace]
members = [
"tanour",
# "tanour-server",
"tanour-server",
]
exclude = ["test-contract"]
16 changes: 11 additions & 5 deletions tanour-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tanour-server"
version = "0.1.0"
version = "0.2.0"
authors = ["Pactus blockchain <[email protected]>"]
edition = "2021"

Expand All @@ -14,13 +14,19 @@ path = "src/main.rs"
capnpc = { git = "https://github.com/capnproto/capnproto-rust" }

[dependencies]
capnp = { git = "https://github.com/capnproto/capnproto-rust" }
capnp = { git = "https://github.com/capnproto/capnproto-rust" }
capnp-rpc = { git = "https://github.com/capnproto/capnproto-rust" }
futures = "0.3.0"
tokio = { version = "0.2.0", features = ["time", "sync", "rt-util", "rt-core", "net", "macros"]}
tokio = { version = "0.2.0", features = [
"time",
"sync",
"rt-util",
"rt-core",
"net",
"macros",
] }
tokio-util = { version = "0.3.0", features = ["compat"] }
tanour = { path = "../tanour" }
primitive-types = "0.7.2"
tanour = { version = "0.2.0" }
async-std = "1.5.0"
log = "0.4"
simple_logger = "1.4.0"
76 changes: 76 additions & 0 deletions tanour-server/src/adaptor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use crate::tanour_capnp;

use log::debug;

use tanour::{blockchain_api::BlockchainAPI, Address};

unsafe impl Send for tanour_capnp::provider::Client {}

pub struct BlockchainAdaptor {
client: tanour_capnp::provider::Client,
}

impl BlockchainAdaptor {
pub fn new(client: tanour_capnp::provider::Client) -> Self {
BlockchainAdaptor { client }
}
}

impl BlockchainAPI for BlockchainAdaptor {
fn page_size(&self) -> Result<u32, tanour::error::Error> {
let req = self.client.page_size_request();

let handle = async move {
debug!("Try ot call `page_size` method in client");
let result = req.send().promise.await.unwrap(); //TODO: no unwrap
result.get().unwrap().get_size() //TODO: no unwrap
};

Ok(futures::executor::block_on(handle))
}

fn read_page(&self, page_no: u32) -> Result<Vec<u8>, tanour::error::Error> {
let mut req = self.client.read_page_request();
req.get().set_page_no(page_no);

let handle = async move {
debug!("Try ot call `read_page` method in client");
let result = req.send().promise.await.unwrap(); //TODO: no unwrap
result.get().unwrap().get_data().unwrap().to_vec() //TODO: no unwrap
};

Ok(futures::executor::block_on(handle)) //TODO: no unwrap
}

fn write_page(&self, page_no: u32, data: &[u8]) -> Result<(), tanour::error::Error> {
let mut req = self.client.write_page_request();
req.get().set_page_no(page_no);
req.get().set_data(data);

let handle = async move {
debug!("Try ot call `write_page` method in client");
let result = req.send().promise.await.unwrap(); //TODO: no unwrap
result.get().unwrap(); //TODO: no unwrap
};

futures::executor::block_on(handle);
Ok(()) //TODO: no unwrap
}

fn exist(&self, address: &Address) -> Result<bool, tanour::error::Error> {
let mut req = self.client.exists_request();
req.get().set_address(address);

let handle = async move {
debug!("Try ot call `exists` method in client");
let result = req.send().promise.await.unwrap(); //TODO: no unwrap
result.get().unwrap().get_exist() //TODO: no unwrap
};

Ok(futures::executor::block_on(handle))
}

fn current_block_number(&self) -> u32 {
todo!()
}
}
8 changes: 5 additions & 3 deletions tanour-server/src/build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@

fn main() {
::capnpc::CompilerCommand::new().file("tanour.capnp").run().unwrap();
}
::capnpc::CompilerCommand::new()
.file("tanour.capnp")
.run()
.unwrap();
}
111 changes: 39 additions & 72 deletions tanour-server/src/executor_impl.rs
Original file line number Diff line number Diff line change
@@ -1,109 +1,76 @@
use crate::provider_adaptor::ProviderAdaptor;
use crate::adaptor::BlockchainAdaptor;
use crate::tanour_capnp;
use crate::tanour_capnp::executor;
use capnp::capability::Promise;
use capnp::Error;
use capnp_rpc::pry;
use log::debug;
use primitive_types::{H256, U256};
use tanour::Address;
use tanour::address_from_bytes;
use tanour::contract::Params;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;

impl<'a> From<tanour_capnp::transaction::Reader<'a>>
for Result<tanour::transaction::Transaction, Error>
{
fn from(reader: tanour_capnp::transaction::Reader<'a>) -> Self {
let sender = Address::from_slice(reader.get_sender()?);
let value = U256::from_little_endian(reader.get_value()?);
let gas = U256::from_little_endian(reader.get_gas()?);
let gas_price = U256::from_little_endian(reader.get_gas_price()?);
let args = reader.get_args()?.to_vec();
let action = match reader.get_action().which()? {
tanour_capnp::transaction::action::Create(create) => {
let code = create.get_code()?.to_vec();
let salt = H256::from_slice(create.get_salt()?);
tanour::transaction::Action::Create(code, salt)
}
tanour_capnp::transaction::action::Call(call) => {
let address = Address::from_slice(call.get_address()?);
tanour::transaction::Action::Call(address)
}
};

Ok(tanour::transaction::Transaction {
sender: sender,
value: value,
gas: gas,
gas_price: gas_price,
action: action,
args: args,
})
}
}

pub struct ExecutorImpl {}

impl ExecutorImpl {
pub fn new() -> Self {
ExecutorImpl {}
}
}

unsafe impl Send for tanour_capnp::provider::Client {}
//unsafe impl Sync for tanour_capnp::provider::Client {}
pub struct ExecutorImpl;

// TODO: ??? why ???
#[allow(clippy::async_yields_async)]
impl executor::Server for ExecutorImpl {
fn execute(
&mut self,
params: executor::ExecuteParams,
mut results: executor::ExecuteResults,
) -> Promise<(), Error> {
let provider_client = pry!(pry!(params.get()).get_provider());
let transaction = pry!(pry!(pry!(params.get()).get_transaction()).into());
let (tx, mut rx) = oneshot::channel();

tokio::task::spawn(async move {
debug!("provider: {:?}", std::thread::current().id());
let mut adaptor = ProviderAdaptor::new(provider_client);

let result = tanour::execute::execute(&mut adaptor, &transaction).unwrap();

tx.send(result).unwrap();
tokio::task::spawn_local(async move {
let provider_client = pry!(pry!(params.get()).get_provider());
let transaction = pry!(pry!(params.get()).get_transaction());
let adaptor = BlockchainAdaptor::new(provider_client);
let msg = pry!(transaction.get_args());
let address = address_from_bytes(pry!(transaction.get_address()));
let code = pry!(transaction.get_code());
let params = Params {
memory_limit_page: 1000,
metering_limit: 11100,
};

let mut contract =
tanour::contract::Contract::new(Box::new(adaptor), &address, code, params).unwrap(); // TODO: no unwrap

let res = match pry!(transaction.get_action().which()) {
tanour_capnp::transaction::action::Instantiate(_) => {
contract.call_instantiate(msg).unwrap() // TODO: no unwrap
}
tanour_capnp::transaction::action::Process(_) => {
contract.call_process(msg).unwrap() // TODO: no unwrap
}
tanour_capnp::transaction::action::Query(_) => {
contract.call_query(msg).unwrap() // TODO: no unwrap
}
};

tx.send(res).unwrap(); // TODO: no unwrap
Promise::<(), Error>::ok(())
});
debug!("executor: {:?}", std::thread::current().id());

Promise::from_future(async move {
loop {
let msg = rx.try_recv();
match msg {
Err(TryRecvError::Empty) => {}
Err(e) => {
return Err(Error::failed(format!("{}", e)));
return Err(Error::failed(format!("{e}")));
}
Ok(result_data) => {
tokio::time::delay_for(std::time::Duration::from_millis(10 as u64)).await;

let mut tmp = Vec::new();
tmp.resize(32, 0);
tokio::time::delay_for(std::time::Duration::from_millis(10_u64)).await;

let mut builder = results.get().get_result_data().unwrap();

result_data.gas_left.to_little_endian(&mut tmp);
builder.set_gas_left(&tmp);
builder.set_data(&result_data.data);
builder.set_contract(&result_data.contract.as_bytes());

// TODO: Implement it later
//builder.set_logs();
builder.set_data(&result_data);

break;
}
};

//print!(".");
tokio::task::yield_now().await;
//tokio::time::delay_for(std::time::Duration::from_millis(10 as u64)).await;
tokio::task::yield_now().await
}

Ok(())
Expand Down
10 changes: 4 additions & 6 deletions tanour-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
pub mod tanour_capnp {
include!(concat!(env!("OUT_DIR"), "/tanour_capnp.rs"));
}
mod adaptor;
mod executor_impl;
mod provider_adaptor;

use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use tanour_capnp::executor;
use executor_impl::ExecutorImpl;
use futures::{AsyncReadExt, FutureExt, TryFutureExt};
use std::net::ToSocketAddrs;
use tanour_capnp::executor;
use tokio::net::TcpListener;

#[tokio::main]
Expand All @@ -30,7 +30,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
tokio::task::LocalSet::new()
.run_until(async move {
let mut listener = TcpListener::bind(&addr).await?;
let executor_impl = ExecutorImpl::new();
let executor_impl = ExecutorImpl {};
let executor: executor::Client = capnp_rpc::new_client(executor_impl);

loop {
Expand All @@ -47,9 +47,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {

let rpc_system = RpcSystem::new(Box::new(network), Some(executor.clone().client));
tokio::task::spawn_local(Box::pin(
rpc_system
.map_err(|e| println!("error: {:?}", e))
.map(|_| ()),
rpc_system.map_err(|e| println!("error: {e:?}")).map(|_| ()),
));
}
})
Expand Down
Loading

0 comments on commit 4d7839a

Please sign in to comment.