Compare commits

...

2 Commits

Author SHA1 Message Date
89bf0a047b Update threading usage docs
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-06-04 14:58:11 +01:00
3b4df2785d Parallelize disk scanning
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-06-04 14:57:44 +01:00
3 changed files with 127 additions and 66 deletions

View File

@@ -61,7 +61,7 @@ Verify possible duplicates with a full-file hash pass:
disk-checker ~/Downloads --verify-full disk-checker ~/Downloads --verify-full
``` ```
Limit hashing workers: Limit scanning and hashing workers:
```bash ```bash
disk-checker ~/Downloads --threads 4 disk-checker ~/Downloads --threads 4

View File

@@ -2,8 +2,10 @@ use std::collections::BTreeMap;
use std::fs::{self, File, Metadata}; use std::fs::{self, File, Metadata};
use std::io::{self, BufReader, Read, Write}; use std::io::{self, BufReader, Read, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::thread;
use ignore::WalkBuilder; use ignore::{WalkBuilder, WalkState};
use rayon::prelude::*; use rayon::prelude::*;
use serde::{Serialize, Serializer}; use serde::{Serialize, Serializer};
@@ -19,6 +21,7 @@ pub struct ScanConfig {
pub hash_bytes: u64, pub hash_bytes: u64,
pub follow_links: bool, pub follow_links: bool,
pub verify_full: bool, pub verify_full: bool,
pub threads: Option<usize>,
} }
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
@@ -26,6 +29,7 @@ pub struct ScanReport {
#[serde(serialize_with = "serialize_paths")] #[serde(serialize_with = "serialize_paths")]
pub scanned_paths: Vec<PathBuf>, pub scanned_paths: Vec<PathBuf>,
pub hash_bytes: u64, pub hash_bytes: u64,
pub worker_threads: usize,
pub followed_symlinks: bool, pub followed_symlinks: bool,
pub full_verification: bool, pub full_verification: bool,
pub summary: ScanSummary, pub summary: ScanSummary,
@@ -135,6 +139,15 @@ enum HashOutcome {
Issue(ScanIssue), Issue(ScanIssue),
} }
#[derive(Debug, Clone)]
enum ScannedEntry {
File(FileEntry),
Directory,
Symlink(SymlinkInfo),
Special(SpecialEntry),
Issue(ScanIssue),
}
pub fn parse_byte_count(input: &str) -> Result<u64, String> { pub fn parse_byte_count(input: &str) -> Result<u64, String> {
let trimmed = input.trim(); let trimmed = input.trim();
if trimmed.is_empty() { if trimmed.is_empty() {
@@ -184,6 +197,7 @@ pub fn parse_byte_count(input: &str) -> Result<u64, String> {
pub fn scan_paths(config: ScanConfig) -> ScanReport { pub fn scan_paths(config: ScanConfig) -> ScanReport {
let hash_bytes = config.hash_bytes.max(1); let hash_bytes = config.hash_bytes.max(1);
let worker_threads = worker_threads(config.threads);
let mut files = Vec::new(); let mut files = Vec::new();
let mut symlinks = Vec::new(); let mut symlinks = Vec::new();
let mut special_entries = Vec::new(); let mut special_entries = Vec::new();
@@ -194,6 +208,7 @@ pub fn scan_paths(config: ScanConfig) -> ScanReport {
for root in &config.paths { for root in &config.paths {
let mut builder = WalkBuilder::new(root); let mut builder = WalkBuilder::new(root);
builder builder
.threads(worker_threads)
.follow_links(config.follow_links) .follow_links(config.follow_links)
.hidden(false) .hidden(false)
.ignore(false) .ignore(false)
@@ -202,58 +217,32 @@ pub fn scan_paths(config: ScanConfig) -> ScanReport {
.git_exclude(false) .git_exclude(false)
.parents(false); .parents(false);
for entry in builder.build() { let (sender, receiver) = mpsc::channel();
match entry { builder.build_parallel().run(|| {
Ok(entry) => { let sender = sender.clone();
let path = entry.path().to_path_buf(); let follow_links = config.follow_links;
let metadata = match fs::symlink_metadata(&path) { Box::new(move |entry| {
Ok(metadata) => metadata, for scanned_entry in classify_walk_entry(entry, follow_links) {
Err(error) => { if sender.send(scanned_entry).is_err() {
errors.push(issue(path, format!("could not read metadata: {error}"))); return WalkState::Quit;
continue;
} }
};
if metadata.file_type().is_symlink() {
symlinks.push(describe_symlink(&path));
if !config.follow_links {
continue;
} }
WalkState::Continue
})
});
drop(sender);
match fs::metadata(&path) { for scanned_entry in receiver {
Ok(target_metadata) => { collect_scanned_entry(
process_non_symlink_entry( scanned_entry,
path,
&target_metadata,
&mut files, &mut files,
&mut symlinks,
&mut special_entries, &mut special_entries,
&mut errors,
&mut directories, &mut directories,
&mut total_file_bytes, &mut total_file_bytes,
); );
} }
Err(error) => {
errors.push(issue(
path,
format!("could not follow symlink target: {error}"),
));
}
}
} else {
process_non_symlink_entry(
path,
&metadata,
&mut files,
&mut special_entries,
&mut directories,
&mut total_file_bytes,
);
}
}
Err(error) => {
errors.push(issue(PathBuf::from("<walk>"), error.to_string()));
}
}
}
} }
files.sort_by(|left, right| left.path.cmp(&right.path)); files.sort_by(|left, right| left.path.cmp(&right.path));
@@ -301,6 +290,7 @@ pub fn scan_paths(config: ScanConfig) -> ScanReport {
ScanReport { ScanReport {
scanned_paths: config.paths, scanned_paths: config.paths,
hash_bytes, hash_bytes,
worker_threads,
followed_symlinks: config.follow_links, followed_symlinks: config.follow_links,
full_verification: config.verify_full, full_verification: config.verify_full,
summary: ScanSummary { summary: ScanSummary {
@@ -327,30 +317,93 @@ pub fn scan_paths(config: ScanConfig) -> ScanReport {
} }
} }
fn process_non_symlink_entry( fn worker_threads(configured_threads: Option<usize>) -> usize {
path: PathBuf, configured_threads.unwrap_or_else(|| {
metadata: &Metadata, thread::available_parallelism()
files: &mut Vec<FileEntry>, .map(usize::from)
special_entries: &mut Vec<SpecialEntry>, .unwrap_or(1)
directories: &mut usize, })
total_file_bytes: &mut u64, }
) {
fn classify_walk_entry(
entry: Result<ignore::DirEntry, ignore::Error>,
follow_links: bool,
) -> Vec<ScannedEntry> {
match entry {
Ok(entry) => classify_path(entry.path().to_path_buf(), follow_links),
Err(error) => vec![ScannedEntry::Issue(issue(
PathBuf::from("<walk>"),
error.to_string(),
))],
}
}
fn classify_path(path: PathBuf, follow_links: bool) -> Vec<ScannedEntry> {
let metadata = match fs::symlink_metadata(&path) {
Ok(metadata) => metadata,
Err(error) => {
return vec![ScannedEntry::Issue(issue(
path,
format!("could not read metadata: {error}"),
))];
}
};
if !metadata.file_type().is_symlink() {
return vec![non_symlink_entry(path, &metadata)];
}
let mut entries = vec![ScannedEntry::Symlink(describe_symlink(&path))];
if follow_links {
match fs::metadata(&path) {
Ok(target_metadata) => entries.push(non_symlink_entry(path, &target_metadata)),
Err(error) => entries.push(ScannedEntry::Issue(issue(
path,
format!("could not follow symlink target: {error}"),
))),
}
}
entries
}
fn non_symlink_entry(path: PathBuf, metadata: &Metadata) -> ScannedEntry {
let file_type = metadata.file_type(); let file_type = metadata.file_type();
if file_type.is_file() { if file_type.is_file() {
*total_file_bytes = total_file_bytes.saturating_add(metadata.len()); ScannedEntry::File(FileEntry {
files.push(FileEntry {
path, path,
size: metadata.len(), size: metadata.len(),
device: metadata.dev(), device: metadata.dev(),
inode: metadata.ino(), inode: metadata.ino(),
}); })
} else if file_type.is_dir() { } else if file_type.is_dir() {
*directories += 1; ScannedEntry::Directory
} else { } else {
special_entries.push(SpecialEntry { ScannedEntry::Special(SpecialEntry {
path, path,
kind: special_entry_kind(&file_type), kind: special_entry_kind(&file_type),
}); })
}
}
fn collect_scanned_entry(
entry: ScannedEntry,
files: &mut Vec<FileEntry>,
symlinks: &mut Vec<SymlinkInfo>,
special_entries: &mut Vec<SpecialEntry>,
errors: &mut Vec<ScanIssue>,
directories: &mut usize,
total_file_bytes: &mut u64,
) {
match entry {
ScannedEntry::File(file) => {
*total_file_bytes = total_file_bytes.saturating_add(file.size);
files.push(file);
}
ScannedEntry::Directory => *directories += 1,
ScannedEntry::Symlink(symlink) => symlinks.push(symlink),
ScannedEntry::Special(special_entry) => special_entries.push(special_entry),
ScannedEntry::Issue(error) => errors.push(error),
} }
} }
@@ -555,6 +608,7 @@ pub fn write_human_report(mut writer: impl Write, report: &ScanReport) -> io::Re
join_paths(&report.scanned_paths) join_paths(&report.scanned_paths)
)?; )?;
writeln!(writer, "Hash window: {}", format_bytes(report.hash_bytes))?; writeln!(writer, "Hash window: {}", format_bytes(report.hash_bytes))?;
writeln!(writer, "Worker threads: {}", report.worker_threads)?;
writeln!( writeln!(
writer, writer,
"Symlink traversal: {}", "Symlink traversal: {}",
@@ -874,6 +928,7 @@ mod tests {
hash_bytes: 3, hash_bytes: 3,
follow_links: false, follow_links: false,
verify_full: false, verify_full: false,
threads: None,
}); });
assert_eq!(report.summary.files, 3); assert_eq!(report.summary.files, 3);
@@ -897,6 +952,7 @@ mod tests {
hash_bytes: 3, hash_bytes: 3,
follow_links: false, follow_links: false,
verify_full: true, verify_full: true,
threads: None,
}); });
assert_eq!(report.possible_duplicates.len(), 1); assert_eq!(report.possible_duplicates.len(), 1);
@@ -919,6 +975,7 @@ mod tests {
hash_bytes: DEFAULT_HASH_BYTES, hash_bytes: DEFAULT_HASH_BYTES,
follow_links: false, follow_links: false,
verify_full: false, verify_full: false,
threads: None,
}); });
assert_eq!(report.summary.files, 1); assert_eq!(report.summary.files, 1);
@@ -941,6 +998,7 @@ mod tests {
hash_bytes: DEFAULT_HASH_BYTES, hash_bytes: DEFAULT_HASH_BYTES,
follow_links: false, follow_links: false,
verify_full: false, verify_full: false,
threads: None,
}); });
assert_eq!(report.summary.files, 2); assert_eq!(report.summary.files, 2);
@@ -965,6 +1023,7 @@ mod tests {
hash_bytes: DEFAULT_HASH_BYTES, hash_bytes: DEFAULT_HASH_BYTES,
follow_links: false, follow_links: false,
verify_full: false, verify_full: false,
threads: None,
}); });
let json = serde_json::to_string(&report).expect("serialize report with lossy path"); let json = serde_json::to_string(&report).expect("serialize report with lossy path");
@@ -977,6 +1036,7 @@ mod tests {
let report = ScanReport { let report = ScanReport {
scanned_paths: vec![PathBuf::from(".")], scanned_paths: vec![PathBuf::from(".")],
hash_bytes: DEFAULT_HASH_BYTES, hash_bytes: DEFAULT_HASH_BYTES,
worker_threads: 1,
followed_symlinks: false, followed_symlinks: false,
full_verification: false, full_verification: false,
summary: ScanSummary { summary: ScanSummary {

View File

@@ -31,7 +31,7 @@ struct Cli {
#[arg(long)] #[arg(long)]
verify_full: bool, verify_full: bool,
/// Number of worker threads used for hashing. Defaults to Rayon automatic sizing. /// Number of worker threads used for scanning and hashing. Defaults to CPU parallelism.
#[arg(long, value_parser = parse_thread_count)] #[arg(long, value_parser = parse_thread_count)]
threads: Option<usize>, threads: Option<usize>,
@@ -72,6 +72,7 @@ fn main() -> anyhow::Result<ExitCode> {
hash_bytes: cli.hash_bytes, hash_bytes: cli.hash_bytes,
follow_links: cli.follow_links, follow_links: cli.follow_links,
verify_full: cli.verify_full, verify_full: cli.verify_full,
threads: cli.threads,
}); });
let stdout = io::stdout(); let stdout = io::stdout();