Skip to content

Commit

Permalink
feat: implement file locking mechanism for concurrent chunk read/writ…
Browse files Browse the repository at this point in the history
…e operations
  • Loading branch information
Mili committed Jan 29, 2025
1 parent 3404700 commit 1953773
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 32 deletions.
63 changes: 32 additions & 31 deletions pumpkin-world/src/chunk/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use pumpkin_config::ADVANCED_CONFIG;

use super::anvil::AnvilChunkFormat;
use super::{
ChunkData, ChunkReader, ChunkReadingError, ChunkSerializingError, ChunkWriter, CompressionError,
ChunkData, ChunkReader, ChunkReadingError, ChunkSerializingError, ChunkWriter,
CompressionError, FileLocksManager,
};

///The side size of a region in chunks (one region is 32x32 chunks)
Expand Down Expand Up @@ -250,19 +251,12 @@ impl LinearFile {
.to_bytes();

// Write/OverWrite the data to the file
let tmp_path = format!(
"{}.{}.tmp",
path.as_os_str().to_str().unwrap(),
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros(),
);

let mut file = OpenOptions::new()
.write(true)
.create_new(true)
.open(&tmp_path)
.create(true)
.truncate(true)
.open(path)
.map_err(|err| ChunkWritingError::IoError(err.kind()))?;

file.write_all(
Expand All @@ -276,8 +270,6 @@ impl LinearFile {
.as_slice(),
)
.map_err(|err| ChunkWritingError::IoError(err.kind()))?;

std::fs::rename(tmp_path, path).map_err(|err| ChunkWritingError::IoError(err.kind()))?;
Ok(())
}

Expand Down Expand Up @@ -379,9 +371,12 @@ impl ChunkReader for LinearChunkFormat {
.region_folder
.join(format!("./r.{}.{}.linear", region_x, region_z));

let file_data = LinearFile::load(&path)?;

file_data.get_chunk(at)
tokio::task::block_in_place(|| {
let file_lock = FileLocksManager::get_file_lock(&path);
let _reader_lock = file_lock.blocking_read();
dbg!("Reading chunk at {:?}", at);
LinearFile::load(&path)?.get_chunk(at)
})
}
}

Expand All @@ -398,21 +393,27 @@ impl ChunkWriter for LinearChunkFormat {
.region_folder
.join(format!("./r.{}.{}.linear", region_x, region_z));

let mut file_data = match LinearFile::load(&path) {
Ok(file_data) => file_data,
Err(ChunkReadingError::ChunkNotExist) => LinearFile::new(),
Err(ChunkReadingError::IoError(err)) => {
error!("Error reading the data before write: {}", err);
return Err(ChunkWritingError::IoError(err));
}
Err(_) => return Err(ChunkWritingError::IoError(std::io::ErrorKind::Other)),
};

file_data
.put_chunk(chunk, at)
.map_err(|err| ChunkWritingError::ChunkSerializingError(err.to_string()))?;

file_data.save(&path)
tokio::task::block_in_place(|| {
let file_lock = FileLocksManager::get_file_lock(&path);
let _writer_lock = file_lock.blocking_write();
dbg!("Writing chunk at {:?}", at);

let mut file_data = match LinearFile::load(&path) {
Ok(file_data) => file_data,
Err(ChunkReadingError::ChunkNotExist) => LinearFile::new(),
Err(ChunkReadingError::IoError(err)) => {
error!("Error reading the data before write: {}", err);
return Err(ChunkWritingError::IoError(err));
}
Err(_) => return Err(ChunkWritingError::IoError(std::io::ErrorKind::Other)),
};

file_data
.put_chunk(chunk, at)
.map_err(|err| ChunkWritingError::ChunkSerializingError(err.to_string()))?;

file_data.save(&path)
})
}
}

Expand Down
46 changes: 45 additions & 1 deletion pumpkin-world/src/chunk/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use fastnbt::LongArray;
use log::warn;
use pumpkin_data::chunk::ChunkStatus;
use pumpkin_util::math::{ceil_log2, vector2::Vector2};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, iter::repeat_with};
use std::{
collections::HashMap,
iter::repeat_with,
path::{Path, PathBuf},
sync::{Arc, LazyLock},
};
use thiserror::Error;
use tokio::sync::{Mutex, RwLock};

use crate::{
block::BlockState,
Expand All @@ -20,6 +27,8 @@ pub const SUBCHUNK_VOLUME: usize = CHUNK_AREA * 16;
pub const SUBCHUNKS_COUNT: usize = WORLD_HEIGHT / 16;
pub const CHUNK_VOLUME: usize = CHUNK_AREA * WORLD_HEIGHT;

// Manejador global para múltiples archivos
static FILE_LOCK_MANAGER: LazyLock<FileLocksManager> = LazyLock::new(FileLocksManager::default);
pub trait ChunkReader: Sync + Send {
fn read_chunk(
&self,
Expand Down Expand Up @@ -77,6 +86,14 @@ pub enum CompressionError {
ZstdError(std::io::Error),
}

type FileLocks = HashMap<PathBuf, Arc<RwLock<()>>>;
/// Central File Lock Manager for chunk files
/// This is used to prevent multiple threads from writing to the same file at the same time
#[derive(Clone, Default)]
pub struct FileLocksManager {
locks: Arc<Mutex<FileLocks>>,
}

pub struct ChunkData {
/// See description in `Subchunks`
pub subchunks: Subchunks,
Expand Down Expand Up @@ -165,6 +182,33 @@ struct ChunkNbt {
heightmaps: ChunkHeightmaps,
}

impl FileLocksManager {
pub fn get_file_lock(path: &Path) -> Arc<RwLock<()>> {
tokio::task::block_in_place(|| {
let mut file_locks = FILE_LOCK_MANAGER.locks.blocking_lock();

if let Some(file_lock) = file_locks.get(path).cloned() {
file_lock
} else {
file_locks
.entry(path.to_path_buf())
.or_insert_with(|| {
warn!("Creating new FileLock for {:?}", path);
Arc::new(RwLock::new(()))
})
.clone()
}
})
}

pub fn remove_file_lock(path: &Path) {
tokio::task::block_in_place(|| {
FILE_LOCK_MANAGER.locks.blocking_lock().remove(path);
warn!("Removed FileLock for {:?}", path);
})
}
}

/// The Heightmap for a completely empty chunk
impl Default for ChunkHeightmaps {
fn default() -> Self {
Expand Down

0 comments on commit 1953773

Please sign in to comment.