Image Preprocessing
The preprocessing module provides comprehensive image quality assessment and data cleaning capabilities for mosquito breeding spot images.
Overview
The ImagePreprocessor
class uses the fastdup library to automatically identify and categorize problematic images, including:
- Invalid images: Corrupted or unreadable files
- Duplicates: Nearly identical or repeated images
- Outliers: Images significantly different from the dataset
- Dark images: Poorly lit or underexposed images
- Blurry images: Out-of-focus or motion-blurred images
Quick Start
from prismh.core.preprocess import ImagePreprocessor
# Initialize the preprocessor
preprocessor = ImagePreprocessor(
input_dir="path/to/raw/images",
output_dir="path/to/results",
ccthreshold=0.9,
outlier_distance=0.68
)
# Run the complete preprocessing pipeline
preprocessor.run_preprocessing()
Command Line Usage
# Basic usage
python -m prismh.core.preprocess --data_dir images/ --output_dir results/
# With custom parameters
python -m prismh.core.preprocess \
--data_dir /path/to/images \
--output_dir /path/to/results \
--ccthreshold 0.85 \
--outlier_distance 0.70
API Reference
ImagePreprocessor
A class that uses the fastdup library to preprocess images by identifying
invalid files, duplicates, outliers, dark, and blurry images, and segregating
them into 'clean' and 'problematic' sets.
Source code in src/prismh/core/preprocess.py
| class ImagePreprocessor:
"""
A class that uses the fastdup library to preprocess images by identifying
invalid files, duplicates, outliers, dark, and blurry images, and segregating
them into 'clean' and 'problematic' sets.
"""
def __init__(self,
input_dir: str,
output_dir: str,
ccthreshold: float = 0.9,
outlier_distance: float = 0.68):
"""
:param input_dir: Path to the folder that contains all your images.
:param output_dir: Path where the cleaned and problematic folders will be created.
:param ccthreshold: Threshold for similarity detection in fastdup (default 0.9).
:param outlier_distance: Distance threshold for outlier detection (default 0.68).
"""
# Convert to absolute paths using pathlib
self.input_dir = Path(input_dir).resolve()
self.output_dir = Path(output_dir).resolve()
self.ccthreshold = ccthreshold
self.outlier_distance = outlier_distance
# Folders for final categorized images using pathlib
self.clean_folder = self.output_dir / "clean"
self.problematic_folder = self.output_dir / "problematic"
self.invalid_folder = self.problematic_folder / "invalid"
self.duplicates_folder = self.problematic_folder / "duplicates"
self.outliers_folder = self.problematic_folder / "outliers"
self.dark_folder = self.problematic_folder / "dark"
self.blurry_folder = self.problematic_folder / "blurry"
# Create output directories
self._create_directories()
def _create_directories(self):
"""Create the output directory structure using pathlib."""
self.output_dir.mkdir(parents=True, exist_ok=True)
self.clean_folder.mkdir(parents=True, exist_ok=True)
self.problematic_folder.mkdir(parents=True, exist_ok=True)
self.invalid_folder.mkdir(parents=True, exist_ok=True)
self.duplicates_folder.mkdir(parents=True, exist_ok=True)
self.outliers_folder.mkdir(parents=True, exist_ok=True)
self.dark_folder.mkdir(parents=True, exist_ok=True)
self.blurry_folder.mkdir(parents=True, exist_ok=True)
def _extract_filename(self, path):
"""Extract just the filename from a path (relative or absolute) using pathlib"""
# Assuming path is a string from fastdup, convert to Path first
return Path(path).name
def run_preprocessing(self):
"""
Run the entire fastdup-based preprocessing pipeline:
1) Detect invalid images
2) Compute similarity and connected components (duplicates)
3) Identify outliers, dark images, and blurry images
4) Copy images to respective categories
"""
# 1) Create a FastDup object and run the analysis
fd = fastdup.create(input_dir=self.input_dir)
fd.run(ccthreshold=self.ccthreshold)
# 2) Identify invalid images
broken_images_df = fd.invalid_instances()
broken_filenames = [self._extract_filename(path) for path in broken_images_df['filename'].tolist()]
print(f"Found {len(broken_filenames)} invalid images.")
# 3) Find duplicates via connected components
connected_components_df, _ = fd.connected_components()
clusters_df = self._get_clusters(
connected_components_df,
sort_by='count',
min_count=2,
ascending=False
)
keep_filenames = []
duplicate_filenames = []
for cluster_file_list in clusters_df.filename:
if not cluster_file_list: # Skip empty lists
continue
# We'll keep the first one and mark the rest as duplicates
keep = self._extract_filename(cluster_file_list[0])
discard = [self._extract_filename(path) for path in cluster_file_list[1:]]
keep_filenames.append(keep)
duplicate_filenames.extend(discard)
print(f"Found {len(set(duplicate_filenames))} duplicates.")
# 4) Find outliers (distance < outlier_distance)
outlier_df = fd.outliers()
outlier_filenames = [
self._extract_filename(path)
for path in outlier_df[outlier_df.distance < self.outlier_distance].filename_outlier.tolist()
]
print(f"Found {len(outlier_filenames)} outliers with distance < {self.outlier_distance}.")
# 5) Dark and blurry images from stats
stats_df = fd.img_stats()
dark_images = stats_df[stats_df['mean'] < 13] # threshold for darkness
dark_filenames = [self._extract_filename(path) for path in dark_images['filename'].tolist()]
print(f"Found {len(dark_filenames)} dark images (mean < 13).")
blurry_images = stats_df[stats_df['blur'] < 50] # threshold for blur
blurry_filenames = [self._extract_filename(path) for path in blurry_images['filename'].tolist()]
print(f"Found {len(blurry_filenames)} blurry images (blur < 50).")
# 6) Collect all problematic filenames
broken_set = set(broken_filenames)
duplicates_set = set(duplicate_filenames)
outlier_set = set(outlier_filenames)
dark_set = set(dark_filenames)
blurry_set = set(blurry_filenames)
keep_set = set(keep_filenames)
# 7) Build sets for processing
all_problematic = broken_set.union(duplicates_set, outlier_set, dark_set, blurry_set)
print(f"Total problematic images: {len(all_problematic)}")
print(f"Images to keep from clusters: {len(keep_set)}")
# 8) Process all files in the input directory
problematic_count = {
"invalid": 0,
"duplicates": 0,
"outliers": 0,
"dark": 0,
"blurry": 0
}
clean_count = 0
kept_duplicates = 0
# Get a list of all files using pathlib (assuming flat structure or recursion handled by fastdup already, adjust if needed)
# Using rglob to find all files recursively. Filter for actual files.
all_paths = [p for p in self.input_dir.rglob('*') if p.is_file()]
all_files = [(p, p.name) for p in all_paths]
print(f"Found {len(all_files)} total files in input directory: {self.input_dir}")
# Process each file
for full_path, filename in all_files:
# Copy to the problematic folders if needed
if filename in broken_set:
self._copy_to_folder(full_path, self.invalid_folder)
problematic_count["invalid"] += 1
if filename in duplicates_set:
self._copy_to_folder(full_path, self.duplicates_folder)
problematic_count["duplicates"] += 1
if filename in outlier_set:
self._copy_to_folder(full_path, self.outliers_folder)
problematic_count["outliers"] += 1
if filename in dark_set:
self._copy_to_folder(full_path, self.dark_folder)
problematic_count["dark"] += 1
if filename in blurry_set:
self._copy_to_folder(full_path, self.blurry_folder)
problematic_count["blurry"] += 1
# Copy to clean folder if not problematic or if it's a keeper
if filename not in all_problematic or filename in keep_set:
self._copy_to_folder(full_path, self.clean_folder)
clean_count += 1
if filename in keep_set:
kept_duplicates += 1
# Print summary
print("Copying results:")
print(f"- Invalid: {problematic_count['invalid']}/{len(broken_set)}")
print(f"- Duplicates: {problematic_count['duplicates']}/{len(duplicates_set)}")
print(f"- Outliers: {problematic_count['outliers']}/{len(outlier_set)}")
print(f"- Dark: {problematic_count['dark']}/{len(dark_set)}")
print(f"- Blurry: {problematic_count['blurry']}/{len(blurry_set)}")
print(f"- Clean: {clean_count} (including {kept_duplicates} kept duplicates)")
def _copy_to_folder(self, src_path, dest_folder):
"""Copy a file to the destination folder using pathlib"""
# Ensure src_path is a Path object if it comes from all_files list
src_path_obj = Path(src_path)
filename = src_path_obj.name
# Ensure dest_folder is a Path object
dest_folder_obj = Path(dest_folder)
dest_path = os.path.join(dest_folder, filename)
dest_path_obj = dest_folder_obj / filename # Use pathlib join
try:
shutil.copy2(src_path, dest_path)
return True
except Exception as e:
print(f"Error copying {src_path} to {dest_folder}: {e}")
return False
def _get_clusters(self, df, sort_by='count', min_count=2, ascending=False):
"""
Given a connected_components DataFrame from fastdup, group into clusters
with the specified sorting options.
"""
agg_dict = {'filename': list, 'mean_distance': 'max', 'count': 'count'}
if 'label' in df.columns:
agg_dict['label'] = list
# only consider rows where 'count' >= min_count
df = df[df['count'] >= min_count]
grouped_df = df.groupby('component_id').agg(agg_dict)
grouped_df = grouped_df.sort_values(by=[sort_by], ascending=ascending)
return grouped_df
|
__init__
__init__(input_dir: str, output_dir: str, ccthreshold: float = 0.9, outlier_distance: float = 0.68)
:param input_dir: Path to the folder that contains all your images.
:param output_dir: Path where the cleaned and problematic folders will be created.
:param ccthreshold: Threshold for similarity detection in fastdup (default 0.9).
:param outlier_distance: Distance threshold for outlier detection (default 0.68).
Source code in src/prismh/core/preprocess.py
| def __init__(self,
input_dir: str,
output_dir: str,
ccthreshold: float = 0.9,
outlier_distance: float = 0.68):
"""
:param input_dir: Path to the folder that contains all your images.
:param output_dir: Path where the cleaned and problematic folders will be created.
:param ccthreshold: Threshold for similarity detection in fastdup (default 0.9).
:param outlier_distance: Distance threshold for outlier detection (default 0.68).
"""
# Convert to absolute paths using pathlib
self.input_dir = Path(input_dir).resolve()
self.output_dir = Path(output_dir).resolve()
self.ccthreshold = ccthreshold
self.outlier_distance = outlier_distance
# Folders for final categorized images using pathlib
self.clean_folder = self.output_dir / "clean"
self.problematic_folder = self.output_dir / "problematic"
self.invalid_folder = self.problematic_folder / "invalid"
self.duplicates_folder = self.problematic_folder / "duplicates"
self.outliers_folder = self.problematic_folder / "outliers"
self.dark_folder = self.problematic_folder / "dark"
self.blurry_folder = self.problematic_folder / "blurry"
# Create output directories
self._create_directories()
|
run_preprocessing
Run the entire fastdup-based preprocessing pipeline
1) Detect invalid images
2) Compute similarity and connected components (duplicates)
3) Identify outliers, dark images, and blurry images
4) Copy images to respective categories
Source code in src/prismh/core/preprocess.py
| def run_preprocessing(self):
"""
Run the entire fastdup-based preprocessing pipeline:
1) Detect invalid images
2) Compute similarity and connected components (duplicates)
3) Identify outliers, dark images, and blurry images
4) Copy images to respective categories
"""
# 1) Create a FastDup object and run the analysis
fd = fastdup.create(input_dir=self.input_dir)
fd.run(ccthreshold=self.ccthreshold)
# 2) Identify invalid images
broken_images_df = fd.invalid_instances()
broken_filenames = [self._extract_filename(path) for path in broken_images_df['filename'].tolist()]
print(f"Found {len(broken_filenames)} invalid images.")
# 3) Find duplicates via connected components
connected_components_df, _ = fd.connected_components()
clusters_df = self._get_clusters(
connected_components_df,
sort_by='count',
min_count=2,
ascending=False
)
keep_filenames = []
duplicate_filenames = []
for cluster_file_list in clusters_df.filename:
if not cluster_file_list: # Skip empty lists
continue
# We'll keep the first one and mark the rest as duplicates
keep = self._extract_filename(cluster_file_list[0])
discard = [self._extract_filename(path) for path in cluster_file_list[1:]]
keep_filenames.append(keep)
duplicate_filenames.extend(discard)
print(f"Found {len(set(duplicate_filenames))} duplicates.")
# 4) Find outliers (distance < outlier_distance)
outlier_df = fd.outliers()
outlier_filenames = [
self._extract_filename(path)
for path in outlier_df[outlier_df.distance < self.outlier_distance].filename_outlier.tolist()
]
print(f"Found {len(outlier_filenames)} outliers with distance < {self.outlier_distance}.")
# 5) Dark and blurry images from stats
stats_df = fd.img_stats()
dark_images = stats_df[stats_df['mean'] < 13] # threshold for darkness
dark_filenames = [self._extract_filename(path) for path in dark_images['filename'].tolist()]
print(f"Found {len(dark_filenames)} dark images (mean < 13).")
blurry_images = stats_df[stats_df['blur'] < 50] # threshold for blur
blurry_filenames = [self._extract_filename(path) for path in blurry_images['filename'].tolist()]
print(f"Found {len(blurry_filenames)} blurry images (blur < 50).")
# 6) Collect all problematic filenames
broken_set = set(broken_filenames)
duplicates_set = set(duplicate_filenames)
outlier_set = set(outlier_filenames)
dark_set = set(dark_filenames)
blurry_set = set(blurry_filenames)
keep_set = set(keep_filenames)
# 7) Build sets for processing
all_problematic = broken_set.union(duplicates_set, outlier_set, dark_set, blurry_set)
print(f"Total problematic images: {len(all_problematic)}")
print(f"Images to keep from clusters: {len(keep_set)}")
# 8) Process all files in the input directory
problematic_count = {
"invalid": 0,
"duplicates": 0,
"outliers": 0,
"dark": 0,
"blurry": 0
}
clean_count = 0
kept_duplicates = 0
# Get a list of all files using pathlib (assuming flat structure or recursion handled by fastdup already, adjust if needed)
# Using rglob to find all files recursively. Filter for actual files.
all_paths = [p for p in self.input_dir.rglob('*') if p.is_file()]
all_files = [(p, p.name) for p in all_paths]
print(f"Found {len(all_files)} total files in input directory: {self.input_dir}")
# Process each file
for full_path, filename in all_files:
# Copy to the problematic folders if needed
if filename in broken_set:
self._copy_to_folder(full_path, self.invalid_folder)
problematic_count["invalid"] += 1
if filename in duplicates_set:
self._copy_to_folder(full_path, self.duplicates_folder)
problematic_count["duplicates"] += 1
if filename in outlier_set:
self._copy_to_folder(full_path, self.outliers_folder)
problematic_count["outliers"] += 1
if filename in dark_set:
self._copy_to_folder(full_path, self.dark_folder)
problematic_count["dark"] += 1
if filename in blurry_set:
self._copy_to_folder(full_path, self.blurry_folder)
problematic_count["blurry"] += 1
# Copy to clean folder if not problematic or if it's a keeper
if filename not in all_problematic or filename in keep_set:
self._copy_to_folder(full_path, self.clean_folder)
clean_count += 1
if filename in keep_set:
kept_duplicates += 1
# Print summary
print("Copying results:")
print(f"- Invalid: {problematic_count['invalid']}/{len(broken_set)}")
print(f"- Duplicates: {problematic_count['duplicates']}/{len(duplicates_set)}")
print(f"- Outliers: {problematic_count['outliers']}/{len(outlier_set)}")
print(f"- Dark: {problematic_count['dark']}/{len(dark_set)}")
print(f"- Blurry: {problematic_count['blurry']}/{len(blurry_set)}")
print(f"- Clean: {clean_count} (including {kept_duplicates} kept duplicates)")
|
Configuration Parameters
Quality Thresholds
Parameter |
Description |
Default |
Range |
ccthreshold |
Similarity threshold for duplicate detection |
0.9 |
0.0-1.0 |
outlier_distance |
Distance threshold for outlier detection |
0.68 |
0.0-1.0 |
Image Quality Metrics
Metric |
Threshold |
Purpose |
Mean brightness |
< 13 |
Detect dark images |
Blur variance |
< 50 |
Detect blurry images |
File validity |
- |
Detect corrupted files |
Output Structure
The preprocessing pipeline creates the following directory structure:
output_dir/
├── clean/ # High-quality images
└── problematic/ # Filtered images
├── invalid/ # Corrupted files
├── duplicates/ # Duplicate images
├── outliers/ # Unusual images
├── dark/ # Dark/underexposed images
└── blurry/ # Blurry images
Usage Examples
Basic Preprocessing
from prismh.core.preprocess import ImagePreprocessor
# Simple preprocessing
preprocessor = ImagePreprocessor(
input_dir="raw_images/",
output_dir="processed/"
)
preprocessor.run_preprocessing()
Custom Configuration
# Advanced configuration
preprocessor = ImagePreprocessor(
input_dir="raw_images/",
output_dir="processed/",
ccthreshold=0.85, # More strict duplicate detection
outlier_distance=0.75 # More lenient outlier detection
)
preprocessor.run_preprocessing()
import json
from pathlib import Path
# Load metadata
metadata_path = Path("metadata/annotations.json")
with open(metadata_path) as f:
metadata = json.load(f)
# Process with metadata context
preprocessor = ImagePreprocessor(
input_dir="raw_images/",
output_dir="processed/"
)
preprocessor.run_preprocessing()
# Analyze results with metadata
results_summary = {
"total_processed": len(list(Path("processed/clean").glob("*"))),
"problematic_count": len(list(Path("processed/problematic").rglob("*"))),
"metadata_entries": len(metadata)
}
Memory Management
# For large datasets, process in batches
import os
os.environ['FASTDUP_BATCH_SIZE'] = '1000'
preprocessor = ImagePreprocessor(
input_dir="large_dataset/",
output_dir="results/"
)
Parallel Processing
# Utilize multiple CPU cores
import multiprocessing
os.environ['FASTDUP_NUM_WORKERS'] = str(multiprocessing.cpu_count())
Quality Metrics
The preprocessing pipeline provides detailed quality metrics:
# Access quality statistics
stats = preprocessor.get_quality_stats()
print(f"Quality score: {stats['quality_score']:.2f}")
print(f"Duplicate rate: {stats['duplicate_rate']:.1%}")
print(f"Outlier rate: {stats['outlier_rate']:.1%}")
Integration with Other Modules
from prismh.core.preprocess import ImagePreprocessor
from prismh.core.extract_embeddings import extract_embeddings_main
# Step 1: Preprocess
preprocessor = ImagePreprocessor("raw/", "processed/")
preprocessor.run_preprocessing()
# Step 2: Extract features from clean images
# (Configure paths in extract_embeddings.py)
extract_embeddings_main()
With Clustering
from prismh.core.cluster_embeddings import cluster_main
# After preprocessing and feature extraction
cluster_main()
Troubleshooting
Common Issues
Memory errors with large datasets:
# Reduce batch size or process in chunks
os.environ['FASTDUP_BATCH_SIZE'] = '500'
Path-related errors:
# Use absolute paths
from pathlib import Path
input_path = Path("images").resolve()
output_path = Path("results").resolve()
Permission errors:
# Ensure write permissions
chmod -R 755 output_directory/
Advanced Features
Custom Quality Filters
class CustomImagePreprocessor(ImagePreprocessor):
def custom_quality_check(self, image_path):
"""Add custom quality assessment logic"""
# Implement custom quality checks
pass
def run_preprocessing(self):
# Run standard preprocessing
super().run_preprocessing()
# Add custom processing steps
self.apply_custom_filters()
Batch Processing
def process_multiple_datasets(datasets):
"""Process multiple image datasets"""
for dataset_info in datasets:
preprocessor = ImagePreprocessor(
input_dir=dataset_info['input'],
output_dir=dataset_info['output']
)
preprocessor.run_preprocessing()
print(f"Completed: {dataset_info['name']}")