Skip to content

Commit

Permalink
Merge branch 'master' into ff
Browse files Browse the repository at this point in the history
  • Loading branch information
kostekIV committed May 9, 2024
2 parents 972527f + 648a3c0 commit 25e3dcb
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches: [ "alephzero" ]
paths-ignore:
- '**/README.md'
- "**/README.md"
pull_request:
branches: [ "alephzero" ]

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ Subway is build with middleware pattern.
- Inject optional `defaultBlock` parameter to requests to ensure downstream middleware such as cache can work properly.
- Subscription
- Forward requests to upstream servers.
- TODO: Merge duplicated subscriptions.
- TODO: Rate Limit
- Merge duplicated subscriptions.
- Rate Limit
- Rate limit requests from downstream middleware.
- TODO: Parameter filter
- Deny requests with invalid parameters.
Expand Down
34 changes: 33 additions & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,13 @@ pub struct MiddlewaresConfig {
pub subscriptions: Vec<String>,
}

#[derive(Debug)]
#[derive(Debug, Validate)]
#[garde(allow_unvalidated)]
pub struct Config {
#[garde(dive)]
pub extensions: ExtensionsConfig,
pub middlewares: MiddlewaresConfig,
#[garde(dive)]
pub rpcs: RpcDefinitions,
}

Expand Down Expand Up @@ -227,9 +230,38 @@ fn validate_config(config: &Config) -> Result<(), anyhow::Error> {
} else if has_optional {
bail!("Method {} has required param after optional param", method.method);
}
(None, None) => env::var(&caps[1]),
_ => Err(env::VarError::NotPresent),
}
};

// replace every matches with early return
// when encountering error
for caps in re.captures_iter(templated_config_str) {
let m = caps
.get(0)
.expect("i==0 means implicit unnamed group that includes the entire match, which is infalliable");
config_str.push_str(&templated_config_str[last_match..m.start()]);
config_str.push_str(
&replacement(&caps).with_context(|| format!("Unable to replace environment variable {}", &caps[1]))?,
);
last_match = m.end();
}
config_str.push_str(&templated_config_str[last_match..]);
Ok(config_str)
}

pub async fn validate(config: &Config) -> Result<(), anyhow::Error> {
// validate use garde::Validate
config.validate(&())?;
// since endpoints connection test is async
// we can't intergrate it into garde::Validate
// and it's not a static validation like format, length, .etc
if let Some(client_config) = &config.extensions.client {
if !client_config.all_endpoints_can_be_connected().await {
anyhow::bail!("Unable to connect to all endpoints");
}
}
Ok(())
}

Expand Down
120 changes: 118 additions & 2 deletions src/config/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use garde::Validate;
use jsonrpsee::core::JsonValue;
use serde::Deserialize;

Expand All @@ -20,13 +21,15 @@ pub struct MethodParam {
pub inject: bool,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Validate, Debug)]
#[garde(allow_unvalidated)]
pub struct RpcMethod {
pub method: String,

#[serde(default)]
pub cache: Option<CacheParams>,

#[garde(custom(validate_params_with_name(&self.method)))]
#[serde(default)]
pub params: Vec<MethodParam>,

Expand All @@ -48,6 +51,31 @@ pub struct RpcMethod {
pub rate_limit_weight: u32,
}

fn validate_params_with_name(method_name: &str) -> impl FnOnce(&[MethodParam], &()) -> garde::Result + '_ {
move |params, _| {
// ensure each method has only one param with inject=true
if params.iter().filter(|x| x.inject).count() > 1 {
return Err(garde::Error::new(format!(
"method {} has more than one inject param",
method_name
)));
}
// ensure there is no required param after optional param
let mut has_optional = false;
for param in params {
if param.optional {
has_optional = true;
} else if has_optional {
return Err(garde::Error::new(format!(
"method {} has required param after optional param",
method_name
)));
}
}
Ok(())
}
}

fn default_rate_limit_weight() -> u32 {
1
}
Expand All @@ -71,11 +99,99 @@ pub struct RpcSubscription {
pub merge_strategy: Option<MergeStrategy>,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Validate, Debug)]
#[garde(allow_unvalidated)]
pub struct RpcDefinitions {
#[garde(dive)]
pub methods: Vec<RpcMethod>,
#[serde(default)]
pub subscriptions: Vec<RpcSubscription>,
#[serde(default)]
pub aliases: Vec<(String, String)>,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn validate_params_succeeds_for_valid_params() {
let valid_params = vec![
MethodParam {
name: "param1".to_string(),
ty: "u64".to_string(),
optional: false,
inject: false,
},
MethodParam {
name: "param2".to_string(),
ty: "u64".to_string(),
optional: true,
inject: false,
},
MethodParam {
name: "param3".to_string(),
ty: "u64".to_string(),
optional: true,
inject: false,
},
];
let method_name = "test";
let test_fn = validate_params_with_name(method_name);
assert!(test_fn(&valid_params, &()).is_ok());
}

#[test]
fn validate_params_fails_for_more_than_one_param_has_inject_equals_true() {
let another_invalid_params = vec![
MethodParam {
name: "param1".to_string(),
ty: "u64".to_string(),
optional: false,
inject: true,
},
MethodParam {
name: "param2".to_string(),
ty: "u64".to_string(),
optional: false,
inject: true,
},
MethodParam {
name: "param3".to_string(),
ty: "u64".to_string(),
optional: false,
inject: true,
},
];
let method_name = "test";
let test_fn = validate_params_with_name(method_name);
assert!(test_fn(&another_invalid_params, &()).is_err());
}

#[test]
fn validate_params_fails_for_optional_params_are_not_the_last() {
let method_name = "test";
let invalid_params = vec![
MethodParam {
name: "param1".to_string(),
ty: "u64".to_string(),
optional: false,
inject: false,
},
MethodParam {
name: "param2".to_string(),
ty: "u64".to_string(),
optional: true,
inject: false,
},
MethodParam {
name: "param3".to_string(),
ty: "u64".to_string(),
optional: false,
inject: true,
},
];
let test_fn = validate_params_with_name(method_name);
assert!(test_fn(&invalid_params, &()).is_err());
}
}
4 changes: 3 additions & 1 deletion src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ impl Drop for Client {
}
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Validate, Debug)]
#[garde(allow_unvalidated)]
pub struct ClientConfig {
#[garde(inner(custom(validate_endpoint)))]
pub endpoints: Vec<String>,
#[serde(default = "bool_true")]
pub shuffle_endpoints: bool,
Expand Down
9 changes: 6 additions & 3 deletions src/extensions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@ impl ExtensionRegistry {
macro_rules! define_all_extensions {
(
$(
$ext_name:ident: $ext_type:ty
$(#[$attr:meta])* $ext_name:ident: $ext_type:ty
),* $(,)?
) => {
#[derive(Deserialize, Debug, Default)]
use garde::Validate;
#[derive(Deserialize, Debug, Validate, Default)]
#[garde(allow_unvalidated)]
pub struct ExtensionsConfig {
$(
#[serde(default)]
$(#[$attr])*
pub $ext_name: Option<<$ext_type as Extension>::Config>,
)*
}
Expand Down Expand Up @@ -133,6 +135,7 @@ macro_rules! define_all_extensions {
define_all_extensions! {
telemetry: telemetry::Telemetry,
cache: cache::Cache,
#[garde(dive)]
client: client::Client,
merge_subscription: merge_subscription::MergeSubscription,
substrate_api: api::SubstrateApi,
Expand Down
7 changes: 7 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ async fn main() -> anyhow::Result<()> {
let config = subway::config::read_config(&cli.config)?;

subway::logger::enable_logger();
let cli = subway::cli::parse_args();
let config = subway::config::read_config(&cli.config)?;
tracing::trace!("{:#?}", config);
subway::config::validate(&config).await?;
// early return if we're just validating the config
if cli.is_validate() {
return Ok(());
}

let subway_server = subway::server::build(config).await?;
tracing::info!("Server running at {}", subway_server.addr);
Expand Down
57 changes: 57 additions & 0 deletions tests/configs/broken_endpoints.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
extensions:
client:
endpoints:
- wss://acala-rpc.dwellir.com
- wss://acala-rpc-0.aca-api.network
- wss://example.com
health_check:
interval_sec: 10 # check interval, default is 10s
healthy_response_time_ms: 500 # max response time to be considered healthy, default is 500ms
health_method: system_health
response: # response contains { isSyncing: false }
!contains
- - isSyncing
- !eq false
event_bus:
substrate_api:
stale_timeout_seconds: 180 # rotate endpoint if no new blocks for 3 minutes
telemetry:
provider: none
cache:
default_ttl_seconds: 60
default_size: 500
merge_subscription:
keep_alive_seconds: 60
server:
port: 9944
listen_address: '0.0.0.0'
max_connections: 2000
http_methods:
- path: /health
method: system_health
- path: /liveness
method: chain_getBlockHash
cors: all
rate_limit: # these are for demo purpose only, please adjust to your needs
connection: # 20 RPC requests per second per connection
burst: 20
period_secs: 1
ip: # 500 RPC requests per 10 seconds per ip
burst: 500
period_secs: 10
# use X-Forwarded-For header to get real ip, if available (e.g. behind a load balancer).
# WARNING: Use with caution, as this xff header can be forged.
use_xff: true # default is false

middlewares:
methods:
- delay
- response
- inject_params
- cache
- upstream
subscriptions:
- merge_subscription
- upstream

rpcs: substrate
56 changes: 56 additions & 0 deletions tests/configs/config_with_env.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
extensions:
client:
endpoints:
- wss://acala-rpc.dwellir.com
- wss://acala-rpc-0.aca-api.network
health_check:
interval_sec: 10 # check interval, default is 10s
healthy_response_time_ms: 500 # max response time to be considered healthy, default is 500ms
health_method: system_health
response: # response contains { isSyncing: false }
!contains
- - isSyncing
- !eq false
event_bus:
substrate_api:
stale_timeout_seconds: 180 # rotate endpoint if no new blocks for 3 minutes
telemetry:
provider: none
cache:
default_ttl_seconds: 60
default_size: 500
merge_subscription:
keep_alive_seconds: 60
server:
port: ${SUBWAY_PORT:-9944}
listen_address: '0.0.0.0'
max_connections: ${SUBWAY_MAX_CONNECTIONS:-2000}
http_methods:
- path: /health
method: system_health
- path: /liveness
method: chain_getBlockHash
cors: all
rate_limit: # these are for demo purpose only, please adjust to your needs
connection: # 20 RPC requests per second per connection
burst: 20
period_secs: 1
ip: # 500 RPC requests per 10 seconds per ip
burst: 500
period_secs: 10
# use X-Forwarded-For header to get real ip, if available (e.g. behind a load balancer).
# WARNING: Use with caution, as this xff header can be forged.
use_xff: true # default is false

middlewares:
methods:
- delay
- response
- inject_params
- cache
- upstream
subscriptions:
- merge_subscription
- upstream

rpcs: substrate

0 comments on commit 25e3dcb

Please sign in to comment.