#!/usr/bin/env python3 """ Strumento avanzato per checksum: CRC32, MD5, SHA1, SHA256, SHA512. Supporta generazione e verifica, con parallelizzazione opzionale. Modalità automatica senza argomenti: - Cerca file di checksum noti (.sha256, .md5, .sha1, .sha512, .crc32) e verifica. - Se non trovato, genera automaticamente checksum SHA256 della directory corrente. # Generare un checksum SHA256 di una cartella (default) python checksum.py generate ./documenti # Generare un checksum CRC32 (velocissimo) senza ricorsione python checksum.py generate ./grande_archivio -a crc32 --no-recursive -o archivio.crc32 # Generare MD5 in parallelo per una cartella con molti file python checksum.py generate ./foto -a md5 -p # Verificare un file .sha1 (algoritmo dedotto automaticamente) python checksum.py verify checksum.sha1 # Verificare un file .md5 con algoritmo specificato e parallelismo python checksum.py verify backup.md5 -a md5 -p -b /mnt/backup # Modalità automatica (cerca un checksum nella cartella corrente) python checksum.py """ import argparse import hashlib import os import sys import zlib # per CRC32 from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path # Mappa algoritmo -> (funzione hash, estensione predefinita) ALGORITHMS = { 'crc32': (lambda: _CRC32Wrapper(), '.crc32'), 'md5': (hashlib.md5, '.md5'), 'sha1': (hashlib.sha1, '.sha1'), 'sha256': (hashlib.sha256,'.sha256'), 'sha512': (hashlib.sha512,'.sha512'), } class _CRC32Wrapper: """Wrapper per rendere zlib.crc32 compatibile con l'interfaccia hashlib.""" def __init__(self): self._crc = 0 def update(self, data): self._crc = zlib.crc32(data, self._crc) def hexdigest(self): return format(self._crc & 0xFFFFFFFF, '08x') def digest(self): return self._crc.to_bytes(4, 'big') def file_hash(filepath, algorithm): """Calcola l'hash di un file con l'algoritmo scelto.""" h = ALGORITHMS[algorithm][0]() with open(filepath, 'rb') as f: while True: chunk = f.read(65536) # 64 KB buffer if not chunk: break h.update(chunk) return h.hexdigest() def generate_checksums(path, output_file, algorithm='sha256', recursive=True, parallel=False): """Genera un file di checksum per un file o directory.""" base = Path(path).resolve() output = Path(output_file).resolve() if base.is_file(): checksum = file_hash(base, algorithm) rel_path = base.name with open(output, 'w', encoding='utf-8') as f: f.write(f"{checksum} {rel_path}\n") print(f"Generato checksum per 1 file in '{output}'") return if base.is_dir(): glob_pattern = '**/*' if recursive else '*' all_files = [p for p in base.glob(glob_pattern) if p.is_file() and p.resolve() != output] all_files.sort() if not all_files: print("Nessun file trovato.") return if parallel: # Calcolo parallelo con mappa (filepath -> (rel, hash)) from collections import OrderedDict results = {} with ThreadPoolExecutor() as executor: future_to_fp = { executor.submit(_hash_worker, fp, base, algorithm): fp for fp in all_files } for future in as_completed(future_to_fp): fp = future_to_fp[future] rel_str, h = future.result() results[rel_str] = h print(f"Calcolato hash: {rel_str}") # Scrittura ordinata per percorso relativo with open(output, 'w', encoding='utf-8') as f: for rel_str in sorted(results.keys()): f.write(f"{results[rel_str]} {rel_str}\n") else: with open(output, 'w', encoding='utf-8') as f: for fp in all_files: rel = fp.relative_to(base) rel_str = str(rel).replace('\\', '/') print(f"Calcolando hash: {rel_str}") h = file_hash(fp, algorithm) f.write(f"{h} {rel_str}\n") print(f"Generato checksum per {len(all_files)} file in '{output}'") return print(f"ERRORE: '{path}' non è un file o directory valido.") sys.exit(1) def _hash_worker(filepath, base_dir, algorithm): """Funzione di supporto per il calcolo parallelo.""" rel = filepath.relative_to(base_dir) rel_str = str(rel).replace('\\', '/') h = file_hash(filepath, algorithm) return rel_str, h def verify_checksums(checksum_file, base_dir=None, algorithm=None, parallel=False): """Verifica un file di checksum. Se algorithm è None, lo deduce dall'estensione.""" checksum_path = Path(checksum_file) if not checksum_path.is_file(): print(f"ERRORE: Il file di checksum '{checksum_file}' non esiste.") sys.exit(1) # Deduzione automatica dell'algoritmo dall'estensione if algorithm is None: ext = checksum_path.suffix.lower() # es. '.sha256' # Cerca corrispondenza inversa algo = None for a, (_, ext_def) in ALGORITHMS.items(): if ext == ext_def: algo = a break if algo is None: print(f"ERRORE: Impossibile dedurre l'algoritmo dall'estensione '{ext}'. Specificarlo con -a.") sys.exit(1) algorithm = algo print(f"Algoritmo dedotto dall'estensione: {algorithm}") if algorithm not in ALGORITHMS: print(f"ERRORE: Algoritmo '{algorithm}' non supportato. Scegli tra: {', '.join(ALGORITHMS.keys())}") sys.exit(1) if base_dir is None: base_dir = checksum_path.resolve().parent else: base_dir = Path(base_dir).resolve() # Legge il file di checksum entries = {} with open(checksum_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue if ' ' not in line: print(f"Riga ignorata (formato errato): {line}") continue hash_hex, rel_path = line.split(' ', 1) entries[rel_path] = hash_hex if not entries: print("Nessuna voce valida nel file di checksum.") return # Stato verifica status = {'ok': [], 'modified': [], 'missing': [], 'extra': []} # Raccoglie i file effettivamente presenti actual_files = set() for root, dirs, files in os.walk(base_dir): for fname in files: fp = Path(root) / fname try: rel = str(fp.relative_to(base_dir)).replace('\\', '/') except ValueError: continue actual_files.add(rel) # Funzione di verifica per un singolo file (per parallelizzazione opzionale) def check_file(rel_path, expected_hash): full_path = base_dir / rel_path if not full_path.is_file(): return ('missing', rel_path, None, expected_hash) current_hash = file_hash(full_path, algorithm) if current_hash == expected_hash: return ('ok', rel_path, None, expected_hash) else: return ('modified', rel_path, current_hash, expected_hash) # Esecuzione (sequenziale o parallela) if parallel: with ThreadPoolExecutor() as executor: futures = {executor.submit(check_file, rel, exp): rel for rel, exp in entries.items()} for future in as_completed(futures): tipo, rel, curr, exp = future.result() if tipo == 'ok': status['ok'].append(rel) elif tipo == 'modified': status['modified'].append((rel, curr, exp)) elif tipo == 'missing': status['missing'].append(rel) else: for rel_path, expected_hash in entries.items(): tipo, rel, curr, exp = check_file(rel_path, expected_hash) if tipo == 'ok': status['ok'].append(rel) elif tipo == 'modified': status['modified'].append((rel, curr, exp)) elif tipo == 'missing': status['missing'].append(rel) # File extra (presenti ma non nel checksum) tracked = set(entries.keys()) for rel in actual_files: if rel not in tracked: if Path(base_dir, rel).resolve() != checksum_path.resolve(): status['extra'].append(rel) status['extra'].sort() # Report print("\n" + "="*60) print(" REPORT DI VERIFICA") print("="*60) if status['ok']: print(f"\n✓ File integri ({len(status['ok'])}):") for f in status['ok']: print(f" {f}") if status['modified']: print(f"\n✗ File modificati ({len(status['modified'])}):") for fname, curr, exp in status['modified']: print(f" {fname}") print(f" Hash attuale : {curr}") print(f" Hash atteso : {exp}") if status['missing']: print(f"\n✗ File mancanti ({len(status['missing'])}):") for f in status['missing']: print(f" {f}") if status['extra']: print(f"\n? File non tracciati ({len(status['extra'])}):") for f in status['extra']: print(f" {f}") total_issues = len(status['modified']) + len(status['missing']) if total_issues == 0 and not status['extra']: print("\nTutti i file tracciati nel checksum sono presenti e integri.") elif total_issues == 0: print("\nTutti i file tracciati sono presenti e integri (ci sono file extra non tracciati).") else: print(f"\nRiepilogo: {len(status['ok'])} OK, {len(status['modified'])} modificati, {len(status['missing'])} mancanti, {len(status['extra'])} non tracciati.") def auto_mode(): """Modalità automatica senza argomenti.""" cwd = os.getcwd() # Cerca file con estensioni note known_exts = {ext for _, (_, ext) in ALGORITHMS.items()} found = [] for f in os.listdir(cwd): full = os.path.join(cwd, f) if os.path.isfile(full): ext = Path(f).suffix.lower() if ext in known_exts: found.append(full) if found: # Prende il primo trovato (si potrebbe scegliere il più recente, ma va bene) target = found[0] print(f"Trovato file di checksum: {target}") verify_checksums(target, parallel=False) # auto mode senza parallel per semplicità else: now = datetime.now().strftime("%Y%m%d_%H%M%S") output = os.path.join(cwd, f"checksum_{now}.sha256") print(f"Nessun file di checksum trovato. Generazione automatica SHA256 della directory corrente in:\n{output}") generate_checksums(cwd, output, algorithm='sha256', recursive=True) if __name__ == "__main__": if len(sys.argv) == 1: auto_mode() else: parser = argparse.ArgumentParser(description="Strumento di checksum multi-algoritmo.") subparsers = parser.add_subparsers(dest='command', required=True) # Genera gen = subparsers.add_parser('generate', help='Genera file di checksum') gen.add_argument('path', help='File o directory') gen.add_argument('-o', '--output', default=None, help='File di output (default: checksum. nella directory)') gen.add_argument('-a', '--algorithm', default='sha256', choices=ALGORITHMS.keys(), help='Algoritmo di hash (default: sha256)') gen.add_argument('-r', '--recursive', action='store_true', default=True, help='Ricorsivo (default: sì)') gen.add_argument('--no-recursive', dest='recursive', action='store_false', help='Non ricorsivo') gen.add_argument('-p', '--parallel', action='store_true', help='Elaborazione parallela (più veloce per molti file)') # Verifica ver = subparsers.add_parser('verify', help='Verifica file di checksum') ver.add_argument('checksum_file', help='File di checksum') ver.add_argument('-b', '--base-dir', default=None, help='Directory base per i percorsi relativi') ver.add_argument('-a', '--algorithm', default=None, choices=ALGORITHMS.keys(), help='Algoritmo (se non specificato, dedotto dall\'estensione)') ver.add_argument('-p', '--parallel', action='store_true', help='Verifica parallela') args = parser.parse_args() if args.command == 'generate': output = args.output if output is None: p = Path(args.path) ext = ALGORITHMS[args.algorithm][1] if p.is_dir(): output = p / f"checksum{ext}" else: output = p.with_suffix(ext) generate_checksums(args.path, output, args.algorithm, args.recursive, args.parallel) elif args.command == 'verify': verify_checksums(args.checksum_file, args.base_dir, args.algorithm, args.parallel)