From f2d9c40bdb457f12577a2599e5cc2aae61f3172d Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 5 May 2022 22:17:21 -0700 Subject: [PATCH 1/2] For write version in transaction --- scripts/create_schema.sql | 1 + src/postgres_client.rs | 4 +++- .../postgres_client_transaction.rs | 21 +++++++++++++------ 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/scripts/create_schema.sql b/scripts/create_schema.sql index 9745276..3c1598e 100644 --- a/scripts/create_schema.sql +++ b/scripts/create_schema.sql @@ -162,6 +162,7 @@ CREATE TABLE transaction ( signatures BYTEA[], message_hash BYTEA, meta "TransactionStatusMeta", + write_version BIGINT, updated_on TIMESTAMP NOT NULL, CONSTRAINT transaction_pk PRIMARY KEY (slot, signature) ); diff --git a/src/postgres_client.rs b/src/postgres_client.rs index b9f22a4..51d36e2 100644 --- a/src/postgres_client.rs +++ b/src/postgres_client.rs @@ -27,7 +27,7 @@ use { std::{ collections::HashSet, sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicUsize, AtomicU64, Ordering}, Arc, Mutex, }, thread::{self, sleep, Builder, JoinHandle}, @@ -1032,6 +1032,7 @@ pub struct ParallelPostgresClient { initialized_worker_count: Arc, sender: Sender, last_report: AtomicInterval, + transaction_write_version: AtomicU64, } impl ParallelPostgresClient { @@ -1095,6 +1096,7 @@ impl ParallelPostgresClient { startup_done_count, initialized_worker_count, sender, + transaction_write_version: AtomicU64::default(), }) } diff --git a/src/postgres_client/postgres_client_transaction.rs b/src/postgres_client/postgres_client_transaction.rs index 48eaa9d..819c5a6 100644 --- a/src/postgres_client/postgres_client_transaction.rs +++ b/src/postgres_client/postgres_client_transaction.rs @@ -24,6 +24,7 @@ use { solana_transaction_status::{ InnerInstructions, Reward, TransactionStatusMeta, TransactionTokenBalance, }, + std::sync::atomic::Ordering, }; const MAX_TRANSACTION_STATUS_LEN: usize = 256; @@ -144,6 +145,8 @@ pub struct DbTransaction { pub message_hash: Vec, pub meta: DbTransactionStatusMeta, pub signatures: Vec>, + /// This can be used to tell the order of transaction within a block + pub write_version: i64, } pub struct LogTransactionRequest { @@ -474,7 +477,7 @@ impl From<&TransactionStatusMeta> for DbTransactionStatusMeta { } } -fn build_db_transaction(slot: u64, transaction_info: &ReplicaTransactionInfo) -> DbTransaction { +fn build_db_transaction(slot: u64, transaction_info: &ReplicaTransactionInfo, transaction_write_version: u64) -> DbTransaction { DbTransaction { signature: transaction_info.signature.as_ref().to_vec(), is_vote: transaction_info.is_vote, @@ -505,6 +508,7 @@ fn build_db_transaction(slot: u64, transaction_info: &ReplicaTransactionInfo) -> .as_ref() .to_vec(), meta: DbTransactionStatusMeta::from(transaction_info.transaction_status_meta), + write_version: transaction_write_version as i64, } } @@ -514,8 +518,8 @@ impl SimplePostgresClient { config: &GeyserPluginPostgresConfig, ) -> Result { let stmt = "INSERT INTO transaction AS txn (signature, is_vote, slot, message_type, legacy_message, \ - v0_loaded_message, signatures, message_hash, meta, updated_on) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) \ + v0_loaded_message, signatures, message_hash, meta, write_version, updated_on) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \ ON CONFLICT (slot, signature) DO UPDATE SET is_vote=excluded.is_vote, \ message_type=excluded.message_type, \ legacy_message=excluded.legacy_message, \ @@ -523,6 +527,7 @@ impl SimplePostgresClient { signatures=excluded.signatures, \ message_hash=excluded.message_hash, \ meta=excluded.meta, \ + write_version=excluded.write_version, \ updated_on=excluded.updated_on"; let stmt = client.prepare(stmt); @@ -562,6 +567,7 @@ impl SimplePostgresClient { &transaction_info.signatures, &transaction_info.message_hash, &transaction_info.meta, + &transaction_info.write_version, &updated_on, ], ); @@ -583,9 +589,10 @@ impl ParallelPostgresClient { fn build_transaction_request( slot: u64, transaction_info: &ReplicaTransactionInfo, + transaction_write_version: u64 ) -> LogTransactionRequest { LogTransactionRequest { - transaction_info: build_db_transaction(slot, transaction_info), + transaction_info: build_db_transaction(slot, transaction_info, transaction_write_version), } } @@ -594,9 +601,11 @@ impl ParallelPostgresClient { transaction_info: &ReplicaTransactionInfo, slot: u64, ) -> Result<(), GeyserPluginError> { + self.transaction_write_version.fetch_add(1, Ordering::Relaxed); let wrk_item = DbWorkItem::LogTransaction(Box::new(Self::build_transaction_request( slot, transaction_info, + self.transaction_write_version.load(Ordering::Relaxed) ))); if let Err(err) = self.sender.send(wrk_item) { @@ -1339,7 +1348,7 @@ pub(crate) mod tests { }; let slot = 54; - let db_transaction = build_db_transaction(slot, &transaction_info); + let db_transaction = build_db_transaction(slot, &transaction_info, 1); check_transaction(slot, &transaction_info, &db_transaction); } @@ -1383,7 +1392,7 @@ pub(crate) mod tests { }; let slot = 54; - let db_transaction = build_db_transaction(slot, &transaction_info); + let db_transaction = build_db_transaction(slot, &transaction_info, 1); check_transaction(slot, &transaction_info, &db_transaction); } } From 85e6c6c6a8993549507ee38c998061c0e14d3c01 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Fri, 6 May 2022 15:42:20 -0700 Subject: [PATCH 2/2] Updated documentations --- src/postgres_client.rs | 2 +- .../postgres_client_transaction.rs | 21 ++++++++++++++----- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/postgres_client.rs b/src/postgres_client.rs index 51d36e2..9cd001d 100644 --- a/src/postgres_client.rs +++ b/src/postgres_client.rs @@ -27,7 +27,7 @@ use { std::{ collections::HashSet, sync::{ - atomic::{AtomicBool, AtomicUsize, AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, }, thread::{self, sleep, Builder, JoinHandle}, diff --git a/src/postgres_client/postgres_client_transaction.rs b/src/postgres_client/postgres_client_transaction.rs index 819c5a6..a9c215c 100644 --- a/src/postgres_client/postgres_client_transaction.rs +++ b/src/postgres_client/postgres_client_transaction.rs @@ -146,6 +146,8 @@ pub struct DbTransaction { pub meta: DbTransactionStatusMeta, pub signatures: Vec>, /// This can be used to tell the order of transaction within a block + /// Given a slot, the transaction with a smaller write_version appears + /// before transactions with higher write_versions in a shred. pub write_version: i64, } @@ -477,7 +479,11 @@ impl From<&TransactionStatusMeta> for DbTransactionStatusMeta { } } -fn build_db_transaction(slot: u64, transaction_info: &ReplicaTransactionInfo, transaction_write_version: u64) -> DbTransaction { +fn build_db_transaction( + slot: u64, + transaction_info: &ReplicaTransactionInfo, + transaction_write_version: u64, +) -> DbTransaction { DbTransaction { signature: transaction_info.signature.as_ref().to_vec(), is_vote: transaction_info.is_vote, @@ -589,10 +595,14 @@ impl ParallelPostgresClient { fn build_transaction_request( slot: u64, transaction_info: &ReplicaTransactionInfo, - transaction_write_version: u64 + transaction_write_version: u64, ) -> LogTransactionRequest { LogTransactionRequest { - transaction_info: build_db_transaction(slot, transaction_info, transaction_write_version), + transaction_info: build_db_transaction( + slot, + transaction_info, + transaction_write_version, + ), } } @@ -601,11 +611,12 @@ impl ParallelPostgresClient { transaction_info: &ReplicaTransactionInfo, slot: u64, ) -> Result<(), GeyserPluginError> { - self.transaction_write_version.fetch_add(1, Ordering::Relaxed); + self.transaction_write_version + .fetch_add(1, Ordering::Relaxed); let wrk_item = DbWorkItem::LogTransaction(Box::new(Self::build_transaction_request( slot, transaction_info, - self.transaction_write_version.load(Ordering::Relaxed) + self.transaction_write_version.load(Ordering::Relaxed), ))); if let Err(err) = self.sender.send(wrk_item) {