-
Notifications
You must be signed in to change notification settings - Fork 290
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Write the dag relationship in batch #4304
Conversation
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThe changes in this pull request primarily enhance database operations and worker management within the system. Key modifications include the removal of the Changes
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (8)
storage/src/batch/mod.rs (2)
60-64
: Consider adding validation and documentation for WriteBatchDataWhile the structure is well-designed, consider these improvements:
- Add documentation explaining the purpose and usage of this structure
- Consider validating the column name (e.g., non-empty, format constraints)
#[derive(Debug, Default, Clone)] +/// WriteBatchData represents a batch of write operations for a specific column family +/// in the storage system. pub struct WriteBatchData { + /// The name of the column family. Must not be empty. pub column: String, + /// The batch of write operations to be performed pub row_data: WriteBatch, }
66-69
: Consider enhancing WriteBatchWithColumn with additional functionalityThe structure could benefit from these architectural improvements:
- Add methods for common operations (add_batch, clear, etc.)
- Consider implementing size limits to prevent memory issues
- Add builder pattern for easier construction
Example implementation:
impl WriteBatchWithColumn { /// Maximum number of batches to prevent memory issues const MAX_BATCHES: usize = 1000; /// Adds a new batch for a column family pub fn add_batch(&mut self, column: String, batch: WriteBatch) -> Result<()> { if self.data.len() >= Self::MAX_BATCHES { return Err(anyhow::anyhow!("Exceeded maximum number of batches")); } self.data.push(WriteBatchData { column, row_data: batch }); Ok(()) } /// Clears all batches pub fn clear(&mut self) { self.data.clear(); } }storage/src/cache_storage/mod.rs (1)
94-118
: Implementation looks good with room for optimizationThe implementation correctly handles column-based batch writes while maintaining consistency with existing patterns. Consider these performance optimizations:
- Preallocate the final vector with an estimated capacity
- Avoid intermediate vector collection in the
flat_map
operationfn write_batch_with_column(&self, batch: WriteBatchWithColumn) -> Result<()> { + // Estimate total capacity to avoid reallocations + let total_size: usize = batch.data.iter().map(|data| data.row_data.rows.len()).sum(); + let mut rows = Vec::with_capacity(total_size); + + for data in batch.data { + rows.extend( + data.row_data + .rows + .into_iter() + .map(|(k, v)| (compose_key(Some(&data.column), k), v)) + ); + } - let rows = batch - .data - .into_iter() - .flat_map(|data| { - data.row_data - .rows - .iter() - .cloned() - .map(|(k, v)| (compose_key(Some(&data.column), k), v)) - .collect::<Vec<_>>() - }) - .collect(); let batch = WriteBatch { rows };storage/src/db_storage/mod.rs (1)
417-439
: Consider enhancing metrics with column-specific informationThe implementation correctly handles batch writes across multiple columns in a single transaction. However, the metrics could be more detailed.
Consider updating the metrics to include column-specific information:
-record_metrics( - "db", - "write_batch_column", - "write_batch", - self.metrics.as_ref(), -) +batch.data.iter().for_each(|data| { + record_metrics( + "db", + &data.column, + "write_batch", + self.metrics.as_ref(), + ); +});storage/src/storage.rs (2)
43-43
: Add documentation for the new trait method.Consider adding documentation comments to explain the purpose, parameters, and behavior of the
write_batch_with_column
method. This will help maintain consistency with other trait methods and improve code maintainability.+ /// Writes a batch of operations with column family information. + /// + /// # Arguments + /// * `batch` - A batch of write operations with their associated column families + /// + /// # Returns + /// * `Result<()>` - Ok if the batch was written successfully, Err otherwise fn write_batch_with_column(&self, batch: WriteBatchWithColumn) -> Result<()>;
207-216
: Consider improving error handling and atomicity.The implementation has two potential areas for improvement:
- Error messages could be more descriptive for better debugging.
- In the CacheAndDb case, a failure in cache write after a successful DB write could leave the system in an inconsistent state.
Consider applying these improvements:
fn write_batch_with_column(&self, batch: WriteBatchWithColumn) -> Result<()> { match self { - Self::CACHE { cache } => cache.write_batch_with_column(batch), - Self::DB { db } => db.write_batch_with_column(batch), + Self::CACHE { cache } => cache.write_batch_with_column(batch) + .map_err(|e| format_err!("Failed to write batch to cache: {}", e)), + Self::DB { db } => db.write_batch_with_column(batch) + .map_err(|e| format_err!("Failed to write batch to db: {}", e)), Self::CacheAndDb { cache, db } => { - db.write_batch_with_column(batch.clone())?; - cache.write_batch_with_column(batch) + db.write_batch_with_column(batch.clone()) + .map_err(|e| format_err!("Failed to write batch to db: {}", e))?; + cache.write_batch_with_column(batch) + .map_err(|e| { + // Consider adding a compensating transaction or logging mechanism + warn!("Failed to write batch to cache after successful db write: {}", e); + e + }) } } }storage/src/tests/test_storage.rs (2)
680-789
: Consider refactoring duplicated code in thematch
armsThe code within each arm of the
match
statement from lines 680 to 789 is largely identical, differing only in the data source (cache
,db
, or both). To improve maintainability and reduce code duplication, consider refactoring the common logic into a helper function that takes the data source as a parameter.Here's how you might refactor the code:
fn verify_blocks( storage: &impl InnerStore, block1: &Block, block2: &Block, dag_block1: &DagSyncBlock, dag_block2: &DagSyncBlock, ) -> Result<()> { let read_block1 = Block::decode( &storage .get(BLOCK_PREFIX_NAME, block1.id().encode()?)? .expect("failed to get the block"), )?; assert_eq!(read_block1, *block1); let read_block2 = Block::decode( &storage .get(BLOCK_PREFIX_NAME, block2.id().encode()?)? .expect("failed to get the block"), )?; assert_eq!(read_block2, *block2); let read_dag_block1 = DagSyncBlock::decode( &storage .get(DAG_SYNC_BLOCK_PREFIX_NAME, dag_block1.block.id().encode()?)? .expect("failed to get the dag block"), )?; assert_eq!(read_dag_block1, *dag_block1); let read_dag_block2 = DagSyncBlock::decode( &storage .get(DAG_SYNC_BLOCK_PREFIX_NAME, dag_block2.block.id().encode()?)? .expect("failed to get the dag block"), )?; assert_eq!(read_dag_block2, *dag_block2); Ok(()) } match instance { StorageInstance::CACHE { cache } => { verify_blocks(cache, &block1, &block2, &dag_block1, &dag_block2)?; } StorageInstance::DB { db } => { verify_blocks(db, &block1, &block2, &dag_block1, &dag_block2)?; } StorageInstance::CacheAndDb { cache, db } => { verify_blocks(cache, &block1, &block2, &dag_block1, &dag_block2)?; verify_blocks(db, &block1, &block2, &dag_block1, &dag_block2)?; } }
793-821
: Add documentation comments to new test functionsConsider adding Rust documentation comments (
///
) to the new test functionstest_batch_write_for_cache_and_db
,test_batch_write_for_db
, andtest_batch_write_for_cache
. This will enhance code readability and provide clarity on the purpose and functionality of each test.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (6)
flexidag/src/consensusdb/consensus_relations.rs
(2 hunks)storage/src/batch/mod.rs
(1 hunks)storage/src/cache_storage/mod.rs
(2 hunks)storage/src/db_storage/mod.rs
(2 hunks)storage/src/storage.rs
(3 hunks)storage/src/tests/test_storage.rs
(2 hunks)
🔇 Additional comments (10)
storage/src/batch/mod.rs (1)
59-69
: Verify batch processing implementation across the codebase
Let's verify the implementation and usage patterns of these new structures.
✅ Verification successful
Based on the results, I can now generate the final response:
Implementation of WriteBatchWithColumn is consistent and well-integrated
The structure is properly integrated across the codebase with consistent usage patterns:
- Implemented in both cache and DB storage layers (
cache_storage/mod.rs
,db_storage/mod.rs
) - Used appropriately in test cases (
test_storage.rs
) - Integrated into consensus relations (
consensus_relations.rs
) - No unbounded iterations or performance concerns found in batch processing
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check usage patterns and potential performance implications
# Find all usages of WriteBatchWithColumn
echo "Checking WriteBatchWithColumn usage patterns:"
rg -A 5 "WriteBatchWithColumn"
# Look for potential size-related issues
echo "Checking for large batch operations:"
ast-grep --pattern 'for $_ in $_.data { $$$$ }'
Length of output: 5529
storage/src/cache_storage/mod.rs (2)
4-4
: LGTM: Import added appropriately
The WriteBatchWithColumn
import is correctly placed alongside related batch imports.
108-113
: Verify metrics consistency
The metrics prefix differs from the write_batch
method:
write_batch
uses:"cache", prefix_name, "write_batch"
write_batch_with_column
uses:"cache", "write_batch_column_prefix", "write_batch"
Let's verify the metrics usage pattern across the codebase:
✅ Verification successful
Metrics prefix inconsistency confirmed
The codebase search reveals a clear pattern where the second parameter in record_metrics
calls consistently follows these rules:
- For regular operations: uses the provided
prefix_name
- For column-specific batch operations: uses a dedicated prefix
- DB storage uses "write_batch_column"
- Cache storage uses "write_batch_column_prefix"
The current implementation follows the established pattern correctly:
- Regular batch:
record_metrics("cache", prefix_name, "write_batch", ...)
- Column batch:
record_metrics("cache", "write_batch_column_prefix", "write_batch", ...)
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for record_metrics calls to verify consistency
rg -A 2 'record_metrics\(' --type rust
Length of output: 3396
storage/src/db_storage/mod.rs (3)
5-5
: LGTM: Import changes are well-organized
The addition of WriteBatchWithColumn
import is properly grouped with related imports.
417-439
: LGTM: Well-integrated with existing functionality
The new method maintains consistency with existing batch operations and follows established patterns for error handling and metrics recording.
417-439
: Verify handling of large batches
While the implementation is efficient for normal use cases, we should verify its behavior with large batches.
✅ Verification successful
Let me generate additional scripts to check for batch size limits and tests specifically related to write batches:
Based on the analysis of the code and test results, I can now provide a final response:
Write batch implementation has appropriate safeguards
The implementation has several built-in safeguards for handling large batches:
- RocksDB write buffer configuration is set with reasonable defaults (max_write_buffer_number=5)
- The implementation uses DBWriteBatch which has built-in memory management
- Write operations are wrapped with metrics recording for monitoring
- Tests verify batch operations with various sizes including multi-operation batches
The current implementation is safe for production use as it leverages RocksDB's battle-tested write batch implementation with appropriate configuration. The metrics wrapper allows monitoring batch sizes and performance in production.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any batch size limits or memory management patterns in the codebase
# Look for any existing batch size limits or configurations
rg -A 2 "batch.*size|size.*batch"
# Check for any memory-related configurations that might affect batch processing
rg -A 2 "memory|mem_table|write_buffer"
# Look for existing batch-related tests that might provide insights into expected batch sizes
ast-grep --pattern 'fn test_$$$batch$$$'
Length of output: 113238
Script:
#!/bin/bash
# Look for any write batch related tests and size limits
ast-grep --pattern 'fn test_$$$write_batch$$$'
# Look for write batch related constants and configurations
rg -A 2 "write_batch|batch.*size.*write|write.*batch.*size"
# Look for any write buffer size configurations in RocksDB
rg -A 2 "write_buffer_size|max_write_buffer"
Length of output: 18943
storage/src/storage.rs (2)
6-6
: LGTM! Import is properly organized.
The WriteBatchWithColumn
import is correctly placed within the existing use statement block, maintaining good code organization.
207-216
: Verify consistent usage of batch operations.
Since this implementation is part of a larger change for DAG relationships, we should verify:
- All callers properly handle the new error cases
- Identify any places where the old
write_batch
method should be replaced withwrite_batch_with_column
flexidag/src/consensusdb/consensus_relations.rs (1)
136-164
: Efficient batch writing implementation
The use of WriteBatchWithColumn
to handle batch writes for both parents and children is correct and enhances the efficiency of database operations. The error handling and data structures are appropriately utilized.
storage/src/tests/test_storage.rs (1)
606-791
: Tests accurately validate batch write functionality
The run_write_batch
function and associated test functions correctly implement batch write tests for different storage instances. The logic is sound, and the tests effectively verify that data is written and retrieved as expected across cache, database, and combined storage configurations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (1)
flexidag/src/consensusdb/consensus_relations.rs (1)
137-164
: Consider simplifying error handling in batch constructionThe current implementation has multiple error transformation points that could be simplified. Consider creating a helper function to handle the encoding operations and error transformations.
+ impl DbRelationsStore { + fn encode_with_store_error<T: BCSCodec>(value: T) -> Result<Vec<u8>, StoreError> { + value + .encode() + .map_err(|e| StoreError::EncodeError(e.to_string())) + } + } let batch = WriteBatchWithColumn { data: vec![ WriteBatchData { column: PARENTS_CF.to_string(), row_data: WriteBatch::new_with_rows(vec![( - hash.encode() - .map_err(|e| StoreError::EncodeError(e.to_string()))?, + Self::encode_with_store_error(hash)?, WriteOp::Value( - parents - .encode() - .map_err(|e| StoreError::EncodeError(e.to_string()))?, + Self::encode_with_store_error(parents)?, ), )]), },
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
flexidag/src/consensusdb/consensus_relations.rs
(2 hunks)
🔇 Additional comments (3)
flexidag/src/consensusdb/consensus_relations.rs (3)
4-4
: LGTM: Import changes align with the new batch processing architecture
The updated imports correctly reflect the transition to using WriteBatchWithColumn
for batch operations.
Also applies to: 9-10, 12-12
125-126
: LGTM: Previously reported critical bug is now fixed
The code now correctly adds the new hash to the parent's children list, fixing the relationship inconsistency issue reported in the previous review.
162-164
: Verify storage layer's batch write implementation
The transition to batch writes requires proper transaction handling in the storage layer. Let's verify the implementation:
✅ Verification successful
Based on the search results, I can now provide a final response regarding the batch write implementation:
Batch write implementation is properly handled with transaction safety
The storage layer has a robust implementation of batch writes with proper transaction handling:
- The
write_batch_with_column
implementation indb_storage/mod.rs
uses RocksDB'sWriteBatch
with proper column family handling and atomic writes - The implementation includes:
- Transaction safety through RocksDB's atomic batch operations
- Error handling and rollback capabilities
- Proper column family management
- Performance metrics tracking
- Sync/async write options
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for write_batch_with_column implementation
ast-grep --pattern 'fn write_batch_with_column($params) {
$$$
}'
# Search for transaction handling in batch writes
rg -A 5 'write_batch_with_column.*transaction'
Length of output: 117
Script:
#!/bin/bash
# Search for write_batch_with_column implementation with broader pattern
rg -A 10 "write_batch_with_column"
# Search for batch write related implementations
rg -A 10 "WriteBatch|write_batch"
# Look for storage/db related files
fd "db|storage" --type f
Length of output: 81050
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (1)
flexidag/src/consensusdb/consensus_relations.rs (1)
137-161
: Consider using a builder pattern for WriteBatchWithColumn constructionThe current batch construction is verbose and error-prone. Consider extracting this into a builder pattern to improve readability and maintainability.
Example implementation:
struct WriteBatchBuilder { batches: Vec<WriteBatchData>, } impl WriteBatchBuilder { fn new() -> Self { Self { batches: vec![] } } fn add_batch(mut self, column: &str, rows: Vec<(Vec<u8>, WriteOp)>) -> Self { self.batches.push(WriteBatchData { column: column.to_string(), row_data: WriteBatch::new_with_rows(rows), }); self } fn build(self) -> WriteBatchWithColumn { WriteBatchWithColumn { data: self.batches } } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
flexidag/src/consensusdb/consensus_relations.rs
(2 hunks)
🔇 Additional comments (3)
flexidag/src/consensusdb/consensus_relations.rs (3)
4-4
: LGTM: Import changes align with the new batch processing approach
The new imports correctly support the transition to using WriteBatchWithColumn
for batch operations.
Also applies to: 7-7, 9-12
123-134
:
Add safety checks for parent validation and children deduplication
The current implementation still lacks important safety checks that were highlighted in previous reviews.
Apply this diff to add the necessary validations:
let mut parent_to_children = HashMap::new();
for parent in parents.iter().cloned() {
+ if !self.has(parent)? {
+ return Err(StoreError::KeyNotFound(parent.to_string()));
+ }
let mut children = (*self.get_children(parent)?).clone();
+ if !children.contains(&hash) {
children.push(hash);
+ }
parent_to_children.insert(
parent
.encode()
.map_err(|e| StoreError::EncodeError(e.to_string()))?,
children
.encode()
.map_err(|e| StoreError::EncodeError(e.to_string()))?,
);
}
123-165
: Verify the impact of batch processing changes
The transition from individual writes to batch processing could affect system behavior under high load or failure conditions.
✅ Verification successful
Based on the shell script results, I can now generate the final response:
Batch processing implementation follows established patterns
The changes to batch processing in consensus_relations.rs
align with the codebase's established patterns:
- Error handling follows the standard approach using
map_err
for DB errors - The batch structure matches other implementations in the codebase
- The atomic write operation using
write_batch_with_column
is consistent with usage across storage modules
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for potential issues with the new batch processing approach
# Look for other uses of WriteBatch in the codebase that might need similar updates
echo "Checking for other WriteBatch usages..."
rg "WriteBatch::" --type rust
# Look for error handling patterns in similar batch operations
echo "Checking error handling patterns in batch operations..."
rg "write_batch" -B 2 -A 2 --type rust
# Check for transaction management patterns
echo "Checking transaction patterns..."
rg "begin_transaction|commit_transaction|rollback" --type rust
Length of output: 32320
2, and write batch sync
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (2)
storage/src/storage.rs (2)
208-218
: Avoid Cloningbatch
to Improve PerformanceIn the
Self::CacheAndDb
match arm ofwrite_batch_with_column
, thebatch
is cloned before passing todb.write_batch_with_column(batch.clone())
. Cloning large batches can introduce performance overhead.Consider passing
batch
by reference to avoid unnecessary cloning:Self::CacheAndDb { cache, db } => { - db.write_batch_with_column(batch.clone())?; + db.write_batch_with_column(&batch)?; cache.write_batch_with_column(batch) }This change requires that the
write_batch_with_column
method inDBStorage
accepts a reference (&WriteBatchWithColumn
) instead of taking ownership.
258-268
: Avoid Cloning inwrite_batch_with_column_sync
Similar to the previous comment, cloning
batch
inwrite_batch_with_column_sync
may impact performance.Consider modifying the code to pass
batch
by reference:Self::CacheAndDb { cache, db } => { - db.write_batch_with_column_sync(batch.clone())?; + db.write_batch_with_column_sync(&batch)?; cache.write_batch_with_column_sync(batch) }Ensure that the
write_batch_with_column_sync
method inDBStorage
accepts a reference toWriteBatchWithColumn
.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (4)
flexidag/src/consensusdb/consensus_relations.rs
(2 hunks)storage/src/cache_storage/mod.rs
(3 hunks)storage/src/db_storage/mod.rs
(2 hunks)storage/src/storage.rs
(4 hunks)
🔇 Additional comments (13)
storage/src/cache_storage/mod.rs (4)
4-4
: LGTM: Import added for new batch functionality
The import of WriteBatchWithColumn
is correctly placed and necessary for the new batch processing capabilities.
140-142
: LGTM: Sync implementation follows established pattern
The sync implementation correctly delegates to the async version, maintaining consistency with other sync methods in the trait.
94-118
: 🛠️ Refactor suggestion
Consider optimizing performance and memory usage
The current implementation has several areas for potential optimization:
- Multiple intermediate collections and clones are created during data transformation
- Each row is cloned unnecessarily
- The metrics prefix differs from the write_batch method ("write_batch_column_prefix" vs prefix_name)
Consider this more efficient implementation:
fn write_batch_with_column(&self, batch: WriteBatchWithColumn) -> Result<()> {
- let rows = batch
- .data
- .into_iter()
- .flat_map(|data| {
- data.row_data
- .rows
- .iter()
- .cloned()
- .map(|(k, v)| (compose_key(Some(&data.column), k), v))
- .collect::<Vec<_>>()
- })
- .collect();
+ let mut rows = Vec::with_capacity(
+ batch.data.iter().map(|d| d.row_data.rows.len()).sum()
+ );
+ for data in batch.data {
+ rows.extend(
+ data.row_data.rows.into_iter().map(|(k, v)|
+ (compose_key(Some(&data.column), k), v)
+ )
+ );
+ }
let batch = WriteBatch { rows };
record_metrics(
"cache",
- "write_batch_column_prefix",
+ "batch_with_column", // More consistent naming
"write_batch",
self.metrics.as_ref(),
)
.call(|| {
self.write_batch_inner(batch);
Ok(())
})
}
Let's verify the column name validation in the codebase:
Line range hint 94-142
: Document architectural changes and update PR description
These changes introduce significant modifications to the batch processing capabilities:
- The new column-aware batch processing represents an architectural shift in how data is written
- This change might affect other components that interact with the storage layer
Please update the PR description to:
- Document the rationale behind this architectural change
- Explain the impact on existing batch processing flows
- Provide migration guidelines for code using the old batch processing
Let's check for existing usages that might need migration:
flexidag/src/consensusdb/consensus_relations.rs (5)
4-4
: LGTM: Import changes align with batch processing implementation
The new imports appropriately support the batch processing functionality.
Also applies to: 8-9, 11-11
123-129
:
Add parent existence validation
The code handles missing children but doesn't validate parent existence. This could lead to inconsistencies if non-existent parents are provided.
Apply this diff:
let mut parent_to_children = HashMap::new();
for parent in parents.iter().cloned() {
+ if !self.has(parent)? {
+ return Err(StoreError::KeyNotFound(parent.to_string()));
+ }
let mut children = match self.get_children(parent) {
131-134
:
Add children deduplication
The code doesn't check for duplicate children before adding the new hash, which could lead to duplicate entries.
Apply this diff:
- children.push(hash);
+ if !children.contains(&hash) {
+ children.push(hash);
+ }
parent_to_children.insert(
160-162
: 🛠️ Refactor suggestion
Enhance error handling for batch operations
The current error handling converts all database errors to a generic string, losing valuable context.
Apply this diff:
self.db
.write_batch_with_column(batch)
- .map_err(|e| StoreError::DBIoError(e.to_string()))?;
+ .map_err(|e| StoreError::DBIoError(format!("Failed to write batch for hash {}: {}", hash, e)))?;
137-159
: Consider adding batch size limits
Large batches could impact memory usage and database performance. Consider adding size limits and splitting into multiple batches if necessary.
Let's check if there are any existing batch size limits in the codebase:
✅ Verification successful
Let me search for any batch size limits specifically in the consensus relations or similar storage implementations:
Let me search for any existing size limits or batch processing configurations in the consensus database:
Let me check the RocksDB configuration to see if there are any batch size limits:
Based on the search results, I can now provide a final response:
Batch size limits are already enforced by RocksDB configuration
The codebase already has batch size management through RocksDB's configuration:
- RocksDB is configured with
max_write_buffer_number = 5
instorage/src/db_storage/mod.rs
- The write buffer settings help control memory usage and database performance
- The database automatically handles batch flushing and compaction based on these settings
Additionally, the batches in this code are naturally limited by their usage context - they only contain parent-child relationships for a single block.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for batch size limits in storage implementation
rg -A 5 "batch.*size|size.*batch" --type rust
Length of output: 18799
Script:
#!/bin/bash
# Search for batch size limits in storage and consensus implementations
rg -A 5 "WriteBatch|write_batch" --type rust
Length of output: 53180
Script:
#!/bin/bash
# Search for configuration settings and constants in consensus database
rg -A 5 "const|config" --type rust flexidag/src/consensusdb/
Length of output: 7516
Script:
#!/bin/bash
# Search for RocksDB configuration settings
rg -A 10 "RocksdbConfig|write_buffer_size|max_write_buffer_number" --type rust
Length of output: 67149
storage/src/db_storage/mod.rs (2)
5-5
: LGTM: Import addition is appropriate
The addition of WriteBatchWithColumn
to the imports is correctly placed and necessary for the new batch processing functionality.
417-462
: Verify batch size handling and usage patterns
Let's verify how these new batch methods are being used in the codebase to ensure they're handling batch sizes appropriately.
✅ Verification successful
Based on the search results, I can now generate the final response since I have sufficient information about batch handling in the codebase:
Batch operations are properly handled, no size concerns
The write_batch_with_column
and write_batch_with_column_sync
implementations are safe because:
- They are used in a controlled manner through the storage trait interface
- The batch operations are processed atomically within RocksDB's write batch mechanism
- No evidence of unbounded batch sizes in the codebase
- The codebase shows consistent patterns of batch size control in similar operations (e.g., sync tasks use explicit batch sizes)
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for usage of WriteBatchWithColumn to understand typical batch sizes
echo "Searching for WriteBatchWithColumn usage patterns..."
rg -A 5 "WriteBatchWithColumn::new|write_batch_with_column" --type rust
# Look for any existing batch size limits or chunking mechanisms
echo "Checking for existing batch size handling..."
rg -A 5 "batch_size|chunk_size|MAX_BATCH" --type rust
Length of output: 24117
storage/src/storage.rs (2)
6-6
: Import WriteBatchWithColumn
Added
The import of WriteBatchWithColumn
is necessary for the new batch writing methods introduced.
43-43
: New Methods Added to InnerStore
Trait
The methods write_batch_with_column
and write_batch_with_column_sync
have been added to the InnerStore
trait. Ensure that all implementors of this trait provide implementations for these methods to maintain trait contract compliance.
Also applies to: 48-48
2, put empty vec as children when insering a new hash
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
flexidag/src/consensusdb/consensus_relations.rs (1)
136-166
: Improve batch preparation readabilityThe nested data structure creation is complex and could benefit from being broken down into smaller, more readable functions.
+ fn create_parents_batch_data( + hash: Hash, + parents: &BlockHashes, + ) -> Result<WriteBatchData, StoreError> { + Ok(WriteBatchData { + column: PARENTS_CF.to_string(), + row_data: WriteBatch::new_with_rows(vec![( + hash.to_vec(), + WriteOp::Value( + <Arc<Vec<Hash>> as ValueCodec<RelationParent>>::encode_value(parents)?, + ), + )]), + }) + } + + fn create_children_batch_data( + parent_to_children: &HashMap<Hash, Vec<Hash>>, + ) -> Result<WriteBatchData, StoreError> { + Ok(WriteBatchData { + column: CHILDREN_CF.to_string(), + row_data: WriteBatch::new_with_rows( + parent_to_children + .iter() + .map(|(key, value)| { + Ok(( + key.to_vec(), + WriteOp::Value(<Arc<Vec<Hash>> as ValueCodec< + RelationChildren, + >>::encode_value( + &Arc::new(value.clone()) + )?), + )) + }) + .collect::<Result<Vec<_>, StoreError>>()?, + ), + }) + } let batch = WriteBatchWithColumn { data: vec![ - WriteBatchData { - column: PARENTS_CF.to_string(), - row_data: WriteBatch::new_with_rows(vec![( - hash.to_vec(), - WriteOp::Value( - <Arc<Vec<Hash>> as ValueCodec<RelationParent>>::encode_value(&parents)?, - ), - )]), - }, - WriteBatchData { - column: CHILDREN_CF.to_string(), - row_data: WriteBatch::new_with_rows( - parent_to_children - .iter() - .map(|(key, value)| { - Ok(( - key.to_vec(), - WriteOp::Value(<Arc<Vec<Hash>> as ValueCodec< - RelationChildren, - >>::encode_value( - &Arc::new(value.clone()) - )?), - )) - }) - .collect::<Result<Vec<_>, StoreError>>()?, - ), - }, + create_parents_batch_data(hash, &parents)?, + create_children_batch_data(&parent_to_children)?, ], };
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
flexidag/src/consensusdb/access.rs
(1 hunks)flexidag/src/consensusdb/consensus_relations.rs
(2 hunks)
🔇 Additional comments (6)
flexidag/src/consensusdb/access.rs (2)
111-116
: Verify concurrent safety with batch operations.
Since this method is part of a concurrent database access implementation (CachedDbAccess
), we should ensure that cache updates during batch processing don't lead to race conditions or inconsistencies.
Let's check the batch processing implementation and concurrent access patterns:
✅ Verification successful
Cache updates are thread-safe, no concurrent safety issues found
The implementation is safe for concurrent access because:
- The
DagCache
is wrapped in anArc
(atomic reference counting), providing thread-safe shared access - Cache operations (
insert
,remove
, etc.) are atomic as they use an underlying thread-safeGCacheStorage
- The
flush_cache
method only updates the cache layer, while database writes are handled separately through theWriteBatch
mechanism with proper synchronization
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Look for batch processing implementations
ast-grep --pattern 'WriteBatch$$$'
# Check for synchronization mechanisms in cache access
rg -A 5 'Mutex|RwLock|atomic' flexidag/src/consensusdb/cache.rs
Length of output: 102
Script:
#!/bin/bash
# Let's check the implementation of CachedDbAccess and its cache field
rg -A 10 "struct CachedDbAccess" flexidag/src/consensusdb/
# Look for concurrent access patterns and synchronization
rg -A 5 "Arc|Mutex|RwLock" flexidag/src/consensusdb/
# Check cache implementation details
rg -A 5 "cache" flexidag/src/consensusdb/access.rs
# Look for any batch processing related code
rg -A 5 "batch|flush" flexidag/src/consensusdb/
Length of output: 52611
111-116
: 🛠️ Refactor suggestion
Add documentation and optimize cloning operations.
The new flush_cache
method lacks documentation explaining its purpose and usage. Additionally, the implementation performs unnecessary cloning of both keys and values.
Consider these improvements:
+ /// Bulk inserts key-value pairs into the cache without affecting the underlying database.
+ /// This is typically used in conjunction with batch processing operations.
+ ///
+ /// # Arguments
+ /// * `data` - A slice of key-value pairs to be cached
+ ///
+ /// # Returns
+ /// * `Result<(), StoreError>` - Success or failure of the cache operation
pub fn flush_cache(&self, data: &[(S::Key, S::Value)]) -> Result<(), StoreError> {
for (key, value) in data {
- self.cache.insert(key.clone(), value.clone());
+ self.cache.insert(key.to_owned(), value.to_owned());
}
Ok(())
}
Let's verify the cache implementation to understand potential size limits:
flexidag/src/consensusdb/consensus_relations.rs (4)
4-4
: LGTM: Import changes align with the new batch processing architecture
The updated imports correctly reflect the transition to using WriteBatchWithColumn
for batch operations.
Also applies to: 8-9, 11-11
167-169
: 🛠️ Refactor suggestion
Enhance error handling for batch operations
The current error handling loses important context about which part of the batch operation failed.
self.db
.write_batch_with_column(batch)
- .map_err(|e| StoreError::DBIoError(e.to_string()))?;
+ .map_err(|e| StoreError::DBIoError(format!(
+ "Failed to write batch for hash {}: {}", hash, e
+ )))?;
Likely invalid or redundant comment.
121-133
: 🛠️ Refactor suggestion
Add parent existence validation
While the code handles missing children gracefully, it should validate parent existence before proceeding with updates to maintain data integrity.
let mut parent_to_children = HashMap::new();
parent_to_children.insert(hash, vec![]);
for parent in parents.iter().cloned() {
+ // Validate parent exists in the DAG
+ if !self.has(parent)? {
+ return Err(StoreError::KeyNotFound(format!(
+ "Parent {} not found in DAG", parent
+ )));
+ }
let mut children = match self.get_children(parent) {
Likely invalid or redundant comment.
121-178
: Consider adding optimistic locking for concurrent updates
While the batch write is atomic, there's still a potential race condition in the get-then-update pattern when reading children lists. Consider implementing optimistic locking.
Suggested approach:
- Add a version field to child lists
- Implement optimistic locking:
// Example implementation
struct VersionedChildren {
version: u64,
children: Vec<Hash>
}
// In insert method:
let (version, mut children) = self.get_children_with_version(parent)?;
children.push(hash);
self.update_children_if_version_matches(parent, children, version)?;
e4f72af
to
aa0c54d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 11
🧹 Outside diff range and nitpick comments (29)
sync/tests/test_parallel_scheduler.rs (1)
15-28
: Consider improving worker monitoring mechanismThe current implementation has several potential issues:
- The infinite loop with 1-second sleep interval is not efficient for worker monitoring
- No error handling for worker_start() and worker_exits()
- No graceful shutdown mechanism beyond check_if_stop()
Consider implementing a more event-driven approach using channels or proper async notification mechanisms instead of polling with sleep.
flexidag/src/prune/pruning_point_manager.rs (1)
78-82
: LGTM! The debug logging enhances observability.The addition of debug logging at this critical point is valuable for troubleshooting the pruning point calculation process. The logged parameters provide essential context for understanding the state during execution.
Consider structuring the debug output as key-value pairs for better log parsing:
- debug!("previous_pruning_point: {:?}, previous_ghostdata: {:?}, next_ghostdata: {:?}, pruning_depth: {:?}, pruning_finality: {:?}", - previous_pruning_point, previous_ghostdata, next_ghostdata, - pruning_depth, pruning_finality, - ); + debug!( + previous_pruning_point = ?previous_pruning_point, + previous_ghostdata = ?previous_ghostdata, + next_ghostdata = ?next_ghostdata, + pruning_depth = ?pruning_depth, + pruning_finality = ?pruning_finality, + "calculating next pruning point" + );sync/src/tasks/inner_sync_task.rs (2)
161-161
: Consider adding error handling for scheduler operationsWhile the
worker_scheduler
is correctly passed toBlockCollector
, consider adding error handling for potential scheduler failures during block collection operations.Example pattern:
let block_collector = BlockCollector::new_with_handle( // ... other parameters ... self.worker_scheduler.clone(), ).map_err(|e| TaskError::BreakError(format_err!("Failed to initialize block collector: {}", e)))?;
42-42
: Document the purpose and behavior of WorkerScheduler integrationThe addition of
WorkerScheduler
appears to be part of a larger change to enable parallel processing, but its specific purpose and behavior are not documented. Consider:
- Adding documentation comments explaining the role of
WorkerScheduler
- Updating the PR description to clarify the motivation for this change
- Including examples of how this affects sync performance
Also applies to: 63-63, 161-161
sync/src/tasks/tests_dag.rs (2)
39-39
: Consider moving WorkerScheduler creation outside the loopCreating a new
WorkerScheduler
in each loop iteration could be inefficient. Consider creating it once before the loop and reusing the instance.async fn sync_block_process( target_node: Arc<SyncNodeMocker>, local_node: Arc<SyncNodeMocker>, registry: &ServiceRef<RegistryService>, ) -> Result<(Arc<SyncNodeMocker>, Arc<SyncNodeMocker>)> { + let worker_scheduler = Arc::new(WorkerScheduler::new()); loop { - let worker_scheduler = Arc::new(WorkerScheduler::new());
Line range hint
89-89
: Consider adding worker scheduler-specific assertionsThe test cases thoroughly verify block synchronization but could benefit from additional assertions to verify the worker scheduler's behavior, such as:
- Verifying that all workers complete their tasks
- Checking for any task failures or timeouts
- Ensuring proper resource cleanup
Would you like me to provide example assertions for these scenarios?
Also applies to: 143-143
sync/src/tasks/test_tools.rs (1)
Line range hint
129-203
: Consider improving test reliability.The test involves multiple async operations and timing-sensitive worker scheduling. Consider adding retry mechanisms or additional assertions to make the test more reliable:
- Add timeouts to prevent test hangs
- Add retries for worker scheduler operations
- Add assertions to verify worker state transitions
Example timeout wrapper:
async fn with_timeout<F, T>(future: F, duration: Duration) -> Result<T> where F: Future<Output = Result<T>>, { tokio::time::timeout(duration, future) .await .map_err(|_| anyhow::anyhow!("Operation timed out"))? }sync/src/parallel/sender.rs (2)
Line range hint
75-116
: Consider refactoring duplicate logic in dispatch_to_workerThe method contains duplicate logic for checking
ExecuteState::Executed
. This redundancy could impact the efficiency of worker scheduling and make maintenance more difficult.Consider consolidating the duplicate checks into a single loop:
async fn dispatch_to_worker(&mut self, block: &Block) -> anyhow::Result<bool> { // First priority: Find an executor working on parent blocks for executor in &mut self.executors { if let ExecuteState::Executing(header_id) = &executor.state { if *header_id == block.header().parent_hash() || block.header.parents_hash().contains(header_id) { executor.state = ExecuteState::Executing(block.id()); executor .sender_to_executor .send(Some(block.clone())) .await?; return anyhow::Ok(true); } } } - // Remove duplicate loops and combine into a single loop - for executor in &mut self.executors { - match &executor.state { - ExecuteState::Executed(_) => { - executor.state = ExecuteState::Executing(block.id()); - executor - .sender_to_executor - .send(Some(block.clone())) - .await?; - return anyhow::Ok(true); - } - _ => continue, - } - } // Second priority: Find any available executed worker for executor in &mut self.executors { if let ExecuteState::Executed(_) = &executor.state { executor.state = ExecuteState::Executing(block.id()); executor .sender_to_executor .send(Some(block.clone())) .await?; return anyhow::Ok(true); } } anyhow::Ok(false) }
Line range hint
199-236
: Consider enhancing error context in wait_for_finishWhile the error handling is functional, consider adding more context to errors in
wait_for_finish
to aid in debugging production issues.Consider enhancing error handling:
for worker in self.executors { - worker.handle.await?; + worker.handle.await.map_err(|e| { + anyhow::anyhow!("Worker task failed to complete: {}", e) + })?; }flexidag/src/consensusdb/consensus_relations.rs (1)
136-166
: Improve memory efficiency in batch constructionThe current implementation creates multiple clones of data during batch construction. Consider using references where possible.
let batch = WriteBatchWithColumn { data: vec![ WriteBatchData { column: PARENTS_CF.to_string(), row_data: WriteBatch::new_with_rows(vec![( hash.to_vec(), - WriteOp::Value( - <Arc<Vec<Hash>> as ValueCodec<RelationParent>>::encode_value(&parents)?, - ), + WriteOp::Value(ValueCodec::<RelationParent>::encode_value(&Arc::new(parents.to_vec())?)), )]), },sync/src/tasks/mock.rs (2)
141-141
: Add documentation for the worker_scheduler fieldConsider adding documentation to explain the purpose and usage of this field, especially since it's public. This will help other developers understand when and how to use it in their test implementations.
+ /// Scheduler for managing concurrent worker tasks in test scenarios. pub worker_scheduler: Arc<WorkerScheduler>,
162-162
: Consider standardizing WorkerScheduler initializationThe current implementation creates a new
WorkerScheduler
instance in each constructor method. Consider:
- Accepting
worker_scheduler
as an optional parameter in all constructors- Creating a shared instance only when not provided
- Using a consistent pattern across all constructors
This would:
- Improve resource utilization in tests
- Allow better coordination of workers across test instances
- Make the API more flexible
Example refactor for the
new
method:pub fn new( net: ChainNetwork, delay_milliseconds: u64, random_error_percent: u32, + worker_scheduler: Option<Arc<WorkerScheduler>>, ) -> Result<Self> { // ... existing code ... - let worker_scheduler = Arc::new(WorkerScheduler::new()); + let worker_scheduler = worker_scheduler.unwrap_or_else(|| + Arc::new(WorkerScheduler::new()) + ); Ok(Self::new_inner( // ... existing parameters ... worker_scheduler, )) }Also applies to: 170-170, 201-201, 222-222, 241-241, 252-252, 260-260
flexidag/src/ghostdag/protocol.rs (2)
213-234
: LGTM! Consider enhancing error handling.The blue set verification logic is well-implemented with appropriate logging. The two-step verification process provides a robust check for data consistency.
Consider wrapping the error message in a custom error type to make it more structured and easier to handle upstream:
+#[derive(Debug, thiserror::Error)] +pub enum GhostdagError { + #[error("Blue set mismatch for block {block_id:?}: expected {expected:?}, got {actual:?}")] + BlueSetsNotEqual { + block_id: Hash, + expected: HashSet<Hash>, + actual: HashSet<Hash>, + } +} -bail!("The ghost data of blue set is not equal when executing the block: {:?}, for {:?}, checking data: {:?}", header.id(), blue_blocks.iter().map(|header| header.id()).collect::<Vec<_>>(), ghostdata.mergeset_blues); +Err(GhostdagError::BlueSetsNotEqual { + block_id: header.id(), + expected: remote_blue_set, + actual: ghostdata.mergeset_blues.iter().skip(1).cloned().collect(), +}.into())
449-452
: LGTM! Consider enhancing the log message.The logging is well-placed for early detection of size violations.
Consider including the actual size and the maximum allowed size (K) in the log message for better context:
-info!("Checking blue candidate: {} failed, mergeset blues size is K+1", blue_candidate); +info!("Checking blue candidate: {} failed, mergeset blues size {} exceeds maximum allowed size {}", blue_candidate, new_block_data.mergeset_blues.len(), self.k);sync/src/tasks/mod.rs (1)
627-627
: Consider refactoring to reduce parameter countThe addition of
worker_scheduler
makes sense for parallel processing, but the function now has 14 parameters. Consider introducing a configuration struct to group related parameters and improve maintainability.Example refactor:
- pub fn full_sync_task<H, A, F, N>( - current_block_id: HashValue, - target: SyncTarget, - skip_pow_verify: bool, - time_service: Arc<dyn TimeService>, - storage: Arc<dyn Store>, - block_event_handle: H, - fetcher: Arc<F>, - ancestor_event_handle: A, - peer_provider: N, - max_retry_times: u64, - sync_metrics: Option<SyncMetrics>, - vm_metrics: Option<VMMetrics>, - dag: BlockDAG, - sync_dag_store: SyncDagStore, - worker_scheduler: Arc<WorkerScheduler>, - ) -> Result<...> + pub struct SyncTaskConfig<H, A, F, N> { + current_block_id: HashValue, + target: SyncTarget, + skip_pow_verify: bool, + time_service: Arc<dyn TimeService>, + storage: Arc<dyn Store>, + block_event_handle: H, + fetcher: Arc<F>, + ancestor_event_handle: A, + peer_provider: N, + max_retry_times: u64, + sync_metrics: Option<SyncMetrics>, + vm_metrics: Option<VMMetrics>, + dag: BlockDAG, + sync_dag_store: SyncDagStore, + worker_scheduler: Arc<WorkerScheduler>, + } + + pub fn full_sync_task<H, A, F, N>( + config: SyncTaskConfig<H, A, F, N> + ) -> Result<...>sync/src/tasks/tests.rs (1)
Line range hint
1029-1053
: Consider refactoring synchronous blocks in async contextWhile the worker state management is correct, using
block_on
in an async context could lead to potential deadlocks or performance issues. Consider refactoring to maintain async flow throughout.Consider refactoring to avoid
block_on
:-async_std::task::block_on(async { - worker_scheduler.tell_worker_to_stop().await; - worker_scheduler.wait_for_worker().await; -}); +worker_scheduler.tell_worker_to_stop().await; +worker_scheduler.wait_for_worker().await; // ... sync_task creation ... -async_std::task::block_on(async { - worker_scheduler.tell_worker_to_start().await; -}); +worker_scheduler.tell_worker_to_start().await;sync/src/parallel/worker_scheduler.rs (2)
63-72
: Use synchronization primitives to wait for workers more efficientlyThe
wait_for_worker
method uses a loop withyield_now
andsleep
, which can cause unnecessary delays and CPU usage. Consider usingtokio::sync::Notify
to efficiently await the condition without busy-waiting.Here's how you might refactor
WorkerScheduler
to useNotify
:use tokio::sync::Notify; pub struct WorkerScheduler { state: RwLock<WorkerSchedulerState>, worker_count: AtomicU64, notify: Notify, } impl WorkerScheduler { pub fn new() -> Self { Self { state: RwLock::new(WorkerSchedulerState::Inactive), worker_count: AtomicU64::new(0), notify: Notify::new(), } } pub fn worker_exits(&self) { self.worker_count.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); self.notify.notify_waiters(); } pub async fn wait_for_worker(&self) { while self.check_worker_count() != 0 { self.notify.notified().await; } } }This approach reduces CPU usage by awaiting notifications rather than polling.
31-34
: Consider replacingRwLock
with atomic operations forstate
The
state
field is protected by anRwLock
, but since it represents a simple enum with two states, using an atomic variable likeAtomicU8
orAtomicBool
could improve performance by avoiding asynchronous locking.Example refactor using
AtomicU8
:use std::sync::atomic::{AtomicU8, Ordering}; #[repr(u8)] enum WorkerSchedulerState { Inactive = 0, Active = 1, } pub struct WorkerScheduler { state: AtomicU8, worker_count: AtomicU64, } impl WorkerScheduler { pub fn new() -> Self { Self { state: AtomicU8::new(WorkerSchedulerState::Inactive as u8), worker_count: AtomicU64::new(0), } } pub fn tell_worker_to_stop(&self) { self.state.store(WorkerSchedulerState::Inactive as u8, Ordering::SeqCst); } pub fn tell_worker_to_start(&self) { self.state.store(WorkerSchedulerState::Active as u8, Ordering::SeqCst); } pub fn check_if_stop(&self) -> bool { self.state.load(Ordering::SeqCst) == WorkerSchedulerState::Inactive as u8 } }This change removes the need for
RwLock
andasync
in methods related tostate
, simplifying the code and potentially improving performance.Also applies to: 36-39, 41-47
sync/src/parallel/executor.rs (3)
37-46
: Clarify the purpose ofExecutorDeconstructor
The
ExecutorDeconstructor
struct is introduced to manage theworker_scheduler
. Its sole purpose appears to be invokingworker_start
on creation andworker_exits
on drop. Consider the following:
- If
ExecutorDeconstructor
is intended as a guard to manage the worker's lifecycle, consider renaming it to reflect its role, such asWorkerGuard
orWorkerLifecycleManager
.- Ensure that the
worker_start
andworker_exits
methods are correctly managing the worker state without race conditions.Apply this diff to rename
ExecutorDeconstructor
for clarity:-struct ExecutorDeconstructor { +struct WorkerGuard { worker_scheduler: Arc<WorkerScheduler>, } -impl ExecutorDeconstructor { +impl WorkerGuard { pub fn new(worker_scheduler: Arc<WorkerScheduler>) -> Self { worker_scheduler.worker_start(); Self { worker_scheduler } } }
104-107
: Review the placement of the stop-check conditionThe stop-check condition within the loop:
if self.worker_scheduler.check_if_stop().await { info!("sync worker scheduler stopped"); return; }is placed before the block reception. Ensure that this condition is also checked appropriately within long-running operations or additional loops to allow timely termination upon stop signals.
Line range hint
149-175
: Handle potential errors when forking the chainIn the block where the chain is forked:
if old_chain.status().head().id() != block.header().parent_hash() { chain = match old_chain.fork(block.header().parent_hash()) { Ok(new_chain) => Some(new_chain), Err(e) => { error!("failed to fork in parallel for: {:?}", e); return; } }; } else { chain = Some(old_chain); }Consider providing more context in the error logs and handling the error more gracefully if possible. Abruptly returning may not be the best approach if the executor needs to process additional blocks.
[error_handling]
sync/src/tasks/block_sync_task.rs (1)
Line range hint
465-474
: Avoid magic numbers by defining100000
as a constantIn the instantiation of
DagBlockSender::new
, the hard-coded value100000
is used. Define this value as a constant or configurable parameter to improve readability and maintainability.storage/src/tests/test_storage.rs (3)
610-623
: Consider refactoring block creation code to reduce duplicationThe code for creating
block1
andblock2
is almost identical except for the block number. Consider refactoring this into a helper function to reduce code duplication and improve maintainability.For example, you could create a function like:
fn create_block(number: u64, body: &BlockBody) -> Block { Block::new( BlockHeaderBuilder::new() .with_body_hash(body.hash()) .with_number(number) .build(), body.clone(), ) }Then use it as:
-let block1 = Block::new( - BlockHeaderBuilder::new() - .with_body_hash(body.hash()) - .with_number(1) - .build(), - body.clone(), -); +let block1 = create_block(1, &body); -let block2 = Block::new( - BlockHeaderBuilder::new() - .with_body_hash(body.hash()) - .with_number(2) - .build(), - body.clone(), -); +let block2 = create_block(2, &body);
625-645
: RefactorDagSyncBlock
creation to eliminate duplicate codeThe creation of
dag_block1
anddag_block2
is repetitive and only differs in the block number. Consider creating a helper function to reduce duplication and enhance readability.For example:
fn create_dag_sync_block(number: u64, body: &BlockBody) -> DagSyncBlock { DagSyncBlock { block: Block::new( BlockHeaderBuilder::new() .with_body_hash(body.hash()) .with_number(number) .build(), body.clone(), ), children: vec![Block::random().id(), Block::random().id()], } }Then use it as:
-let dag_block1 = DagSyncBlock { - block: Block::new( - BlockHeaderBuilder::new() - .with_body_hash(body.hash()) - .with_number(3) - .build(), - body.clone(), - ), - children: vec![Block::random().id(), Block::random().id()], -}; +let dag_block1 = create_dag_sync_block(3, &body); -let dag_block2 = DagSyncBlock { - block: Block::new( - BlockHeaderBuilder::new() - .with_body_hash(body.hash()) - .with_number(4) - .build(), - body.clone(), - ), - children: vec![Block::random().id(), Block::random().id()], -}; +let dag_block2 = create_dag_sync_block(4, &body);
680-789
: Eliminate code duplication inmatch
arms by abstracting common logicEach arm of the
match
statement contains nearly identical code for reading and asserting blocks and DAG sync blocks. Refactoring this common logic into helper functions will enhance maintainability and reduce repetition.For example, you could introduce helper functions:
fn verify_block<B: InnerStore>( storage: &B, prefix: &str, block_id: HashValue, expected_block: &Block, ) -> Result<()> { let data = storage.get(prefix, block_id.encode()?)?.expect("failed to get the block"); let read_block = Block::decode(&data)?; assert_eq!(read_block, *expected_block); Ok(()) } fn verify_dag_block<B: InnerStore>( storage: &B, prefix: &str, block_id: HashValue, expected_dag_block: &DagSyncBlock, ) -> Result<()> { let data = storage.get(prefix, block_id.encode()?)?.expect("failed to get the dag block"); let read_dag_block = DagSyncBlock::decode(&data)?; assert_eq!(read_dag_block, *expected_dag_block); Ok(()) }Then, in your
match
arms, you can use these functions:verify_block(&cache, BLOCK_PREFIX_NAME, block1.id(), &block1)?; verify_block(&cache, BLOCK_PREFIX_NAME, block2.id(), &block2)?; verify_dag_block(&cache, DAG_SYNC_BLOCK_PREFIX_NAME, dag_block1.block.id(), &dag_block1)?; verify_dag_block(&cache, DAG_SYNC_BLOCK_PREFIX_NAME, dag_block2.block.id(), &dag_block2)?;flexidag/tests/tests.rs (4)
22-23
: Verify the usage of newly added importsImports
consensus_header::ConsensusHeader
andU256
have been added. Please ensure these are utilized within the code to prevent any unused import warnings.
Line range hint
446-494
: Optimize the repeated use of empty iteratorsIn multiple instances within
test_reachability_check_ancestor
, you're creating empty iterators using&mut vec![].into_iter()
. Consider creating a constant empty iterator or refactoring the code to avoid redundancy and enhance readability.
1143-1147
: Add documentation to the new test functionThe test function
test_verification_blue_block_inconsistent
lacks descriptive comments. Adding a brief explanation of its purpose will improve code readability and assist future maintainability.
1149-1329
: Refactorloop_to_blue
for enhanced clarityThe
loop_to_blue
function contains complex loops and nested logic, which may be challenging to follow. Consider adding comments or refactoring portions of the code to improve clarity and ensure that the function's purpose is easily understood.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (26)
commons/stream-task/src/event.rs
(1 hunks)flexidag/src/blockdag.rs
(2 hunks)flexidag/src/consensusdb/access.rs
(1 hunks)flexidag/src/consensusdb/consensus_relations.rs
(2 hunks)flexidag/src/ghostdag/protocol.rs
(3 hunks)flexidag/src/prune/pruning_point_manager.rs
(1 hunks)flexidag/tests/tests.rs
(7 hunks)kube/manifest/starcoin-vega.yaml
(1 hunks)storage/src/batch/mod.rs
(1 hunks)storage/src/cache_storage/mod.rs
(3 hunks)storage/src/db_storage/mod.rs
(2 hunks)storage/src/storage.rs
(4 hunks)storage/src/tests/test_storage.rs
(2 hunks)sync/src/parallel/executor.rs
(6 hunks)sync/src/parallel/mod.rs
(1 hunks)sync/src/parallel/sender.rs
(5 hunks)sync/src/parallel/worker_scheduler.rs
(1 hunks)sync/src/sync.rs
(12 hunks)sync/src/tasks/block_sync_task.rs
(6 hunks)sync/src/tasks/inner_sync_task.rs
(5 hunks)sync/src/tasks/mock.rs
(7 hunks)sync/src/tasks/mod.rs
(3 hunks)sync/src/tasks/test_tools.rs
(5 hunks)sync/src/tasks/tests.rs
(19 hunks)sync/src/tasks/tests_dag.rs
(3 hunks)sync/tests/test_parallel_scheduler.rs
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- sync/src/parallel/mod.rs
🚧 Files skipped from review as they are similar to previous changes (5)
- flexidag/src/consensusdb/access.rs
- storage/src/batch/mod.rs
- storage/src/cache_storage/mod.rs
- storage/src/db_storage/mod.rs
- storage/src/storage.rs
🔇 Additional comments (45)
sync/tests/test_parallel_scheduler.rs (2)
1-9
: LGTM! Clean struct design with appropriate threading primitives
The Worker struct is well-designed with thread-safe reference counting using Arc, which is appropriate for concurrent worker management.
3-3
: Verify thread-safety of WorkerScheduler methods
Let's verify that the WorkerScheduler's methods are properly synchronized to prevent race conditions.
✅ Verification successful
Thread-safety is properly implemented in WorkerScheduler
The implementation shows proper synchronization:
- State management uses
RwLock
for safe concurrent access - Worker count uses
AtomicU64
withSeqCst
ordering for thread-safe operations - All state-modifying methods are properly guarded:
tell_worker_to_stop/start
use write lockscheck_if_stop
uses read lockworker_exits/start
use atomic operations
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for WorkerScheduler implementation to verify synchronization
ast-grep --pattern 'struct WorkerScheduler {
$$$
}'
# Look for potential race conditions in method implementations
ast-grep --pattern 'impl WorkerScheduler {
$$$
}'
Length of output: 3622
kube/manifest/starcoin-vega.yaml (2)
Line range hint 31-39
: LGTM: Robust initialization script
The initialization script is well-structured:
- Performs necessary cleanup
- Handles node key configuration
- Includes comprehensive P2P and JSON-RPC configurations
- Sets appropriate resource quotas
31-31
: Verify the impact of removing the sync directory
The addition of /sc-data/vega/sync
to the cleanup command needs verification to ensure it doesn't affect node synchronization after startup.
Let's search for references to this directory in the codebase:
Consider documenting:
- Why the sync directory needs to be cleaned up
- What data is stored in this directory
- Any potential impact on node synchronization
✅ Verification successful
Let me gather more context about the sync directory's usage and purpose.
Based on the search results, I can now provide a definitive response about the sync directory cleanup:
Cleanup of sync directory is a safe operation
The sync directory (/sc-data/vega/sync
) is used as a temporary storage location during node synchronization. From the codebase:
- It's configured via
StorageConfig
and defaults tosync/db
under the data directory - The directory is recreated when the node starts and contains only temporary synchronization data
- No persistent state is stored here, as it's separate from the main blockchain storage
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for references to the sync directory
rg -i "sync.*directory|sync.*dir"
# Search for any configuration or documentation about the sync directory
fd -t f "README|CONTRIBUTING|CHANGELOG|config" -X rg -i "sync.*directory|sync.*dir"
Length of output: 751
Script:
#!/bin/bash
# Search for the sync directory implementation and usage
ast-grep --pattern 'fn sync_dir($$$) { $$$ }'
# Look for any references to sync directory cleanup or initialization
rg -i "rm.*sync|clean.*sync" -A 2 -B 2
# Check storage configuration and initialization
rg "StorageConfig" -A 5 -B 5
# Look for any documentation about data persistence or cleanup
fd -t f "README|doc" -X rg -i "data.*persist|cleanup|reset"
Length of output: 31856
flexidag/src/prune/pruning_point_manager.rs (1)
78-82
: Verify interaction with batch writing changes
Since this PR focuses on batch DAG relationship writing, let's verify if this function interacts with any batch operations.
sync/src/tasks/inner_sync_task.rs (3)
18-18
: LGTM: Import statement is correctly placed
The import follows the codebase's module organization pattern.
63-63
: LGTM: Constructor properly initialized
The worker_scheduler
parameter and field assignment are correctly implemented, maintaining consistent ownership patterns.
Also applies to: 77-77
42-42
: Verify WorkerScheduler usage across the codebase
The Arc<WorkerScheduler>
field addition suggests concurrent access. Let's verify its usage pattern across the codebase.
✅ Verification successful
WorkerScheduler usage pattern is consistent and correct
The verification shows proper usage of WorkerScheduler
across the codebase:
- Correctly wrapped in
Arc
for shared ownership in all instances - Used consistently for parallel execution control with proper clone patterns
- Implements worker lifecycle management (start/stop) through atomic state handling
- Thread-safe implementation using
RwLock
andAtomicU64
for internal state - Clone patterns appear in appropriate concurrent contexts (tests, task spawning)
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for WorkerScheduler usage patterns
echo "=== WorkerScheduler Usage Patterns ==="
rg "WorkerScheduler" -A 3 --type rust
echo -e "\n=== Concurrent Access Patterns ==="
ast-grep --pattern 'worker_scheduler.clone()'
Length of output: 12931
sync/src/tasks/tests_dag.rs (3)
3-3
: LGTM: Import addition is well-organized
The WorkerScheduler
import is properly placed within the internal imports group.
Line range hint 39-56
: Ensure proper cleanup of WorkerScheduler resources
The WorkerScheduler
might hold resources that need proper cleanup. Consider implementing cleanup logic when the sync process completes or fails.
Let's check if WorkerScheduler
has any cleanup requirements:
Line range hint 39-56
: Verify thread safety in parallel block processing
The integration of WorkerScheduler
introduces parallel processing. Let's verify the thread safety of shared resources.
✅ Verification successful
Let me gather more specific information about the thread safety mechanisms in the worker scheduler implementation.
Let me check the actual WorkerScheduler implementation to verify its thread safety mechanisms.
Thread safety is properly implemented in WorkerScheduler
The WorkerScheduler implementation is thread-safe with appropriate synchronization primitives:
- Uses
RwLock
for state management with proper read/write locking - Uses
AtomicU64
for worker count with correct memory ordering (SeqCst) - All state mutations are properly synchronized through async locks or atomic operations
- No shared mutable state is exposed without synchronization
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for potential race conditions in shared resource access
rg -A 5 'Arc::clone|\.clone\(\)'
# Check for proper synchronization primitives
rg -A 5 'Mutex|RwLock|atomic'
# Look for any existing race condition related tests
ast-grep --pattern 'test_$$$race$$$'
Length of output: 101042
Script:
#!/bin/bash
# Check WorkerScheduler implementation for synchronization primitives
rg -A 10 'WorkerScheduler::new|impl WorkerScheduler'
# Look for any concurrent access patterns in the worker scheduler
rg -A 10 'worker_scheduler' sync/src/tasks/
# Check for any race condition related tests
rg -A 10 'test.*worker.*scheduler' sync/src/
Length of output: 41742
Script:
#!/bin/bash
# Check the WorkerScheduler implementation
cat sync/src/parallel/worker_scheduler.rs
Length of output: 1828
sync/src/tasks/test_tools.rs (1)
129-130
: LGTM: Worker scheduler initialization looks good!
The initialization sequence properly handles thread-safe sharing of the WorkerScheduler and follows good practices by stopping the worker before starting the sync task.
Also applies to: 148-149
sync/src/parallel/sender.rs (3)
19-22
: LGTM: Proper thread-safe integration of WorkerScheduler
The addition of WorkerScheduler
with Arc
for thread-safety is well-structured and follows Rust's concurrency patterns.
Also applies to: 40-40
52-52
: LGTM: Constructor properly initialized with WorkerScheduler
The constructor changes maintain consistency with existing patterns and properly handle the new worker_scheduler
field.
Also applies to: 63-63
150-150
: LGTM: Proper integration with DagBlockExecutor
The worker_scheduler
is correctly cloned and passed to new executors, maintaining thread-safety.
flexidag/src/consensusdb/consensus_relations.rs (4)
4-4
: LGTM: Import changes align with new batch processing approach
The new imports properly support the transition to using WriteBatchWithColumn
for batch operations.
Also applies to: 8-9, 11-11
121-178
: Address potential race condition in concurrent updates
The get-then-update pattern on children lists could lead to lost updates in concurrent scenarios. Consider implementing one of these solutions:
- Use database transactions if supported
- Implement optimistic locking with version checks
- Add a mutex for parent-child updates
Let's verify the concurrent access patterns in the codebase:
#!/bin/bash
# Search for concurrent access patterns
rg -A 5 "Arc::clone|\.clone\(\)" --type rust
# Search for mutex usage
rg -A 5 "Mutex|RwLock" --type rust
# Search for transaction support
ast-grep --pattern 'fn $_ ($_) -> Result<$_, $_> {
$$$
transaction
$$$
}'
167-169
: 🛠️ Refactor suggestion
Enhance error handling for database operations
The current error handling converts all database errors to a generic string. Consider preserving more error context and adding transaction rollback capability.
self.db
.write_batch_with_column(batch)
- .map_err(|e| StoreError::DBIoError(e.to_string()))?;
+ .map_err(|e| StoreError::DBIoError(format!("Failed to write batch for hash {}: {}", hash, e)))?;
125-131
: 🛠️ Refactor suggestion
Consider adding explicit parent validation
The current error handling for missing parents silently creates an empty children list. This might mask configuration errors or data inconsistencies.
for parent in parents.iter().cloned() {
+ // Explicitly validate parent exists unless it's a genesis block
+ if !self.has(parent)? && !parents.is_empty() {
+ return Err(StoreError::KeyNotFound(
+ format!("Parent {} not found", parent)
+ ));
+ }
let mut children = match self.get_children(parent) {
Ok(children) => (*children).clone(),
- Err(e) => match e {
- StoreError::KeyNotFound(_) => vec![],
- _ => return std::result::Result::Err(e),
- },
+ Err(e) => return Err(e),
};
Likely invalid or redundant comment.
sync/src/tasks/mock.rs (2)
4-4
: LGTM: Import statement is correctly placed and used
The import of WorkerScheduler
is properly placed and follows the codebase's import organization pattern.
Line range hint 141-260
: Verify WorkerScheduler cleanup in tests
Since WorkerScheduler
manages concurrent tasks, we should verify that resources are properly cleaned up after tests.
✅ Verification successful
Based on the shell script results, I can now generate the final response as there is clear evidence of proper cleanup patterns in the codebase.
WorkerScheduler is properly managed in tests
The codebase demonstrates consistent and proper cleanup of WorkerScheduler resources:
- Tests consistently call
tell_worker_to_stop()
before starting new tasks wait_for_worker()
is called to ensure proper shutdown- Worker lifecycle is properly managed with
tell_worker_to_start()
after task setup
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for proper cleanup patterns in test files
# Look for test files that use SyncNodeMocker
echo "Searching for test files using SyncNodeMocker..."
rg -l "SyncNodeMocker" --type rust --glob "*test*.rs"
# Check for cleanup patterns in tests
echo "Checking for cleanup patterns..."
rg "worker_scheduler" --type rust --glob "*test*.rs" -A 5 -B 5
Length of output: 25318
flexidag/src/ghostdag/protocol.rs (2)
387-390
: LGTM! Appropriate logging added.
The log message is well-placed and provides useful context for debugging k-cluster violations.
397-400
: LGTM! Detailed logging for k-cluster violation.
The log message provides comprehensive context by including both the candidate and the violating block hash.
sync/src/tasks/mod.rs (2)
5-5
: LGTM: Import follows standard conventions
The import is properly scoped and follows Rust's standard import organization.
740-740
: Verify worker scheduler integration in InnerSyncTask
The worker scheduler is properly passed to InnerSyncTask
, but we should verify its implementation.
✅ Verification successful
Worker scheduler integration is properly implemented
The verification shows that:
worker_scheduler
is correctly defined as a field inInnerSyncTask
struct usingArc<WorkerScheduler>
- It's properly initialized in the constructor
- It's used in the implementation, being passed to
block_sync_task
- The synchronization pattern follows proper concurrency practices with
Arc
for thread-safe reference counting
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check InnerSyncTask implementation for worker scheduler usage
ast-grep --pattern $'struct InnerSyncTask {
$$$
worker_scheduler: Arc<WorkerScheduler>
$$$
}'
# Check for worker scheduler usage in do_sync method
ast-grep --pattern $'impl InnerSyncTask {
$$$
fn do_sync($$$) {
$$$
worker_scheduler$$$
$$$
}
$$$
}'
Length of output: 222
Script:
#!/bin/bash
# Let's try a broader search for InnerSyncTask struct and its implementation
rg -A 10 "struct InnerSyncTask"
# Search for worker_scheduler usage in sync-related files
rg "worker_scheduler" sync/src/tasks/mod.rs -A 5
# Check if WorkerScheduler is used in any impl blocks
rg "impl.*InnerSyncTask" -A 20
Length of output: 2914
Script:
#!/bin/bash
# Let's check the implementation details of InnerSyncTask's do_sync method
rg "fn do_sync" sync/src/tasks/inner_sync_task.rs -A 10
# Check for any worker_scheduler usage in inner_sync_task.rs
rg "worker_scheduler" sync/src/tasks/inner_sync_task.rs -A 5
Length of output: 1408
sync/src/tasks/tests.rs (6)
5-5
: LGTM: WorkerScheduler import added
The addition of WorkerScheduler import is appropriate for managing worker states during sync operations.
Line range hint 75-90
: LGTM: Proper worker state management added
The worker state management sequence is well-structured:
- Stop worker before block collection
- Wait for complete stop
- Initialize block collector
- Restart worker
This prevents potential race conditions during test execution.
Line range hint 122-141
: LGTM: Consistent worker state management in fork sync tests
The worker state management pattern is consistently applied across both initial sync and fork sync scenarios, maintaining proper synchronization control.
Also applies to: 161-180
Line range hint 216-235
: LGTM: Worker management properly implemented in genesis fork sync
The worker state management pattern is correctly implemented for genesis fork synchronization testing.
Line range hint 273-292
: LGTM: Worker management properly implemented in continuous sync
The worker state management pattern is consistently applied across both initial and continued sync scenarios.
Also applies to: 316-335
Line range hint 373-404
: LGTM: Enhanced worker management in cancellation scenario
The implementation properly handles worker state during cancellation, including verification of complete worker stoppage. This ensures clean test cancellation.
sync/src/parallel/executor.rs (5)
19-19
: Verify the necessity of deriving Clone
for ExecuteState
The ExecuteState
enum now derives Clone
. Ensure that all variants within ExecuteState
can be safely cloned and that cloning is indeed required for your use cases.
34-34
: Consider the impact of cloning Arc<WorkerScheduler>
The worker_scheduler
field is added to DagBlockExecutor
as an Arc<WorkerScheduler>
. While Arc
provides thread-safe reference counting, cloning an Arc
increases the reference count. Verify that this does not lead to unintended prolonged lifetimes or resource leaks of the WorkerScheduler
.
48-51
: Ensure proper synchronization in Drop
implementation
In the Drop
implementation of ExecutorDeconstructor
(or WorkerGuard
), worker_scheduler.worker_exits()
is called. Verify that this method is thread-safe and handles concurrent access appropriately, preventing potential data races or deadlocks.
127-130
: Optimize the parent-waiting loop for responsiveness
Within the inner loop that waits for parent blocks:
if self.worker_scheduler.check_if_stop().await {
info!("sync worker scheduler stopped");
return;
}
Consider adding a small sleep duration to prevent tight looping, which can lead to high CPU usage.
[performance]
Apply this diff to introduce a sleep duration:
-}
+} else {
+ tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
72-72
: Ensure worker_scheduler
is properly initialized
When creating the DagBlockExecutor
, the worker_scheduler
is cloned and stored. Confirm that the worker_scheduler
is fully initialized and that cloning it at this point does not lead to unintended behavior.
flexidag/src/blockdag.rs (2)
226-231
: Refactored merge set calculation enhances code clarity
The updated code uses unordered_mergeset_without_selected_parent
, which improves encapsulation and makes the merge set calculation more straightforward.
487-488
: Improved logging provides better insight into mergeset computation
The enhanced logging statement now includes detailed information about the tips, pruning points, selected parents, and blue sets, aiding in debugging and monitoring the mergeset and tips calculation process.
sync/src/tasks/block_sync_task.rs (2)
219-219
: LGTM: Addition of worker_scheduler
to BlockCollector
.
Adding worker_scheduler
to BlockCollector
enhances task scheduling capabilities within the synchronization process.
270-270
: LGTM: Including worker_scheduler
in new_with_handle
.
The addition of the worker_scheduler
parameter to the new_with_handle
method ensures proper initialization of BlockCollector
with task scheduling capabilities.
sync/src/sync.rs (2)
535-539
: Avoid blocking within async contexts; consider alternative solutions
This code segment uses tokio::task::block_in_place
and tokio::runtime::Handle::current().block_on
, similar to the pattern in the cancel_task
method. It's advisable to avoid blocking within asynchronous contexts to prevent performance bottlenecks. Please consider refactoring as suggested previously.
561-565
: Avoid blocking within async contexts; consider alternative solutions
This code segment repeats the blocking pattern noted earlier. Consistently avoiding blocking operations within async code enhances performance and prevents potential deadlocks. Please refer to the previous comments for guidance.
storage/src/tests/test_storage.rs (2)
607-791
: run_write_batch
function correctly implements batch writing and verification
The run_write_batch
function effectively performs batch writing of blocks and DAG sync blocks to the storage instance and verifies the integrity of the data. The use of WriteBatchWithColumn
and the handling of different StorageInstance
variants are appropriate.
793-821
: Test functions are appropriately structured and validate batch writing across storage instances
The test functions test_batch_write_for_cache_and_db
, test_batch_write_for_db
, and test_batch_write_for_cache
properly set up different storage instances and invoke run_write_batch
to verify batch writing functionality in various contexts.
flexidag/tests/tests.rs (1)
508-519
: Confirm the correctness of ancestor checks
The assertions using dag.check_ancestor_of
are crucial for verifying ancestor relationships. Ensure that these checks align with the intended DAG structure and that all variables (selected_parent_uncle1
, uncle1
, uncle2
, parent
, child
) have been correctly initialized and represent the expected nodes in the graph.
println!( | ||
"Worker is working, worker_count = {:?}", | ||
self.worker_scheduler.check_worker_count().await | ||
); | ||
time::sleep(Duration::from_secs(1)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Replace println! with proper logging
Using println! for logging in production code is not recommended. Consider using the logging framework (e.g., tracing, log) for better observability and control.
- println!(
- "Worker is working, worker_count = {:?}",
- self.worker_scheduler.check_worker_count().await
- );
+ tracing::debug!(
+ worker_count = ?self.worker_scheduler.check_worker_count().await,
+ "Worker is working"
+ );
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
println!( | |
"Worker is working, worker_count = {:?}", | |
self.worker_scheduler.check_worker_count().await | |
); | |
time::sleep(Duration::from_secs(1)).await; | |
tracing::debug!( | |
worker_count = ?self.worker_scheduler.check_worker_count().await, | |
"Worker is working" | |
); | |
time::sleep(Duration::from_secs(1)).await; |
@@ -158,7 +162,9 @@ pub async fn full_sync_new_node() -> Result<()> { | |||
None, | |||
dag.clone(), | |||
node2.sync_dag_store.clone(), | |||
worker_scheduler.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for worker scheduler operations.
While the sequence of operations is correct, the tell_worker_to_start/stop
calls could fail. Consider adding error handling:
- worker_scheduler.tell_worker_to_stop().await;
+ worker_scheduler.tell_worker_to_stop().await.map_err(|e| {
+ error!("Failed to stop worker: {}", e);
+ e
+ })?;
let (sync_task, _task_handle, task_event_counter) = full_sync_task(
/* ... */
worker_scheduler.clone(),
)?;
- worker_scheduler.tell_worker_to_start().await;
+ worker_scheduler.tell_worker_to_start().await.map_err(|e| {
+ error!("Failed to start worker: {}", e);
+ e
+ })?;
Also applies to: 167-167
pub async fn check_worker_count(&self) -> u64 { | ||
self.worker_count.load(std::sync::atomic::Ordering::SeqCst) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Remove unnecessary async
from check_worker_count
and its usage
The method check_worker_count
is declared async
but does not perform any asynchronous operations. Making it synchronous eliminates unnecessary overhead and simplifies the code. Similarly, update its usage in wait_for_worker
.
Apply this diff to remove async
and adjust the call:
-pub async fn check_worker_count(&self) -> u64 {
+pub fn check_worker_count(&self) -> u64 {
self.worker_count.load(std::sync::atomic::Ordering::SeqCst)
}
-if 0 == self.check_worker_count().await {
+if 0 == self.check_worker_count() == 0 {
Also applies to: 65-65
self.sync_dag_store.delete_all_dag_sync_block()?; | ||
self.find_absent_ancestor(vec![block_header.clone()]) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential data inconsistency due to deletion of DAG sync blocks
The call to self.sync_dag_store.delete_all_dag_sync_block()?
removes all DAG sync blocks, which may lead to data inconsistency or loss of synchronization state. Ensure this operation is necessary and consider the impact on ongoing synchronization processes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (5)
sync/src/parallel/worker_scheduler.rs (3)
12-22
: Consider deriving Default instead of manual implementation.The manual Default implementation can be replaced with a derive attribute since all fields are initialized to their default values in
new()
.-#[derive(Debug)] +#[derive(Debug, Default)] pub struct WorkerScheduler { state: RwLock<WorkerSchedulerState>, worker_count: AtomicU64, } -impl Default for WorkerScheduler { - fn default() -> Self { - Self::new() - } -}
57-65
: Consider using Relaxed ordering for worker count operations.The SeqCst ordering is unnecessarily strong for simple counter operations. Relaxed ordering would be sufficient and more performant.
pub fn worker_exits(&self) { self.worker_count - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); } pub fn worker_start(&self) { self.worker_count - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); }
44-51
: Consider adding metrics for worker state transitions.Adding metrics for state transitions would help monitor system health and debug issues.
Consider integrating with a metrics system to track:
- Number of state transitions
- Time spent in each state
- Number of active workers over time
sync/src/tasks/tests_dag.rs (2)
57-59
: Verify worker cleanup after test completionThe WorkerScheduler is started but there's no explicit cleanup. Consider:
- Adding cleanup in a finally block or using test fixtures
- Ensuring worker is stopped even if test fails
Consider implementing a test helper or fixture to manage WorkerScheduler lifecycle consistently across tests.
Line range hint
89-89
: Consider adding specific worker-related test casesWhile the existing tests cover the functionality, consider adding test cases for:
- Worker failure scenarios
- Concurrent sync operations
- Worker restart scenarios
Would you like me to help draft additional test cases focusing on these scenarios?
Also applies to: 143-143
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
sync/src/parallel/worker_scheduler.rs
(1 hunks)sync/src/tasks/tests_dag.rs
(3 hunks)
🔇 Additional comments (3)
sync/src/parallel/worker_scheduler.rs (2)
1-10
: LGTM! Well-structured type definitions and imports.
The imports and type definitions are appropriate for implementing thread-safe worker state management.
53-55
: 🛠️ Refactor suggestion
Remove unnecessary async
and consider relaxing memory ordering.
The check_worker_count
method doesn't perform any async operations and can be simplified.
-pub async fn check_worker_count(&self) -> u64 {
- self.worker_count.load(std::sync::atomic::Ordering::SeqCst)
+pub fn check_worker_count(&self) -> u64 {
+ self.worker_count.load(std::sync::atomic::Ordering::Acquire)
}
sync/src/tasks/tests_dag.rs (1)
3-3
: LGTM: WorkerScheduler import added appropriately
The new import is correctly placed and is used within the modified code.
pub async fn wait_for_worker(&self) { | ||
loop { | ||
if 0 == self.check_worker_count().await { | ||
break; | ||
} else { | ||
tokio::task::yield_now().await; | ||
tokio::time::sleep(Duration::from_millis(200)).await; | ||
debug!("wait for worker to exit"); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve wait_for_worker implementation.
Several improvements can be made to the waiting logic:
- The comparison can be simplified
- The fixed sleep duration could cause unnecessary delays
- Consider adding a timeout to prevent infinite waiting
pub async fn wait_for_worker(&self) {
+ const MAX_ATTEMPTS: u32 = 150; // 30 seconds total
+ const INITIAL_DELAY: Duration = Duration::from_millis(50);
+ let mut delay = INITIAL_DELAY;
+ let mut attempts = 0;
loop {
- if 0 == self.check_worker_count().await {
+ if self.check_worker_count() == 0 {
break;
- } else {
- tokio::task::yield_now().await;
- tokio::time::sleep(Duration::from_millis(200)).await;
- debug!("wait for worker to exit");
+ }
+ attempts += 1;
+ if attempts >= MAX_ATTEMPTS {
+ debug!("Timeout waiting for workers to exit");
+ break;
}
+ tokio::time::sleep(delay).await;
+ delay = std::cmp::min(delay * 2, Duration::from_secs(1));
+ debug!("waiting for worker to exit, attempt {}", attempts);
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub async fn wait_for_worker(&self) { | |
loop { | |
if 0 == self.check_worker_count().await { | |
break; | |
} else { | |
tokio::task::yield_now().await; | |
tokio::time::sleep(Duration::from_millis(200)).await; | |
debug!("wait for worker to exit"); | |
} | |
} | |
} | |
pub async fn wait_for_worker(&self) { | |
const MAX_ATTEMPTS: u32 = 150; // 30 seconds total | |
const INITIAL_DELAY: Duration = Duration::from_millis(50); | |
let mut delay = INITIAL_DELAY; | |
let mut attempts = 0; | |
loop { | |
if self.check_worker_count() == 0 { | |
break; | |
} | |
attempts += 1; | |
if attempts >= MAX_ATTEMPTS { | |
debug!("Timeout waiting for workers to exit"); | |
break; | |
} | |
tokio::time::sleep(delay).await; | |
delay = std::cmp::min(delay * 2, Duration::from_secs(1)); | |
debug!("waiting for worker to exit, attempt {}", attempts); | |
} | |
} |
sync/src/tasks/tests_dag.rs
Outdated
let worker_scheduler = Arc::new(WorkerScheduler::new()); | ||
loop { | ||
worker_scheduler.tell_worker_to_stop().await; | ||
worker_scheduler.wait_for_worker().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Add timeout and error handling to worker lifecycle operations
The current implementation of wait_for_worker
has a basic timeout mechanism (200ms sleep between checks) but lacks:
- Maximum wait time limit
- Error propagation
- Proper cleanup on timeout
Specific issues:
wait_for_worker
loops indefinitely without a maximum duration- No error handling for worker state transitions
- Debug-level logging may not be sufficient for production issues
🔗 Analysis chain
Consider adding error handling for worker lifecycle operations
The worker stop/wait operations could potentially fail or deadlock. Consider:
- Adding timeouts to prevent indefinite waiting
- Error handling for failed worker operations
- Logging worker state transitions for debugging
Let me check the implementation of the worker scheduler to better understand the error handling and timeout mechanisms.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for similar worker management patterns in tests
rg -A 3 "tell_worker_to_stop|wait_for_worker" --type rust
Length of output: 6966
Script:
#!/bin/bash
# Check WorkerScheduler implementation for error handling and timeouts
ast-grep --pattern 'pub struct WorkerScheduler {
$$$
}'
# Also check wait_for_worker implementation
ast-grep --pattern 'pub async fn wait_for_worker(&self) {
$$$
}'
Length of output: 1198
Benchmark for 19ba03eClick to view benchmark
|
753ac53
to
e4f72af
Compare
Benchmark for a8a8c83Click to view benchmark
|
Benchmark for 0a5c137Click to view benchmark
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Outside diff range and nitpick comments (13)
sync/src/parallel/worker_scheduler.rs (1)
7-11
: Consider making WorkerSchedulerState publicThe enum is currently private but used across multiple components. Consider making it public to improve API documentation and visibility.
-enum WorkerSchedulerState { +pub enum WorkerSchedulerState {sync/src/tasks/block_sync_task.rs (2)
Line range hint
461-474
: Critical: Potential race condition in DAG block deletionThe deletion of all DAG sync blocks followed by parallel execution could lead to race conditions:
- Blocks might be deleted while other workers are still processing them
- No synchronization mechanism ensures all workers have stopped before deletion
Consider this safer approach:
- self.sync_dag_store.delete_all_dag_sync_block()?; + // Ensure all workers have stopped + self.worker_scheduler.tell_worker_to_stop().await?; + self.worker_scheduler.wait_for_worker().await?; + + // Safe to delete now + self.sync_dag_store.delete_all_dag_sync_block()?; + + // Resume workers + self.worker_scheduler.tell_worker_to_start().await?;
Line range hint
465-474
: Consider adding retry mechanism for parallel executionThe parallel execution setup lacks error handling for temporary failures.
Add retry logic:
let parallel_execute = DagBlockSender::new( self.sync_dag_store.clone(), 100000, self.chain.time_service(), self.local_store.clone(), None, self.chain.dag(), self, worker_scheduler, ); - parallel_execute.process_absent_blocks().await?; + let mut attempts = 0; + const MAX_ATTEMPTS: u32 = 3; + while attempts < MAX_ATTEMPTS { + match parallel_execute.process_absent_blocks().await { + Ok(_) => break, + Err(e) if attempts < MAX_ATTEMPTS - 1 => { + debug!("Retrying parallel execution after error: {:?}", e); + attempts += 1; + tokio::time::sleep(Duration::from_secs(1 << attempts)).await; + } + Err(e) => return Err(e), + } + }sync/src/sync.rs (1)
69-69
: Consider adding error handling for WorkerScheduler initializationThe
WorkerScheduler
is initialized without any error handling. While the current implementation works, it would be more robust to handle potential initialization failures.- let worker_scheduler = Arc::new(WorkerScheduler::new()); + let worker_scheduler = Arc::new(WorkerScheduler::new().map_err(|e| format_err!("Failed to initialize worker scheduler: {}", e))?);Also applies to: 97-106
sync/src/tasks/tests.rs (1)
75-77
: Consider extracting test helper for worker lifecycle managementThe pattern of stopping, waiting, and starting workers is repeated across multiple tests. Consider extracting this into a test helper method to improve test maintainability.
+async fn setup_worker(scheduler: &Arc<WorkerScheduler>) -> Result<()> { + scheduler.tell_worker_to_stop().await; + scheduler.wait_for_worker().await?; + Ok(()) +} +async fn start_worker(scheduler: &Arc<WorkerScheduler>) { + scheduler.tell_worker_to_start().await; +}Also applies to: 88-90
flexidag/src/consensusdb/consensus_relations.rs (2)
136-166
: Enhance error handling in batch constructionThe batch construction logic is correct, but consider adding error handling for the encoding operations.
Consider wrapping the encoding operations in a try block:
let batch = WriteBatchWithColumn { data: vec![ WriteBatchData { column: PARENTS_CF.to_string(), - row_data: WriteBatch::new_with_rows(vec![( - hash.to_vec(), - WriteOp::Value( - <Arc<Vec<Hash>> as ValueCodec<RelationParent>>::encode_value(&parents)?, - ), - )]), + row_data: WriteBatch::new_with_rows(vec![( + hash.to_vec(), + WriteOp::Value( + <Arc<Vec<Hash>> as ValueCodec<RelationParent>>::encode_value(&parents) + .map_err(|e| StoreError::EncodeError( + format!("Failed to encode parents: {}", e) + ))?, + ), + )]), },
167-178
: Consider implementing transaction-like behaviorThe current implementation updates both DB and cache separately. If the cache update fails after a successful DB write, it could lead to inconsistency.
Consider implementing a transaction-like pattern:
- Prepare both DB and cache operations
- Execute DB operation
- Execute cache operation with rollback capability
- Handle rollback if cache operation fails
This would ensure better consistency between DB and cache.
storage/src/storage.rs (2)
43-43
: Add documentation for new trait methodsThe new trait methods lack documentation explaining their purpose and usage.
Consider adding documentation:
+ /// Writes a batch of operations with column family information. + /// The batch can contain multiple operations across different column families. fn write_batch_with_column(&self, batch: WriteBatchWithColumn) -> Result<()>; + /// Writes a batch of operations with column family information synchronously. + /// This ensures the batch is persisted to disk before returning. fn write_batch_with_column_sync(&self, batch: WriteBatchWithColumn) -> Result<()>;Also applies to: 48-48
208-217
: Enhance error handling in StorageInstance implementationThe implementation should provide more specific error messages for better debugging.
Consider improving error handling:
fn write_batch_with_column(&self, batch: WriteBatchWithColumn) -> Result<()> { match self { Self::CACHE { cache } => cache.write_batch_with_column(batch), Self::DB { db } => db.write_batch_with_column(batch), Self::CacheAndDb { cache, db } => { - db.write_batch_with_column(batch.clone())?; - cache.write_batch_with_column(batch) + db.write_batch_with_column(batch.clone()) + .map_err(|e| format_err!("Failed to write batch to DB: {}", e))?; + cache.write_batch_with_column(batch) + .map_err(|e| format_err!("Failed to write batch to cache: {}", e)) } } }storage/src/tests/test_storage.rs (1)
607-791
: LGTM! Well-structured batch write test implementation.The function provides comprehensive testing of batch writes across different storage types with proper error handling.
Consider extracting the verification logic into a helper function to reduce code duplication across storage types:
+fn verify_blocks( + storage: &dyn CodecKVStore, + block1: &Block, + block2: &Block, + dag_block1: &DagSyncBlock, + dag_block2: &DagSyncBlock, +) -> Result<()> { + let read_block1 = Block::decode( + &storage + .get(BLOCK_PREFIX_NAME, block1.id().encode()?)? + .expect("failed to get the block"), + )?; + assert_eq!(read_block1, *block1); + + let read_block2 = Block::decode( + &storage + .get(BLOCK_PREFIX_NAME, block2.id().encode()?)? + .expect("failed to get the block"), + )?; + assert_eq!(read_block2, *block2); + + let read_dag_block1 = DagSyncBlock::decode( + &storage + .get(DAG_SYNC_BLOCK_PREFIX_NAME, dag_block1.block.id().encode()?)? + .expect("failed to get the dag block"), + )?; + assert_eq!(read_dag_block1, *dag_block1); + + let read_dag_block2 = DagSyncBlock::decode( + &storage + .get(DAG_SYNC_BLOCK_PREFIX_NAME, dag_block2.block.id().encode()?)? + .expect("failed to get the dag block"), + )?; + assert_eq!(read_dag_block2, *dag_block2); + Ok(()) +}flexidag/tests/tests.rs (2)
1143-1328
: Add documentation and extract magic numbers as constants.The test's purpose and verification steps are not immediately clear. Consider:
- Adding documentation to explain the test's purpose and verification strategy
- Extracting magic numbers as named constants
Add documentation and constants:
+/// Number of blocks to generate in the bottom layer +const BOTTOM_LAYER_BLOCKS: u64 = 500; + +/// Default block difficulty +const DEFAULT_DIFFICULTY: u64 = 10; + +/// High difficulty for testing block selection +const HIGH_DIFFICULTY: u64 = 99999999; + +/// Tests the consistency of blue block verification by: +/// 1. Creating a chain of blocks with consistent difficulty +/// 2. Introducing a high difficulty block to test selection +/// 3. Verifying that ghost data remains consistent #[test] fn test_verification_blue_block_inconsistent() -> anyhow::Result<()> {
Line range hint
1329-1462
: Clean up commented assertions.The test contains commented out assertions that should either be removed or uncommented if they are still relevant:
// assert!(observer1.blue_score < observer2.blue_score); // assert!(observer1.selected_parent != observer2.selected_parent); // assert_eq!(observer3.blue_score, observer2.blue_score); // assert_eq!(observer3.selected_parent, observer2.selected_parent);sync/src/tasks/inner_sync_task.rs (1)
42-42
: Consider documenting the architectural changes.The integration of
WorkerScheduler
represents a significant architectural change to the synchronization process:
- How does this relate to the "dag relationship in batch" mentioned in the PR title?
- What are the performance implications of this change?
- Are there any changes needed in error handling or recovery processes?
Please update the PR description to include:
- Current vs. new behavior
- Performance expectations
- Impact on system resources
Also applies to: 161-161
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (23)
flexidag/src/consensusdb/access.rs
(1 hunks)flexidag/src/consensusdb/consensus_relations.rs
(2 hunks)flexidag/src/ghostdag/protocol.rs
(2 hunks)flexidag/src/prune/pruning_point_manager.rs
(1 hunks)flexidag/tests/tests.rs
(7 hunks)kube/manifest/starcoin-vega.yaml
(1 hunks)storage/src/batch/mod.rs
(1 hunks)storage/src/cache_storage/mod.rs
(3 hunks)storage/src/db_storage/mod.rs
(2 hunks)storage/src/storage.rs
(4 hunks)storage/src/tests/test_storage.rs
(2 hunks)sync/src/parallel/executor.rs
(6 hunks)sync/src/parallel/mod.rs
(1 hunks)sync/src/parallel/sender.rs
(5 hunks)sync/src/parallel/worker_scheduler.rs
(1 hunks)sync/src/sync.rs
(12 hunks)sync/src/tasks/block_sync_task.rs
(6 hunks)sync/src/tasks/inner_sync_task.rs
(5 hunks)sync/src/tasks/mock.rs
(7 hunks)sync/src/tasks/mod.rs
(3 hunks)sync/src/tasks/test_tools.rs
(5 hunks)sync/src/tasks/tests.rs
(19 hunks)sync/src/tasks/tests_dag.rs
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (12)
- sync/src/parallel/mod.rs
- flexidag/src/prune/pruning_point_manager.rs
- kube/manifest/starcoin-vega.yaml
- sync/src/tasks/tests_dag.rs
- storage/src/batch/mod.rs
- flexidag/src/consensusdb/access.rs
- sync/src/tasks/mod.rs
- sync/src/parallel/sender.rs
- flexidag/src/ghostdag/protocol.rs
- storage/src/cache_storage/mod.rs
- sync/src/parallel/executor.rs
- sync/src/tasks/mock.rs
🔇 Additional comments (15)
sync/src/tasks/test_tools.rs (3)
129-130
: LGTM: Import statement is correctly placed.
The import follows the codebase's organization pattern.
Line range hint 148-167
: Add error handling for worker scheduler operations.
While the sequence of operations is correct, the tell_worker_to_start/stop
calls could fail. Consider adding error handling:
- worker_scheduler.tell_worker_to_stop().await;
+ worker_scheduler.tell_worker_to_stop().await.map_err(|e| {
+ error!("Failed to stop worker: {}", e);
+ e
+ })?;
let (sync_task, _task_handle, task_event_counter) = full_sync_task(
/* ... */
worker_scheduler.clone(),
)?;
- worker_scheduler.tell_worker_to_start().await;
+ worker_scheduler.tell_worker_to_start().await.map_err(|e| {
+ error!("Failed to start worker: {}", e);
+ e
+ })?;
Verify worker scheduler integration.
Let's verify that the worker scheduler is properly integrated with the sync task system.
#!/bin/bash
# Description: Check worker scheduler usage in sync tasks
# Check if worker scheduler is consistently used across sync tasks
rg -A 5 "WorkerScheduler::new"
# Check if all sync tasks receive worker scheduler
ast-grep --pattern 'full_sync_task($$$, worker_scheduler.clone())'
# Check for any potential worker lifecycle management issues
rg -A 5 "tell_worker_to_(start|stop)"
Line range hint 184-203
: Refactor duplicated sync task code and fix operation sequence.
Two issues to address:
- The sync task setup code is duplicated. Consider extracting a helper function to reduce duplication and improve maintainability.
- The sequence of operations differs from the first sync task - the worker start happens after
join_handle
setup, while in the first sync it happens before.
+ async fn setup_sync_task(
+ current_header_id: HashValue,
+ target: SyncTarget,
+ storage: Arc<Storage>,
+ sender_1: UnboundedSender<BlockConnectEvent>,
+ arc_node1: Arc<SyncNodeMocker>,
+ sender_2: UnboundedSender<BlockConnectEvent>,
+ dag: Arc<BlockDAG>,
+ sync_dag_store: Arc<SyncDagStore>,
+ worker_scheduler: Arc<WorkerScheduler>,
+ net: ChainNetwork,
+ ) -> Result<(JoinHandle<SyncNodeMocker>, impl Future<Output = Result<Branch>>)> {
+ worker_scheduler.tell_worker_to_stop().await?;
+ let (sync_task, _task_handle, _) = full_sync_task(
+ current_header_id,
+ target,
+ false,
+ net.time_service(),
+ storage,
+ sender_1,
+ arc_node1,
+ sender_2,
+ DummyNetworkService::default(),
+ 15,
+ None,
+ None,
+ dag,
+ sync_dag_store,
+ worker_scheduler.clone(),
+ )?;
+ let join_handle = node2.process_block_connect_event(receiver_1).await;
+ worker_scheduler.tell_worker_to_start().await?;
+ Ok((join_handle, sync_task))
+ }
Fix inconsistent worker start sequence.
The worker start operation happens at different points in the two sync tasks:
- First sync task: worker is started before join_handle setup
- Second sync task: worker is started after join_handle setup
This inconsistency could lead to race conditions. Standardize the sequence across both sync tasks.
sync/src/sync.rs (2)
237-242
: Well-structured worker lifecycle management
The implementation properly manages the worker lifecycle by:
- Stopping existing workers
- Waiting for complete stop
- Logging each step for better observability
406-413
: Avoid blocking operations in async context
The use of block_in_place
and block_on
for stopping workers can lead to performance issues.
flexidag/src/consensusdb/consensus_relations.rs (1)
4-11
: LGTM! Well-organized imports
The new imports for batch operations are properly organized and necessary for the implementation.
storage/src/storage.rs (1)
258-267
: Apply consistent error handling pattern
The sync implementation should follow the same error handling pattern as the non-sync version.
Apply the same error handling improvements as suggested for the non-sync version.
storage/src/tests/test_storage.rs (3)
793-802
: LGTM! Clear and focused test case.
The test properly sets up a combined cache and db storage instance for batch write testing.
804-814
: LGTM! Clear and focused test case.
The test properly sets up a db-only storage instance for batch write testing.
816-821
: LGTM! Clear and focused test case.
The test properly sets up a cache-only storage instance for batch write testing.
flexidag/tests/tests.rs (1)
784-807
:
Fix incorrect duration measurement.
The duration is calculated immediately after starting the timer without any intervening operations, resulting in a meaningless duration measurement.
Apply this diff to fix the timing logic:
- let start = Instant::now();
- let duration = start.elapsed();
+ // Start timing after header creation
+ let start = Instant::now();
+ // Perform the operation you want to time here
+ let duration = start.elapsed();
sync/src/tasks/inner_sync_task.rs (4)
18-18
: LGTM: Import statement is correctly placed.
The import for WorkerScheduler
follows the module organization pattern.
63-63
: LGTM: Constructor updated correctly.
The worker_scheduler
parameter and field initialization maintain good practices:
- Added at the end for backward compatibility
- Consistent ordering with struct fields
Also applies to: 77-77
42-42
: Verify thread-safety requirements for WorkerScheduler.
The Arc
wrapper indicates concurrent access. Let's verify that WorkerScheduler
implements the necessary thread-safety traits.
✅ Verification successful
Thread-safety requirements for WorkerScheduler are properly implemented
The verification shows that WorkerScheduler
is thread-safe:
- It uses
RwLock
for internal state management (state: RwLock<WorkerSchedulerState>
) - It uses
AtomicU64
for worker count tracking (worker_count: AtomicU64
) - All public methods that modify state use proper synchronization primitives
- The struct is consistently wrapped in
Arc
across all usages in the codebase
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check if WorkerScheduler implements Send + Sync traits
ast-grep --pattern 'struct WorkerScheduler {
$$$
}'
# Check for potential unsafe implementations
rg -A 5 "unsafe impl.*WorkerScheduler"
Length of output: 96
Script:
#!/bin/bash
# Let's first locate the WorkerScheduler definition
fd "worker.*scheduler" --type f
# Then search for its implementation and usage patterns
rg -i "WorkerScheduler" -A 5
# Also check for any potential thread-safety related imports
rg "use std::sync::(Arc|Mutex|RwLock).*WorkerScheduler"
Length of output: 15054
161-161
: Document the worker scheduling integration in BlockCollector.
The addition of worker_scheduler
to BlockCollector
suggests a significant change in behavior:
- Please add documentation explaining how worker scheduling affects the block collection process.
- Consider updating the PR description to clarify these changes.
pub async fn wait_for_worker(&self) -> anyhow::Result<()> { | ||
const MAX_ATTEMPTS: u32 = 150; | ||
const INITIAL_DELAY: Duration = Duration::from_secs(30); | ||
let mut delay = INITIAL_DELAY; | ||
let mut attempts: u32 = 0; | ||
loop { | ||
if 0 == self.check_worker_count() { | ||
break; | ||
} | ||
attempts = attempts.saturating_add(1); | ||
if attempts >= MAX_ATTEMPTS { | ||
bail!("Timeout waiting for workers to exit"); | ||
} | ||
tokio::task::yield_now().await; | ||
debug!("waiting for worker to exit, attempt {}", attempts); | ||
tokio::time::sleep(delay).await; | ||
delay = std::cmp::min(delay.saturating_mul(2), Duration::from_secs(60 * 60 * 2)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimize wait_for_worker implementation
The current implementation has potential issues:
- Initial delay of 30 seconds is too long for the first attempt
- Exponential backoff with 2-hour maximum delay is excessive
- No timeout duration configuration option
Apply this diff to improve the implementation:
pub async fn wait_for_worker(&self) -> anyhow::Result<()> {
- const MAX_ATTEMPTS: u32 = 150;
- const INITIAL_DELAY: Duration = Duration::from_secs(30);
+ const MAX_ATTEMPTS: u32 = 300; // 5 minutes total
+ const INITIAL_DELAY: Duration = Duration::from_millis(100);
+ const MAX_DELAY: Duration = Duration::from_secs(10);
let mut delay = INITIAL_DELAY;
let mut attempts: u32 = 0;
loop {
if 0 == self.check_worker_count() {
break;
}
attempts = attempts.saturating_add(1);
if attempts >= MAX_ATTEMPTS {
bail!("Timeout waiting for workers to exit");
}
tokio::task::yield_now().await;
debug!("waiting for worker to exit, attempt {}", attempts);
tokio::time::sleep(delay).await;
- delay = std::cmp::min(delay.saturating_mul(2), Duration::from_secs(60 * 60 * 2));
+ delay = std::cmp::min(delay.saturating_mul(2), MAX_DELAY);
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
pub async fn wait_for_worker(&self) -> anyhow::Result<()> { | |
const MAX_ATTEMPTS: u32 = 150; | |
const INITIAL_DELAY: Duration = Duration::from_secs(30); | |
let mut delay = INITIAL_DELAY; | |
let mut attempts: u32 = 0; | |
loop { | |
if 0 == self.check_worker_count() { | |
break; | |
} | |
attempts = attempts.saturating_add(1); | |
if attempts >= MAX_ATTEMPTS { | |
bail!("Timeout waiting for workers to exit"); | |
} | |
tokio::task::yield_now().await; | |
debug!("waiting for worker to exit, attempt {}", attempts); | |
tokio::time::sleep(delay).await; | |
delay = std::cmp::min(delay.saturating_mul(2), Duration::from_secs(60 * 60 * 2)); | |
} | |
pub async fn wait_for_worker(&self) -> anyhow::Result<()> { | |
const MAX_ATTEMPTS: u32 = 300; // 5 minutes total | |
const INITIAL_DELAY: Duration = Duration::from_millis(100); | |
const MAX_DELAY: Duration = Duration::from_secs(10); | |
let mut delay = INITIAL_DELAY; | |
let mut attempts: u32 = 0; | |
loop { | |
if 0 == self.check_worker_count() { | |
break; | |
} | |
attempts = attempts.saturating_add(1); | |
if attempts >= MAX_ATTEMPTS { | |
bail!("Timeout waiting for workers to exit"); | |
} | |
tokio::task::yield_now().await; | |
debug!("waiting for worker to exit, attempt {}", attempts); | |
tokio::time::sleep(delay).await; | |
delay = std::cmp::min(delay.saturating_mul(2), MAX_DELAY); | |
} |
tokio::task::block_in_place(|| { | ||
tokio::runtime::Handle::current().block_on(async { | ||
worker_scheduler.tell_worker_to_stop().await; | ||
}); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Extract common worker stopping pattern
The worker stopping pattern is duplicated across multiple locations. Consider extracting this into a helper method to improve maintainability and reduce duplication.
+ async fn stop_worker_scheduler(scheduler: &Arc<WorkerScheduler>) -> Result<()> {
+ scheduler.tell_worker_to_stop().await;
+ scheduler.wait_for_worker().await
+ }
- tokio::task::block_in_place(|| {
- tokio::runtime::Handle::current().block_on(async {
- worker_scheduler.tell_worker_to_stop().await;
- });
- });
+ stop_worker_scheduler(&worker_scheduler).await?;
Also applies to: 561-565
async_std::task::block_on(async { | ||
worker_scheduler.tell_worker_to_stop().await; | ||
worker_scheduler | ||
.wait_for_worker() | ||
.await | ||
.expect("the sync workers do not exist"); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using block_on in async context
Using block_on
within an async context can lead to deadlocks and performance issues. Consider refactoring to use proper async/await patterns.
- async_std::task::block_on(async {
- worker_scheduler.tell_worker_to_stop().await;
- worker_scheduler
- .wait_for_worker()
- .await
- .expect("the sync workers do not exist");
- });
+ worker_scheduler.tell_worker_to_stop().await;
+ worker_scheduler
+ .wait_for_worker()
+ .await
+ .expect("the sync workers do not exist");
- async_std::task::block_on(async {
- worker_scheduler.tell_worker_to_start().await;
- });
+ worker_scheduler.tell_worker_to_start().await;
Also applies to: 1054-1056
while !node2_worker_handle.check_if_stop().await { | ||
println!("wait for worker stop."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Replace polling loop with async wait
The current implementation uses a polling loop which could lead to busy waiting. Consider using an async notification mechanism or a timeout-based approach.
- while !node2_worker_handle.check_if_stop().await {
- println!("wait for worker stop.");
- }
+ let mut retries = 0;
+ while !node2_worker_handle.check_if_stop().await {
+ if retries >= MAX_RETRIES {
+ return Err(format_err!("Worker failed to stop after {} retries", MAX_RETRIES));
+ }
+ tokio::time::sleep(Duration::from_millis(100)).await;
+ retries += 1;
+ }
Committable suggestion skipped: line range outside the PR's diff.
let mut parent_to_children = HashMap::new(); | ||
parent_to_children.insert(hash, vec![]); | ||
|
||
// The new hash has no children yet | ||
self.children_access.write( | ||
DirectDbWriter::new(&self.db), | ||
hash, | ||
BlockHashes::new(Vec::new()), | ||
)?; | ||
|
||
// Update `children` for each parent | ||
for parent in parents.iter().cloned() { | ||
let mut children = (*self.get_children(parent)?).clone(); | ||
let mut children = match self.get_children(parent) { | ||
Ok(children) => (*children).clone(), | ||
Err(e) => match e { | ||
StoreError::KeyNotFound(_) => vec![], | ||
_ => return std::result::Result::Err(e), | ||
}, | ||
}; | ||
children.push(hash); | ||
self.children_access.write( | ||
DirectDbWriter::new(&self.db), | ||
parent, | ||
BlockHashes::new(children), | ||
)?; | ||
parent_to_children.insert(parent, children); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Optimize HashMap initialization and error handling
A few suggestions for improvement:
- The initial empty children vector for the new hash (line 122) is unnecessary as it's not used anywhere.
- The error handling for missing parents could be more explicit.
Consider this improved implementation:
- let mut parent_to_children = HashMap::new();
- parent_to_children.insert(hash, vec![]);
+ let mut parent_to_children = HashMap::with_capacity(parents.len());
for parent in parents.iter().cloned() {
let mut children = match self.get_children(parent) {
Ok(children) => (*children).clone(),
Err(e) => match e {
- StoreError::KeyNotFound(_) => vec![],
+ StoreError::KeyNotFound(_) => {
+ return Err(StoreError::KeyNotFound(
+ format!("Parent block {} not found", parent)
+ ));
+ }
_ => return std::result::Result::Err(e),
},
};
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let mut parent_to_children = HashMap::new(); | |
parent_to_children.insert(hash, vec![]); | |
// The new hash has no children yet | |
self.children_access.write( | |
DirectDbWriter::new(&self.db), | |
hash, | |
BlockHashes::new(Vec::new()), | |
)?; | |
// Update `children` for each parent | |
for parent in parents.iter().cloned() { | |
let mut children = (*self.get_children(parent)?).clone(); | |
let mut children = match self.get_children(parent) { | |
Ok(children) => (*children).clone(), | |
Err(e) => match e { | |
StoreError::KeyNotFound(_) => vec![], | |
_ => return std::result::Result::Err(e), | |
}, | |
}; | |
children.push(hash); | |
self.children_access.write( | |
DirectDbWriter::new(&self.db), | |
parent, | |
BlockHashes::new(children), | |
)?; | |
parent_to_children.insert(parent, children); | |
let mut parent_to_children = HashMap::with_capacity(parents.len()); | |
for parent in parents.iter().cloned() { | |
let mut children = match self.get_children(parent) { | |
Ok(children) => (*children).clone(), | |
Err(e) => match e { | |
StoreError::KeyNotFound(_) => { | |
return Err(StoreError::KeyNotFound( | |
format!("Parent block {} not found", parent) | |
)); | |
} | |
_ => return std::result::Result::Err(e), | |
}, | |
}; | |
children.push(hash); | |
parent_to_children.insert(parent, children); |
fn write_batch_with_column(&self, batch: WriteBatchWithColumn) -> Result<()> { | ||
let mut db_batch = DBWriteBatch::default(); | ||
batch.data.into_iter().for_each(|data| { | ||
let cf_handle = self.get_cf_handle(&data.column); | ||
for (key, write_op) in data.row_data.rows { | ||
match write_op { | ||
WriteOp::Value(value) => db_batch.put_cf(cf_handle, key, value), | ||
WriteOp::Deletion => db_batch.delete_cf(cf_handle, key), | ||
}; | ||
} | ||
}); | ||
record_metrics( | ||
"db", | ||
"write_batch_column", | ||
"write_batch", | ||
self.metrics.as_ref(), | ||
) | ||
.call(|| { | ||
self.db | ||
.write_opt(db_batch, &Self::default_write_options())?; | ||
Ok(()) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Reduce code duplication between batch write methods
The implementations of write_batch_with_column
and write_batch_with_column_sync
are nearly identical, differing only in the write options used.
Extract the common logic into a private helper method:
+ fn write_batch_with_column_internal(
+ &self,
+ batch: WriteBatchWithColumn,
+ write_opts: WriteOptions,
+ ) -> Result<()> {
+ let mut db_batch = DBWriteBatch::default();
+ batch.data.into_iter().for_each(|data| {
+ let cf_handle = self.get_cf_handle(&data.column);
+ for (key, write_op) in data.row_data.rows {
+ match write_op {
+ WriteOp::Value(value) => db_batch.put_cf(cf_handle, key, value),
+ WriteOp::Deletion => db_batch.delete_cf(cf_handle, key),
+ };
+ }
+ });
+ record_metrics(
+ "db",
+ "write_batch_column",
+ "write_batch",
+ self.metrics.as_ref(),
+ )
+ .call(|| {
+ self.db.write_opt(db_batch, &write_opts)?;
+ Ok(())
+ })
+ }
+
fn write_batch_with_column(&self, batch: WriteBatchWithColumn) -> Result<()> {
- let mut db_batch = DBWriteBatch::default();
- // ... existing implementation
+ self.write_batch_with_column_internal(batch, Self::default_write_options())
}
fn write_batch_with_column_sync(&self, batch: WriteBatchWithColumn) -> Result<()> {
- let mut db_batch = DBWriteBatch::default();
- // ... existing implementation
+ self.write_batch_with_column_internal(batch, Self::sync_write_options())
}
Also applies to: 441-462
Benchmark for 1972e10Click to view benchmark
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (3)
sync/src/parallel/worker_scheduler.rs (1)
51-53
: Consider using Relaxed ordering for atomic counter.
SeqCst
ordering might be stronger than necessary for a simple counter operation.Relaxed
ordering would be sufficient here and potentially more performant.- self.worker_count.load(std::sync::atomic::Ordering::SeqCst) + self.worker_count.load(std::sync::atomic::Ordering::Relaxed)sync/src/parallel/executor.rs (2)
Line range hint
77-96
: Remove panic branch and improve error handling.The method has a few areas for improvement:
- The panic branch indicates a logic error that should be handled gracefully
- Error messages could be more descriptive
pub fn waiting_for_parents( chain: &BlockDAG, storage: Arc<dyn Store>, parents_hash: Vec<HashValue>, ) -> anyhow::Result<(bool, Option<HashValue>)> { for parent_id in parents_hash { let header = match storage.get_block_header_by_hash(parent_id)? { Some(header) => header, - None => return Ok((false, Some(parent_id))), + None => { + warn!("Parent block header not found: {}", parent_id); + return Ok((false, Some(parent_id))); + } }; if storage.get_block_info(header.id())?.is_none() { + warn!("Parent block info not found: {}", parent_id); return Ok((false, Some(parent_id))); } if !chain.has_dag_block(parent_id)? { + warn!("Parent block not in DAG: {}", parent_id); return Ok((false, Some(parent_id))); } } Ok((true, None)) }
126-159
: Extract exponential backoff logic to avoid duplication.The exponential backoff logic is duplicated between this method and
WorkerScheduler::wait_for_worker
. Consider extracting it into a shared utility.Create a new utility module with this implementation:
pub struct ExponentialBackoff { delay: Duration, attempts: u32, max_attempts: u32, max_delay: Duration, } impl ExponentialBackoff { pub fn new(initial_delay: Duration, max_attempts: u32, max_delay: Duration) -> Self { Self { delay: initial_delay, attempts: 0, max_attempts, max_delay, } } pub fn next_delay(&mut self) -> Option<Duration> { if self.attempts >= self.max_attempts { return None; } self.attempts = self.attempts.saturating_add(1); let current_delay = self.delay; self.delay = std::cmp::min(self.delay.saturating_mul(2), self.max_delay); Some(current_delay) } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
sync/src/parallel/executor.rs
(6 hunks)sync/src/parallel/worker_scheduler.rs
(1 hunks)
🔇 Additional comments (4)
sync/src/parallel/worker_scheduler.rs (3)
7-11
: LGTM! Well-designed state enum.
The enum clearly represents the binary state of worker scheduling with descriptive variants.
13-17
: LGTM! Thread-safe design with appropriate synchronization primitives.
The struct uses appropriate concurrency primitives:
RwLock
for state management allows multiple readersAtomicU64
for thread-safe counter operations
65-86
: 🛠️ Refactor suggestion
Optimize wait_for_worker timing parameters.
The current timing parameters may lead to inefficient waiting:
- Initial delay of 30 seconds is too long for the first attempt
- Maximum delay of 2 hours is excessive
- Consider adding configuration options for these parameters
Apply this diff to improve the implementation:
pub async fn wait_for_worker(&self) -> anyhow::Result<()> {
- const MAX_ATTEMPTS: u32 = 150;
- const INITIAL_DELAY: Duration = Duration::from_secs(30);
+ const MAX_ATTEMPTS: u32 = 300; // 5 minutes total with shorter intervals
+ const INITIAL_DELAY: Duration = Duration::from_millis(100);
+ const MAX_DELAY: Duration = Duration::from_secs(10);
let mut delay = INITIAL_DELAY;
let mut attempts: u32 = 0;
loop {
if 0 == self.check_worker_count() {
break;
}
attempts = attempts.saturating_add(1);
if attempts >= MAX_ATTEMPTS {
bail!("Timeout waiting for workers to exit");
}
tokio::task::yield_now().await;
debug!("waiting for worker to exit, attempt {}", attempts);
tokio::time::sleep(delay).await;
- delay = std::cmp::min(delay.saturating_mul(2), Duration::from_secs(60 * 60 * 2));
+ delay = std::cmp::min(delay.saturating_mul(2), MAX_DELAY);
}
Likely invalid or redundant comment.
sync/src/parallel/executor.rs (1)
37-51
: LGTM! Well-designed RAII pattern for worker lifecycle management.
The ExecutorDeconstructor
correctly implements the RAII pattern to manage worker lifecycle:
- Constructor starts the worker
- Drop trait ensures worker cleanup
Benchmark for 918f12eClick to view benchmark
|
Benchmark for d7560aeClick to view benchmark
|
This PR was merged into: #4358 and closed. |
Pull request type
Please check the type of change your PR introduces:
What is the current behavior?
Issue Number: N/A
What is the new behavior?
Other information
Summary by CodeRabbit
New Features
flush_cache
method for bulk insertion of key-value pairs in the caching system.write_batch_with_column
andwrite_batch_with_column_sync
methods for enhanced batch writing capabilities across multiple storage types.WorkerScheduler
for improved management of worker tasks during synchronization.Bug Fixes
verify_and_ghostdata
method.Tests
WorkerScheduler
for better control during block synchronization tasks.Documentation