Skip to content

Commit

Permalink
[arrow-ballista] enable delete log periodically (apache#280)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang committed Sep 26, 2022
1 parent d832399 commit 0c3a241
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 3 deletions.
72 changes: 72 additions & 0 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::execution_plans::{
};
use crate::serde::scheduler::PartitionStats;
use async_trait::async_trait;
use chrono::{Duration as Cduration, DateTime, Utc};
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::{ipc::writer::FileWriter, record_batch::RecordBatch};
use datafusion::error::DataFusionError;
Expand Down Expand Up @@ -52,6 +53,8 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::{fs::File, pin::Pin};
use log::{info, warn};
use tokio::fs;
use tonic::codegen::StdError;
use tonic::transport::{Channel, Error, Server};

Expand Down Expand Up @@ -356,3 +359,72 @@ pub fn collect_plan_metrics(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
});
metrics_array
}

/// This function will scheduled periodically for cleanup log.
pub async fn clean_log_loop(work_dir: &str, ttl_seconds: u32) -> Result<()> {
let mut dir = fs::read_dir(work_dir).await?;
let mut to_deleted = Vec::new();
let cutoff = Utc::now() - Cduration::seconds(ttl_seconds as i64);
let mut need_delete;
while let Some(child) = dir.next_entry().await? {
if let Ok(metadata) = child.metadata().await {
// only delete the log file
if metadata.is_file() {
let modified_time: DateTime<Utc> =
metadata.modified().map(chrono::DateTime::from)?;
if modified_time < cutoff {
need_delete = child.path().into_os_string();
to_deleted.push(need_delete)
}
}
} else {
warn!("Found a dir {:?} in clean log skip it.", child)
}
}
info!(
"The log files {:?} that have not been modified for {:?} seconds will be deleted",
&to_deleted, ttl_seconds
);
for del in to_deleted {
fs::remove_file(del).await?;
}
Ok(())
}

#[cfg(test)]
mod tests {
use crate::utils::clean_log_loop;
use std::fs;
use std::fs::File;
use std::io::Write;
use std::time::Duration;
use tempfile::TempDir;

#[tokio::test]
async fn test_clean_up_log() {
let work_dir = TempDir::new().unwrap().into_path();
let job_dir = work_dir.as_path().join("log");
let file_path = job_dir.as_path().join("1.log");
let data = "Jorge,2018-12-13T12:12:10.011Z\n\
Andrew,2018-11-13T17:11:10.011Z";
fs::create_dir(job_dir.clone()).unwrap();
File::create(&file_path)
.expect("creating temp file")
.write_all(data.as_bytes())
.expect("writing data");

let count1 = fs::read_dir(job_dir.clone()).unwrap().count();
assert_eq!(count1, 1);
let mut handles = vec![];
let job_dir_clone = job_dir.clone();
handles.push(tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(2)).await;
clean_log_loop(job_dir_clone.to_str().unwrap(), 1)
.await
.unwrap();
}));
futures::future::join_all(handles).await;
let count2 = fs::read_dir(job_dir.clone()).unwrap().count();
assert_eq!(count2, 0);
}
}
6 changes: 6 additions & 0 deletions ballista/rust/executor/executor_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,9 @@ name = "log_level_setting"
type = "String"
doc = "special log level for sub mod. link: https://docs.rs/env_logger/latest/env_logger/#enabling-logging. For example we want whole level is INFO but datafusion mode is DEBUG"
default = "std::string::String::from(\"INFO,datafusion=INFO\")"

[[param]]
name = "cleanup_log_ttl"
type = "u32"
doc = "The number of hours to retain log files on each node, zero means disable."
default = "0"
30 changes: 29 additions & 1 deletion ballista/rust/executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,47 @@ async fn main() -> Result<()> {
let grpc_port = opt.bind_grpc_port;
let log_dir = opt.log_dir;
let print_thread_info = opt.print_thread_info;
let cleanup_log_ttl = opt.cleanup_log_ttl;

let scheduler_name = format!("executor_{}_{}", bind_host, port);

// File layer
if let Some(log_dir) = log_dir {
let log_file = tracing_appender::rolling::daily(log_dir, &scheduler_name);
let log_file = if cleanup_log_ttl > 24 {
tracing_appender::rolling::daily(log_dir.clone(), &scheduler_name)
} else {
tracing_appender::rolling::hourly(log_dir.clone(), &scheduler_name)
};
tracing_subscriber::fmt()
.with_ansi(true)
.with_thread_names(print_thread_info)
.with_thread_ids(print_thread_info)
.with_writer(log_file)
.with_env_filter(special_mod_log_level)
.init();

if cleanup_log_ttl > 0 {
// cleanup_log_ttl unit is hour
let ttl_seconds = opt.cleanup_log_ttl * 60 * 60;
let mut interval_time =
time::interval(Core_Duration::from_secs((ttl_seconds / 2) as u64));

info!(
"Enable cleanup log loop which cleanup_log_ttl is {} hours in log_dir {}",
cleanup_log_ttl, &log_dir
);

tokio::spawn(async move {
loop {
interval_time.tick().await;
if let Err(e) =
ballista_core::utils::clean_log_loop(&log_dir, ttl_seconds).await
{
error!("Ballista executor fail to clean_log {:?}", e)
}
}
});
}
} else {
//Console layer
let rust_log = env::var(EnvFilter::DEFAULT_ENV);
Expand Down
6 changes: 6 additions & 0 deletions ballista/rust/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,9 @@ name = "log_level_setting"
type = "String"
doc = "special log level for sub mod. link: https://docs.rs/env_logger/latest/env_logger/#enabling-logging. For example we want whole level is INFO but datafusion mode is DEBUG"
default = "std::string::String::from(\"INFO,datafusion=INFO\")"

[[param]]
name = "cleanup_log_ttl"
type = "u32"
doc = "The number of hours to retain log files on each node, zero means disable."
default = "0"
31 changes: 29 additions & 2 deletions ballista/rust/scheduler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use ballista_scheduler::scheduler_server::externalscaler::external_scaler_server
use futures::future::{self, Either, TryFutureExt};
use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
use std::convert::Infallible;
use std::time::Duration;
use std::{env, io, net::SocketAddr, sync::Arc};
use tonic::transport::server::Connected;
use tower::Service;
Expand All @@ -45,7 +46,8 @@ use ballista_scheduler::state::backend::{StateBackend, StateBackendClient};

use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::serde::BallistaCodec;
use log::info;
use log::{error, info};
use tokio::time;

#[macro_use]
extern crate configure_me;
Expand Down Expand Up @@ -168,20 +170,45 @@ async fn main() -> Result<()> {
let port = opt.bind_port;
let log_dir = opt.log_dir;
let print_thread_info = opt.print_thread_info;
let cleanup_log_ttl = opt.cleanup_log_ttl;
let log_file_name_prefix =
format!("scheduler_{}_{}_{}", namespace, external_host, port);
let scheduler_name = format!("{}:{}", external_host, port);

// File layer
if let Some(log_dir) = log_dir {
let log_file = tracing_appender::rolling::daily(log_dir, &log_file_name_prefix);
let log_file = tracing_appender::rolling::daily(&log_dir, &log_file_name_prefix);
tracing_subscriber::fmt()
.with_ansi(true)
.with_thread_names(print_thread_info)
.with_thread_ids(print_thread_info)
.with_writer(log_file)
.with_env_filter(special_mod_log_level)
.init();

if cleanup_log_ttl > 0 {
// cleanup_log_ttl unit is hour
let ttl_seconds = opt.cleanup_log_ttl * 60 * 60;
let mut interval_time =
time::interval(Duration::from_secs((ttl_seconds / 2) as u64));
info!(
"Enable cleanup log loop which cleanup_log_ttl is {} hours in log_dir {}",
cleanup_log_ttl, &log_dir
);

tokio::spawn(async move {
loop {
interval_time.tick().await;
if let Err(e) =
ballista_core::utils::clean_log_loop(&log_dir, ttl_seconds).await
{
error!("Ballista executor fail to clean_log {:?}", e)
}
}
});
}


} else {
//Console layer
let rust_log = env::var(EnvFilter::DEFAULT_ENV);
Expand Down

0 comments on commit 0c3a241

Please sign in to comment.