From 4b36b75d9f599d716c16eda691b717545f142ab8 Mon Sep 17 00:00:00 2001 From: Kienan Stewart Date: Sun, 25 Sep 2022 18:36:40 -0400 Subject: [PATCH] Add first pass at using inotify to watch the job directory for changes There are number of cases which currently aren't handled: * moving a file out of or into the directory * a file being touched: should that be used to reset that last_run value? * when jobs are removed, it is regardless of thread state so there may be children that never get joined --- Cargo.lock | 23 ++++++++++++++ Cargo.toml | 1 + src/main.rs | 89 ++++++++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 98 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4adbf17..a8d70d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -620,6 +620,7 @@ version = "0.1.0" dependencies = [ "ansi-to-html", "chrono", + "inotify", "prettydiff", "rss", "scraper", @@ -757,6 +758,28 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "inotify" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abf888f9575c290197b2c948dc9e9ff10bd1a39ad1ea8585f734585fa6b9d3f9" +dependencies = [ + "bitflags", + "futures-core", + "inotify-sys", + "libc", + "tokio", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.12" diff --git a/Cargo.toml b/Cargo.toml index 5d5c547..3160476 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] ansi-to-html = "0.1" chrono = "0.4" +inotify = "0.10" prettydiff = "0.6" rss = "2" scraper = "0.13" diff --git a/src/main.rs b/src/main.rs index e5ebd8f..a1bedc5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,11 @@ use std::cmp::Ordering; use std::env; -use std::path::Path; +use std::path::{Path,PathBuf}; use std::thread; use std::time::Duration; use std::time::Instant; +use inotify; use thirtyfour_sync::WebDriverCommands; mod conf; @@ -99,20 +100,11 @@ fn main() { println!("Skipping '{}': not a file", _entry.path().display()); continue; } - match _entry.path().extension() { - Some(x) => { - if ! "job".eq(x) { - println!("Skipping '{}': does not have '.job' extension", - _entry.path().display()); - continue; - } - }, - None => { - println!("Skipping '{}': does not have '.job' extension", - _entry.path().display()); - continue; - } - }; + + if !Job::has_job_file_extension(&_entry.path()) { + println!("Skipping '{}': does not have '.job' extension", _entry.path().display()); + continue; + } let job = match Job::from_file(&_entry.path(), &conf) { Err(why) => { @@ -128,6 +120,14 @@ fn main() { }); } + let mut inotify = inotify::Inotify::init() + .expect("Failed to initialize inotify"); + inotify.add_watch( + conf.job_dir.to_str().unwrap().clone(), + inotify::WatchMask::CREATE | inotify::WatchMask::DELETE + ).expect("Failed to start watching job dir for changes"); + let mut notify_buffer = [0; 1024]; + let max_running_tasks = 5; loop { let mut running_tasks = 0; @@ -196,6 +196,65 @@ fn main() { break; } + let events = inotify.read_events(&mut notify_buffer); + if events.is_ok() { + for event in events.unwrap() { + if event.name.is_none() { + continue; + } + let mut path = PathBuf::new(); + path.push(&conf.job_dir.to_str().unwrap().clone()); + path.push(event.name.unwrap()); + match event.mask { + inotify::EventMask::CREATE => { + let md = match path.metadata() { + Err(why) => { + println!("Failed to read metadata from event file '{}': {}", + path.display(), why); + continue; + }, + Ok(value) => value, + }; + if !md.is_file() { + continue; + } + if !Job::has_job_file_extension(&path) { + continue; + } + + let job = match Job::from_file(&path, &conf) { + Err(why) => { + println!("Failed to load job from '{}': {}", path.display(), why); + continue; + }, + Ok(value) => value, + }; + jobs.push(ThreadJob { + job: job, + handle: None, + last_result: None, + }); + println!("Added job from '{}' being created", path.display()); + }, + inotify::EventMask::DELETE => { + let x = jobs.len(); + // @TODO If a thread is currently running, we never join it again. + jobs.retain( + |tj| + !(tj.job.source_file.is_some() && path.to_str().eq(&tj.job.source_file.as_ref().unwrap().to_str())) + ); + println!( + "{} jobs removed due to deletion of '{}'", + x - jobs.len(), path.display() + ); + }, + mask => { + println!("Unhandled event mask: {:?}", mask); + continue; + } + }; + } + } std::thread::sleep(Duration::new(1, 0)); } }