Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect more DB with validation results #102

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/credentialgateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ pub struct CredentialGateway {
receive_bitcoind_result_handler: Mutex<mpsc::UnboundedReceiver<BitcoindResult>>,

receive_events_gateway: Mutex<mpsc::UnboundedReceiver<ClientEvents>>,
send_validation_result_gateway: Mutex<mpsc::UnboundedSender<ClientEvents>>,

issuance_manager: IssuanceManager,
redemption_manager: RedemptionManager,
Expand All @@ -227,7 +228,7 @@ pub struct CredentialGateway {
}

impl CredentialGateway {
pub fn new(receive_credential_event_gateway: mpsc::UnboundedReceiver<ClientEvents>, send_credential_events_gateway: mpsc::UnboundedSender<ClientEvents>, send_bitcoind_request_gateway: mpsc::UnboundedSender<BitcoindRequest>, receive_bitcoind_result_gateway: mpsc::UnboundedReceiver<BitcoindResult>, receive_events_gateway: mpsc::UnboundedReceiver<ClientEvents>) -> Self {
pub fn new(receive_credential_event_gateway: mpsc::UnboundedReceiver<ClientEvents>, send_credential_events_gateway: mpsc::UnboundedSender<ClientEvents>, send_bitcoind_request_gateway: mpsc::UnboundedSender<BitcoindRequest>, receive_bitcoind_result_gateway: mpsc::UnboundedReceiver<BitcoindResult>, receive_events_gateway: mpsc::UnboundedReceiver<ClientEvents>, send_validation_result_gateway: mpsc::UnboundedSender<ClientEvents>) -> Self {
let bitcoind_client = BitcoindClient::new(String::new(), "0".to_string(), String::new(), String::new());
let secp_ctx = Secp256k1::new();

Expand Down Expand Up @@ -265,6 +266,7 @@ impl CredentialGateway {
send_bitcoind_request_gateway: Mutex::new(send_bitcoind_request_gateway),
receive_bitcoind_result_handler: Mutex::new(receive_bitcoind_result_gateway),
receive_events_gateway: Mutex::new(receive_events_gateway),
send_validation_result_gateway: Mutex::new(send_validation_result_gateway),
issuance_manager: issuance_manager,
redemption_manager: redemption_manager,
sec_key: secret_key,
Expand Down
47 changes: 32 additions & 15 deletions src/kindprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ pub struct NoteProcessor {
send_db_result_handler: TokioMutex<mpsc::UnboundedSender<ClientEvents>>,

receive_db_requests_manager: TokioMutex<mpsc::UnboundedReceiver<DbRequest>>,
receive_validation_dbrequests_manager: TokioMutex<mpsc::UnboundedReceiver<ClientEvents>>,

pending_write_db: HashMap<u64, Vec<(u64, Event)>>,

config: Config,
}

impl NoteProcessor {
pub fn new(receive_db_requests: mpsc::UnboundedReceiver<DbRequest>, receive_db_requests_manager: mpsc::UnboundedReceiver<DbRequest>, send_db_result_handler: mpsc::UnboundedSender<ClientEvents>, our_config: Config) -> Self {
pub fn new(receive_db_requests: mpsc::UnboundedReceiver<DbRequest>, receive_db_requests_manager: mpsc::UnboundedReceiver<DbRequest>, send_db_result_handler: mpsc::UnboundedSender<ClientEvents>, receive_validation_dbrequests_manager: mpsc::UnboundedReceiver<ClientEvents>, our_config: Config) -> Self {
NoteProcessor {
note_counters: Mutex::new(0),
current_height: 0,
Expand All @@ -59,6 +60,7 @@ impl NoteProcessor {
send_db_result_handler: TokioMutex::new(send_db_result_handler),

receive_db_requests_manager: TokioMutex::new(receive_db_requests_manager),
receive_validation_dbrequests_manager: TokioMutex::new(receive_validation_dbrequests_manager),

pending_write_db: HashMap::new(),

Expand Down Expand Up @@ -112,21 +114,36 @@ impl NoteProcessor {
}
}

//TODO: once service deliverance validation is okay write events to DB.
//let event_id = ev.id;
//if is_replaceable(&ev) {
// //TODO: build filter and replace event
// //TODO: If two events have the same timestamp, the event with the lowest id SHOULD be retained, and the other discarded
// let filter = Filter::new();
// if let Ok(old_ev) = query_events_db(filter) {
// //TODO: check if you should query for multiple replaced events
// write_new_event_db(ev, Some(old_ev)).await;
// }
//} else {
// let ret = write_new_event_db(ev, None).await;
// if ret { ok_events.push(event_id); }
//}
let mut paid_and_validated_events = Vec::new();
{
let mut receive_validation_dbrequests_manager_lock = self.receive_validation_dbrequests_manager.lock();
if let Ok(paid_and_validated_event) = receive_validation_dbrequests_manager_lock.await.try_recv() {
paid_and_validated_events.push(paid_and_validated_event);
}
}

for client_ev in paid_and_validated_events {
match client_ev {
ClientEvents::Credential { client_id, event } => {
//TODO: pair validation result with pending event to be written
//self.pending_write_db;
//let event_id = event.id;
//if is_replaceable(&event) {
// //TODO: build filter and replace event
// //TODO: If two events have the same timestamp, the event with the lowest id SHOULD be retained, and the other discarded
// let filter = Filter::new();
// if let Ok(old_ev) = query_events_db(filter) {
// //TODO: check if you should query for multiple replaced events
// write_new_event_db(event, Some(old_ev)).await;
// }
//} else {
// let ret = write_new_event_db(event, None).await;
// if ret { ok_events.push(event_id); }
//}
},
_ => {},
}
}

{
let mut receive_db_requests_manager_lock = self.receive_db_requests_manager.lock();
Expand Down
6 changes: 4 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,17 +411,19 @@ fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

let (send_events_gateway, receive_events_gateway) = mpsc::unbounded_channel::<ClientEvents>();

let (send_validation_result_gateway, receive_validation_result_dbrequests_manager) = mpsc::unbounded_channel::<ClientEvents>();

// The onion message handler...quite empty for now.
let onion_box = OnionBox::new();

// The noise peers handler...almost empty for now.
let noise_gateway = NoiseGateway::new(gateway_receive);

// The staking credentials handler...quite empty for now.
let mut credential_gateway = CredentialGateway::new(receive_credential_event_gateway, send_credential_events_gateway, send_bitcoind_request_gateway, receive_bitcoind_result_handler, receive_events_gateway);
let mut credential_gateway = CredentialGateway::new(receive_credential_event_gateway, send_credential_events_gateway, send_bitcoind_request_gateway, receive_bitcoind_result_handler, receive_events_gateway, send_validation_result_gateway);

// The note or service provider...quite empty for now.
let mut note_processor = NoteProcessor::new(processor_receive_dbrequests, receive_dbrequests_manager, send_db_result_handler, config.clone());
let mut note_processor = NoteProcessor::new(processor_receive_dbrequests, receive_dbrequests_manager, send_db_result_handler, receive_validation_result_dbrequests_manager, config.clone());

// The service provider signer...quite empty for now.
let node_signer = Arc::new(NodeSigner::new());
Expand Down