Add first pass at using inotify to watch the job directory for changes

There are number of cases which currently aren't handled:

 * moving a file out of or into the directory
 * a file being touched: should that be used to reset that last_run
   value?
 * when jobs are removed, it is regardless of thread state so there
   may be children that never get joined
This commit is contained in:
Kienan Stewart 2022-09-25 18:36:40 -04:00
parent 518f0b284f
commit 4b36b75d9f
3 changed files with 98 additions and 15 deletions

23
Cargo.lock generated
View File

@ -620,6 +620,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"ansi-to-html", "ansi-to-html",
"chrono", "chrono",
"inotify",
"prettydiff", "prettydiff",
"rss", "rss",
"scraper", "scraper",
@ -757,6 +758,28 @@ dependencies = [
"hashbrown", "hashbrown",
] ]
[[package]]
name = "inotify"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abf888f9575c290197b2c948dc9e9ff10bd1a39ad1ea8585f734585fa6b9d3f9"
dependencies = [
"bitflags",
"futures-core",
"inotify-sys",
"libc",
"tokio",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "instant" name = "instant"
version = "0.1.12" version = "0.1.12"

View File

@ -8,6 +8,7 @@ edition = "2021"
[dependencies] [dependencies]
ansi-to-html = "0.1" ansi-to-html = "0.1"
chrono = "0.4" chrono = "0.4"
inotify = "0.10"
prettydiff = "0.6" prettydiff = "0.6"
rss = "2" rss = "2"
scraper = "0.13" scraper = "0.13"

View File

@ -1,10 +1,11 @@
use std::cmp::Ordering; use std::cmp::Ordering;
use std::env; use std::env;
use std::path::Path; use std::path::{Path,PathBuf};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use inotify;
use thirtyfour_sync::WebDriverCommands; use thirtyfour_sync::WebDriverCommands;
mod conf; mod conf;
@ -99,20 +100,11 @@ fn main() {
println!("Skipping '{}': not a file", _entry.path().display()); println!("Skipping '{}': not a file", _entry.path().display());
continue; continue;
} }
match _entry.path().extension() {
Some(x) => { if !Job::has_job_file_extension(&_entry.path()) {
if ! "job".eq(x) { println!("Skipping '{}': does not have '.job' extension", _entry.path().display());
println!("Skipping '{}': does not have '.job' extension", continue;
_entry.path().display()); }
continue;
}
},
None => {
println!("Skipping '{}': does not have '.job' extension",
_entry.path().display());
continue;
}
};
let job = match Job::from_file(&_entry.path(), &conf) { let job = match Job::from_file(&_entry.path(), &conf) {
Err(why) => { Err(why) => {
@ -128,6 +120,14 @@ fn main() {
}); });
} }
let mut inotify = inotify::Inotify::init()
.expect("Failed to initialize inotify");
inotify.add_watch(
conf.job_dir.to_str().unwrap().clone(),
inotify::WatchMask::CREATE | inotify::WatchMask::DELETE
).expect("Failed to start watching job dir for changes");
let mut notify_buffer = [0; 1024];
let max_running_tasks = 5; let max_running_tasks = 5;
loop { loop {
let mut running_tasks = 0; let mut running_tasks = 0;
@ -196,6 +196,65 @@ fn main() {
break; break;
} }
let events = inotify.read_events(&mut notify_buffer);
if events.is_ok() {
for event in events.unwrap() {
if event.name.is_none() {
continue;
}
let mut path = PathBuf::new();
path.push(&conf.job_dir.to_str().unwrap().clone());
path.push(event.name.unwrap());
match event.mask {
inotify::EventMask::CREATE => {
let md = match path.metadata() {
Err(why) => {
println!("Failed to read metadata from event file '{}': {}",
path.display(), why);
continue;
},
Ok(value) => value,
};
if !md.is_file() {
continue;
}
if !Job::has_job_file_extension(&path) {
continue;
}
let job = match Job::from_file(&path, &conf) {
Err(why) => {
println!("Failed to load job from '{}': {}", path.display(), why);
continue;
},
Ok(value) => value,
};
jobs.push(ThreadJob {
job: job,
handle: None,
last_result: None,
});
println!("Added job from '{}' being created", path.display());
},
inotify::EventMask::DELETE => {
let x = jobs.len();
// @TODO If a thread is currently running, we never join it again.
jobs.retain(
|tj|
!(tj.job.source_file.is_some() && path.to_str().eq(&tj.job.source_file.as_ref().unwrap().to_str()))
);
println!(
"{} jobs removed due to deletion of '{}'",
x - jobs.len(), path.display()
);
},
mask => {
println!("Unhandled event mask: {:?}", mask);
continue;
}
};
}
}
std::thread::sleep(Duration::new(1, 0)); std::thread::sleep(Duration::new(1, 0));
} }
} }