Skip to content

Commit

Permalink
improvement view_state and remove state limit (#145)
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii authored Dec 13, 2023
1 parent c1e12b8 commit 0154109
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 50 deletions.
5 changes: 4 additions & 1 deletion database/src/base/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ pub trait ReaderDbManager {
account_id: &near_primitives::types::AccountId,
block_height: near_primitives::types::BlockHeight,
key_data: readnode_primitives::StateKey,
) -> anyhow::Result<readnode_primitives::StateValue>;
) -> (
readnode_primitives::StateKey,
readnode_primitives::StateValue,
);

/// Returns the near_primitives::account::Account at the given block height
async fn get_account(
Expand Down
27 changes: 18 additions & 9 deletions database/src/postgres/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,28 @@ impl crate::ReaderDbManager for PostgresDBManager {
account_id: &near_primitives::types::AccountId,
block_height: near_primitives::types::BlockHeight,
key_data: readnode_primitives::StateKey,
) -> anyhow::Result<readnode_primitives::StateValue> {
let result = crate::models::StateChangesData::get_state_key_value(
Self::get_connection(&self.pg_pool).await?,
) -> (
readnode_primitives::StateKey,
readnode_primitives::StateValue,
) {
let connection = if let Ok(pg_connection) = Self::get_connection(&self.pg_pool).await {
pg_connection
} else {
return (key_data, readnode_primitives::StateValue::default());
};
let result = if let Ok(result) = crate::models::StateChangesData::get_state_key_value(
connection,
account_id,
block_height,
hex::encode(key_data),
hex::encode(key_data.clone()),
)
.await?;
if let Some(value) = result {
Ok(value)
.await
{
result.unwrap_or_default()
} else {
anyhow::bail!("State value not found")
}
readnode_primitives::StateValue::default()
};
(key_data, result)
}

async fn get_account(
Expand Down
47 changes: 31 additions & 16 deletions database/src/scylladb/rpc_server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::scylladb::ScyllaStorageManager;
use borsh::{BorshDeserialize, BorshSerialize};
use futures::StreamExt;
use num_traits::ToPrimitive;
use scylla::{prepared_statement::PreparedStatement, IntoTypedRows};
use std::convert::TryFrom;
Expand Down Expand Up @@ -169,16 +170,19 @@ impl crate::ReaderDbManager for ScyllaDBManager {
&self,
account_id: &near_primitives::types::AccountId,
) -> anyhow::Result<Vec<readnode_primitives::StateKey>> {
let result = Self::execute_prepared_query(
&self.scylla_session,
&self.get_all_state_keys,
(account_id.to_string(),),
)
.await?
.rows_typed::<(String,)>()?
.filter_map(|row| row.ok().and_then(|(value,)| hex::decode(value).ok()));

Ok(result.collect())
let mut paged_query = self.get_all_state_keys.clone();
paged_query.set_page_size(25000);
let mut rows_stream = self
.scylla_session
.execute_iter(paged_query, (account_id.to_string(),))
.await?
.into_typed::<(String,)>();
let mut stata_keys = vec![];
while let Some(next_row_res) = rows_stream.next().await {
let (value,): (String,) = next_row_res?;
stata_keys.push(hex::decode(value)?);
}
Ok(stata_keys)
}

/// Returns state keys for the given account id filtered by the given prefix
Expand Down Expand Up @@ -209,8 +213,11 @@ impl crate::ReaderDbManager for ScyllaDBManager {
account_id: &near_primitives::types::AccountId,
block_height: near_primitives::types::BlockHeight,
key_data: readnode_primitives::StateKey,
) -> anyhow::Result<readnode_primitives::StateValue> {
let result = Self::execute_prepared_query(
) -> (
readnode_primitives::StateKey,
readnode_primitives::StateValue,
) {
let value = match Self::execute_prepared_query(
&self.scylla_session,
&self.get_state_key_value,
(
Expand All @@ -219,10 +226,18 @@ impl crate::ReaderDbManager for ScyllaDBManager {
hex::encode(&key_data).to_string(),
),
)
.await?
.single_row_typed::<(readnode_primitives::StateValue,)>()?;

Ok(result.0)
.await
{
Ok(result) => {
let (value,) = result
.single_row_typed::<(readnode_primitives::StateValue,)>()
.unwrap_or_default();
value
}
Err(_) => readnode_primitives::StateValue::default(),
};

(key_data, value)
}

/// Returns the near_primitives::account::Account at the given block height
Expand Down
22 changes: 8 additions & 14 deletions rpc-server/src/modules/queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use std::collections::HashMap;
pub mod methods;
pub mod utils;

const MAX_LIMIT: u8 = 100;

pub type Result<T> = ::std::result::Result<T, near_vm_runner::logic::VMLogicError>;

pub struct CodeStorage {
Expand Down Expand Up @@ -69,14 +67,12 @@ impl near_vm_runner::logic::External for CodeStorage {
let get_db_data =
self.db_manager
.get_state_key_value(&self.account_id, self.block_height, key.to_vec());
match block_on(get_db_data) {
Ok(data) => Ok(if !data.is_empty() {
Some(Box::new(StorageValuePtr { value: data }) as Box<_>)
} else {
None
}),
Err(_) => Ok(None),
}
let (_, data) = block_on(get_db_data);
Ok(if !data.is_empty() {
Some(Box::new(StorageValuePtr { value: data }) as Box<_>)
} else {
None
})
}

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))]
Expand Down Expand Up @@ -109,10 +105,8 @@ impl near_vm_runner::logic::External for CodeStorage {
let get_db_state_keys =
self.db_manager
.get_state_key_value(&self.account_id, self.block_height, key.to_vec());
match block_on(get_db_state_keys) {
Ok(data) => Ok(!data.is_empty()),
Err(_) => Ok(false),
}
let (_, data) = block_on(get_db_state_keys);
Ok(!data.is_empty())
}

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))]
Expand Down
21 changes: 11 additions & 10 deletions rpc-server/src/modules/queries/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::task;

use crate::config::CompiledCodeCache;
use crate::errors::FunctionCallError;
use crate::modules::queries::{CodeStorage, MAX_LIMIT};
use crate::modules::queries::CodeStorage;

pub struct RunContractResponse {
pub result: Vec<u8>,
Expand Down Expand Up @@ -48,18 +48,19 @@ pub async fn get_state_keys_from_db(
};
match result {
Ok(state_keys) => {
for state_key in state_keys {
let state_value_result = db_manager
.get_state_key_value(account_id, block_height, state_key.clone())
.await;
if let Ok(state_value) = state_value_result {
for state_keys_chunk in state_keys.chunks(1000) {
// TODO: 1000 is hardcoded value. Make it configurable.
let mut tasks_futures = vec![];
for state_key in state_keys_chunk {
let state_value_result_future =
db_manager.get_state_key_value(account_id, block_height, state_key.clone());
tasks_futures.push(state_value_result_future);
}
let results = futures::future::join_all(tasks_futures).await;
for (state_key, state_value) in results.into_iter() {
if !state_value.is_empty() {
data.insert(state_key, state_value);
}
};
let keys_count = data.keys().len() as u8;
if keys_count > MAX_LIMIT {
return data;
}
}
data
Expand Down

0 comments on commit 0154109

Please sign in to comment.