Run least recently run jobs up to max_running_tasks
This commit is contained in:
		
							parent
							
								
									24baa3a6a6
								
							
						
					
					
						commit
						cf5ebbb457
					
				
							
								
								
									
										62
									
								
								src/main.rs
								
								
								
								
							
							
						
						
									
										62
									
								
								src/main.rs
								
								
								
								
							|  | @ -1,8 +1,10 @@ | |||
| use std::cmp::Ordering; | ||||
| use std::env; | ||||
| use std::str::FromStr; | ||||
| use std::thread; | ||||
| use std::time::Duration; | ||||
| use std::time::Instant; | ||||
| 
 | ||||
| use thirtyfour_sync::WebDriverCommands; | ||||
| 
 | ||||
| mod conf; | ||||
|  | @ -25,6 +27,23 @@ struct ThreadJob<'a> { | |||
|     last_result: Option<String>, | ||||
| } | ||||
| 
 | ||||
| impl ThreadJob<'_> { | ||||
|     fn lru_not_running(&self, b: &ThreadJob) -> Option<Ordering> { | ||||
|         // the "greater" value is one that is running, but we
 | ||||
|         // don't recheck the thread state every time. If there's
 | ||||
|         // a handle that isn't none, assume it's running.
 | ||||
|         if self.handle.is_some() != b.handle.is_some() { | ||||
|             if self.handle.is_some() { | ||||
|                 return Some(Ordering::Greater); | ||||
|             } | ||||
|             else { | ||||
|                 return Some(Ordering::Less); | ||||
|             } | ||||
|         } | ||||
|         return self.job.last_run.partial_cmp(&b.job.last_run); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| fn main() { | ||||
|     let mut conf = Conf { | ||||
|         job_dir: String::from_str("jobs.d").unwrap(), | ||||
|  | @ -75,18 +94,12 @@ fn main() { | |||
|     //
 | ||||
|     // thread '<unnamed>' panicked at 'failed to get url: UnknownError(WebDriverErrorInfo { status: 500, error: "", value: WebDriverErrorValue { message: "unknown error: session deleted because of page crash\nfrom tab crashed\n  (Session info: chrome=105.0.5195.52)
 | ||||
|     //
 | ||||
|     // This should just run single jobs consecutively as a result.
 | ||||
|     // This runs single jobs consecutively as a result.
 | ||||
|     let max_running_tasks = 1; | ||||
|     loop { | ||||
|         for tj in jobs.iter_mut() { | ||||
|             let should_run_by_time = tj.job.last_run.is_some() && Instant::now().duration_since(tj.job.last_run.unwrap()).ge(&tj.job.every); | ||||
|             if tj.handle.is_none() && (should_run_by_time || tj.job.last_run.is_none()) { | ||||
|                 tj.handle = Some(thread::spawn(|| { | ||||
|                     return get_source(driver, tj.job.url); | ||||
|                 })); | ||||
|                 println!("Started thread for '{}'", tj.job.url); | ||||
|                 tj.job.last_run = Some(Instant::now()); | ||||
|                 continue; | ||||
|             } | ||||
|         let mut running_tasks = 0; | ||||
|         for tj in jobs.iter_mut().filter(|job| job.handle.is_some()) { | ||||
|             // Check if the task is done
 | ||||
|             if tj.handle.is_some() && tj.handle.as_ref().unwrap().is_finished() { | ||||
|                 let duration = Instant::now().duration_since(tj.job.last_run.unwrap()); | ||||
|                 tj.job.last_run = Some(Instant::now()); | ||||
|  | @ -116,6 +129,31 @@ fn main() { | |||
|                 } | ||||
|                 tj.handle = None; | ||||
|             } | ||||
|             else if tj.handle.is_some() { | ||||
|                 running_tasks += 1; | ||||
|             } | ||||
|         } | ||||
|         while running_tasks < max_running_tasks { | ||||
|             // Sort by least recently run
 | ||||
|             // According to the docs, unstable_by is preferred for speed +
 | ||||
|             // reduced memory allocations, but doesn't guarantee order of
 | ||||
|             // equal elements.
 | ||||
|             jobs.sort_unstable_by(|a, b| a.lru_not_running(b).unwrap()); | ||||
|             for tj in jobs.iter_mut() { | ||||
|                 let should_run_by_time = tj.job.last_run.is_some() && Instant::now().duration_since(tj.job.last_run.unwrap()).ge(&tj.job.every); | ||||
|                 if tj.handle.is_none() && (should_run_by_time || tj.job.last_run.is_none()) { | ||||
|                     tj.handle = Some(thread::spawn(|| { | ||||
|                         return get_source(driver, tj.job.url); | ||||
|                     })); | ||||
|                     println!("Started thread for '{}'", tj.job.url); | ||||
|                     tj.job.last_run = Some(Instant::now()); | ||||
|                     running_tasks += 1; | ||||
|                     if running_tasks >= max_running_tasks { | ||||
|                         break; | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             break; | ||||
|         } | ||||
|         std::thread::sleep(Duration::new(1, 0)); | ||||
|     } | ||||
|  | @ -129,5 +167,3 @@ fn get_source(driver: &str, url: &str) -> Result<String, &'static str> { | |||
|     driver.quit().expect("failed to close session"); | ||||
|     return Ok(source); | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue