Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate internal state manager #26

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion echo-library/src/boilerplate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ derive-enum-from-into = "0.1.1"
serde_derive = "1.0.192"
paste = "1.0.7"
dyn-clone = "1.0.7"
workflow_macro = "0.0.3"
workflow_macro = {git="https://github.com/HugoByte/aurras", branch = "feat/modify-workflow-macro-for-state-management", package = "workflow_macro"}
openwhisk-rust = "0.1.2"
serde_json = { version = "1.0", features = ["raw_value"] }
serde = { version = "1.0.192", features = ["derive"] }
Expand Down
51 changes: 37 additions & 14 deletions echo-library/src/boilerplate/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
#![allow(unused_imports)]
use paste::paste;
use super::*;
use alloc::task;
use paste::paste;
use workflow_macro::Flow;

#[derive(Debug, Flow)]
pub struct WorkflowGraph {
edges: Vec<(usize, usize)>,
nodes: Vec<Box<dyn Execute>>,
pub state_manager: StateManager,
}

impl WorkflowGraph {
pub fn new(size: usize) -> Self {
WorkflowGraph {
nodes: Vec::with_capacity(size),
edges: Vec::new(),
state_manager: StateManager::init(),
}
}
}
Expand All @@ -20,22 +25,34 @@ impl WorkflowGraph {
macro_rules! impl_execute_trait {
($ ($struct : ty), *) => {

paste!{
$( impl Execute for $struct {
fn execute(&mut self) -> Result<(),String>{
self.run()
}
paste!{$(
impl Execute for $struct {
fn execute(&mut self) -> Result<(),String>{
self.run()
}

fn get_task_output(&self) -> Value {
self.output().clone().into()
}
fn get_task_output(&self) -> Value {
self.output().clone().into()
}

fn set_output_to_task(&mut self, input: Value) {
self.setter(input)
}

fn get_action_name(&self) -> String{
self.action_name.clone()
}

fn get_json_string(&self) -> String{
serde_json::to_string(&self).unwrap()
}

fn set_result_output(&mut self, inp: Value) {
self.set_result_output(inp)
}

fn set_output_to_task(&mut self, input: Value) {
self.setter(input)
}
}
)*
}
)*}
};
}

Expand Down Expand Up @@ -65,6 +82,12 @@ pub unsafe extern "C" fn free_memory(ptr: *mut u8, size: u32, alignment: u32) {
extern "C" {
pub fn set_output(ptr: i32, size: i32);
}

#[link(wasm_import_module = "host")]
extern "C" {
pub fn set_state(ptr: i32, size: i32);
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Output {
pub result: Value,
Expand Down
2 changes: 2 additions & 0 deletions echo-library/src/boilerplate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use workflow_macro::Flow;
extern crate alloc;
use codec::{Decode, Encode};
use core::alloc::Layout;
mod state_manager;
use state_manager::*;

#[no_mangle]
pub fn _start(ptr: *mut u8, length: i32) {
Expand Down
61 changes: 31 additions & 30 deletions echo-library/src/boilerplate/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ macro_rules! make_input_struct {
[$($der:ident),*]
) => {
#[derive($($der),*)]
pub struct $x {
pub struct $x {
$(
$(#[serde(default=$default_derive)])?
$visibility $element: $ty
Expand Down Expand Up @@ -42,6 +42,11 @@ macro_rules! make_main_struct {
pub fn output(&self) -> Value {
self.$output_field.clone()
}

pub fn set_result_output(&mut self, inp: Value) {
self.$output_field = inp
}

}
}
}
Expand All @@ -61,7 +66,7 @@ macro_rules! impl_new {
..Default::default()
},
..Default::default()
}
}
}
}
};
Expand All @@ -79,7 +84,7 @@ macro_rules! impl_new {
..Default::default()
},
..Default::default()
}
}
}
}
}
Expand All @@ -106,48 +111,45 @@ macro_rules! impl_setter {
macro_rules! impl_map_setter {
(
$name:ty,
$element:ident : $key:expr,
$element:ident : $key:expr,
$typ_name : ty,
$out:expr
) => {
impl $name {
pub fn setter(&mut self, val: Value) {

let value = val.get($key).unwrap();
let value = serde_json::from_value::<Vec<$typ_name>>(value.clone()).unwrap();
let mut map: HashMap<_, _> = value
.iter()
.map(|x| {
self.input.$element = x.to_owned() as $typ_name;
self.run();
(x.to_owned(), self.output.get($out).unwrap().to_owned())
})
.collect();
self.mapout = to_value(map).unwrap();

let value = val.get($key).unwrap();
let value = serde_json::from_value::<Vec<$typ_name>>(value.clone()).unwrap();
let mut map: HashMap<_, _> = value
.iter()
.map(|x| {
self.input.$element = x.to_owned() as $typ_name;
self.run();
(x.to_owned(), self.output.get($out).unwrap().to_owned())
})
.collect();
self.mapout = to_value(map).unwrap();
}
}
}
}
};
}

#[macro_export]
macro_rules! impl_concat_setter {
(
$name:ty,
$input:ident
) => {
impl $name{
pub fn setter(&mut self, val: Value) {

let val: Vec<Value> = serde_json::from_value(val).unwrap();
let res = join_hashmap(
serde_json::from_value(val[0].to_owned()).unwrap(),
serde_json::from_value(val[1].to_owned()).unwrap(),
);
self.input.$input = res;
impl $name {
pub fn setter(&mut self, value: Value) {
let value: Vec<Value> = serde_json::from_value(value).unwrap();
let response = join_hashmap(
serde_json::from_value(value[0].to_owned()).unwrap(),
serde_json::from_value(value[1].to_owned()).unwrap(),
);
self.input.$input = response;
}
}
}
};
}

#[allow(unused)]
Expand Down Expand Up @@ -176,4 +178,3 @@ macro_rules! impl_combine_setter {
}
}
}

105 changes: 105 additions & 0 deletions echo-library/src/boilerplate/src/state_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use super::*;
use crate::WorkflowGraph;
use core::default;

#[derive(Debug, Serialize, Deserialize, Clone)]
enum ExecutionState {
Init,
Running,
Paused,
Failed,
Success,
}

impl Default for ExecutionState {
fn default() -> Self {
ExecutionState::Init
}
}

#[derive(Default, Debug)]
pub struct StateManager{
action_name: String,
task_index: isize,
execution_state: ExecutionState,
output: Option<Value>,
error: Option<String>,
}

impl StateManager {

fn update_state_data(&self) {
let state_data: serde_json::Value = serde_json::json!(
{
"action_name": self.action_name,
"task_index": self.task_index,
"execution_state": self.execution_state,
"output": self.output,
"error": self.error
}
);

let serialized = serde_json::to_vec(&state_data).unwrap();
let size = serialized.len() as i32;
let ptr = serialized.as_ptr();

std::mem::forget(ptr);

unsafe {
super::set_state(ptr as i32, size);
}
}

pub fn init() -> Self {
let state_data = StateManager {
action_name: "Initializing Workflow".to_string(),
execution_state: ExecutionState::Init,
output: None,
task_index: -1,
error: None,
};

state_data.update_state_data();
state_data
}

pub fn update_workflow_initialized(&mut self) {
self.execution_state = ExecutionState::Success;
self.task_index = -1;
self.error = None;
self.update_state_data();
}

pub fn update_running(&mut self, action_name: &str, task_index: isize) {
self.action_name = action_name.to_string();
self.task_index = task_index;
self.execution_state = ExecutionState::Running;
self.output = None;
self.update_state_data();
}

pub fn update_pause(&mut self) {
self.execution_state = ExecutionState::Paused;
self.update_state_data();
}

pub fn update_success(&mut self, output: Value) {
self.output = Some(output);
self.execution_state = ExecutionState::Success;
self.update_state_data();
}

pub fn update_restore_success(&mut self, action_name: &str, task_index: isize, output: Value) {
self.action_name = action_name.to_string();
self.task_index = task_index;
self.execution_state = ExecutionState::Success;
self.output = Some(output);
self.update_state_data();
}

pub fn update_err(&mut self, error: &str) {
self.execution_state = ExecutionState::Failed;
self.error = Some(error.to_string());
self.update_state_data();
}
}
10 changes: 6 additions & 4 deletions echo-library/src/boilerplate/src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@

use super::*;

pub trait Execute : Debug + DynClone {
fn execute(&mut self)-> Result<(),String>;
fn get_task_output(&self)->Value;
pub trait Execute: Debug + DynClone {
fn execute(&mut self) -> Result<(), String>;
fn get_task_output(&self) -> Value;
fn set_result_output(&mut self, inp: Value);
fn set_output_to_task(&mut self, inp: Value);
fn get_action_name(&self) -> String;
fn get_json_string(&self) -> String;
}

clone_trait_object!(Execute);
4 changes: 4 additions & 0 deletions echo-library/src/common/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const COMMON: &str = include_str!("../boilerplate/src/common.rs");
const LIB: &str = include_str!("../boilerplate/src/lib.rs");
const TRAIT: &str = include_str!("../boilerplate/src/traits.rs");
const MACROS: &str = include_str!("../boilerplate/src/macros.rs");
const STATE_MANAGER: &str = include_str!("../boilerplate/src/state_manager.rs");
const CARGO: &str = include_str!("../boilerplate/Cargo.toml");

#[derive(Debug, ProvidesStaticType, Default)]
Expand Down Expand Up @@ -136,6 +137,9 @@ impl Composer {
let temp_path = src_curr.as_path().join("macros.rs");
std::fs::write(temp_path, MACROS)?;

let temp_path = src_curr.as_path().join("state_manager.rs");
std::fs::write(temp_path, STATE_MANAGER)?;

let cargo_path = curr.join("Cargo.toml");
std::fs::write(cargo_path.clone(), CARGO)?;

Expand Down
Loading