Skip to content

Commit

Permalink
Copy Phala crates
Browse files Browse the repository at this point in the history
  • Loading branch information
jasl committed Sep 3, 2021
1 parent c061137 commit 61a40ae
Show file tree
Hide file tree
Showing 438 changed files with 141,487 additions and 27 deletions.
393 changes: 366 additions & 27 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,22 @@ panic = "unwind"

[workspace]
resolver = "2"

exclude = [
"vendor/webpki",
"vendor/ring",
]

members = [
"node",
"parachains-common",
"crates/phala-types",
"crates/phala-mq",
"crates/phala-node-rpc-ext",
"crates/prpc",
"crates/phala-trie-storage",
"pallets/phala",
"pallets/phala/mq-runtime-api",
"pallets/bridge",
"pallets/bridge_transfer",
"pallets/parachain-info",
Expand Down
1 change: 1 addition & 0 deletions crates/phala-mq/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/Cargo.lock
21 changes: 21 additions & 0 deletions crates/phala-mq/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "phala-mq"
version = "0.1.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
hex = { version = "0.4.3", default-features = false }
derive_more = { version = "0.99", default-features = false, features = ["display"] }
parity-scale-codec = { version = "2.2", default-features = false, features = ["derive"] }
primitive-types = { version = "0.10", default-features = false, features = ["codec", "byteorder"] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.9", default-features = false }

spin = { version = "0.9", default-features = false, features = ["mutex", "use_ticket_mutex"], optional = true }

[features]
default = ["dispatcher", "queue", "signers"]
dispatcher = ["spin"]
queue = ["spin"]
signers = ["sp-core/full_crypto"]
168 changes: 168 additions & 0 deletions crates/phala-mq/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use core::marker::PhantomData;

use alloc::{collections::BTreeMap, vec::Vec};

use crate::simple_mpsc::{channel, ReceiveError, Receiver as RawReceiver, Sender, Seq};
use crate::types::{Message, Path};
use crate::{BindTopic, MessageOrigin};
use derive_more::Display;
use parity_scale_codec::{Decode, Error as CodecError};

impl Seq for (u64, Message) {
fn seq(&self) -> u64 {
self.0
}
}

#[derive(Default)]
pub struct MessageDispatcher {
subscribers: BTreeMap<Path, Vec<Sender<(u64, Message)>>>,
local_index: u64,
//match_subscribers: Vec<Matcher, Vec<Sender<Message>>>,
}

pub type Receiver<T> = RawReceiver<(u64, T)>;

impl MessageDispatcher {
pub fn new() -> Self {
MessageDispatcher {
subscribers: Default::default(),
local_index: 0,
}
}

/// Subscribe messages which are sent to `path`.
/// Returns a Receiver channel end.
pub fn subscribe(&mut self, path: impl Into<Path>) -> Receiver<Message> {
let (rx, tx) = channel();
let entry = self.subscribers.entry(path.into()).or_default();
entry.push(tx);
rx
}

/// Subscribe messages which implementing BindTopic
/// Returns a TypedReceiver channel end.
pub fn subscribe_bound<T: Decode + BindTopic>(&mut self) -> TypedReceiver<T> {
self.subscribe(<T as BindTopic>::topic()).into()
}

/// Dispatch a message.
/// Returns number of receivers dispatched to.
pub fn dispatch(&mut self, message: Message) -> usize {
let mut count = 0;
let sn = self.local_index;
self.local_index += 1;
if let Some(receivers) = self.subscribers.get_mut(message.destination.path()) {
receivers.retain(|receiver| {
if let Err(error) = receiver.send((sn, message.clone())) {
use crate::simple_mpsc::SendError::*;
match error {
ReceiverGone => false,
}
} else {
count += 1;
true
}
});
}
count
}

pub fn reset_local_index(&mut self) {
self.local_index = 0;
}

/// Drop all unhandled messages.
pub fn clear(&mut self) -> usize {
let mut count = 0;
for subscriber in self.subscribers.values_mut().flatten() {
count += subscriber.clear();
}
count
}
}

#[derive(Display, Debug)]
pub enum TypedReceiveError {
#[display(fmt = "All senders of the channel have gone")]
SenderGone,
#[display(fmt = "Decode message failed: {}", _0)]
CodecError(CodecError),
}

impl From<CodecError> for TypedReceiveError {
fn from(e: CodecError) -> Self {
Self::CodecError(e)
}
}

pub struct TypedReceiver<T> {
queue: Receiver<Message>,
_t: PhantomData<T>,
}

impl<T: Decode> TypedReceiver<T> {
pub fn try_next(&mut self) -> Result<Option<(u64, T, MessageOrigin)>, TypedReceiveError> {
let message = self.queue.try_next().map_err(|e| match e {
ReceiveError::SenderGone => TypedReceiveError::SenderGone,
})?;
let (sn, msg) = match message {
None => return Ok(None),
Some(m) => m,
};
let typed = Decode::decode(&mut &msg.payload[..])?;
Ok(Some((sn, typed, msg.sender)))
}

pub fn peek_ind(&self) -> Result<Option<u64>, ReceiveError> {
self.queue.peek_ind()
}
}

impl<T: Decode> From<Receiver<Message>> for TypedReceiver<T> {
fn from(queue: Receiver<Message>) -> Self {
Self {
queue,
_t: Default::default(),
}
}
}

#[macro_export]
macro_rules! select {
(
$( $bind:pat = $mq:expr => $block:expr, )+
) => {{
let mut min = None;
let mut min_ind = 0;
let mut ind = 0;
$({
match $mq.peek_ind() {
Ok(Some(sn)) => match min {
None => { min = Some(sn); min_ind = ind; }
Some(old) if sn < old => { min = Some(sn); min_ind = ind; }
_ => (),
},
Err(_) => { min = Some(0); min_ind = ind; }
_ => (),
}
ind += 1;
})+

let mut ind = 0;
let mut rv = None;
if min.is_some() {
$({
if min_ind == ind {
let msg = $mq.try_next().transpose();
rv = match msg {
Some($bind) => Some($block),
None => None
};
}
ind += 1;
})+
}
rv
}};
}
44 changes: 44 additions & 0 deletions crates/phala-mq/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#![no_std]

#[cfg(feature = "sgx")]
pub extern crate serde_sgx as serde;

extern crate alloc;

mod signer;
pub mod types;

#[cfg(feature = "dispatcher")]
mod dispatcher;
#[cfg(feature = "queue")]
mod send_queue;
#[cfg(any(feature = "queue", feature = "dispatcher"))]
mod simple_mpsc;

#[cfg(feature = "dispatcher")]
pub use dispatcher::{MessageDispatcher, TypedReceiveError, TypedReceiver};
#[cfg(feature = "queue")]
pub use send_queue::{MessageChannel, MessageSendQueue};
#[cfg(any(feature = "queue", feature = "dispatcher"))]
pub use simple_mpsc::{ReceiveError, Receiver};

pub use signer::MessageSigner;

pub use types::*;

// TODO.kevin: use std::sync::Mutex instead.
// See:
// https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html
// https://matklad.github.io/2020/01/04/mutexes-are-faster-than-spinlocks.html
#[cfg(any(feature = "queue", feature = "dispatcher"))]
use spin::mutex::Mutex;

#[cfg(all(feature = "queue", feature = "signers"))]
pub use alias::*;

#[cfg(all(feature = "queue", feature = "signers"))]
mod alias {
use super::*;
use sp_core::sr25519;
pub type Sr25519MessageChannel = MessageChannel<sr25519::Pair>;
}
Loading

0 comments on commit 61a40ae

Please sign in to comment.