From cf5ebbb457dad7040b17506b432bfc6b23cc6862 Mon Sep 17 00:00:00 2001 From: Kienan Stewart Date: Sat, 24 Sep 2022 10:05:39 -0400 Subject: [PATCH] Run least recently run jobs up to max_running_tasks --- src/main.rs | 62 ++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/src/main.rs b/src/main.rs index 2e3f42f..e6c8b2b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,10 @@ +use std::cmp::Ordering; use std::env; use std::str::FromStr; use std::thread; use std::time::Duration; use std::time::Instant; + use thirtyfour_sync::WebDriverCommands; mod conf; @@ -25,6 +27,23 @@ struct ThreadJob<'a> { last_result: Option, } +impl ThreadJob<'_> { + fn lru_not_running(&self, b: &ThreadJob) -> Option { + // the "greater" value is one that is running, but we + // don't recheck the thread state every time. If there's + // a handle that isn't none, assume it's running. + if self.handle.is_some() != b.handle.is_some() { + if self.handle.is_some() { + return Some(Ordering::Greater); + } + else { + return Some(Ordering::Less); + } + } + return self.job.last_run.partial_cmp(&b.job.last_run); + } +} + fn main() { let mut conf = Conf { job_dir: String::from_str("jobs.d").unwrap(), @@ -75,18 +94,12 @@ fn main() { // // thread '' panicked at 'failed to get url: UnknownError(WebDriverErrorInfo { status: 500, error: "", value: WebDriverErrorValue { message: "unknown error: session deleted because of page crash\nfrom tab crashed\n (Session info: chrome=105.0.5195.52) // - // This should just run single jobs consecutively as a result. + // This runs single jobs consecutively as a result. + let max_running_tasks = 1; loop { - for tj in jobs.iter_mut() { - let should_run_by_time = tj.job.last_run.is_some() && Instant::now().duration_since(tj.job.last_run.unwrap()).ge(&tj.job.every); - if tj.handle.is_none() && (should_run_by_time || tj.job.last_run.is_none()) { - tj.handle = Some(thread::spawn(|| { - return get_source(driver, tj.job.url); - })); - println!("Started thread for '{}'", tj.job.url); - tj.job.last_run = Some(Instant::now()); - continue; - } + let mut running_tasks = 0; + for tj in jobs.iter_mut().filter(|job| job.handle.is_some()) { + // Check if the task is done if tj.handle.is_some() && tj.handle.as_ref().unwrap().is_finished() { let duration = Instant::now().duration_since(tj.job.last_run.unwrap()); tj.job.last_run = Some(Instant::now()); @@ -116,6 +129,31 @@ fn main() { } tj.handle = None; } + else if tj.handle.is_some() { + running_tasks += 1; + } + } + while running_tasks < max_running_tasks { + // Sort by least recently run + // According to the docs, unstable_by is preferred for speed + + // reduced memory allocations, but doesn't guarantee order of + // equal elements. + jobs.sort_unstable_by(|a, b| a.lru_not_running(b).unwrap()); + for tj in jobs.iter_mut() { + let should_run_by_time = tj.job.last_run.is_some() && Instant::now().duration_since(tj.job.last_run.unwrap()).ge(&tj.job.every); + if tj.handle.is_none() && (should_run_by_time || tj.job.last_run.is_none()) { + tj.handle = Some(thread::spawn(|| { + return get_source(driver, tj.job.url); + })); + println!("Started thread for '{}'", tj.job.url); + tj.job.last_run = Some(Instant::now()); + running_tasks += 1; + if running_tasks >= max_running_tasks { + break; + } + } + } + break; } std::thread::sleep(Duration::new(1, 0)); } @@ -129,5 +167,3 @@ fn get_source(driver: &str, url: &str) -> Result { driver.quit().expect("failed to close session"); return Ok(source); } - -