#!/usr/bin/env python3 """ Netcat-like tool in Python 3 Ora con IPv6, timeout, keep‑listening, source address, ecc. rrw.it Utilizzo: Cliente: netcat.py [opzioni] Server: netcat.py -l [opzioni] [-s indirizzo] -p Opzioni comuni: -u Usa UDP invece di TCP -4 / -6 Forza IPv4 o IPv6 -n Non risolvere i nomi (solo numerico) -w Timeout per connessione e I/O -c Chiudi la connessione quando stdin finisce (solo client) -e Esegui il comando e collega I/O al socket -v Output verboso (su stderr) Modalità server: -l Modalità ascolto -s Indirizzo su cui mettersi in ascolto (default: 0.0.0.0 o ::) -p Porta in ascolto -k Accetta più connessioni (solo TCP) Esempi: # Shell remota (server) python netcat.py -l -p 4444 -e /bin/bash -v # Connessione interattiva (client) python netcat.py -w 5 example.com 80 # Server IPv6 con keep‑listening python netcat.py -l -6 -s :: -p 8080 -k """ import os import sys import socket import threading import subprocess import argparse import time # ------------------------------------------------------------ # Gestore di una singola connessione (flusso bidirezionale) # ------------------------------------------------------------ class ConnectionHandler: def __init__(self, sock, addr, verbose=False, exec_cmd=None, close_on_eof=False, timeout=None): self.sock = sock self.addr = addr self.verbose = verbose self.exec_cmd = exec_cmd self.close_on_eof = close_on_eof self.timeout = timeout self.stop_event = threading.Event() if self.timeout: self.sock.settimeout(self.timeout) def log(self, msg): if self.verbose: print(f"[*] {msg}", file=sys.stderr) def start(self): """Avvia il trasferimento: normale o con esecuzione comando.""" if self.exec_cmd: self._handle_exec() else: self._handle_stdin_stdout() def _handle_stdin_stdout(self): """Scambio dati tra socket e stdin/stdout.""" def receive(): while not self.stop_event.is_set(): try: data = self.sock.recv(4096) if not data: break sys.stdout.buffer.write(data) sys.stdout.buffer.flush() except socket.timeout: continue except Exception as e: if self.verbose: self.log(f"Errore in ricezione: {e}") break self.stop_event.set() def send(): while not self.stop_event.is_set(): try: data = sys.stdin.buffer.read1(4096) if not data: if self.close_on_eof: self.sock.shutdown(socket.SHUT_WR) break self.sock.sendall(data) except socket.timeout: continue except Exception as e: if self.verbose: self.log(f"Errore in invio: {e}") break self.stop_event.set() t_recv = threading.Thread(target=receive, daemon=True) t_send = threading.Thread(target=send, daemon=True) t_recv.start() t_send.start() try: while not self.stop_event.is_set(): self.stop_event.wait(0.5) except KeyboardInterrupt: self.log("Interruzione da tastiera") finally: self.sock.close() # In modalità client, usciamo. In modalità server (con -k) il main rimane in ascolto. if not (args.listen and args.keep): os._exit(0) def _handle_exec(self): """Collega il socket a stdin/stdout/stderr di un comando.""" try: proc = subprocess.Popen( self.exec_cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) except Exception as e: self.log(f"Impossibile avviare il comando: {e}") return def socket_to_process(): while not self.stop_event.is_set(): try: data = self.sock.recv(4096) if not data: break proc.stdin.write(data) proc.stdin.flush() except socket.timeout: continue except Exception: break proc.stdin.close() self.stop_event.set() def process_to_socket(): while not self.stop_event.is_set(): try: data = proc.stdout.read(4096) if not data: break self.sock.sendall(data) except socket.timeout: continue except Exception: break proc.terminate() self.stop_event.set() t_s2p = threading.Thread(target=socket_to_process, daemon=True) t_p2s = threading.Thread(target=process_to_socket, daemon=True) t_s2p.start() t_p2s.start() try: while not self.stop_event.is_set(): self.stop_event.wait(0.5) except KeyboardInterrupt: self.log("Interruzione da tastiera") finally: proc.terminate() self.sock.close() if not (args.listen and args.keep): os._exit(0) # ------------------------------------------------------------ # Funzioni di connessione (client) # ------------------------------------------------------------ def create_client_socket(host, port, family=socket.AF_UNSPEC, udp=False, source=None, timeout=None, numeric=False): """Crea e connette un socket (TCP o UDP) con risoluzione DNS e indirizzo di origine.""" sock_type = socket.SOCK_DGRAM if udp else socket.SOCK_STREAM proto = 0 # IPPROTO_IP, ma getaddrinfo lo imposta correttamente flags = socket.AI_ADDRCONFIG if numeric: flags |= socket.AI_NUMERICHOST try: addrinfo = socket.getaddrinfo(host, port, family, sock_type, proto, flags) except socket.gaierror as e: print(f"Errore risoluzione host: {e}", file=sys.stderr) sys.exit(1) # Seleziona il primo risultato fam, socktype, proto, canonname, sa = addrinfo[0] sock = socket.socket(fam, socktype, proto) if timeout: sock.settimeout(timeout) # Indirizzo di origine (bind locale) if source: try: # Risolvi anche l'indirizzo di origine src_addrinfo = socket.getaddrinfo(source, 0, fam, socktype, proto, flags) src_fam, src_socktype, src_proto, _, src_sa = src_addrinfo[0] sock.bind(src_sa) except Exception as e: print(f"Errore bind su {source}: {e}", file=sys.stderr) sys.exit(1) if udp: # Per UDP, connect imposta l'indirizzo di default per send/recv sock.connect(sa) else: try: sock.connect(sa) except Exception as e: print(f"Errore di connessione: {e}", file=sys.stderr) sys.exit(1) return sock, sa # ------------------------------------------------------------ # Funzioni server # ------------------------------------------------------------ def create_server_socket(bind_addr, port, family=socket.AF_UNSPEC, udp=False, numeric=False): """Crea un socket in ascolto (TCP) o pronto per ricevere (UDP).""" sock_type = socket.SOCK_DGRAM if udp else socket.SOCK_STREAM flags = socket.AI_PASSIVE if numeric: flags |= socket.AI_NUMERICHOST # Se non è specificato un indirizzo, usiamo '' per tutte le interfacce, # ma getaddrinfo con AI_PASSIVE e '' restituirà 0.0.0.0 e :: a seconda della famiglia. if bind_addr is None: bind_addr = '' try: addrinfo = socket.getaddrinfo(bind_addr, port, family, sock_type, 0, flags) except socket.gaierror as e: print(f"Errore risoluzione indirizzo di bind: {e}", file=sys.stderr) sys.exit(1) # Potrebbero esserci più risultati (es. IPv4 e IPv6). Selezioniamo in base a -4/-6 se forzato. # Qui per semplicità prendiamo il primo, ma potremmo creare socket multipli. # Nella pratica, l'utente può specificare un indirizzo concreto. fam, socktype, proto, _, sa = addrinfo[0] sock = socket.socket(fam, socktype, proto) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Per IPv6, disabilitiamo IPV6_V6ONLY per avere dual-stack? Di default è True su molti sistemi. # Non lo cambiamo per semplicità, l'utente può forzare -6. sock.bind(sa) if not udp: sock.listen(5) return sock, sa # ------------------------------------------------------------ # Main # ------------------------------------------------------------ if __name__ == "__main__": parser = argparse.ArgumentParser(description="Netcat-like tool con IPv6") parser.add_argument("-l", "--listen", action="store_true", help="Modalità ascolto (server)") parser.add_argument("-p", "--port", type=int, required=True, help="Porta su cui ascoltare o a cui connettersi") parser.add_argument("-u", "--udp", action="store_true", help="Usa UDP invece di TCP") parser.add_argument("-e", "--execute", metavar="COMANDO", help="Esegue il comando e collega I/O al socket") parser.add_argument("-v", "--verbose", action="store_true", help="Output verboso") parser.add_argument("-4", "--ipv4", action="store_true", help="Forza IPv4") parser.add_argument("-6", "--ipv6", action="store_true", help="Forza IPv6") parser.add_argument("-n", "--numeric", action="store_true", help="Non risolvere i nomi (solo numerico)") parser.add_argument("-s", "--source", metavar="INDIRIZZO", help="Indirizzo di origine (client) o di bind (server)") parser.add_argument("-w", "--timeout", type=float, metavar="SEC", help="Timeout per connessione e I/O (secondi)") parser.add_argument("-c", "--close-on-eof", action="store_true", help="Chiudi la connessione quando stdin finisce") parser.add_argument("-k", "--keep", action="store_true", help="In ascolto, accetta più connessioni (solo TCP)") parser.add_argument("host", nargs="?", help="Host remoto (obbligatorio in modalità client)") args = parser.parse_args() # Determina la famiglia di indirizzi if args.ipv4 and args.ipv6: parser.error("Non puoi usare -4 e -6 insieme") family = socket.AF_INET6 if args.ipv6 else socket.AF_INET if args.ipv4 else socket.AF_UNSPEC # Modalità server if args.listen: # Se server, host non richiesto if args.host: parser.error("In modalità server non si specifica host (usa -s per l'indirizzo di bind)") if args.udp and args.keep: parser.error("-k non supportato con UDP") if args.close_on_eof: parser.error("-c ha senso solo in modalità client") # Crea socket server sock, sa = create_server_socket(args.source, args.port, family, args.udp, args.numeric) if args.verbose: proto = "UDP" if args.udp else "TCP" print(f"[*] In ascolto {proto} su {sa[0]}:{sa[1]}", file=sys.stderr) if args.udp: # UDP: ricevi il primo datagramma per conoscere il client # Nota: in UDP, -k non ha senso, gestiamo una sola connessione data, client_addr = sock.recvfrom(4096) if args.verbose: print(f"[*] Primo datagramma da {client_addr[0]}:{client_addr[1]}", file=sys.stderr) # Connetto il socket a quel client sock.connect(client_addr) # Creo un wrapper per iniettare il primo dato class UDPSocketWithFirst: def __init__(self, sock, first_data): self.sock = sock self.first_data = first_data self.first_sent = False def recv(self, bufsize): if not self.first_sent and self.first_data is not None: self.first_sent = True return self.first_data return self.sock.recv(bufsize) def sendall(self, data): return self.sock.send(data) def close(self): self.sock.close() wrapped = UDPSocketWithFirst(sock, data) handler = ConnectionHandler(wrapped, client_addr, args.verbose, args.execute, close_on_eof=False, timeout=args.timeout) handler.start() else: # TCP if args.keep: # Loop accettazione while True: conn, client_addr = sock.accept() if args.verbose: print(f"[*] Connessione da {client_addr[0]}:{client_addr[1]}", file=sys.stderr) handler = ConnectionHandler(conn, client_addr, args.verbose, args.execute, close_on_eof=False, timeout=args.timeout) # Ogni connessione in un thread separato t = threading.Thread(target=handler.start, daemon=True) t.start() else: conn, client_addr = sock.accept() if args.verbose: print(f"[*] Connessione da {client_addr[0]}:{client_addr[1]}", file=sys.stderr) handler = ConnectionHandler(conn, client_addr, args.verbose, args.execute, close_on_eof=False, timeout=args.timeout) handler.start() else: # Modalità client if not args.host: parser.error("In modalità client è richiesto l'host") # Crea socket connesso sock, sa = create_client_socket(args.host, args.port, family, args.udp, args.source, args.timeout, args.numeric) if args.verbose: proto = "UDP" if args.udp else "TCP" print(f"[*] Connesso {proto} a {sa[0]}:{sa[1]}", file=sys.stderr) handler = ConnectionHandler(sock, sa, args.verbose, args.execute, args.close_on_eof, args.timeout) handler.start()