#!/usr/bin/env python3 """Find and remove duplicate emoji files using perceptual hashing.""" import argparse import hashlib from pathlib import Path from dataclasses import dataclass import imagehash from PIL import Image EXTENSIONS = (".png", ".gif", ".jpg", ".jpeg") # Number of hash algorithms that must agree for images to be considered similar MIN_HASH_AGREEMENT = 4 # Maximum file size difference ratio for duplicates (e.g., 0.05 = 5% difference allowed) MAX_SIZE_DIFF_RATIO = 0.02 @dataclass class ImageInfo: """Container for image metadata and hashes.""" phash: imagehash.ImageHash ahash: imagehash.ImageHash dhash: imagehash.ImageHash colorhash: imagehash.ImageHash width: int height: int n_frames: int # 1 for static images md5: str # File content hash for exact duplicate detection def _has_degenerate_hash(self) -> bool: """Check if this image has degenerate (all-zero) hashes, indicating mostly transparent content.""" zero_hash = "0000000000000000" # If 3+ hashes are all zeros, the image is likely mostly transparent zero_count = sum(1 for h in [str(self.phash), str(self.ahash), str(self.dhash)] if h == zero_hash) return zero_count >= 3 def is_candidate(self, other: "ImageInfo", threshold: int) -> tuple[bool, int, int]: """ Check if two images are candidate duplicates based on metadata and hashes. Returns (is_candidate, agreements, total_distance). This is a fast pre-filter. GIFs require additional frame verification. """ # Dimensions must match exactly if self.width != other.width or self.height != other.height: return False, 0, 999 # Frame count must match for animated images if self.n_frames != other.n_frames: return False, 0, 999 # Calculate perceptual hash distances distances = [ self.phash - other.phash, self.ahash - other.ahash, self.dhash - other.dhash, self.colorhash - other.colorhash, ] total_distance = sum(distances) agreements = sum(1 for d in distances if d <= threshold) # For static images: detect re-compressed/re-exported duplicates # Require identical structure AND color, with small perceptual variance: # - aHash=0 AND dHash=0 AND colorHash=0 AND pHash <= 10 # - OR all 4 hashes match exactly (total_distance = 0) if self.n_frames == 1: phash_dist = self.phash - other.phash ahash_dist = self.ahash - other.ahash dhash_dist = self.dhash - other.dhash chash_dist = self.colorhash - other.colorhash # Identical structure + color, small perceptual variance = re-compressed image if ahash_dist == 0 and dhash_dist == 0 and chash_dist == 0 and phash_dist <= 10: return True, agreements, total_distance # All hashes match exactly if total_distance == 0: return True, agreements, total_distance return False, agreements, total_distance # For animated images: require all 4 hashes to agree (will be verified by frame check) return agreements >= MIN_HASH_AGREEMENT, agreements, total_distance def is_animated(self) -> bool: """Check if this is an animated image (multiple frames).""" return self.n_frames > 1 class UnionFind: """Union-Find data structure for clustering similar images.""" def __init__(self): self.parent = {} def find(self, x): if x not in self.parent: self.parent[x] = x if self.parent[x] != x: self.parent[x] = self.find(self.parent[x]) return self.parent[x] def union(self, x, y): px, py = self.find(x), self.find(y) if px != py: self.parent[px] = py def _compute_hashes(img: Image.Image) -> tuple[imagehash.ImageHash, ...]: """Compute all hash types for a single image/frame.""" # Convert to RGBA to handle transparency consistently if img.mode != "RGBA": img = img.convert("RGBA") return ( imagehash.phash(img), imagehash.average_hash(img), imagehash.dhash(img), imagehash.colorhash(img), ) def _compute_md5(path: Path) -> str: """Compute MD5 hash of file contents.""" md5 = hashlib.md5() with open(path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): md5.update(chunk) return md5.hexdigest() def _get_gif_frame_info(path: Path) -> list[tuple[str, int]] | None: """ Get perceptual hash and duration for each frame of a GIF. Returns list of (hash_string, duration_ms) tuples, or None if not a multi-frame image. """ try: with Image.open(path) as img: n_frames = getattr(img, "n_frames", 1) if n_frames <= 1: return None frame_info = [] for i in range(n_frames): img.seek(i) frame = img.copy() if frame.mode != "RGBA": frame = frame.convert("RGBA") duration = img.info.get("duration", 0) frame_info.append((str(imagehash.phash(frame)), duration)) return frame_info except Exception: return None def _gifs_are_identical(path1: Path, path2: Path) -> bool: """ Compare two GIFs frame-by-frame to check if they have identical content AND timing. Returns True only if all frames and durations match. """ info1 = _get_gif_frame_info(path1) info2 = _get_gif_frame_info(path2) # If either isn't a multi-frame GIF, fall back to MD5 comparison if info1 is None or info2 is None: return _compute_md5(path1) == _compute_md5(path2) # Frame counts must match if len(info1) != len(info2): return False # All frames AND durations must match return info1 == info2 def compute_image_info(path: Path) -> ImageInfo | None: """ Compute image metadata and perceptual hashes. For animated GIFs, samples middle frame to avoid blank first-frame issues. Returns None if image can't be processed. """ try: md5 = _compute_md5(path) with Image.open(path) as img: width, height = img.size n_frames = getattr(img, "n_frames", 1) is_animated = getattr(img, "is_animated", False) if not is_animated: hashes = _compute_hashes(img) else: # For animated images, use middle frame for hashing middle_frame = n_frames // 2 try: img.seek(middle_frame) hashes = _compute_hashes(img.copy()) except EOFError: img.seek(0) hashes = _compute_hashes(img) return ImageInfo( phash=hashes[0], ahash=hashes[1], dhash=hashes[2], colorhash=hashes[3], width=width, height=height, n_frames=n_frames, md5=md5, ) except Exception as e: print(f" Warning: Could not process {path.name}: {e}") return None def _files_size_similar(path1: Path, path2: Path) -> bool: """Check if two files have similar sizes (within MAX_SIZE_DIFF_RATIO).""" size1 = path1.stat().st_size size2 = path2.stat().st_size if size1 == 0 or size2 == 0: return size1 == size2 ratio = abs(size1 - size2) / max(size1, size2) return ratio <= MAX_SIZE_DIFF_RATIO def _verify_duplicate_pair( path_i: Path, info_i: ImageInfo, path_j: Path, info_j: ImageInfo, threshold: int ) -> bool: """ Verify if two candidate images are true duplicates. For animated GIFs, compares frames and timing. For static images, perceptual match is sufficient. """ # For animated images, verify frame-by-frame including timing if info_i.is_animated() and info_j.is_animated(): return _gifs_are_identical(path_i, path_j) # For static images, perceptual hash agreement is sufficient # (handles re-compressed/re-exported duplicates with different file sizes) return True def find_similar_groups( files: list[Path], threshold: int ) -> list[list[tuple[Path, ImageInfo]]]: """Find groups of similar images using multi-hash consensus and union-find.""" # Compute image info for all files images: list[tuple[Path, ImageInfo]] = [] for file in files: info = compute_image_info(file) if info is not None: # Skip images with degenerate (all-zero) hashes - they can't be meaningfully compared if not info._has_degenerate_hash(): images.append((file, info)) if not images: return [] # Use union-find to cluster similar images # First pass: find candidates based on hashes and metadata # Second pass: verify GIFs with frame comparison uf = UnionFind() for i, (path_i, info_i) in enumerate(images): uf.find(i) # Initialize for j in range(i + 1, len(images)): path_j, info_j = images[j] # Check if candidates based on hashes/metadata is_candidate, _, _ = info_i.is_candidate(info_j, threshold) if not is_candidate: continue # For animated images, also check file size similarity # (static images may have different compression, so skip size check) if info_i.is_animated() and not _files_size_similar(path_i, path_j): continue # Verify: for GIFs, compare frames; for static, already verified by hashes if _verify_duplicate_pair(path_i, info_i, path_j, info_j, threshold): uf.union(i, j) # Group by cluster clusters: dict[int, list[tuple[Path, ImageInfo]]] = {} for i, (path, info) in enumerate(images): root = uf.find(i) if root not in clusters: clusters[root] = [] clusters[root].append((path, info)) # Return only groups with duplicates return [group for group in clusters.values() if len(group) > 1] def deduplicate( groups: list[list[tuple[Path, ImageInfo]]], dry_run: bool, threshold: int ) -> tuple[int, int]: """Remove duplicates, keeping first alphabetically. Returns (groups, removed).""" total_removed = 0 for group in groups: # Sort by filename alphabetically sorted_group = sorted(group, key=lambda x: x[0].name.lower()) keep_path, keep_info = sorted_group[0] remove = sorted_group[1:] # Calculate agreement info for display agreements_info = [keep_info.is_candidate(info, threshold) for _, info in remove] min_agreements = min(a for _, a, _ in agreements_info) frames_str = f", {keep_info.n_frames} frames" if keep_info.is_animated() else "" print(f"\nSimilar group ({len(group)} files, {keep_info.width}x{keep_info.height}{frames_str}):") print(f" KEEP: {keep_path.name}") for (path, info), (_, agreements, total_dist) in zip(remove, agreements_info): action = "WOULD DELETE" if dry_run else "DELETE" print(f" {action}: {path.name} (agreements: {agreements}/4, dist: {total_dist})") if not dry_run: path.unlink() total_removed += 1 if dry_run: return len(groups), sum(len(g) - 1 for g in groups) return len(groups), total_removed def main(): parser = argparse.ArgumentParser( description="Find and remove duplicate emoji files using perceptual hashing." ) parser.add_argument( "--threshold", type=int, default=0, help="Similarity threshold (0=exact, default=0)", ) parser.add_argument( "--dry-run", action="store_true", help="Show duplicates without deleting", ) parser.add_argument( "--dir", type=Path, default=Path("emoji"), help="Directory to scan (default: emoji/)", ) args = parser.parse_args() emoji_dir = args.dir if not emoji_dir.exists(): print(f"Error: Directory '{emoji_dir}' does not exist.") return files = [f for f in emoji_dir.iterdir() if f.suffix.lower() in EXTENSIONS] if not files: print(f"No image files found in {emoji_dir}/ folder.") return print(f"Scanning {len(files)} files (threshold: {args.threshold})...") if args.dry_run: print("(dry-run mode - no files will be deleted)") groups = find_similar_groups(files, args.threshold) if not groups: print("\nNo similar images found.") return group_count, removed = deduplicate(groups, args.dry_run, args.threshold) print(f"\n--- Summary ---") print(f"Files scanned: {len(files)}") print(f"Similar groups: {group_count}") if args.dry_run: print(f"Files to remove: {removed}") else: print(f"Files removed: {removed}") if __name__ == "__main__": main()