From 1953773d2a038bb3be02c7c8ba95234c4b214ac8 Mon Sep 17 00:00:00 2001 From: Mili Date: Wed, 29 Jan 2025 01:38:00 +0000 Subject: [PATCH] feat: implement file locking mechanism for concurrent chunk read/write operations --- pumpkin-world/src/chunk/linear.rs | 63 ++++++++++++++++--------------- pumpkin-world/src/chunk/mod.rs | 46 +++++++++++++++++++++- 2 files changed, 77 insertions(+), 32 deletions(-) diff --git a/pumpkin-world/src/chunk/linear.rs b/pumpkin-world/src/chunk/linear.rs index 1d4edbe6..0cc384cc 100644 --- a/pumpkin-world/src/chunk/linear.rs +++ b/pumpkin-world/src/chunk/linear.rs @@ -10,7 +10,8 @@ use pumpkin_config::ADVANCED_CONFIG; use super::anvil::AnvilChunkFormat; use super::{ - ChunkData, ChunkReader, ChunkReadingError, ChunkSerializingError, ChunkWriter, CompressionError, + ChunkData, ChunkReader, ChunkReadingError, ChunkSerializingError, ChunkWriter, + CompressionError, FileLocksManager, }; ///The side size of a region in chunks (one region is 32x32 chunks) @@ -250,19 +251,12 @@ impl LinearFile { .to_bytes(); // Write/OverWrite the data to the file - let tmp_path = format!( - "{}.{}.tmp", - path.as_os_str().to_str().unwrap(), - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_micros(), - ); let mut file = OpenOptions::new() .write(true) - .create_new(true) - .open(&tmp_path) + .create(true) + .truncate(true) + .open(path) .map_err(|err| ChunkWritingError::IoError(err.kind()))?; file.write_all( @@ -276,8 +270,6 @@ impl LinearFile { .as_slice(), ) .map_err(|err| ChunkWritingError::IoError(err.kind()))?; - - std::fs::rename(tmp_path, path).map_err(|err| ChunkWritingError::IoError(err.kind()))?; Ok(()) } @@ -379,9 +371,12 @@ impl ChunkReader for LinearChunkFormat { .region_folder .join(format!("./r.{}.{}.linear", region_x, region_z)); - let file_data = LinearFile::load(&path)?; - - file_data.get_chunk(at) + tokio::task::block_in_place(|| { + let file_lock = FileLocksManager::get_file_lock(&path); + let _reader_lock = file_lock.blocking_read(); + dbg!("Reading chunk at {:?}", at); + LinearFile::load(&path)?.get_chunk(at) + }) } } @@ -398,21 +393,27 @@ impl ChunkWriter for LinearChunkFormat { .region_folder .join(format!("./r.{}.{}.linear", region_x, region_z)); - let mut file_data = match LinearFile::load(&path) { - Ok(file_data) => file_data, - Err(ChunkReadingError::ChunkNotExist) => LinearFile::new(), - Err(ChunkReadingError::IoError(err)) => { - error!("Error reading the data before write: {}", err); - return Err(ChunkWritingError::IoError(err)); - } - Err(_) => return Err(ChunkWritingError::IoError(std::io::ErrorKind::Other)), - }; - - file_data - .put_chunk(chunk, at) - .map_err(|err| ChunkWritingError::ChunkSerializingError(err.to_string()))?; - - file_data.save(&path) + tokio::task::block_in_place(|| { + let file_lock = FileLocksManager::get_file_lock(&path); + let _writer_lock = file_lock.blocking_write(); + dbg!("Writing chunk at {:?}", at); + + let mut file_data = match LinearFile::load(&path) { + Ok(file_data) => file_data, + Err(ChunkReadingError::ChunkNotExist) => LinearFile::new(), + Err(ChunkReadingError::IoError(err)) => { + error!("Error reading the data before write: {}", err); + return Err(ChunkWritingError::IoError(err)); + } + Err(_) => return Err(ChunkWritingError::IoError(std::io::ErrorKind::Other)), + }; + + file_data + .put_chunk(chunk, at) + .map_err(|err| ChunkWritingError::ChunkSerializingError(err.to_string()))?; + + file_data.save(&path) + }) } } diff --git a/pumpkin-world/src/chunk/mod.rs b/pumpkin-world/src/chunk/mod.rs index 60fb312b..68650b9b 100644 --- a/pumpkin-world/src/chunk/mod.rs +++ b/pumpkin-world/src/chunk/mod.rs @@ -1,9 +1,16 @@ use fastnbt::LongArray; +use log::warn; use pumpkin_data::chunk::ChunkStatus; use pumpkin_util::math::{ceil_log2, vector2::Vector2}; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, iter::repeat_with}; +use std::{ + collections::HashMap, + iter::repeat_with, + path::{Path, PathBuf}, + sync::{Arc, LazyLock}, +}; use thiserror::Error; +use tokio::sync::{Mutex, RwLock}; use crate::{ block::BlockState, @@ -20,6 +27,8 @@ pub const SUBCHUNK_VOLUME: usize = CHUNK_AREA * 16; pub const SUBCHUNKS_COUNT: usize = WORLD_HEIGHT / 16; pub const CHUNK_VOLUME: usize = CHUNK_AREA * WORLD_HEIGHT; +// Manejador global para mĂșltiples archivos +static FILE_LOCK_MANAGER: LazyLock = LazyLock::new(FileLocksManager::default); pub trait ChunkReader: Sync + Send { fn read_chunk( &self, @@ -77,6 +86,14 @@ pub enum CompressionError { ZstdError(std::io::Error), } +type FileLocks = HashMap>>; +/// Central File Lock Manager for chunk files +/// This is used to prevent multiple threads from writing to the same file at the same time +#[derive(Clone, Default)] +pub struct FileLocksManager { + locks: Arc>, +} + pub struct ChunkData { /// See description in `Subchunks` pub subchunks: Subchunks, @@ -165,6 +182,33 @@ struct ChunkNbt { heightmaps: ChunkHeightmaps, } +impl FileLocksManager { + pub fn get_file_lock(path: &Path) -> Arc> { + tokio::task::block_in_place(|| { + let mut file_locks = FILE_LOCK_MANAGER.locks.blocking_lock(); + + if let Some(file_lock) = file_locks.get(path).cloned() { + file_lock + } else { + file_locks + .entry(path.to_path_buf()) + .or_insert_with(|| { + warn!("Creating new FileLock for {:?}", path); + Arc::new(RwLock::new(())) + }) + .clone() + } + }) + } + + pub fn remove_file_lock(path: &Path) { + tokio::task::block_in_place(|| { + FILE_LOCK_MANAGER.locks.blocking_lock().remove(path); + warn!("Removed FileLock for {:?}", path); + }) + } +} + /// The Heightmap for a completely empty chunk impl Default for ChunkHeightmaps { fn default() -> Self {