Skip to content

Commit

Permalink
Add an end-to-end test for trim gap handling using snapshots (#2463)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov authored Jan 8, 2025
1 parent dd3ab02 commit 22fcef1
Show file tree
Hide file tree
Showing 12 changed files with 849 additions and 588 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ description = "Restate makes distributed applications easy!"
[workspace.dependencies]
# Own crates
codederror = { path = "crates/codederror" }
mock-service-endpoint = { path = "tools/mock-service-endpoint" }
restate-admin = { path = "crates/admin" }
restate-admin-rest-model = { path = "crates/admin-rest-model" }
restate-base64-util = { path = "crates/base64-util" }
Expand Down
30 changes: 27 additions & 3 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl Node {
nodes
}

/// Start this Node, providing the base_dir and the cluster_name of the cluster its
/// Start this Node, providing the base_dir and the cluster_name of the cluster it's
/// expected to attach to. All relative file paths addresses specified in the node config
/// (eg, nodename/node.sock) will be absolutized against the base path, and the base dir
/// and cluster name present in config will be overwritten.
Expand Down Expand Up @@ -557,7 +557,7 @@ impl StartedNode {
pid
);
match nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid.try_into().unwrap()),
nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")),
nix::sys::signal::SIGKILL,
) {
Ok(()) => (&mut self.status).await,
Expand All @@ -582,7 +582,7 @@ impl StartedNode {
pid
);
match nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid.try_into().unwrap()),
nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")),
nix::sys::signal::SIGTERM,
) {
Err(nix::errno::Errno::ESRCH) => {
Expand Down Expand Up @@ -807,6 +807,30 @@ impl StartedNode {
}
}

impl Drop for StartedNode {
fn drop(&mut self) {
if let StartedNodeStatus::Running { pid, .. } = self.status {
warn!(
"Node {} (pid {}) dropped without explicit shutdown",
self.config.node_name(),
pid,
);
match nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid.try_into().expect("pid_t = i32")),
nix::sys::signal::SIGKILL,
) {
Ok(()) | Err(nix::errno::Errno::ESRCH) => {}
err => error!(
"Failed to send SIGKILL to running node {} (pid {}): {:?}",
self.config.node_name(),
pid,
err,
),
}
}
}
}

#[derive(Debug, Clone, Copy)]
pub enum HealthCheck {
Admin,
Expand Down
3 changes: 3 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ restate-core = { workspace = true, features = ["test-util"] }
restate-local-cluster-runner = { workspace = true }
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }
mock-service-endpoint = { workspace = true }

anyhow = { workspace = true }
async-trait = { workspace = true }
Expand All @@ -81,6 +82,8 @@ test-log = { workspace = true }
tonic = { workspace = true, features = ["transport", "prost"] }
tower = { workspace = true }
tracing-subscriber = { workspace = true }
reqwest = { workspace = true }
serde_json = { workspace = true }
url = { workspace = true }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
Expand Down
5 changes: 2 additions & 3 deletions server/tests/common/replicated_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ pub struct TestEnv {
pub loglet: Arc<dyn Loglet>,
pub metadata_writer: MetadataWriter,
pub metadata_store_client: MetadataStoreClient,
pub cluster: StartedCluster,
}

pub async fn run_in_test_env<F, O>(
Expand Down Expand Up @@ -118,7 +117,7 @@ where

RocksDbManager::init(Configuration::mapped_updateable(|c| &c.common));

let cluster = Cluster::builder()
let mut cluster = Cluster::builder()
.base_dir(base_dir.as_path().to_owned())
.nodes(nodes)
.build()
Expand Down Expand Up @@ -165,12 +164,12 @@ where
future(TestEnv {
bifrost,
loglet,
cluster,
metadata_writer,
metadata_store_client,
})
.await?;

cluster.graceful_shutdown(Duration::from_secs(1)).await?;
TaskCenter::shutdown_node("test completed", 0).await;
RocksDbManager::get().shutdown().await;
Ok(())
Expand Down
169 changes: 0 additions & 169 deletions server/tests/snapshots.rs

This file was deleted.

Loading

0 comments on commit 22fcef1

Please sign in to comment.