feat: expanded dedup.py, removed dups

This commit is contained in:
2025-12-16 13:34:18 +02:00
parent 194d2c2a28
commit e110cfa9cf
20 changed files with 690 additions and 36 deletions

383
dedup.py
View File

@@ -1,71 +1,382 @@
#!/usr/bin/env python3
"""Find and remove duplicate emoji files based on content hash."""
"""Find and remove duplicate emoji files using perceptual hashing."""
import argparse
import hashlib
from collections import defaultdict
from pathlib import Path
from dataclasses import dataclass
import imagehash
from PIL import Image
EMOJI_DIR = Path("emoji")
EXTENSIONS = (".png", ".gif", ".jpg", ".jpeg")
# Number of hash algorithms that must agree for images to be considered similar
MIN_HASH_AGREEMENT = 4
def hash_file(path: Path) -> str:
"""Return SHA-256 hash of file contents."""
return hashlib.sha256(path.read_bytes()).hexdigest()
# Maximum file size difference ratio for duplicates (e.g., 0.05 = 5% difference allowed)
MAX_SIZE_DIFF_RATIO = 0.02
def find_duplicates(files: list[Path]) -> dict[str, list[Path]]:
"""Group files by their content hash, return only groups with duplicates."""
by_hash: dict[str, list[Path]] = defaultdict(list)
@dataclass
class ImageInfo:
"""Container for image metadata and hashes."""
phash: imagehash.ImageHash
ahash: imagehash.ImageHash
dhash: imagehash.ImageHash
colorhash: imagehash.ImageHash
width: int
height: int
n_frames: int # 1 for static images
md5: str # File content hash for exact duplicate detection
def _has_degenerate_hash(self) -> bool:
"""Check if this image has degenerate (all-zero) hashes, indicating mostly transparent content."""
zero_hash = "0000000000000000"
# If 3+ hashes are all zeros, the image is likely mostly transparent
zero_count = sum(1 for h in [str(self.phash), str(self.ahash), str(self.dhash)] if h == zero_hash)
return zero_count >= 3
def is_candidate(self, other: "ImageInfo", threshold: int) -> tuple[bool, int, int]:
"""
Check if two images are candidate duplicates based on metadata and hashes.
Returns (is_candidate, agreements, total_distance).
This is a fast pre-filter. GIFs require additional frame verification.
"""
# Dimensions must match exactly
if self.width != other.width or self.height != other.height:
return False, 0, 999
# Frame count must match for animated images
if self.n_frames != other.n_frames:
return False, 0, 999
# Calculate perceptual hash distances
distances = [
self.phash - other.phash,
self.ahash - other.ahash,
self.dhash - other.dhash,
self.colorhash - other.colorhash,
]
total_distance = sum(distances)
agreements = sum(1 for d in distances if d <= threshold)
# For static images: detect re-compressed/re-exported duplicates
# Require identical structure AND color, with small perceptual variance:
# - aHash=0 AND dHash=0 AND colorHash=0 AND pHash <= 10
# - OR all 4 hashes match exactly (total_distance = 0)
if self.n_frames == 1:
phash_dist = self.phash - other.phash
ahash_dist = self.ahash - other.ahash
dhash_dist = self.dhash - other.dhash
chash_dist = self.colorhash - other.colorhash
# Identical structure + color, small perceptual variance = re-compressed image
if ahash_dist == 0 and dhash_dist == 0 and chash_dist == 0 and phash_dist <= 10:
return True, agreements, total_distance
# All hashes match exactly
if total_distance == 0:
return True, agreements, total_distance
return False, agreements, total_distance
# For animated images: require all 4 hashes to agree (will be verified by frame check)
return agreements >= MIN_HASH_AGREEMENT, agreements, total_distance
def is_animated(self) -> bool:
"""Check if this is an animated image (multiple frames)."""
return self.n_frames > 1
class UnionFind:
"""Union-Find data structure for clustering similar images."""
def __init__(self):
self.parent = {}
def find(self, x):
if x not in self.parent:
self.parent[x] = x
if self.parent[x] != x:
self.parent[x] = self.find(self.parent[x])
return self.parent[x]
def union(self, x, y):
px, py = self.find(x), self.find(y)
if px != py:
self.parent[px] = py
def _compute_hashes(img: Image.Image) -> tuple[imagehash.ImageHash, ...]:
"""Compute all hash types for a single image/frame."""
# Convert to RGBA to handle transparency consistently
if img.mode != "RGBA":
img = img.convert("RGBA")
return (
imagehash.phash(img),
imagehash.average_hash(img),
imagehash.dhash(img),
imagehash.colorhash(img),
)
def _compute_md5(path: Path) -> str:
"""Compute MD5 hash of file contents."""
md5 = hashlib.md5()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
md5.update(chunk)
return md5.hexdigest()
def _get_gif_frame_info(path: Path) -> list[tuple[str, int]] | None:
"""
Get perceptual hash and duration for each frame of a GIF.
Returns list of (hash_string, duration_ms) tuples, or None if not a multi-frame image.
"""
try:
with Image.open(path) as img:
n_frames = getattr(img, "n_frames", 1)
if n_frames <= 1:
return None
frame_info = []
for i in range(n_frames):
img.seek(i)
frame = img.copy()
if frame.mode != "RGBA":
frame = frame.convert("RGBA")
duration = img.info.get("duration", 0)
frame_info.append((str(imagehash.phash(frame)), duration))
return frame_info
except Exception:
return None
def _gifs_are_identical(path1: Path, path2: Path) -> bool:
"""
Compare two GIFs frame-by-frame to check if they have identical content AND timing.
Returns True only if all frames and durations match.
"""
info1 = _get_gif_frame_info(path1)
info2 = _get_gif_frame_info(path2)
# If either isn't a multi-frame GIF, fall back to MD5 comparison
if info1 is None or info2 is None:
return _compute_md5(path1) == _compute_md5(path2)
# Frame counts must match
if len(info1) != len(info2):
return False
# All frames AND durations must match
return info1 == info2
def compute_image_info(path: Path) -> ImageInfo | None:
"""
Compute image metadata and perceptual hashes.
For animated GIFs, samples middle frame to avoid blank first-frame issues.
Returns None if image can't be processed.
"""
try:
md5 = _compute_md5(path)
with Image.open(path) as img:
width, height = img.size
n_frames = getattr(img, "n_frames", 1)
is_animated = getattr(img, "is_animated", False)
if not is_animated:
hashes = _compute_hashes(img)
else:
# For animated images, use middle frame for hashing
middle_frame = n_frames // 2
try:
img.seek(middle_frame)
hashes = _compute_hashes(img.copy())
except EOFError:
img.seek(0)
hashes = _compute_hashes(img)
return ImageInfo(
phash=hashes[0],
ahash=hashes[1],
dhash=hashes[2],
colorhash=hashes[3],
width=width,
height=height,
n_frames=n_frames,
md5=md5,
)
except Exception as e:
print(f" Warning: Could not process {path.name}: {e}")
return None
def _files_size_similar(path1: Path, path2: Path) -> bool:
"""Check if two files have similar sizes (within MAX_SIZE_DIFF_RATIO)."""
size1 = path1.stat().st_size
size2 = path2.stat().st_size
if size1 == 0 or size2 == 0:
return size1 == size2
ratio = abs(size1 - size2) / max(size1, size2)
return ratio <= MAX_SIZE_DIFF_RATIO
def _verify_duplicate_pair(
path_i: Path, info_i: ImageInfo, path_j: Path, info_j: ImageInfo, threshold: int
) -> bool:
"""
Verify if two candidate images are true duplicates.
For animated GIFs, compares frames and timing. For static images, perceptual match is sufficient.
"""
# For animated images, verify frame-by-frame including timing
if info_i.is_animated() and info_j.is_animated():
return _gifs_are_identical(path_i, path_j)
# For static images, perceptual hash agreement is sufficient
# (handles re-compressed/re-exported duplicates with different file sizes)
return True
def find_similar_groups(
files: list[Path], threshold: int
) -> list[list[tuple[Path, ImageInfo]]]:
"""Find groups of similar images using multi-hash consensus and union-find."""
# Compute image info for all files
images: list[tuple[Path, ImageInfo]] = []
for file in files:
file_hash = hash_file(file)
by_hash[file_hash].append(file)
return {h: paths for h, paths in by_hash.items() if len(paths) > 1}
info = compute_image_info(file)
if info is not None:
# Skip images with degenerate (all-zero) hashes - they can't be meaningfully compared
if not info._has_degenerate_hash():
images.append((file, info))
if not images:
return []
# Use union-find to cluster similar images
# First pass: find candidates based on hashes and metadata
# Second pass: verify GIFs with frame comparison
uf = UnionFind()
for i, (path_i, info_i) in enumerate(images):
uf.find(i) # Initialize
for j in range(i + 1, len(images)):
path_j, info_j = images[j]
# Check if candidates based on hashes/metadata
is_candidate, _, _ = info_i.is_candidate(info_j, threshold)
if not is_candidate:
continue
# For animated images, also check file size similarity
# (static images may have different compression, so skip size check)
if info_i.is_animated() and not _files_size_similar(path_i, path_j):
continue
# Verify: for GIFs, compare frames; for static, already verified by hashes
if _verify_duplicate_pair(path_i, info_i, path_j, info_j, threshold):
uf.union(i, j)
# Group by cluster
clusters: dict[int, list[tuple[Path, ImageInfo]]] = {}
for i, (path, info) in enumerate(images):
root = uf.find(i)
if root not in clusters:
clusters[root] = []
clusters[root].append((path, info))
# Return only groups with duplicates
return [group for group in clusters.values() if len(group) > 1]
def deduplicate(duplicates: dict[str, list[Path]]) -> tuple[int, int]:
def deduplicate(
groups: list[list[tuple[Path, ImageInfo]]], dry_run: bool, threshold: int
) -> tuple[int, int]:
"""Remove duplicates, keeping first alphabetically. Returns (groups, removed)."""
total_removed = 0
for file_hash, paths in duplicates.items():
sorted_paths = sorted(paths, key=lambda p: p.name.lower())
keep = sorted_paths[0]
remove = sorted_paths[1:]
for group in groups:
# Sort by filename alphabetically
sorted_group = sorted(group, key=lambda x: x[0].name.lower())
keep_path, keep_info = sorted_group[0]
remove = sorted_group[1:]
print(f"\nDuplicate group ({len(paths)} files):")
print(f" KEEP: {keep.name}")
for path in remove:
print(f" DELETE: {path.name}")
path.unlink()
total_removed += 1
# Calculate agreement info for display
agreements_info = [keep_info.is_candidate(info, threshold) for _, info in remove]
min_agreements = min(a for _, a, _ in agreements_info)
return len(duplicates), total_removed
frames_str = f", {keep_info.n_frames} frames" if keep_info.is_animated() else ""
print(f"\nSimilar group ({len(group)} files, {keep_info.width}x{keep_info.height}{frames_str}):")
print(f" KEEP: {keep_path.name}")
for (path, info), (_, agreements, total_dist) in zip(remove, agreements_info):
action = "WOULD DELETE" if dry_run else "DELETE"
print(f" {action}: {path.name} (agreements: {agreements}/4, dist: {total_dist})")
if not dry_run:
path.unlink()
total_removed += 1
if dry_run:
return len(groups), sum(len(g) - 1 for g in groups)
return len(groups), total_removed
def main():
files = [
f for f in EMOJI_DIR.iterdir()
if f.suffix.lower() in EXTENSIONS
]
parser = argparse.ArgumentParser(
description="Find and remove duplicate emoji files using perceptual hashing."
)
parser.add_argument(
"--threshold",
type=int,
default=0,
help="Similarity threshold (0=exact, default=0)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show duplicates without deleting",
)
parser.add_argument(
"--dir",
type=Path,
default=Path("emoji"),
help="Directory to scan (default: emoji/)",
)
args = parser.parse_args()
emoji_dir = args.dir
if not emoji_dir.exists():
print(f"Error: Directory '{emoji_dir}' does not exist.")
return
files = [f for f in emoji_dir.iterdir() if f.suffix.lower() in EXTENSIONS]
if not files:
print("No image files found in emoji/ folder.")
print(f"No image files found in {emoji_dir}/ folder.")
return
print(f"Scanning {len(files)} files...")
print(f"Scanning {len(files)} files (threshold: {args.threshold})...")
if args.dry_run:
print("(dry-run mode - no files will be deleted)")
duplicates = find_duplicates(files)
groups = find_similar_groups(files, args.threshold)
if not duplicates:
print("\nNo duplicates found.")
if not groups:
print("\nNo similar images found.")
return
groups, removed = deduplicate(duplicates)
group_count, removed = deduplicate(groups, args.dry_run, args.threshold)
print(f"\n--- Summary ---")
print(f"Files scanned: {len(files)}")
print(f"Duplicate groups: {groups}")
print(f"Files removed: {removed}")
print(f"Similar groups: {group_count}")
if args.dry_run:
print(f"Files to remove: {removed}")
else:
print(f"Files removed: {removed}")
if __name__ == "__main__":