diff --git a/Cargo.toml b/Cargo.toml index 6b7ba72b..1579a69b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,7 +92,7 @@ tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } # reqwest = { version = "0.11", features = ["json"], default-features = false } async-raft-ext = "0.6.3" thiserror = "1.0.20" -clap = { version = "4.3", features = ["derive"] } +clap = { version = "4.5", features = ["derive"] } #inject bean_factory = "0.1.4" diff --git a/src/cli.rs b/src/cli.rs new file mode 100644 index 00000000..65605d11 --- /dev/null +++ b/src/cli.rs @@ -0,0 +1,25 @@ +use clap::{Args, Parser, Subcommand, ValueEnum}; + +/// A fictional versioning CLI +#[derive(Debug, Parser)] // requires `derive` feature +#[command(name = "git")] +#[command(about = "rnacos cli", long_about = None)] +pub struct Cli { + #[command(subcommand)] + pub command: Option, + /// env file path + #[arg(short, long, default_value = "")] + pub env_file: String, +} + +#[derive(Debug, Subcommand)] +pub enum Commands { + /// transfer middle data to sqlite + #[command(arg_required_else_help = true)] + DataToSqlite { + /// the transfer middle data file + file: String, + /// out to sqlite db file + out: String, + }, +} diff --git a/src/common/constant.rs b/src/common/constant.rs index 90db93ef..e0b29056 100644 --- a/src/common/constant.rs +++ b/src/common/constant.rs @@ -1,7 +1,5 @@ use std::sync::Arc; -pub const APP_VERSION: &str = "0.6.1"; - pub const EMPTY_STR: &str = ""; pub const HTTP_METHOD_GET: &str = "GET"; diff --git a/src/common/mod.rs b/src/common/mod.rs index e997b920..0f93f5d3 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -276,3 +276,7 @@ pub fn gen_uuid() -> i64 { ((msb << 32) | lsb) as i64 } + +pub fn get_app_version() -> &'static str { + env!("CARGO_PKG_VERSION") +} diff --git a/src/console/user_api.rs b/src/console/user_api.rs index dcc09418..40934b59 100644 --- a/src/console/user_api.rs +++ b/src/console/user_api.rs @@ -7,17 +7,17 @@ use actix_web::{ }; use serde::{Deserialize, Serialize}; +use super::model::user_model::{UpdateUserInfoParam, UserInfo, UserPageParams, UserPermissions}; +use crate::common::get_app_version; use crate::{ common::{ appdata::AppShareData, - constant::{APP_VERSION, EMPTY_STR}, + constant::EMPTY_STR, model::{ApiResult, PageResultOld, UserSession}, }, user::{model::UserDto, permission::UserRole, UserManagerReq, UserManagerResult}, }; -use super::model::user_model::{UpdateUserInfoParam, UserInfo, UserPageParams, UserPermissions}; - #[derive(Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct ResetPasswordParam { @@ -51,7 +51,7 @@ pub async fn get_user_web_resources(req: HttpRequest) -> actix_web::Result actix_web::Result Result<(), Box> { - init_env(); + let cli_opt = cli::Cli::parse(); + init_env(&cli_opt.env_file); let rust_log = std::env::var("RUST_LOG").unwrap_or("info".to_owned()); - println!("version:{}, RUST_LOG:{}", APP_VERSION, &rust_log); std::env::set_var("RUST_LOG", &rust_log); let sys_config = Arc::new(AppSysConfig::init_from_env()); - println!("data dir:{}", sys_config.local_db_dir); let timezone_fmt = Arc::new(TimeZoneFormatEnv::new( sys_config.gmt_fixed_offset_hours.map(|v| v * 60 * 60), Some(TimestampPrecision::Micros), @@ -66,6 +59,11 @@ async fn main() -> Result<(), Box> { env_logger::Builder::from_default_env() .format(move |buf, record| TimeZoneFormat::new(buf, &timezone_fmt).write(record)) .init(); + if let Some(cmd) = cli_opt.command { + return run_subcommand(cmd).await; + } + println!("version:{}, RUST_LOG:{}", get_app_version(), &rust_log); + println!("data dir:{}", sys_config.local_db_dir); let factory_data = config_factory(sys_config.clone()).await?; let app_data = build_share_data(factory_data.clone())?; let http_addr = sys_config.get_http_addr(); @@ -131,9 +129,7 @@ async fn main() -> Result<(), Box> { Ok(()) } -fn init_env() { - let app_opt = AppOpt::parse(); - let env_path = app_opt.env_file; +fn init_env(env_path: &str) { //let env_path = std::env::var("RNACOS_ENV_FILE").unwrap_or_default(); if env_path.is_empty() { dotenv::dotenv().ok(); @@ -142,6 +138,16 @@ fn init_env() { } } +async fn run_subcommand(commands: Commands) -> Result<(), Box> { + match commands { + Commands::DataToSqlite { file, out } => { + log::info!("middle data to sqlite, from:{file} to:{out}"); + data_to_sqlite(&file, &out).await?; + } + } + Ok(()) +} + async fn run_console_web(source_app_data: Arc) { let http_console_addr = source_app_data.sys_config.get_http_console_addr(); log::info!("new console server http addr:{}", &http_console_addr); diff --git a/src/transfer/data_to_sqlite.rs b/src/transfer/data_to_sqlite.rs new file mode 100644 index 00000000..8b77ce4e --- /dev/null +++ b/src/transfer/data_to_sqlite.rs @@ -0,0 +1,206 @@ +use crate::common::constant::{CONFIG_TREE_NAME, NAMESPACE_TREE_NAME, USER_TREE_NAME}; +use crate::config::core::{ConfigKey, ConfigValue}; +use crate::config::model::ConfigValueDO; +use crate::namespace::model::NamespaceDO; +use crate::transfer::model::TransferRecordRef; +use crate::transfer::reader::{reader_transfer_record, TransferFileReader}; +use crate::transfer::sqlite::dao::config::{ConfigDO, ConfigDao}; +use crate::transfer::sqlite::dao::config_history::{ConfigHistoryDO, ConfigHistoryDao}; +use crate::transfer::sqlite::dao::tenant::{TenantDO, TenantDao}; +use crate::transfer::sqlite::dao::user::{UserDO, UserDao}; +use crate::user::model::UserDo; +use rusqlite::Connection; + +#[derive(Debug, Default)] +pub struct TableSeq { + pub(crate) config_id: i64, + pub(crate) config_history_id: i64, + pub(crate) tenant_id: i64, + pub(crate) user_id: i64, +} + +impl TableSeq { + pub fn next_config_id(&mut self) -> i64 { + self.config_id += 1; + self.config_id + } + + pub fn next_config_history_id(&mut self) -> i64 { + self.config_history_id += 1; + self.config_history_id + } + + pub fn next_tenant_id(&mut self) -> i64 { + self.tenant_id += 1; + self.tenant_id + } + pub fn next_user_id(&mut self) -> i64 { + self.user_id += 1; + self.user_id + } +} + +pub async fn data_to_sqlite(data_file: &str, db_path: &str) -> anyhow::Result<()> { + let mut file_reader = TransferFileReader::new(data_file).await?; + let conn = open_init_db(db_path)?; + let mut config_count = 0; + let mut tenant_count = 0; + let mut user_count = 0; + let mut ignore = 0; + let mut table_seq = TableSeq::default(); + let config_dao = ConfigDao::new(&conn); + let config_history_dao = ConfigHistoryDao::new(&conn); + let user_dao = UserDao::new(&conn); + let tenant_dao = TenantDao::new(&conn); + while let Ok(Some(vec)) = file_reader.read_record_vec().await { + let record = reader_transfer_record(&vec, &file_reader.header)?; + if record.table_name.as_str() == CONFIG_TREE_NAME.as_str() { + config_count += 1; + insert_config(&mut table_seq, &config_dao, &config_history_dao, record)?; + } else if record.table_name.as_str() == NAMESPACE_TREE_NAME.as_str() { + tenant_count += 1; + insert_namespace(&mut table_seq, &tenant_dao, record)? + } else if record.table_name.as_str() == USER_TREE_NAME.as_str() { + user_count += 1; + insert_user(&mut table_seq, &user_dao, record)? + } else { + ignore += 1; + } + } + log::info!( + "transfer to sqlite db finished,config count:{},tenant count:{},use count:{},ignore count:{}", + config_count, + tenant_count, + user_count, + ignore + ); + Ok(()) +} + +fn insert_config( + table_seq: &mut TableSeq, + config_dao: &ConfigDao<'_>, + config_history_dao: &ConfigHistoryDao<'_>, + record: TransferRecordRef<'_>, +) -> anyhow::Result<()> { + let value_do = ConfigValueDO::from_bytes(&record.value)?; + let key = String::from_utf8_lossy(&record.key).to_string(); + let key: ConfigKey = (&key as &str).into(); + let config_value: ConfigValue = value_do.into(); + let config_do = ConfigDO { + id: Some(table_seq.next_config_id()), + data_id: Some(key.data_id.clone()), + group_id: Some(key.group.clone()), + tenant_id: Some(key.tenant.clone()), + content: Some(config_value.content.clone()), + config_type: config_value.config_type.clone(), + config_desc: config_value.desc, + last_time: Some(config_value.last_modified), + }; + config_dao.insert(&config_do)?; + for history_item in config_value.histories { + let history = ConfigHistoryDO { + id: Some(table_seq.next_config_history_id()), + data_id: Some(key.data_id.clone()), + group_id: Some(key.group.clone()), + tenant_id: Some(key.tenant.clone()), + content: Some(history_item.content.clone()), + config_type: None, + config_desc: None, + op_user: history_item.op_user.clone(), + last_time: Some(history_item.modified_time), + }; + config_history_dao.insert(&history)?; + } + Ok(()) +} + +fn insert_namespace( + table_seq: &mut TableSeq, + tenant_dao: &TenantDao<'_>, + record: TransferRecordRef<'_>, +) -> anyhow::Result<()> { + let value_do: NamespaceDO = NamespaceDO::from_bytes(&record.value)?; + let tenant_do = TenantDO { + id: Some(table_seq.next_tenant_id()), + tenant_id: value_do.namespace_id, + tenant_name: value_do.namespace_name, + tenant_desc: None, + create_flag: None, + }; + tenant_dao.insert(&tenant_do)?; + Ok(()) +} + +fn insert_user( + table_seq: &mut TableSeq, + user_dao: &UserDao<'_>, + record: TransferRecordRef<'_>, +) -> anyhow::Result<()> { + let value_do = UserDo::from_bytes(&record.value)?; + let user_do = UserDO { + id: Some(table_seq.next_user_id()), + username: Some(value_do.username), + nickname: Some(value_do.nickname), + password_hash: value_do.password_hash, + gmt_create: Some(value_do.gmt_create as i64), + gmt_modified: Some(value_do.gmt_modified as i64), + enabled: Some(value_do.enable.to_string()), + roles: Some(serde_json::to_string(&value_do.roles)?), + extend_info: Some(serde_json::to_string(&value_do.extend_info)?), + }; + user_dao.insert(&user_do)?; + Ok(()) +} + +pub fn open_init_db(db_path: &str) -> anyhow::Result { + let conn = Connection::open(db_path)?; + let create_table_sql = r" +create table if not exists tb_config( + id integer primary key autoincrement, + data_id text, + group_id text, + tenant_id text, + content text, + config_type text, + config_desc text, + last_time long +); +create index if not exists tb_config_key_idx on tb_config(data_id,group_id,tenant_id); + +create table if not exists tb_config_history( + id integer primary key autoincrement, + data_id text, + group_id text, + tenant_id text, + content text, + config_type text, + config_desc text, + op_user text, + last_time long +); +create index if not exists tb_config_history_key_idx on tb_config_history(data_id,group_id,tenant_id); + +create table if not exists tb_tenant( + id integer primary key autoincrement, + tenant_id text, + tenant_name text, + tenant_desc text, + create_flag integer +); + +create table if not exists tb_user( + id integer primary key autoincrement, + username text, + nickname text, + password_hash text, + gmt_create integer, + gmt_modified integer, + enabled text, + roles text, + extend_info text +); + "; + conn.execute_batch(create_table_sql)?; + Ok(conn) +} diff --git a/src/transfer/mod.rs b/src/transfer/mod.rs index f50f6342..79cca655 100644 --- a/src/transfer/mod.rs +++ b/src/transfer/mod.rs @@ -1,3 +1,4 @@ +pub mod data_to_sqlite; pub mod model; pub mod reader; pub mod sqlite; diff --git a/src/transfer/model.rs b/src/transfer/model.rs index 6ff73d93..bbddcf54 100644 --- a/src/transfer/model.rs +++ b/src/transfer/model.rs @@ -1,7 +1,7 @@ -///导入导出中间文件对象 -use crate::common::constant; use crate::common::pb::transfer::{TableNameMapEntity, TransferHeader, TransferItem}; use crate::common::tempfile::TempFile; +///导入导出中间文件对象 +use crate::common::{constant, get_app_version}; use crate::now_millis; use crate::transfer::writer::TransferWriterActor; use actix::{Addr, Message}; @@ -53,7 +53,7 @@ impl TransferHeaderDto { Self { version, modify_time: now_millis(), - from_sys: Some(format!("r-nacos_{}", constant::APP_VERSION)), + from_sys: Some(format!("r-nacos_{}", get_app_version())), name_to_id: Default::default(), id_to_name: Default::default(), max_id: 0, diff --git a/src/transfer/reader.rs b/src/transfer/reader.rs index a59f94d3..875d8900 100644 --- a/src/transfer/reader.rs +++ b/src/transfer/reader.rs @@ -2,7 +2,7 @@ use crate::common::constant::{ CACHE_TREE_NAME, CONFIG_TREE_NAME, EMPTY_ARC_STRING, NAMESPACE_TREE_NAME, USER_TREE_NAME, }; use crate::common::pb::transfer::{TransferHeader, TransferItem}; -use crate::common::protobuf_utils::MessageBufReader; +use crate::common::protobuf_utils::{FileMessageReader, MessageBufReader}; use crate::common::sequence_utils::CacheSequence; use crate::config::core::{ConfigActor, ConfigCmd, ConfigResult, ConfigValue}; use crate::config::model::ConfigValueDO; @@ -24,6 +24,37 @@ use binrw::BinReaderExt; use quick_protobuf::BytesReader; use std::io::Cursor; use std::sync::Arc; +use tokio::fs::OpenOptions; + +pub(crate) fn reader_transfer_record<'a>( + v: &'a [u8], + header: &'a TransferHeaderDto, +) -> anyhow::Result> { + let mut reader = BytesReader::from_bytes(v); + let record_do: TransferItem = reader.read_message(v)?; + let table_name = if record_do.table_id == 0 { + if CONFIG_TREE_NAME.as_str() == record_do.table_name.as_ref() { + CONFIG_TREE_NAME.clone() + } else if USER_TREE_NAME.as_str() == record_do.table_name.as_ref() { + USER_TREE_NAME.clone() + } else if CACHE_TREE_NAME.as_str() == record_do.table_name.as_ref() { + CACHE_TREE_NAME.clone() + } else if NAMESPACE_TREE_NAME.as_str() == record_do.table_name.as_ref() { + NAMESPACE_TREE_NAME.clone() + } else { + //ignore + EMPTY_ARC_STRING.clone() + } + } else { + header + .id_to_name + .get(&record_do.table_id) + .cloned() + .unwrap_or(EMPTY_ARC_STRING.clone()) + }; + let record = TransferRecordRef::new(table_name, record_do); + Ok(record) +} pub struct TransferReader { message_reader: MessageBufReader, @@ -55,29 +86,7 @@ impl TransferReader { pub fn read_record(&mut self) -> anyhow::Result> { if let Some(v) = self.message_reader.next_message_vec() { - let mut reader = BytesReader::from_bytes(v); - let record_do: TransferItem = reader.read_message(v)?; - let table_name = if record_do.table_id == 0 { - if CONFIG_TREE_NAME.as_str() == record_do.table_name.as_ref() { - CONFIG_TREE_NAME.clone() - } else if USER_TREE_NAME.as_str() == record_do.table_name.as_ref() { - USER_TREE_NAME.clone() - } else if CACHE_TREE_NAME.as_str() == record_do.table_name.as_ref() { - CACHE_TREE_NAME.clone() - } else if NAMESPACE_TREE_NAME.as_str() == record_do.table_name.as_ref() { - NAMESPACE_TREE_NAME.clone() - } else { - //ignore - EMPTY_ARC_STRING.clone() - } - } else { - self.header - .id_to_name - .get(&record_do.table_id) - .cloned() - .unwrap_or(EMPTY_ARC_STRING.clone()) - }; - let record = TransferRecordRef::new(table_name, record_do); + let record = reader_transfer_record(v, &self.header)?; Ok(Some(record)) } else { Ok(None) @@ -85,6 +94,36 @@ impl TransferReader { } } +pub struct TransferFileReader { + message_reader: FileMessageReader, + //prefix: TransferPrefix, + pub(crate) header: TransferHeaderDto, +} + +impl TransferFileReader { + pub async fn new(path: &str) -> anyhow::Result { + let file = OpenOptions::new().read(true).open(&path).await?; + let mut message_reader = FileMessageReader::new(file, 8); + message_reader.seek_start(8).await?; + let header = if let Ok(v) = message_reader.read_next().await { + let mut reader = BytesReader::from_bytes(&v); + let header_do: TransferHeader = reader.read_message(&v)?; + header_do.into() + } else { + return Err(anyhow::anyhow!("read header error from transfer file")); + }; + Ok(Self { + message_reader, + header, + }) + } + + pub async fn read_record_vec(&mut self) -> anyhow::Result>> { + let v = self.message_reader.read_next().await?; + Ok(Some(v)) + } +} + #[derive(Clone)] pub struct ConfigCacheSequence { seq: CacheSequence, diff --git a/src/transfer/sqlite/dao/config.rs b/src/transfer/sqlite/dao/config.rs index df82283d..b1980272 100644 --- a/src/transfer/sqlite/dao/config.rs +++ b/src/transfer/sqlite/dao/config.rs @@ -2,34 +2,34 @@ use rsql_builder::B; use rusqlite::{Connection, Row}; use serde::{Deserialize, Serialize}; -use std::rc::Rc; +use std::sync::Arc; use crate::common::rusqlite_utils::{ - get_row_value, sqlite_execute, sqlite_fetch, sqlite_fetch_count, + get_row_arc_value, get_row_value, sqlite_execute, sqlite_fetch, sqlite_fetch_count, }; #[derive(Debug, Default, Serialize, Deserialize)] pub struct ConfigDO { pub id: Option, - pub data_id: Option, - pub group_id: Option, - pub tenant_id: Option, - pub content: Option, - pub config_type: Option, - pub config_desc: Option, - pub last_time: Option, + pub data_id: Option>, + pub group_id: Option>, + pub tenant_id: Option>, + pub content: Option>, + pub config_type: Option>, + pub config_desc: Option>, + pub last_time: Option, } impl ConfigDO { fn from_row(r: &Row) -> Self { let mut s = Self::default(); s.id = get_row_value(r, "id"); - s.data_id = get_row_value(r, "data_id"); - s.group_id = get_row_value(r, "group_id"); - s.tenant_id = get_row_value(r, "tenant_id"); - s.content = get_row_value(r, "content"); - s.config_type = get_row_value(r, "config_type"); - s.config_desc = get_row_value(r, "config_desc"); + s.data_id = get_row_arc_value(r, "data_id"); + s.group_id = get_row_arc_value(r, "group_id"); + s.tenant_id = get_row_arc_value(r, "tenant_id"); + s.content = get_row_arc_value(r, "content"); + s.config_type = get_row_arc_value(r, "config_type"); + s.config_desc = get_row_arc_value(r, "config_desc"); s.last_time = get_row_value(r, "last_time"); s } @@ -148,13 +148,13 @@ impl ConfigSql { } } -pub struct ConfigDao { - conn: Rc, +pub struct ConfigDao<'a> { + conn: &'a Connection, inner: ConfigSql, } -impl ConfigDao { - pub fn new(conn: Rc) -> Self { +impl<'a> ConfigDao<'a> { + pub fn new(conn: &'a Connection) -> Self { Self { conn, inner: ConfigSql {}, diff --git a/src/transfer/sqlite/dao/config_history.rs b/src/transfer/sqlite/dao/config_history.rs index 748d9d4a..21ac9386 100644 --- a/src/transfer/sqlite/dao/config_history.rs +++ b/src/transfer/sqlite/dao/config_history.rs @@ -2,36 +2,36 @@ use rsql_builder::B; use rusqlite::{Connection, Row}; use serde::{Deserialize, Serialize}; -use std::rc::Rc; +use std::sync::Arc; use crate::common::rusqlite_utils::{ - get_row_value, sqlite_execute, sqlite_fetch, sqlite_fetch_count, + get_row_arc_value, get_row_value, sqlite_execute, sqlite_fetch, sqlite_fetch_count, }; #[derive(Debug, Default, Serialize, Deserialize)] pub struct ConfigHistoryDO { pub id: Option, - pub data_id: Option, - pub group_id: Option, - pub tenant_id: Option, - pub content: Option, + pub data_id: Option>, + pub group_id: Option>, + pub tenant_id: Option>, + pub content: Option>, pub config_type: Option, pub config_desc: Option, - pub op_user: Option, - pub last_time: Option, + pub op_user: Option>, + pub last_time: Option, } impl ConfigHistoryDO { fn from_row(r: &Row) -> Self { let mut s = Self::default(); s.id = get_row_value(r, "id"); - s.data_id = get_row_value(r, "data_id"); - s.group_id = get_row_value(r, "group_id"); - s.tenant_id = get_row_value(r, "tenant_id"); - s.content = get_row_value(r, "content"); + s.data_id = get_row_arc_value(r, "data_id"); + s.group_id = get_row_arc_value(r, "group_id"); + s.tenant_id = get_row_arc_value(r, "tenant_id"); + s.content = get_row_arc_value(r, "content"); s.config_type = get_row_value(r, "config_type"); s.config_desc = get_row_value(r, "config_desc"); - s.op_user = get_row_value(r, "op_user"); + s.op_user = get_row_arc_value(r, "op_user"); s.last_time = get_row_value(r, "last_time"); s } @@ -159,13 +159,13 @@ impl ConfigHistorySql { } } -pub struct ConfigHistoryDao { - conn: Rc, +pub struct ConfigHistoryDao<'a> { + conn: &'a Connection, inner: ConfigHistorySql, } -impl ConfigHistoryDao { - pub fn new(conn: Rc) -> Self { +impl<'a> ConfigHistoryDao<'a> { + pub fn new(conn: &'a Connection) -> Self { Self { conn, inner: ConfigHistorySql {}, diff --git a/src/transfer/sqlite/dao/tenant.rs b/src/transfer/sqlite/dao/tenant.rs index cb8fd032..2914e3fa 100644 --- a/src/transfer/sqlite/dao/tenant.rs +++ b/src/transfer/sqlite/dao/tenant.rs @@ -2,7 +2,6 @@ use rsql_builder::B; use rusqlite::{Connection, Row}; use serde::{Deserialize, Serialize}; -use std::rc::Rc; use crate::common::rusqlite_utils::{ get_row_value, sqlite_execute, sqlite_fetch, sqlite_fetch_count, @@ -123,13 +122,13 @@ impl TenantSql { } } -pub struct TenantDao { - conn: Rc, +pub struct TenantDao<'a> { + conn: &'a Connection, inner: TenantSql, } -impl TenantDao { - pub fn new(conn: Rc) -> Self { +impl<'a> TenantDao<'a> { + pub fn new(conn: &'a Connection) -> Self { Self { conn, inner: TenantSql {}, diff --git a/src/transfer/sqlite/dao/user.rs b/src/transfer/sqlite/dao/user.rs index f0a71eec..a0bb47ac 100644 --- a/src/transfer/sqlite/dao/user.rs +++ b/src/transfer/sqlite/dao/user.rs @@ -2,7 +2,6 @@ use rsql_builder::B; use rusqlite::{Connection, Row}; use serde::{Deserialize, Serialize}; -use std::rc::Rc; use crate::common::rusqlite_utils::{ get_row_value, sqlite_execute, sqlite_fetch, sqlite_fetch_count, @@ -157,13 +156,13 @@ impl UserSql { } } -pub struct UserDao { - conn: Rc, +pub struct UserDao<'a> { + conn: &'a Connection, inner: UserSql, } -impl UserDao { - pub fn new(conn: Rc) -> Self { +impl<'a> UserDao<'a> { + pub fn new(conn: &'a Connection) -> Self { Self { conn, inner: UserSql {},