From d53009cef997b9884a726e11910bcb60535956a3 Mon Sep 17 00:00:00 2001 From: j-mendez Date: Sat, 25 Nov 2023 10:02:21 -0500 Subject: [PATCH] feat(cron): add cron feature flag --- Cargo.lock | 22 ++- examples/Cargo.toml | 4 +- spider/Cargo.toml | 10 +- spider/README.md | 52 +++++- spider/src/features/cron.rs | 346 ++++++++++++++++++++++++++++++++++++ spider/src/features/mod.rs | 3 + spider/src/lib.rs | 1 + spider/src/website.rs | 138 ++++++++++++++ spider_cli/Cargo.toml | 4 +- spider_worker/Cargo.toml | 4 +- 10 files changed, 564 insertions(+), 20 deletions(-) create mode 100644 spider/src/features/cron.rs diff --git a/Cargo.lock b/Cargo.lock index 35e78c1e4a..9c3d6e1d21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1102,6 +1102,17 @@ dependencies = [ "itertools", ] +[[package]] +name = "cron" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff76b51e4c068c52bfd2866e1567bee7c567ae8f24ada09fd4307019e25eab7" +dependencies = [ + "chrono", + "nom", + "once_cell", +] + [[package]] name = "crossbeam-deque" version = "0.8.3" @@ -3740,13 +3751,16 @@ dependencies = [ [[package]] name = "spider" -version = "1.49.13" +version = "1.50.0" dependencies = [ "ahash", + "async-trait", "bytes", "case_insensitive_string", "chromiumoxide", + "chrono", "compact_str", + "cron", "cssparser", "ego-tree", "fast_html5ever", @@ -3775,7 +3789,7 @@ dependencies = [ [[package]] name = "spider_cli" -version = "1.49.13" +version = "1.50.0" dependencies = [ "clap 4.4.8", "env_logger 0.9.3", @@ -3787,7 +3801,7 @@ dependencies = [ [[package]] name = "spider_examples" -version = "1.49.13" +version = "1.50.0" dependencies = [ "convert_case", "env_logger 0.9.3", @@ -3808,7 +3822,7 @@ dependencies = [ [[package]] name = "spider_worker" -version = "1.49.13" +version = "1.50.0" dependencies = [ "env_logger 0.10.1", "lazy_static", diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 61dd71f3ed..141c151960 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_examples" -version = "1.49.13" +version = "1.50.0" authors = ["madeindjs ", "j-mendez "] description = "Multithreaded web crawler written in Rust." repository = "https://github.com/spider-rs/spider" @@ -22,7 +22,7 @@ htr = "0.5.27" flexbuffers = "2.0.0" [dependencies.spider] -version = "1.49.13" +version = "1.50.0" path = "../spider" features = ["serde"] diff --git a/spider/Cargo.toml b/spider/Cargo.toml index 7ccf764feb..081da64f8f 100644 --- a/spider/Cargo.toml +++ b/spider/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider" -version = "1.49.13" +version = "1.50.0" authors = ["madeindjs ", "j-mendez "] description = "The fastest web crawler written in Rust." repository = "https://github.com/spider-rs/spider" @@ -43,12 +43,15 @@ case_insensitive_string = { version = "0.1.7", features = [ "compact", "serde" ] jsdom = { version = "0.0.11-alpha.1", optional = true, features = [ "hashbrown", "tokio" ] } chromiumoxide = { version = "0.5.6", optional = true, features = ["tokio-runtime", "bytes"], default-features = false } sitemap = { version = "0.4.1", optional = true } +chrono = "0.4.31" +cron = "0.12.0" +async-trait = "0.1.74" [target.'cfg(all(not(windows), not(target_os = "android"), not(target_env = "musl")))'.dependencies] tikv-jemallocator = { version = "0.5.0", optional = true } [features] -default = ["sync"] +default = ["sync", "cron"] regex = ["dep:regex"] glob = ["dep:regex", "dep:itertools"] ua_generator = ["dep:ua_generator"] @@ -70,4 +73,5 @@ chrome = ["dep:chromiumoxide"] chrome_headed = ["chrome"] chrome_cpu = ["chrome"] chrome_stealth = ["chrome"] -cookies = ["reqwest/cookies"] \ No newline at end of file +cookies = ["reqwest/cookies"] +cron = [] diff --git a/spider/README.md b/spider/README.md index 231b8ab899..41172b13c9 100644 --- a/spider/README.md +++ b/spider/README.md @@ -16,7 +16,7 @@ This is a basic async example crawling a web page, add spider to your `Cargo.tom ```toml [dependencies] -spider = "1.49.13" +spider = "1.50.0" ``` And then the code: @@ -87,7 +87,7 @@ We have a couple optional feature flags. Regex blacklisting, jemaloc backend, gl ```toml [dependencies] -spider = { version = "1.49.13", features = ["regex", "ua_generator"] } +spider = { version = "1.50.0", features = ["regex", "ua_generator"] } ``` 1. `ua_generator`: Enables auto generating a random real User-Agent. @@ -117,7 +117,7 @@ Move processing to a worker, drastically increases performance even if worker is ```toml [dependencies] -spider = { version = "1.49.13", features = ["decentralized"] } +spider = { version = "1.50.0", features = ["decentralized"] } ``` ```sh @@ -137,7 +137,7 @@ Use the subscribe method to get a broadcast channel. ```toml [dependencies] -spider = { version = "1.49.13", features = ["sync"] } +spider = { version = "1.50.0", features = ["sync"] } ``` ```rust,no_run @@ -167,7 +167,7 @@ Allow regex for blacklisting routes ```toml [dependencies] -spider = { version = "1.49.13", features = ["regex"] } +spider = { version = "1.50.0", features = ["regex"] } ``` ```rust,no_run @@ -194,7 +194,7 @@ If you are performing large workloads you may need to control the crawler by ena ```toml [dependencies] -spider = { version = "1.49.13", features = ["control"] } +spider = { version = "1.50.0", features = ["control"] } ``` ```rust @@ -258,11 +258,49 @@ async fn main() { } ``` +### Cron Jobs + +Use cron jobs to run crawls continuously at anytime. + +```toml +[dependencies] +spider = { version = "1.50.0", features = ["sync", "cron"] } +``` + +```rust,no_run +extern crate spider; + +use spider::website::{Website, run_cron}; +use spider::tokio; + +#[tokio::main] +async fn main() { + let mut website: Website = Website::new("https://choosealicense.com"); + // set the cron to run or use the builder pattern `website.with_cron`. + website.cron_str = "1/5 * * * * *".into(); + + let mut rx2 = website.subscribe(16).unwrap(); + + let join_handle = tokio::spawn(async move { + while let Ok(res) = rx2.recv().await { + println!("{:?}", res.get_url()); + } + }); + + // take ownership of the website. You can also use website.run_cron, except you need to perform abort manually on handles created. + let runner = run_cron(website).await; + // This controls when to stop, you do not need to add the sleep here if the lifetime of your program does not shutdown after crawls etc. + println!("Starting the Runner for 10 seconds"); + tokio::time::sleep(Duration::from_secs(10)).await; + let _ = tokio::join!(runner.stop(), join_handle); +} +``` + ### Chrome ```toml [dependencies] -spider = { version = "1.49.13", features = ["chrome"] } +spider = { version = "1.50.0", features = ["chrome"] } ``` You can use `website.crawl_concurrent_raw` to perform a crawl without chromium when needed. Use the feature flag `chrome_headed` to enable headful browser usage if needed to debug. diff --git a/spider/src/features/cron.rs b/spider/src/features/cron.rs new file mode 100644 index 0000000000..3dc9791b30 --- /dev/null +++ b/spider/src/features/cron.rs @@ -0,0 +1,346 @@ +//! # Cron: a simple cron runner - fork of crony with changes that allow async. +//! +//! Use the `Job` trait to create your cron job struct, pass it to the `Runner` and then start it via `run()` method. +//! Runner will spawn new thread where it will start looping through the jobs and will run their handle +//! method once the scheduled time is reached. +//! +//! +extern crate chrono; +extern crate cron; + +use async_trait::async_trait; +use chrono::{DateTime, Duration, Utc}; +pub use cron::Schedule; +use lazy_static::lazy_static; +use log::{debug, error, info}; +use std::panic; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + mpsc, Arc, Mutex, +}; +use tokio::task::JoinHandle; + +lazy_static! { + /// Singleton instance of a tracker that won't allow + /// same job to run again while its already running + /// unless you specificly allow the job to run in + /// parallel with itself + pub static ref TRACKER: Mutex = Mutex::new(Tracker::new()); +} + +#[async_trait] +pub trait Job: Send + Sync { + /// Default implementation of is_active method will + /// make this job always active + fn is_active(&self) -> bool { + true + } + + /// In case your job takes longer to finish and it's scheduled + /// to start again (while its still running), default behaviour + /// will skip the next run while one instance is already running. + /// (if your OS has enough threads, and is spawning a thread for next job) + /// + /// To override this behaviour and enable it to run in parallel + /// with other instances of self, return `true` on this instance. + fn allow_parallel_runs(&self) -> bool { + false + } + + /// Define the run schedule for your job + fn schedule(&self) -> Option; + + /// This is where your jobs magic happens, define the action that + /// will happen once the cron start running your job + /// + /// If this method panics, your entire job will panic and that may + /// or may not make the whole runner panic. Handle your errors + /// properly and don't let it panic. + async fn handle(&mut self); + + /// Decide wheather or not to start running your job + fn should_run(&self) -> bool { + if self.is_active() { + match self.schedule() { + Some(schedule) => { + for item in schedule.upcoming(Utc).take(1) { + let now = Utc::now(); + let difference = item - now; + if difference <= Duration::milliseconds(100) { + return true; + } + } + } + _ => (), + } + } + + false + } + + /// Simple output that will return current time so you don't have to do so + /// in your job if you wish to display the time of the run. + fn now(&self) -> DateTime { + Utc::now() + } +} + +/// Struct for marking jobs running +pub struct Tracker(Vec); + +impl Default for Tracker { + fn default() -> Self { + Self::new() + } +} + +impl Tracker { + /// Return new instance of running + pub fn new() -> Self { + Tracker(vec![]) + } + + /// Check if id of the job is marked as running + pub fn running(&self, id: &usize) -> bool { + self.0.contains(id) + } + + /// Set job id as running + pub fn start(&mut self, id: &usize) -> usize { + if !self.running(id) { + self.0.push(*id); + } + self.0.len() + } + + /// Unmark the job from running + pub fn stop(&mut self, id: &usize) -> usize { + if self.running(id) { + match self.0.iter().position(|&r| r == *id) { + Some(i) => self.0.remove(i), + None => 0, + }; + } + self.0.len() + } +} + +/// Runner that will hold all the jobs and will start up the execution +/// and eventually will stop it. +pub struct Runner { + /// the current jobs + pub jobs: Vec>, + /// the task that is running the handle + pub thread: Option>, + /// is the task running or not + pub running: bool, + /// channel sending message + pub tx: Option>>, + /// tracker to determine crons working + pub working: Arc, +} + +impl Default for Runner { + fn default() -> Self { + Self::new() + } +} + +impl Runner { + /// Create new runner + pub fn new() -> Self { + Runner { + jobs: vec![], + thread: None, + running: false, + tx: None, + working: Arc::new(AtomicBool::new(false)), + } + } + + /// Add jobs into the runner + /// + /// **panics** if you try to push a job onto already started runner + #[allow(clippy::should_implement_trait)] + pub fn add(mut self, job: Box) -> Self { + if self.running { + panic!("Cannot push job onto runner once the runner is started!"); + } + self.jobs.push(job); + self + } + + /// Number of jobs ready to start running + pub fn jobs_to_run(&self) -> usize { + self.jobs.len() + } + + /// Start the loop and job execution + pub async fn run(self) -> Self { + if self.jobs.is_empty() { + return self; + } + + let working = Arc::new(AtomicBool::new(false)); + let (thread, tx) = spawn(self, working.clone()).await; + + Self { + thread, + jobs: vec![], + running: true, + tx, + working, + } + } + + /// Stop the spawned runner + pub async fn stop(mut self) { + if !self.running { + return; + } + if let Some(thread) = self.thread.take() { + if let Some(tx) = self.tx { + match tx.send(Ok(())) { + Ok(_) => (), + Err(e) => error!("Could not send stop signal to cron runner thread: {}", e), + }; + } + thread.abort() + } + } + + /// Lets us know if the cron worker is running + pub fn is_running(&self) -> bool { + self.running + } + + /// Lets us know if the worker is in the process of executing a job currently + pub fn is_working(&self) -> bool { + self.working.load(Ordering::Relaxed) + } +} + +/// Spawn the thread for the runner and return its sender to stop it +async fn spawn( + runner: Runner, + working: Arc, +) -> (Option>, Option>>) { + let (tx, rx): (Sender>, Receiver>) = mpsc::channel(); + + let handler = tokio::spawn(async move { + let mut jobs = runner.jobs; + + loop { + if rx.try_recv().is_ok() { + info!("Stopping the cron runner thread"); + break; + } + + for (id, job) in jobs.iter_mut().enumerate() { + let no = (id + 1).to_string(); + + if job.should_run() + && (job.allow_parallel_runs() || !TRACKER.lock().unwrap().running(&id)) + { + TRACKER.lock().unwrap().start(&id); + + let now = Utc::now(); + debug!( + "START: {} --- {}", + format!("cron-job-thread-{}", no), + now.format("%H:%M:%S%.f") + ); + working.store(true, Ordering::Relaxed); + + // keep the work on the same task for now. + job.handle().await; + + working.store(TRACKER.lock().unwrap().stop(&id) != 0, Ordering::Relaxed); + + debug!( + "FINISH: {} --- {}", + format!("cron-job-thread-{}", no), + now.format("%H:%M:%S%.f") + ); + } + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + }); + + (Some(handler), Some(tx)) +} + +#[cfg(test)] +mod tests { + use super::{Job, Runner}; + use async_trait::async_trait; + use cron::Schedule; + use std::str::FromStr; + struct SomeJob; + + #[async_trait] + impl Job for SomeJob { + fn schedule(&self) -> Option { + Some(Schedule::from_str("0 * * * * *").unwrap()) + } + + async fn handle(&mut self) {} + } + struct AnotherJob; + #[async_trait] + impl Job for AnotherJob { + fn schedule(&self) -> Option { + Some(Schedule::from_str("0 * * * * *").unwrap()) + } + + async fn handle(&mut self) {} + } + #[tokio::test] + async fn create_job() { + let mut some_job = SomeJob; + + assert_eq!(some_job.handle().await, ()); + } + + #[tokio::test] + async fn test_adding_jobs_to_runner() { + let some_job = SomeJob; + let another_job = AnotherJob; + + let runner = Runner::new() + .add(Box::new(some_job)) + .add(Box::new(another_job)); + + assert_eq!(runner.jobs_to_run(), 2); + } + + #[tokio::test] + async fn test_jobs_are_empty_after_runner_starts() { + let some_job = SomeJob; + let another_job = AnotherJob; + + let runner = Runner::new() + .add(Box::new(some_job)) + .add(Box::new(another_job)) + .run() + .await; + + assert_eq!(runner.jobs_to_run(), 0); + } + + #[tokio::test] + async fn test_stopping_the_runner() { + let some_job = SomeJob; + let another_job = AnotherJob; + + let runner = Runner::new() + .add(Box::new(some_job)) + .add(Box::new(another_job)) + .run() + .await; + + assert_eq!(runner.stop().await, ()); + } +} diff --git a/spider/src/features/mod.rs b/spider/src/features/mod.rs index b743cf4104..5724c5ba1b 100644 --- a/spider/src/features/mod.rs +++ b/spider/src/features/mod.rs @@ -1,6 +1,9 @@ /// Chrome utils #[cfg(feature = "chrome")] pub mod chrome; +/// Cron jobs +#[cfg(feature = "cron")] +pub mod cron; /// URL globbing #[cfg(feature = "glob")] pub mod glob; diff --git a/spider/src/lib.rs b/spider/src/lib.rs index 83deeb80a0..b6b3955008 100644 --- a/spider/src/lib.rs +++ b/spider/src/lib.rs @@ -64,6 +64,7 @@ //! - `chrome_cpu`: Disable gpu usage for chrome browser. //! - `chrome_stealth`: Enables stealth mode to make it harder to be detected as a bot. //! - `cookies`: Enables cookies storing and setting to use for request. +//! - `cron`: Enables the ability to start cron jobs for the website. pub extern crate bytes; pub extern crate compact_str; diff --git a/spider/src/website.rs b/spider/src/website.rs index db412f6a79..f32c49d4b0 100644 --- a/spider/src/website.rs +++ b/spider/src/website.rs @@ -1,9 +1,14 @@ use crate::black_list::contains; use crate::configuration::{get_ua, Configuration}; +use crate::features::cron::Job; use crate::packages::robotparser::parser::RobotFileParser; use crate::page::{build, get_page_selectors, Page}; use crate::utils::log; use crate::CaseInsensitiveString; + +#[cfg(feature = "cron")] +use async_trait::async_trait; + use compact_str::CompactString; #[cfg(feature = "budget")] @@ -115,6 +120,17 @@ pub enum CrawlStatus { Paused, } +#[cfg(feature = "cron")] +/// The type of cron job to run +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub enum CronType { + #[default] + /// Crawl collecting links, page data, and etc. + Crawl, + /// Scrape collecting links, page data as bytes to store, and etc. + Scrape, +} + /// Represents a website to crawl and gather all links. /// ```rust /// use spider::website::Website; @@ -157,6 +173,12 @@ pub struct Website { #[cfg(feature = "cookies")] /// Cookie string to use for network requests ex: "foo=bar; Domain=blog.spider" pub cookie_str: String, + #[cfg(feature = "cron")] + /// Cron string to perform crawls - use to help generate a valid cron for needs. + pub cron_str: String, + #[cfg(feature = "cron")] + /// The type of cron to run either crawl or scrape + pub cron_type: CronType, } impl Website { @@ -2122,6 +2144,14 @@ impl Website { self } + #[cfg(feature = "cron")] + /// Setup cron jobs to run + pub fn with_cron(&mut self, cron_str: &str, cron_type: CronType) -> &mut Self { + self.cron_str = cron_str.into(); + self.cron_type = cron_type; + self + } + /// Build the website configuration when using with_builder pub fn build(&self) -> Result { if self.domain_parsed.is_none() { @@ -2152,6 +2182,50 @@ impl Website { Some(rx2) } + + #[cfg(feature = "cron")] + /// Start a cron job - if you use subscribe on another thread you need to abort the handle in conjuction with runner.stop. + pub async fn run_cron(&self) -> crate::features::cron::Runner { + crate::features::cron::Runner::new() + .add(Box::new(self.clone())) + .run() + .await + } +} + +#[cfg(feature = "cron")] +/// Start a cron job taking ownership of the website +pub async fn run_cron(website: Website) -> crate::features::cron::Runner { + crate::features::cron::Runner::new() + .add(Box::new(website)) + .run() + .await +} + +#[cfg(feature = "cron")] +#[async_trait] +impl Job for Website { + fn schedule(&self) -> Option { + match self.cron_str.parse() { + Ok(schedule) => Some(schedule), + Err(e) => { + log::error!("{:?}", e); + None + } + } + } + async fn handle(&mut self) { + log::info!( + "CRON: {} - cron job running {}", + self.get_domain().as_ref(), + self.now() + ); + if self.cron_type == CronType::Crawl { + self.crawl().await; + } else { + self.scrape().await; + } + } } #[cfg(not(feature = "decentralized"))] @@ -2169,6 +2243,70 @@ async fn crawl() { ); } +#[cfg(feature = "cron")] +#[tokio::test] +async fn crawl_cron() { + let url = "https://choosealicense.com"; + let mut website: Website = Website::new(&url) + .with_cron("1/5 * * * * *", Default::default()) + .build() + .unwrap(); + let mut rx2 = website.subscribe(16).unwrap(); + + // handle an event on every cron + let join_handle = tokio::spawn(async move { + let mut links_visited = HashSet::new(); + while let Ok(res) = rx2.recv().await { + let url = res.get_url(); + links_visited.insert(CaseInsensitiveString::new(url)); + } + assert!( + links_visited + .contains::(&"https://choosealicense.com/licenses/".into()), + "{:?}", + links_visited + ); + }); + + let runner = website.run_cron().await; + log::debug!("Starting the Runner for 10 seconds"); + tokio::time::sleep(Duration::from_secs(10)).await; + runner.stop().await; + join_handle.abort(); + let _ = join_handle.await; +} + +#[cfg(feature = "cron")] +#[tokio::test] +async fn crawl_cron_own() { + let url = "https://choosealicense.com"; + let mut website: Website = Website::new(&url) + .with_cron("1/5 * * * * *", Default::default()) + .build() + .unwrap(); + let mut rx2 = website.subscribe(16).unwrap(); + + // handle an event on every cron + let join_handle = tokio::spawn(async move { + let mut links_visited = HashSet::new(); + while let Ok(res) = rx2.recv().await { + let url = res.get_url(); + links_visited.insert(CaseInsensitiveString::new(url)); + } + assert!( + links_visited + .contains::(&"https://choosealicense.com/licenses/".into()), + "{:?}", + links_visited + ); + }); + + let runner = run_cron(website).await; + log::debug!("Starting the Runner for 10 seconds"); + tokio::time::sleep(Duration::from_secs(10)).await; + let _ = tokio::join!(runner.stop(), join_handle); +} + #[cfg(not(feature = "decentralized"))] #[tokio::test] async fn scrape() { diff --git a/spider_cli/Cargo.toml b/spider_cli/Cargo.toml index 198cf70f71..7f0b7e7646 100644 --- a/spider_cli/Cargo.toml +++ b/spider_cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_cli" -version = "1.49.13" +version = "1.50.0" authors = ["madeindjs ", "j-mendez "] description = "The fastest web crawler CLI written in Rust." repository = "https://github.com/spider-rs/spider" @@ -26,7 +26,7 @@ quote = "1.0.18" failure_derive = "0.1.8" [dependencies.spider] -version = "1.49.13" +version = "1.50.0" path = "../spider" [[bin]] diff --git a/spider_worker/Cargo.toml b/spider_worker/Cargo.toml index 08031145cf..4fb5f6e599 100644 --- a/spider_worker/Cargo.toml +++ b/spider_worker/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_worker" -version = "1.49.13" +version = "1.50.0" authors = ["madeindjs ", "j-mendez "] description = "The fastest web crawler as a worker or proxy." repository = "https://github.com/spider-rs/spider" @@ -22,7 +22,7 @@ lazy_static = "1.4.0" env_logger = "0.10.0" [dependencies.spider] -version = "1.49.13" +version = "1.50.0" path = "../spider" features = ["serde", "flexbuffers"]