Skip to content

Commit

Permalink
Merge pull request #103 from ariard/2023-11-finish-connect-db
Browse files Browse the repository at this point in the history
Add finish connect DB
  • Loading branch information
ariard authored Dec 1, 2023
2 parents f659c74 + efc45dc commit c4b0a20
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 37 deletions.
9 changes: 5 additions & 4 deletions src/clienthandler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ impl ClientHandler {
}
}
},
ClientEvents::Credential { client_id, event } => {
ClientEvents::Credential { client_id, deliverance_id, event } => {
if client_id == map_client_id {
let random_sub_id = SubscriptionId::generate();
let relay_message = RelayMessage::new_event(random_sub_id, event.clone());
Expand Down Expand Up @@ -400,16 +400,17 @@ impl ClientHandler {
}
let msg_2 = msg.clone();
if is_credential(&msg_2) {
self.deliverance_counter += 1;
println!("[CIVKITD] - NOSTR: credential msg received");
let credential = ClientEvents::Credential { client_id: id, event: *msg_2.clone() };
let credential = ClientEvents::Credential { client_id: id, deliverance_id: self.deliverance_counter, event: *msg_2.clone() };
query_credential_gateway.push(credential);
}
self.filter_events(*msg).await;
//TODO: we should link our filtering policy to our db storing,
//otherwise this is a severe DoS vector
//TODO: move is_ephemeral check when receive result from CredentialGateway is in NoteProcessor
//TODO: move is_ephemeral check when receive result from CredentialGateway is in NoteProcessor ?
if !is_ephemeral(&msg_2) {
self.deliverance_counter += 1;
//TODO: we reuse the self.deliverance_counter
let db_request = DbRequest::WriteEvent { client_id: id, deliverance_id: self.deliverance_counter, ev: *msg_2 };
write_db.push(db_request);
}
Expand Down
18 changes: 9 additions & 9 deletions src/credentialgateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ struct RedemptionManager {
}

impl RedemptionManager {
fn validate_service_deliverance(&mut self, client_id: u64, credential_msg_bytes: Vec<u8>, secret_key: &SecretKey) -> Result<Event, RedemptionError> {
fn validate_service_deliverance(&mut self, client_id: u64, deliverance_id: u64, credential_msg_bytes: Vec<u8>, secret_key: &SecretKey) -> Result<(u64, Event), RedemptionError> {

let secp_ctx = Secp256k1::new();

Expand Down Expand Up @@ -186,7 +186,7 @@ impl RedemptionManager {
let server_event_keys = Keys::generate();

if let Ok(credential_carrier) = EventBuilder::new_text_note("", tags).to_event(&server_event_keys) {
return Ok(credential_carrier);
return Ok((deliverance_id, credential_carrier));
}
Err(RedemptionError::EventGenerationError)
}
Expand Down Expand Up @@ -317,7 +317,7 @@ impl CredentialGateway {
//let mut deliverance_result_queue = Vec::new();
for event in credential_queue {
match event {
ClientEvents::Credential { client_id, event } => {
ClientEvents::Credential { client_id, deliverance_id, event } => {
if let Ok((credential_type, credential_msg_bytes)) = self.get_credential_bytes_and_type(event) {
match credential_type {
//TODO: decode and check the exact credential requested from client
Expand All @@ -334,10 +334,10 @@ impl CredentialGateway {
},
1 => { println!("[CIVKITD] - CREDENTIAL event error: gateway should not receive CredentialAuthenticationResult"); },
3 => {
match self.redemption_manager.validate_service_deliverance(client_id, credential_msg_bytes, &self.sec_key) {
match self.redemption_manager.validate_service_deliverance(client_id, deliverance_id, credential_msg_bytes, &self.sec_key) {
Ok(result) => {
println!("[CIVKITD] - CREDENTIAL: service deliverance validation result");
redemption_result.push(result);
redemption_result.push((client_id, result));
},
Err(error) => {
println!("[CIVKITD - CREDENTIAL: authentication request error {:?}", error);
Expand Down Expand Up @@ -383,14 +383,14 @@ impl CredentialGateway {
{
for (client_id, event) in authentication_result_queue {
let mut send_credential_lock = self.send_credential_events_gateway.lock();
send_credential_lock.await.send(ClientEvents::Credential { client_id, event: event });
send_credential_lock.await.send(ClientEvents::Credential { client_id, deliverance_id: 0, event: event });
}
}

{
for result in redemption_result {
let mut send_credential_lock = self.send_credential_events_gateway.lock();
//TODO: send back event
for (client_id, result) in redemption_result {
let mut send_validation_result_gateway_lock = self.send_validation_result_gateway.lock();
send_validation_result_gateway_lock.await.send(ClientEvents::Credential { client_id, deliverance_id: result.0, event: result.1 });
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ pub enum ClientEvents {
SubscribedEvent { client_id: u64, sub_id: SubscriptionId, event: Event },
OkEvent { event_id: EventId, ret: bool, msg: Option<String> },
ServiceRegistration { pubkey: PublicKey, credential_policy: CredentialPolicy, service_policy: ServicePolicy },
Credential { client_id: u64, event: Event },
Credential { client_id: u64, deliverance_id: u64, event: Event },
ValidationResult { client_id: u64, deliverance_id: u64, event: Event },
}

#[derive(Debug)]
Expand Down
44 changes: 21 additions & 23 deletions src/kindprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,7 @@ impl NoteProcessor {
let mut receive_db_requests_lock = self.receive_db_requests.lock();
if let Ok(db_request) = receive_db_requests_lock.await.try_recv() {
match db_request {
DbRequest::WriteEvent { client_id, deliverance_id, ev } => {
if let Some(write_queue) = self.pending_write_db.get_mut(&client_id) {
write_queue.push((deliverance_id, ev));
} else {
self.pending_write_db.insert(client_id, vec![(deliverance_id, ev)]);
}
},
DbRequest::WriteEvent { client_id, deliverance_id, ev } => { self.pending_write_db.insert(client_id, vec![(deliverance_id, ev)]); },
DbRequest::WriteSub(ns) => { write_new_subscription_db(ns); },
DbRequest::WriteClient(ct) => { write_new_client_db(ct).await; },
DbRequest::ReplayEvents { client_id, filters } => { replay_request.push((client_id, filters)); },
Expand All @@ -124,22 +118,26 @@ impl NoteProcessor {

for client_ev in paid_and_validated_events {
match client_ev {
ClientEvents::Credential { client_id, event } => {
//TODO: pair validation result with pending event to be written
//self.pending_write_db;
//let event_id = event.id;
//if is_replaceable(&event) {
// //TODO: build filter and replace event
// //TODO: If two events have the same timestamp, the event with the lowest id SHOULD be retained, and the other discarded
// let filter = Filter::new();
// if let Ok(old_ev) = query_events_db(filter) {
// //TODO: check if you should query for multiple replaced events
// write_new_event_db(event, Some(old_ev)).await;
// }
//} else {
// let ret = write_new_event_db(event, None).await;
// if ret { ok_events.push(event_id); }
//}
ClientEvents::ValidationResult { client_id, deliverance_id, event } => {
if let Some(queue_events) = self.pending_write_db.get(&client_id) {
for queue_event in queue_events {
if queue_event.0 == deliverance_id {
let event_id = event.id;
if is_replaceable(&event) {
//TODO: build filter and replace event
//TODO: If two events have the same timestamp, the event with the lowest id SHOULD be retained, and the other discarded
let filter = Filter::new();
if let Ok(old_ev) = query_events_db(filter) {
//TODO: check if you should query for multiple replaced events
write_new_event_db(event.clone(), Some(old_ev)).await;
}
} else {
let ret = write_new_event_db(event.clone(), None).await;
if ret { ok_events.push(event_id); }
}
}
}
}
},
_ => {},
}
Expand Down

0 comments on commit c4b0a20

Please sign in to comment.