From 283ecc19ddb32d770b349330f25c444246252b65 Mon Sep 17 00:00:00 2001 From: Codex Date: Mon, 25 May 2026 22:19:06 +0800 Subject: [PATCH 1/4] Build pure Python reconnaissance workflow --- .gitignore | 7 + README.md | 61 +- requirements.txt | 2 - searchmap.py | 1401 ++++++++++++++++++++++++++++++++++------------ 4 files changed, 1099 insertions(+), 372 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d348640 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +__pycache__/ +*.py[cod] +*.log +findings*.csv +result*.json +searchmap-*.json +.DS_Store diff --git a/README.md b/README.md index 14064b9..d093f4e 100644 --- a/README.md +++ b/README.md @@ -1,28 +1,27 @@ -**SearchMap_V1.0.3** +**SearchMap_V1.1.0** -searchmap是一款集**域名解析、IP反查域名、WHOIS查询、CDN检测、端口扫描、目录扫描、子域名挖掘**为一体的前渗透测试综合信息收集工具。新版本在原版基础上进行了**全面重构**,专注于提升**稳定性、性能和结果的可靠性**。它用更健壮的API和并发模型取代了原先脆弱的网页抓取逻辑,并增加了更丰富的信息展示,旨在成为您侦察阶段的得力助手。 +searchmap是一款集**域名解析、DNS记录枚举、WHOIS查询、CDN检测、纯Python端口扫描、TLS证书识别、HTTP指纹、目录扫描、子域名挖掘、结构化导出**为一体的前渗透测试综合信息收集工具。新版本继续强化**稳定性、性能和结果可靠性**,移除了对 `nmap`、IP归属地API、反查网页等外部工具/第三方数据接口的依赖,核心侦察能力尽量由本地Python网络能力完成,适合授权安全自查和资产梳理。 ![image](https://user-images.githubusercontent.com/67818638/133013451-1d3f8310-6c17-4985-b526-9d9af9e8302c.png) ## 一.功能特性 -- **域名/IP基础信息**: 快速解析域名,获取IP地址列表,并自动查询所有IP的地理位置。 +- **域名/IP基础信息**: 快速解析域名,获取IPv4/IPv6地址、PTR记录和公网/内网/保留地址分类。 +- **DNS记录枚举**: 自动收集A、AAAA、CNAME、NS、MX、TXT、SOA、CAA等关键记录。 - **WHOIS查询**: 获取域名的详细注册信息。 -- **多节点DNS检测 (CDN识别)**: 通过并行查询全球多个地区的公共DNS服务器,高效、稳定地判断目标是否使用CDN或负载均衡。 -- **IP归属地查询**: 所有展示IP地址的地方(基础信息、DNS检测)都会自动附带其物理归属地,信息更直观。 -- **Nmap端口扫描**: 集成Nmap,可对目标IP进行快速的端口和服务扫描。 -- **多线程目录与子域名爆破**: 高效的并发引擎,快速对目标进行目录和子域名探测。 +- **多解析器DNS检测 (CDN识别)**: 并行查询多个公共DNS解析器,通过IP差异和CDN CNAME特征判断目标是否使用CDN或负载均衡。 +- **HTTP/TLS指纹**: 自动识别网站标题、Server、X-Powered-By、常见技术栈、安全响应头、robots/sitemap/security.txt,以及TLS版本、证书主体、颁发者、有效期和SHA256指纹。 +- **纯Python端口扫描**: 使用TCP connect扫描和Banner探测替代Nmap,不需要安装外部二进制工具。 +- **多线程目录与子域名爆破**: 高效的并发引擎,目录扫描内置软404基线过滤,子域名扫描内置泛解析过滤。 - **批量处理**: 支持从文件读取多个目标进行批量扫描。 -- **日志记录**: 可将所有扫描结果输出到日志文件,方便归档和分析。 +- **日志与结构化输出**: 支持控制台日志、JSON结果和CSV发现项导出,方便归档和二次分析。 ## 二.安装说明 1.工具使用python3开发,请确保您的电脑上已经安装了python3环境。 -2.工具的端口扫描功能调用了nmap接口,请确保您的电脑已安装nmap。 +2.首次使用请使用 **python3 -m pip install -r requirements.txt** 命令,来安装必要的Python依赖包。端口扫描不再依赖nmap等外部二进制工具。 -3.首次使用请使用 **python3 -m pip install -r requirements.txt** 命令,来安装必要的外部依赖包。 - -4.本机未安装pip工具的请使用如下命令来进行安装: +3.本机未安装pip工具的请使用如下命令来进行安装: ``` $ curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py # 下载安装脚本 @@ -50,7 +49,7 @@ $ python3 searchmap.py -u 123.123.123.123 image -**2.-p 使用nmap进行隐式端口扫描** +**2.-p 使用纯Python TCP connect进行端口扫描** ``` $ python3 searchmap.py -u https://www.baidu.com -p @@ -111,7 +110,28 @@ $ python3 searchmap.py -u https://www.baidu.com -o myscan.log $ python3 searchmap.py -u https://www.baidu.com -a -t 50 ``` -**10.组合用法** +**10.--ports 自定义端口集合** + +``` +# 支持 top100、web、单端口、逗号列表和端口范围 +$ python3 searchmap.py -u https://www.baidu.com -p --ports web +$ python3 searchmap.py -u https://www.baidu.com -p --ports 80,443,8000-8100 +``` + +**11.--json-out / --csv-out 导出结构化结果** + +``` +$ python3 searchmap.py -u https://www.baidu.com -a --json-out result.json --csv-out findings.csv +``` + +**12.--dict / --subdict 指定目录和子域名字典** + +``` +$ python3 searchmap.py -u https://www.baidu.com -d --dict dict/fuzz.txt +$ python3 searchmap.py -u https://www.baidu.com -s --subdict dict/subdomain.txt +``` + +**13.组合用法** ``` $ python3 searchmap.py -u https://www.baidu.com -p -n -d -s @@ -148,6 +168,19 @@ $ python3 searchmap.py -r myurl.txt -p -n -d -s ## 四.更新日志 +********* +**Version1.1.0_UpdateLog** +------------------------------------- +1. **外部工具依赖移除**: 端口扫描由Nmap改为纯Python TCP connect扫描,并加入Banner/TLS探测。 +2. **第三方数据API依赖移除**: 删除ipinfo.io归属地查询和ip138网页反查,改为本地可完成的IP分类、PTR、DNS、WHOIS、HTTP/TLS信息收集。 +3. **新增HTTP/TLS指纹**: 自动采集网站标题、响应头、技术栈、安全响应头、robots/sitemap/security.txt和TLS证书信息。 +4. **新增DNS记录枚举**: 支持A、AAAA、CNAME、NS、MX、TXT、SOA、CAA记录收集。 +5. **端口扫描增强**: 新增`--ports`参数,支持`top100`、`web`、端口列表和范围。 +6. **目录扫描增强**: 加入随机基线软404过滤,降低误报。 +7. **子域名扫描增强**: 加入泛解析识别和过滤。 +8. **结构化输出**: 新增`--json-out`和`--csv-out`,便于归档、比对和后续分析。 +9. **健壮性优化**: 重写目标解析、IPv6 URL处理、线程数限制、超时控制和错误收集。 + ********* **Version1.0.3_UpdateLog** ------------------------------------- diff --git a/requirements.txt b/requirements.txt index 556cc52..fc19319 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,5 @@ requests python-whois -python-nmap colorama tqdm dnspython -tldextract diff --git a/searchmap.py b/searchmap.py index 47c3617..f8279d5 100644 --- a/searchmap.py +++ b/searchmap.py @@ -2,59 +2,129 @@ # -*- coding: utf-8 -*- import argparse -import requests -import socket -import re -import whois -import nmap +import csv +import hashlib +import ipaddress import json -import zlib +import os import random -import string -import colorama +import re +import socket +import ssl import sys -import os +import tempfile +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Optional +from urllib.parse import urlparse, urlunparse + +import colorama +import dns.exception import dns.resolver -import tldextract +import requests +import whois from tqdm import tqdm -from concurrent.futures import ThreadPoolExecutor, as_completed -from datetime import datetime -# 禁用requests的InsecureRequestWarning警告 -requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) -# --- 工具信息 --- +requests.packages.urllib3.disable_warnings( + requests.packages.urllib3.exceptions.InsecureRequestWarning +) + +VERSION = "1.1.0" +LAST_UPDATED = "2026.05.25" + +DEFAULT_RESOLVERS = { + "Google": "8.8.8.8", + "Cloudflare": "1.1.1.1", + "Quad9": "9.9.9.9", + "OpenDNS": "208.67.222.222", + "AliDNS": "223.5.5.5", + "DNSPod": "119.29.29.29", + "NTT": "129.250.35.250", + "CleanBrowsing": "185.228.168.9", +} + +DNS_RECORD_TYPES = ("A", "AAAA", "CNAME", "NS", "MX", "TXT", "SOA", "CAA") + +TOP_100_PORTS = ( + 7, 9, 13, 21, 22, 23, 25, 26, 37, 53, + 79, 80, 81, 88, 106, 110, 111, 113, 119, 135, + 139, 143, 144, 179, 199, 389, 427, 443, 444, 445, + 465, 513, 514, 515, 543, 544, 548, 554, 587, 631, + 646, 873, 990, 993, 995, 1025, 1026, 1027, 1028, 1029, + 1110, 1433, 1720, 1723, 1755, 1900, 2000, 2001, 2049, 2121, + 2717, 3000, 3128, 3306, 3389, 3986, 4899, 5000, 5009, 5051, + 5060, 5101, 5190, 5357, 5432, 5631, 5666, 5800, 5900, 6000, + 6001, 6646, 7070, 8000, 8008, 8009, 8080, 8081, 8443, 8888, + 9100, 9999, 10000, 32768, 49152, 49153, 49154, 49155, 49156, 49157, +) + +WEB_PORTS = {80, 81, 443, 8000, 8008, 8080, 8081, 8443, 8888, 10000} +TLS_PORTS = {443, 465, 636, 990, 993, 995, 8443} +INTERESTING_DIR_STATUS = {200, 201, 204, 301, 302, 307, 308, 401, 403, 405} + +SECURITY_HEADERS = ( + "strict-transport-security", + "content-security-policy", + "x-frame-options", + "x-content-type-options", + "referrer-policy", + "permissions-policy", +) + +CDN_HINTS = ( + "akamai", "alicdn", "azureedge", "baiduyun", "cachefly", "cdn", + "cloudflare", "cloudfront", "dnsv1", "edgecast", "edgekey", + "edgesuite", "fastly", "incapdns", "kunlun", "qiniu", "tcdn", + "tencent", "yunjiasu", +) + +COMMON_MULTI_PART_SUFFIXES = { + "ac.cn", "ah.cn", "bj.cn", "com.cn", "cq.cn", "edu.cn", "fj.cn", + "gd.cn", "gov.cn", "gs.cn", "gx.cn", "gz.cn", "ha.cn", "hb.cn", + "he.cn", "hi.cn", "hk.cn", "hl.cn", "hn.cn", "jl.cn", "js.cn", + "jx.cn", "ln.cn", "mo.cn", "net.cn", "nm.cn", "nx.cn", "org.cn", + "qh.cn", "sc.cn", "sd.cn", "sh.cn", "sn.cn", "sx.cn", "tj.cn", + "tw.cn", "xj.cn", "xz.cn", "yn.cn", "zj.cn", "co.jp", "ne.jp", + "or.jp", "ac.jp", "go.jp", "co.kr", "ne.kr", "or.kr", "re.kr", + "co.uk", "org.uk", "ac.uk", "gov.uk", "net.uk", "com.au", + "net.au", "org.au", "edu.au", "gov.au", "co.nz", "org.nz", +} + + def banner(): - """打印Banner和版本信息""" colorama.init(autoreset=True) - print(colorama.Fore.CYAN + r""" - ____ _ __ __ -/ ___| ___ __ _ _ __ ___| |__ | \/ | __ _ _ __ -\___ \ / _ \/ _` | '__/ __| '_ \| |\/| |/ _` | '_ \ + print(colorama.Fore.CYAN + rf""" + ____ _ __ __ +/ ___| ___ __ _ _ __ ___| |__ | \/ | __ _ _ __ +\___ \ / _ \/ _` | '__/ __| '_ \| |\/| |/ _` | '_ \ ___) | __/ (_| | | | (__| | | | | | | (_| | |_) | -|____/ \___|\__,_|_| \___|_| |_|_| |_|\__,_| .__/ - |_| V1.0.3 +|____/ \___|\__,_|_| \___|_| |_|_| |_|\__,_| .__/ + |_| V{VERSION} """) print(colorama.Fore.GREEN + "# Coded by Asaotomo") - print(colorama.Fore.GREEN + "# Last Updated: 2025.07.22") + print(colorama.Fore.GREEN + f"# Last Updated: {LAST_UPDATED}") + print(colorama.Fore.YELLOW + "# Pure Python reconnaissance, no nmap or third-party data API required") -# --- 日志记录类 --- class Logger(object): - """将输出同时打印到控制台和文件""" + """将输出同时打印到控制台和文件,并在日志中移除 ANSI 颜色控制符。""" + def __init__(self, filename="Default.log"): self.terminal = sys.stdout - self.ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]') + self.ansi_escape = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]") try: - self.log = open(filename, "w", encoding='utf-8') - except IOError as e: - print(colorama.Fore.RED + f"[Error] Cannot open log file {filename}: {e}") + self.log = open(filename, "w", encoding="utf-8") + except IOError as exc: + print(colorama.Fore.RED + f"[Error] Cannot open log file {filename}: {exc}") self.log = None def write(self, message): self.terminal.write(message) if self.log: - self.log.write(self.ansi_escape.sub('', message)) + self.log.write(self.ansi_escape.sub("", message)) self.log.flush() def flush(self): @@ -62,361 +132,889 @@ def flush(self): if self.log: self.log.flush() -# --- 主扫描类 --- + +@dataclass +class TargetInfo: + raw: str + host: str + scheme: Optional[str] + port: Optional[int] + path: str + query: str + + class SearchMap: - def __init__(self, target, threads=20): - self.target_raw = target - self.target_url = self._normalize_url(target) # This will be used for domain targets - self.target_domain = self._get_domain_from_url(self.target_url) - self.threads = threads + def __init__( + self, + target, + threads=20, + timeout=5.0, + ports=None, + dir_dict="dict/fuzz.txt", + sub_dict="dict/subdomain.txt", + resolvers=None, + ): + self.target = self._parse_target(target) + self.threads = max(1, min(int(threads), 256)) + self.timeout = max(0.5, float(timeout)) + self.port_spec = ports or "top100" + self.dir_dict = dir_dict + self.sub_dict = sub_dict + self.resolvers = resolvers or DEFAULT_RESOLVERS self.headers = self._get_random_header() - self.session = requests.Session() - self.session.headers.update(self.headers) self.ip_list = [] - self.results = {} + self.working_web_url = None + self.results = { + "target": self.target.raw, + "host": self.target.host, + "scan_time": datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z"), + "basic": {}, + "dns_records": {}, + "http": [], + "tls": [], + "cdn": {}, + "ports": [], + "directories": [], + "subdomains": [], + "errors": [], + } - # --- 内部辅助方法 --- @staticmethod def _get_random_header(): - lib = ["Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.109 Safari/537.36", - "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:97.0) Gecko/20100101 Firefox/97.0"] - return {"User-Agent": random.choice(lib)} + user_agents = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/124.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/124.0 Safari/537.36", + "Mozilla/5.0 (X11; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0", + ] + return {"User-Agent": random.choice(user_agents), "Accept": "*/*"} @staticmethod - def _normalize_url(url): - if not re.match(r'http(s)?://', url): - # For IP addresses, this default might be overridden later by smart check - return "https://" + url - return url + def _parse_target(raw): + target = raw.strip() + if not target: + raise ValueError("empty target") - @staticmethod - def _get_domain_from_url(url): - netloc_part = url.split("://")[1].split("/")[0] - if ":" in netloc_part: - return netloc_part.split(":")[0] - else: - return netloc_part + has_scheme = bool(re.match(r"^[a-zA-Z][a-zA-Z0-9+.-]*://", target)) + parsed = urlparse(target if has_scheme else f"//{target}") + host = parsed.hostname + if not host: + raise ValueError(f"cannot parse host from target: {raw}") + + try: + port = parsed.port + except ValueError as exc: + raise ValueError(f"invalid port in target: {raw}") from exc + + return TargetInfo( + raw=target, + host=host.strip("[]").lower(), + scheme=parsed.scheme.lower() if has_scheme else None, + port=port, + path=parsed.path or "/", + query=parsed.query or "", + ) @staticmethod def _is_ip(address): try: - socket.inet_aton(address) + ipaddress.ip_address(address) return True - except socket.error: + except ValueError: return False - def _get_ip_location(self, ip): - """获取单个IP的地理位置 (使用ipinfo.io API)""" + @staticmethod + def _format_host_for_url(host): + if SearchMap._is_ip(host) and ":" in host: + return f"[{host}]" + return host + + @staticmethod + def _ip_profile(ip): try: - api_url = f"https://ipinfo.io/{ip}/json" - res = self.session.get(api_url, timeout=3) - res.raise_for_status() - data = res.json() - city = data.get('city', '') - region = data.get('region', '') - country = data.get('country', '') - location_parts = [part for part in [city, region, country] if part] - if location_parts: - return ", ".join(location_parts) - else: - return "Location data not found" - except (requests.RequestException, json.JSONDecodeError): - return "Lookup Failed" + obj = ipaddress.ip_address(ip) + except ValueError: + return "unknown" + flags = [] + if obj.is_private: + flags.append("private") + if obj.is_global: + flags.append("global") + if obj.is_loopback: + flags.append("loopback") + if obj.is_reserved: + flags.append("reserved") + if obj.is_multicast: + flags.append("multicast") + return ", ".join(flags) or "public" + + @staticmethod + def _registrable_domain(host): + if SearchMap._is_ip(host): + return None + parts = [part for part in host.strip(".").lower().split(".") if part] + if len(parts) < 2: + return None + suffix2 = ".".join(parts[-2:]) + if suffix2 in COMMON_MULTI_PART_SUFFIXES and len(parts) >= 3: + return ".".join(parts[-3:]) + return ".".join(parts[-2:]) + + @staticmethod + def _strip_control(value, max_len=220): + text = re.sub(r"[\x00-\x08\x0b-\x1f\x7f]+", " ", value or "") + text = re.sub(r"\s+", " ", text).strip() + return text[:max_len] + + @staticmethod + def _response_signature(response): + body = response.text or "" + title = SearchMap._extract_title(body) + normalized = re.sub(r"\d{4,}", "N", body) + normalized = re.sub(r"[a-f0-9]{16,}", "H", normalized, flags=re.I) + digest = hashlib.sha1(normalized[:8192].encode("utf-8", "ignore")).hexdigest() + return { + "status": response.status_code, + "length": len(response.content or b""), + "title": title, + "hash": digest, + } + + @staticmethod + def _extract_title(html): + match = re.search(r"]*>(.*?)", html or "", re.I | re.S) + if not match: + return "" + title = re.sub(r"\s+", " ", match.group(1)).strip() + return SearchMap._strip_control(title, 160) + + @staticmethod + def _extract_generator(html): + match = re.search( + r']+name=["\']generator["\'][^>]+content=["\']([^"\']+)', + html or "", + re.I, + ) + if match: + return SearchMap._strip_control(match.group(1), 120) + return "" def _print_info(self, key, value, color=colorama.Fore.CYAN, indent=0): + if value in (None, "", [], {}): + return indent_space = " " * indent - if value: - if isinstance(value, list): - if len(value) > 0: - if isinstance(value[0], datetime): - value_str = ", ".join([item.strftime('%Y-%m-%d %H:%M:%S') for item in value]) - else: - value_str = ", ".join(map(str, value)) - print(f"{indent_space}{colorama.Fore.GREEN}[{key}]: {color}{value_str}") - else: - print(f"{indent_space}{colorama.Fore.GREEN}[{key}]: {color}{value}") + if isinstance(value, list): + value = ", ".join(map(str, value)) + print(f"{indent_space}{colorama.Fore.GREEN}[{key}]: {color}{value}") - # --- 核心扫描功能 --- - def get_base_info(self): - print("\n" + "="*20 + " Basic Information " + "="*20) + def _build_url(self, scheme, path="/"): + host = self._format_host_for_url(self.target.host) + port = f":{self.target.port}" if self.target.port else "" + if not path.startswith("/"): + path = f"/{path}" + return f"{scheme}://{host}{port}{path}" + + def _target_full_url(self): + if not self.target.scheme: + return None + netloc = self._format_host_for_url(self.target.host) + if self.target.port: + netloc = f"{netloc}:{self.target.port}" + return urlunparse( + ( + self.target.scheme, + netloc, + self.target.path or "/", + "", + self.target.query, + "", + ) + ) + + def _web_candidates(self): + if self.target.scheme: + return [self._target_full_url()] + if self.target.port: + schemes = ["https", "http"] if self.target.port in TLS_PORTS else ["http", "https"] + return [self._build_url(scheme, self.target.path or "/") for scheme in schemes] + return [ + self._build_url("https", self.target.path or "/"), + self._build_url("http", self.target.path or "/"), + ] + + def _web_root_candidates(self): + roots = [] + for url in self._web_candidates(): + parsed = urlparse(url) + roots.append(urlunparse((parsed.scheme, parsed.netloc, "/", "", "", ""))) + return list(dict.fromkeys(roots)) + + def _request(self, method, url, allow_redirects=True): + return requests.request( + method, + url, + headers=self.headers, + timeout=self.timeout, + verify=False, + allow_redirects=allow_redirects, + ) + + def _resolve_addresses(self, host=None): + host = host or self.target.host + if self._is_ip(host): + return [host] + addresses = set() try: - addrs = socket.getaddrinfo(self.target_domain, None) - self.ip_list = sorted(list(set(item[4][0] for item in addrs))) - except socket.gaierror as e: - self._print_info("Domain Resolution Error", str(e), color=colorama.Fore.RED) - return + for item in socket.getaddrinfo(host, None, proto=socket.IPPROTO_TCP): + addresses.add(item[4][0]) + except socket.gaierror as exc: + self.results["errors"].append(f"resolve {host}: {exc}") + return sorted(addresses, key=lambda value: (":" in value, value)) + + def _resolve_record(self, qtype, nameserver=None, host=None): + resolver = dns.resolver.Resolver() + resolver.timeout = self.timeout + resolver.lifetime = self.timeout + if nameserver: + resolver.nameservers = [nameserver] + try: + answers = resolver.resolve(host or self.target.host, qtype) + return sorted({answer.to_text().strip('"') for answer in answers}) + except (dns.exception.DNSException, OSError): + return [] + def get_base_info(self): + print("\n" + "=" * 20 + " Basic Information " + "=" * 20) + self._print_info("Target", self.target.raw) + self._print_info("Host", self.target.host) + self._print_info("Target Type", "IP address" if self._is_ip(self.target.host) else "Domain") + + self.ip_list = self._resolve_addresses() + self.results["basic"]["ips"] = self.ip_list if self.ip_list: - ip_to_location = {} - with ThreadPoolExecutor(max_workers=len(self.ip_list) or 1) as executor: - future_to_ip = {executor.submit(self._get_ip_location, ip): ip for ip in self.ip_list} - for future in as_completed(future_to_ip): - ip = future_to_ip[future] - try: - location = future.result() - ip_to_location[ip] = location - except Exception: - ip_to_location[ip] = "Lookup Failed" - - ips_with_location = [f"{ip}({ip_to_location.get(ip, 'N/A')})" for ip in self.ip_list] - if len(ips_with_location) > 1: - self._print_info("IP Addresses", ", ".join(ips_with_location)) - print(colorama.Fore.YELLOW + "[Ps] Multiple IPs found, CDN may be in use.") - elif ips_with_location: - self._print_info("IP Address", ips_with_location[0]) - - # --- 智能获取网站标题 --- - url_for_title = None - netloc = self.target_url.split("://")[1].split("/")[0] - - if self._is_ip(self.target_domain): - # The target is an IP. Check if a port was specified in the original input. - if ":" in netloc: - # A port was specified (e.g., "10.204.1.249:65000"). Use the full URL directly. - url_for_title = self.target_url - else: - # No port was specified (it was a pure IP). Check common web ports. - print(colorama.Fore.YELLOW + "[Info] Target is an IP, checking for web ports (80, 443)...") - s_443 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s_443.settimeout(1.0) - if s_443.connect_ex((self.target_domain, 443)) == 0: - url_for_title = f"https://{self.target_domain}" - s_443.close() - - if not url_for_title: - s_80 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s_80.settimeout(1.0) - if s_80.connect_ex((self.target_domain, 80)) == 0: - url_for_title = f"http://{self.target_domain}" - s_80.close() + enriched = [] + for ip in self.ip_list: + ptr = self._reverse_dns(ip) + label = f"{ip}({self._ip_profile(ip)})" + if ptr: + label = f"{label} PTR={ptr}" + enriched.append(label) + self._print_info("Resolved IPs", enriched) + if len(self.ip_list) > 1: + print(colorama.Fore.YELLOW + "[Ps] Multiple IPs found; CDN or load balancing may be in use.") else: - # For domain names, use the normalized URL from initialization. - url_for_title = self.target_url + self._print_info("Domain Resolution Error", "No address records found", colorama.Fore.RED) - # --- 获取标题 --- - if url_for_title: - try: - res = self.session.get(url_for_title, verify=False, timeout=5) - res.encoding = res.apparent_encoding - title_match = re.search("(.*?)", res.text, re.S) - title = title_match.group(1).strip() if title_match else "No Title Found" - self._print_info("Website Title", title) - except requests.RequestException as e: - self._print_info("Website Title", f"Failed to fetch title from {url_for_title}: {e}", color=colorama.Fore.RED) + if not self._is_ip(self.target.host): + self.dns_record_scan() + self._whois_lookup() else: - self._print_info("Website Title", "No web service found on common ports (80, 443)", color=colorama.Fore.YELLOW) + self._reverse_ip_ptrs() + + self.http_fingerprint() + self.tls_fingerprint() + + def _reverse_dns(self, ip): + try: + return socket.gethostbyaddr(ip)[0] + except (socket.herror, socket.gaierror, OSError): + return "" + + def _reverse_ip_ptrs(self): + print(colorama.Fore.GREEN + "\n[Reverse DNS]:") + for ip in self.ip_list or [self.target.host]: + ptr = self._reverse_dns(ip) + if ptr: + print(colorama.Fore.CYAN + f" - {ip} -> {ptr}") + else: + print(colorama.Fore.YELLOW + f" - {ip}: no PTR record") + + def _whois_lookup(self): + print(colorama.Fore.GREEN + "\n[WHOIS Information]:") + try: + whois_info = whois.whois(self.target.host) + compact = {} + for key, value in whois_info.items(): + if value in (None, "", [], {}): + continue + compact[key] = value + self._print_info(key.capitalize(), value, indent=2) + self.results["basic"]["whois"] = compact + except Exception as exc: + self._print_info("WHOIS Error", str(exc), colorama.Fore.RED, indent=2) + self.results["errors"].append(f"whois: {exc}") + + def dns_record_scan(self): + print("\n" + "=" * 20 + " DNS Records " + "=" * 20) + if self._is_ip(self.target.host): + print(colorama.Fore.YELLOW + "[Skip] DNS record scan is for domain targets.") + return + + records = {} + for qtype in DNS_RECORD_TYPES: + values = self._resolve_record(qtype) + if values: + records[qtype] = values + self._print_info(qtype, values) - - # --- IP反查或WHOIS --- - if self._is_ip(self.target_domain): - print(colorama.Fore.GREEN + "\n[Bound Domains on IP (Reverse IP Lookup)]:") + if not records: + print(colorama.Fore.YELLOW + "[Info] No DNS records returned by the default resolver.") + self.results["dns_records"] = records + + def http_fingerprint(self): + print("\n" + "=" * 20 + " HTTP Fingerprint " + "=" * 20) + findings = [] + + for url in self._web_candidates(): try: - rev_url = f"https://site.ip138.com/{self.target_domain}/" - res = self.session.get(rev_url, timeout=10) - domains = re.findall('(.*?)(.*?)', res.text, re.S) - if domains: - for date, domain, _ in domains: - print(colorama.Fore.CYAN + f" - {domain} ({date})") - else: - print(colorama.Fore.YELLOW + " No bound domains found.") - except requests.RequestException: - print(colorama.Fore.RED + " Failed to perform reverse IP lookup.") - else: - print(colorama.Fore.GREEN + "\n[WHOIS Information]:") + response = self._request("GET", url, allow_redirects=True) + except requests.RequestException as exc: + self._print_info("HTTP Probe Failed", f"{url} -> {exc}", colorama.Fore.YELLOW) + continue + + parsed_final = urlparse(response.url) + self.working_web_url = urlunparse((parsed_final.scheme, parsed_final.netloc, "/", "", "", "")) + headers = {key.lower(): value for key, value in response.headers.items()} + title = self._extract_title(response.text) + generator = self._extract_generator(response.text) + technologies = self._detect_technologies(headers, response.text) + present_security = [header for header in SECURITY_HEADERS if header in headers] + missing_security = [header for header in SECURITY_HEADERS if header not in headers] + + result = { + "url": url, + "final_url": response.url, + "status": response.status_code, + "title": title, + "server": response.headers.get("Server", ""), + "powered_by": response.headers.get("X-Powered-By", ""), + "content_type": response.headers.get("Content-Type", ""), + "content_length": len(response.content or b""), + "redirects": [item.status_code for item in response.history], + "generator": generator, + "technologies": technologies, + "security_headers_present": present_security, + "security_headers_missing": missing_security, + "well_known": self._probe_well_known(self.working_web_url), + } + findings.append(result) + + self._print_info("URL", response.url) + self._print_info("Status", response.status_code) + self._print_info("Title", title or "No Title Found") + self._print_info("Server", result["server"]) + self._print_info("X-Powered-By", result["powered_by"]) + self._print_info("Content-Type", result["content_type"]) + self._print_info("Content-Length", result["content_length"]) + self._print_info("Generator", generator) + self._print_info("Technologies", technologies) + self._print_info("Security Headers Present", present_security) + self._print_info("Security Headers Missing", missing_security, colorama.Fore.YELLOW) + for item in result["well_known"]: + print(colorama.Fore.BLUE + f" - {item['path']} -> {item['status']} {item['url']}") + break + + if not findings: + print(colorama.Fore.YELLOW + "[Info] No HTTP service responded on the candidate URL(s).") + self.results["http"] = findings + + def _detect_technologies(self, headers, html): + tech = set() + server = headers.get("server", "").lower() + powered_by = headers.get("x-powered-by", "").lower() + cookies = headers.get("set-cookie", "").lower() + body = (html or "").lower() + + header_map = { + "nginx": "nginx", + "openresty": "OpenResty", + "apache": "Apache", + "iis": "Microsoft IIS", + "cloudflare": "Cloudflare", + "tengine": "Tengine", + "gunicorn": "Gunicorn", + "werkzeug": "Werkzeug", + } + for needle, label in header_map.items(): + if needle in server: + tech.add(label) + + if powered_by: + tech.add(f"X-Powered-By: {self._strip_control(headers.get('x-powered-by', ''), 80)}") + if "phpsessid" in cookies or ".php" in body: + tech.add("PHP") + if "jsessionid" in cookies: + tech.add("Java") + if "asp.net_sessionid" in cookies or "x-aspnet-version" in headers: + tech.add("ASP.NET") + if "wp-content" in body or "wp-json" in body: + tech.add("WordPress") + if "drupal.settings" in body or "/sites/default/" in body: + tech.add("Drupal") + if "joomla" in body or "/media/system/js/" in body: + tech.add("Joomla") + if "__next_data__" in body: + tech.add("Next.js") + if "nuxt" in body: + tech.add("Nuxt") + if "vite" in body: + tech.add("Vite") + if "react" in body: + tech.add("React") + if "vue" in body: + tech.add("Vue") + return sorted(tech) + + def _probe_well_known(self, root_url): + findings = [] + for path in ("/robots.txt", "/sitemap.xml", "/.well-known/security.txt"): + url = root_url.rstrip("/") + path try: - whois_info = whois.whois(self.target_domain) - for key, value in whois_info.items(): - self._print_info(f"{key.capitalize()}", value, indent=2) - except Exception as e: - self._print_info("WHOIS Error", str(e), color=colorama.Fore.RED, indent=2) + response = self._request("GET", url, allow_redirects=False) + except requests.RequestException: + continue + if response.status_code in INTERESTING_DIR_STATUS: + findings.append({"path": path, "status": response.status_code, "url": url}) + return findings - def port_scan(self): - if not self.ip_list: - print(colorama.Fore.RED + "[Error] No IP addresses to scan. Run basic info scan first.") - return - - print("\n" + "="*20 + " Port Scan " + "="*20) - arguments = '-sS -T4 -Pn' - nm = nmap.PortScanner() - + def tls_fingerprint(self): + print("\n" + "=" * 20 + " TLS Certificate " + "=" * 20) + targets = [] + if self.target.scheme == "https": + targets.append((self.target.host, self.target.port or 443)) + elif self.target.port in TLS_PORTS: + targets.append((self.target.host, self.target.port)) + elif not self.target.port: + targets.append((self.target.host, 443)) + + seen = set() + results = [] + for host, port in targets: + if (host, port) in seen: + continue + seen.add((host, port)) + result = self._read_tls_certificate(host, port) + if result: + results.append(result) + self._print_info("Endpoint", f"{host}:{port}") + self._print_info("TLS Version", result.get("tls_version")) + self._print_info("Cipher", result.get("cipher")) + self._print_info("Subject", result.get("subject")) + self._print_info("Issuer", result.get("issuer")) + self._print_info("Not Before", result.get("not_before")) + self._print_info("Not After", result.get("not_after")) + self._print_info("SAN Count", result.get("san_count")) + self._print_info("SHA256", result.get("sha256")) + + if not results: + print(colorama.Fore.YELLOW + "[Info] No TLS certificate could be collected.") + self.results["tls"] = results + + def _read_tls_certificate(self, host, port): + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + sock = None try: - is_root = (os.getuid() == 0) - except AttributeError: - is_root = True - - if not is_root: - print(colorama.Fore.YELLOW + "[Warning] Not running as root. SYN scan (-sS) may fail or require password.") - print(colorama.Fore.YELLOW + " Falling back to TCP connect scan (-sT).") - arguments = '-sT -T4 -Pn' - - for ip in self.ip_list: - self._print_info("Scanning Ports for", ip) - try: - nm.scan(hosts=ip, arguments=arguments) - if ip not in nm.all_hosts(): - print(colorama.Fore.RED + f" Nmap scan failed for {ip}. Host might be down or blocking scans.") - continue - scan_info = nm[ip] - if 'tcp' in scan_info: - for port, port_info in scan_info['tcp'].items(): - service_info = f"{port_info['name']} {port_info.get('version', '')}" - print(f" - Port {port:<5} ({port_info['state']:<7}): {service_info.strip()}") - else: - print(colorama.Fore.YELLOW + " No open TCP ports found.") - except nmap.nmap.PortScannerError as e: - print(colorama.Fore.RED + f" Nmap error: {e}") - - def _dns_worker(self, resolver_ip): - resolver = dns.resolver.Resolver() - resolver.nameservers = [resolver_ip] - resolver.timeout = 5 - resolver.lifetime = 5 + sock = socket.create_connection((host, port), timeout=self.timeout) + server_name = None if self._is_ip(host) else host + with context.wrap_socket(sock, server_hostname=server_name) as tls_sock: + der_cert = tls_sock.getpeercert(binary_form=True) + tls_version = tls_sock.version() + cipher = tls_sock.cipher() + except (OSError, ssl.SSLError): + if sock: + sock.close() + return None + + sha256 = hashlib.sha256(der_cert).hexdigest() if der_cert else "" + decoded = self._decode_certificate(ssl.DER_cert_to_PEM_cert(der_cert)) if der_cert else {} + sans = [item[1] for item in decoded.get("subjectAltName", []) if item[0].lower() == "dns"] + return { + "host": host, + "port": port, + "tls_version": tls_version, + "cipher": cipher[0] if cipher else "", + "subject": self._format_cert_name(decoded.get("subject")), + "issuer": self._format_cert_name(decoded.get("issuer")), + "not_before": decoded.get("notBefore", ""), + "not_after": decoded.get("notAfter", ""), + "san_count": len(sans), + "sans": sans[:50], + "sha256": sha256, + } + + @staticmethod + def _decode_certificate(pem): + tmp_path = None try: - answers = resolver.resolve(self.target_domain, 'A') - results_with_location = [] - with ThreadPoolExecutor(max_workers=len(answers) or 1) as executor: - future_to_ip = {executor.submit(self._get_ip_location, answer.to_text()): answer.to_text() for answer in answers} - ip_to_location = {} - for future in as_completed(future_to_ip): - ip = future_to_ip[future] - try: - location = future.result() - ip_to_location[ip] = location - except Exception: - ip_to_location[ip] = "Lookup Failed" - for ip, loc in ip_to_location.items(): - results_with_location.append((ip, loc)) - return sorted(results_with_location) + with tempfile.NamedTemporaryFile("w", encoding="ascii", delete=False) as tmp: + tmp.write(pem) + tmp_path = tmp.name + return ssl._ssl._test_decode_cert(tmp_path) except Exception: - return None + return {} + finally: + if tmp_path: + try: + os.unlink(tmp_path) + except OSError: + pass + + @staticmethod + def _format_cert_name(value): + if not value: + return "" + parts = [] + for group in value: + for key, item in group: + parts.append(f"{key}={item}") + return ", ".join(parts) def multi_location_dns_check(self): - print("\n" + "="*20 + " Multi-Location DNS Check " + "="*20) - - resolvers = { - "Google (USA)": "8.8.8.8", - "Cloudflare (Global)": "1.1.1.1", - "OpenDNS (USA)": "208.67.222.222", - "Quad9 (Global)": "9.9.9.9", - "AliDNS (China)": "223.5.5.5", - "DNSPod (China)": "119.29.29.29", - "NTT (Japan)": "129.250.35.250", - "Comodo (Europe)": "8.26.56.26", - } - - all_found_ips = set() - with ThreadPoolExecutor(max_workers=len(resolvers)) as executor: - with tqdm(total=len(resolvers), desc="DNS Checking", ncols=100) as pbar: - future_to_resolver = {executor.submit(self._dns_worker, ip): name for name, ip in resolvers.items()} - for future in as_completed(future_to_resolver): - resolver_name = future_to_resolver[future] + print("\n" + "=" * 20 + " Multi-Resolver DNS/CDN Check " + "=" * 20) + if self._is_ip(self.target.host): + print(colorama.Fore.YELLOW + "[Skip] CDN check is for domain targets.") + return + + all_ips = set() + resolver_results = {} + cname_values = self._resolve_record("CNAME") + with ThreadPoolExecutor(max_workers=min(len(self.resolvers), self.threads)) as executor: + future_map = { + executor.submit(self._dns_resolver_worker, name, ip): name + for name, ip in self.resolvers.items() + } + with tqdm(total=len(future_map), desc="DNS Checking", ncols=100) as pbar: + for future in as_completed(future_map): + name = future_map[future] try: - result_tuples = future.result() - if result_tuples: - formatted_output = ", ".join([f"{ip}({loc})" for ip, loc in result_tuples]) - pbar.write(colorama.Fore.BLUE + f" - From {resolver_name:<20}: {formatted_output}") - all_found_ips.update([ip for ip, loc in result_tuples]) - else: - pbar.write(colorama.Fore.YELLOW + f" - From {resolver_name:<20}: No response or failed") - except Exception as e: - pbar.write(colorama.Fore.RED + f" - From {resolver_name:<20}: Error - {e}") + result = future.result() + except Exception as exc: + result = {"ips": [], "error": str(exc)} + resolver_results[name] = result + if result.get("ips"): + all_ips.update(result["ips"]) + pbar.write(colorama.Fore.BLUE + f" - {name:<14}: {', '.join(result['ips'])}") + else: + pbar.write(colorama.Fore.YELLOW + f" - {name:<14}: no response") pbar.update(1) - + + cname_hit = any(any(hint in cname.lower() for hint in CDN_HINTS) for cname in cname_values) + likely_cdn = len(all_ips) > 1 or cname_hit + conclusion = "LIKELY using CDN/load balancing" if likely_cdn else "LIKELY direct origin" + self.results["cdn"] = { + "unique_ips": sorted(all_ips), + "cname": cname_values, + "resolver_results": resolver_results, + "likely_cdn": likely_cdn, + "reason": "multiple resolver IPs or CDN-like CNAME" if likely_cdn else "single IP and no CDN-like CNAME", + } + print("\n" + colorama.Fore.GREEN + "[Conclusion]:") - self._print_info("Total Unique IPs Found", len(all_found_ips), indent=2) - if len(all_found_ips) > 1: - print(colorama.Fore.CYAN + " -> This domain is LIKELY using a CDN or load balancing.") - else: - print(colorama.Fore.CYAN + " -> This domain is LIKELY NOT using a CDN.") - - def _dir_worker(self, path): - # Determine the base URL for dir scan, which needs a scheme - base_url_for_dir = self.target_url - if self._is_ip(self.target_domain): - # If the main target was an IP, we must have determined a working scheme - if "http" not in base_url_for_dir: # Check if it was already fixed - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(0.5) - if s.connect_ex((self.target_domain, 443)) == 0: - base_url_for_dir = f"https://{self.target_domain}" - elif s.connect_ex((self.target_domain, 80)) == 0: - base_url_for_dir = f"http://{self.target_domain}" - else: - return None # No web service to scan dirs - s.close() - + self._print_info("Total Unique IPs Found", len(all_ips), indent=2) + self._print_info("CNAME", cname_values, indent=2) + print(colorama.Fore.CYAN + f" -> {conclusion}.") + + def _dns_resolver_worker(self, name, nameserver): + ips = [] + for qtype in ("A", "AAAA"): + ips.extend(self._resolve_record(qtype, nameserver=nameserver)) + return {"resolver": name, "nameserver": nameserver, "ips": sorted(set(ips))} + + def port_scan(self): + print("\n" + "=" * 20 + " Pure Python Port Scan " + "=" * 20) + if not self.ip_list: + self.ip_list = self._resolve_addresses() + if not self.ip_list: + print(colorama.Fore.RED + "[Error] No IP addresses to scan.") + return + + ports = self._parse_ports(self.port_spec) + self._print_info("Port Set", f"{len(ports)} ports") + self._print_info("Scanner", "TCP connect scan with banner probing") + + tasks = [] + with ThreadPoolExecutor(max_workers=self.threads) as executor: + for ip in self.ip_list: + for port in ports: + tasks.append(executor.submit(self._scan_one_port, ip, port)) + with tqdm(total=len(tasks), desc="Scanning Ports", ncols=100) as pbar: + for future in as_completed(tasks): + result = future.result() + if result: + self.results["ports"].append(result) + banner = f" | {result['banner']}" if result.get("banner") else "" + tls = f" | TLS {result['tls_version']}" if result.get("tls_version") else "" + pbar.write( + colorama.Fore.BLUE + + f"[Open] {result['ip']}:{result['port']} " + + f"({result['service']}){tls}{banner}" + ) + pbar.update(1) + + if not self.results["ports"]: + print(colorama.Fore.YELLOW + "[Info] No open TCP ports found in selected port set.") + + @staticmethod + def _parse_ports(spec): + if not spec or spec.lower() == "top100": + return list(TOP_100_PORTS) + if spec.lower() == "web": + return sorted(WEB_PORTS) + + ports = set() + for part in spec.split(","): + part = part.strip() + if not part: + continue + if "-" in part: + start, end = part.split("-", 1) + start, end = int(start), int(end) + if start > end: + start, end = end, start + ports.update(range(max(1, start), min(65535, end) + 1)) + else: + ports.add(int(part)) + return sorted(port for port in ports if 1 <= port <= 65535) + + def _scan_one_port(self, ip, port): + start = time.monotonic() try: - url_to_check = f"{base_url_for_dir.rstrip('/')}/{path.strip()}" - res = self.session.get(url_to_check, timeout=3, verify=False, allow_redirects=False) - if res.status_code == 200: - return f"[Found] {url_to_check} (Status: 200)" - except requests.RequestException: - pass - return None + with socket.create_connection((ip, port), timeout=self.timeout): + pass + except OSError: + return None - def dir_scan(self): - print("\n" + "="*20 + " Directory Scan " + "="*20) + latency_ms = int((time.monotonic() - start) * 1000) + service = self._service_name(port) + banner, tls_version = self._grab_banner(ip, port) + return { + "ip": ip, + "port": port, + "service": service, + "latency_ms": latency_ms, + "banner": banner, + "tls_version": tls_version, + } + + @staticmethod + def _service_name(port): try: - with open("dict/fuzz.txt", "r", encoding='utf-8') as f: - dir_dict = f.readlines() - except FileNotFoundError: - print(colorama.Fore.RED + "[Error] Dictionary file not found: dict/fuzz.txt") + return socket.getservbyport(port, "tcp") + except OSError: + return "unknown" + + def _grab_banner(self, ip, port): + use_tls_first = port in TLS_PORTS + for use_tls in (use_tls_first, not use_tls_first): + try: + return self._grab_banner_once(ip, port, use_tls) + except (OSError, ssl.SSLError, TimeoutError): + continue + return "", "" + + def _grab_banner_once(self, ip, port, use_tls): + tls_version = "" + with socket.create_connection((ip, port), timeout=self.timeout) as sock: + sock.settimeout(min(self.timeout, 2.0)) + conn = sock + if use_tls: + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + server_name = None if self._is_ip(self.target.host) else self.target.host + conn = context.wrap_socket(sock, server_hostname=server_name) + tls_version = conn.version() or "" + + try: + if port in WEB_PORTS or use_tls: + host_header = self.target.host + request = ( + f"HEAD / HTTP/1.1\r\nHost: {host_header}\r\n" + f"User-Agent: {self.headers['User-Agent']}\r\n" + "Connection: close\r\n\r\n" + ) + conn.sendall(request.encode("ascii", "ignore")) + else: + try: + data = conn.recv(512) + banner = self._strip_control(data.decode("utf-8", "ignore")) + if banner: + return banner, tls_version + except socket.timeout: + pass + conn.sendall(b"\r\n") + + data = conn.recv(1024) + banner = self._strip_control(data.decode("utf-8", "ignore")) + first_line = banner.split(" ")[0] if " " in banner else banner.splitlines()[0] if banner else "" + return first_line[:180], tls_version + finally: + if use_tls: + conn.close() + + def dir_scan(self): + print("\n" + "=" * 20 + " Directory Scan " + "=" * 20) + root_url = self._pick_web_root() + if not root_url: + print(colorama.Fore.RED + "[Error] No HTTP service available for directory scan.") + return + + paths = self._load_wordlist(self.dir_dict) + if not paths: + print(colorama.Fore.RED + f"[Error] Dictionary is empty or missing: {self.dir_dict}") return - + + baselines = self._build_soft404_baselines(root_url) + self._print_info("Base URL", root_url) + self._print_info("Dictionary Items", len(paths)) + if baselines: + self._print_info("Soft 404 Baselines", len(baselines)) + with ThreadPoolExecutor(max_workers=self.threads) as executor: - with tqdm(total=len(dir_dict), desc="Scanning Dirs", ncols=100) as pbar: - futures = [executor.submit(self._dir_worker, path) for path in dir_dict] + futures = [executor.submit(self._dir_worker, root_url, path, baselines) for path in paths] + with tqdm(total=len(futures), desc="Scanning Dirs", ncols=100) as pbar: for future in as_completed(futures): result = future.result() if result: - pbar.write(colorama.Fore.BLUE + result) + self.results["directories"].append(result) + pbar.write( + colorama.Fore.BLUE + + f"[Found] {result['url']} " + + f"(Status: {result['status']}, Length: {result['length']})" + ) pbar.update(1) - def _sub_worker(self, subname, base_domain): - subname = subname.strip() - if not subname: + def _pick_web_root(self): + if self.working_web_url: + return self.working_web_url + for url in self._web_root_candidates(): + try: + response = self._request("GET", url) + except requests.RequestException: + continue + if response.status_code < 500: + parsed = urlparse(response.url) + self.working_web_url = urlunparse((parsed.scheme, parsed.netloc, "/", "", "", "")) + return self.working_web_url + return None + + @staticmethod + def _load_wordlist(path): + try: + with open(path, "r", encoding="utf-8", errors="ignore") as handle: + items = [line.strip() for line in handle if line.strip() and not line.startswith("#")] + except FileNotFoundError: + return [] + return list(dict.fromkeys(items)) + + def _build_soft404_baselines(self, root_url): + baselines = [] + for _ in range(2): + token = "searchmap-" + "".join(random.choice("abcdefghijklmnopqrstuvwxyz0123456789") for _ in range(16)) + url = root_url.rstrip("/") + "/" + token + try: + response = self._request("GET", url, allow_redirects=False) + except requests.RequestException: + continue + baselines.append(self._response_signature(response)) + return baselines + + def _dir_worker(self, root_url, path, baselines): + clean_path = path.strip().lstrip("/") + if not clean_path: return None - - domain_to_check = f"{subname}.{base_domain}" + url = root_url.rstrip("/") + "/" + clean_path try: - socket.gethostbyname(domain_to_check) - return domain_to_check - except socket.gaierror: + response = self._request("GET", url, allow_redirects=False) + except requests.RequestException: + return None + if response.status_code not in INTERESTING_DIR_STATUS: return None + signature = self._response_signature(response) + if self._looks_like_soft404(signature, baselines): + return None + return { + "url": url, + "path": "/" + clean_path, + "status": response.status_code, + "length": len(response.content or b""), + "title": signature["title"], + "location": response.headers.get("Location", ""), + } + + @staticmethod + def _looks_like_soft404(signature, baselines): + for baseline in baselines: + if signature["status"] != baseline["status"]: + continue + if signature["hash"] == baseline["hash"]: + return True + base_len = max(baseline["length"], 1) + length_delta = abs(signature["length"] - baseline["length"]) / base_len + same_title = signature["title"] and signature["title"] == baseline["title"] + if length_delta < 0.05 or same_title: + return True + return False def sub_scan(self): - print("\n" + "="*20 + " Subdomain Scan " + "="*20) - extracted = tldextract.extract(self.target_domain) - base_domain = f"{extracted.domain}.{extracted.suffix}" - if not extracted.domain: - print(colorama.Fore.RED + "[Error] Subdomain scan can only be performed on a valid domain, not an IP address.") + print("\n" + "=" * 20 + " Subdomain Scan " + "=" * 20) + base_domain = self._registrable_domain(self.target.host) + if not base_domain: + print(colorama.Fore.RED + "[Error] Subdomain scan can only be performed on a valid domain.") return - print(colorama.Fore.YELLOW + f"[Info] Starting scan for base domain: {base_domain}") - try: - with open("dict/subdomain.txt", "r", encoding='utf-8') as f: - sub_dict = f.readlines() - except FileNotFoundError: - print(colorama.Fore.RED + "[Error] Dictionary file not found: dict/subdomain.txt") + names = self._load_wordlist(self.sub_dict) + if not names: + print(colorama.Fore.RED + f"[Error] Dictionary is empty or missing: {self.sub_dict}") return - + + wildcard_ips = self._detect_wildcard_dns(base_domain) + self._print_info("Base Domain", base_domain) + self._print_info("Dictionary Items", len(names)) + if wildcard_ips: + self._print_info("Wildcard DNS", sorted(wildcard_ips), colorama.Fore.YELLOW) + with ThreadPoolExecutor(max_workers=self.threads) as executor: - with tqdm(total=len(sub_dict), desc="Scanning Subs", ncols=100) as pbar: - futures = [executor.submit(self._sub_worker, subname, base_domain) for subname in sub_dict] + futures = [ + executor.submit(self._sub_worker, name, base_domain, wildcard_ips) + for name in names + ] + with tqdm(total=len(futures), desc="Scanning Subs", ncols=100) as pbar: for future in as_completed(futures): result = future.result() if result: - try: - ips = sorted(list(set(item[4][0] for item in socket.getaddrinfo(result, None)))) - pbar.write(colorama.Fore.BLUE + f"[Found] {result} -> IPs: {', '.join(ips)}") - except Exception: - pbar.write(colorama.Fore.BLUE + f"[Found] {result} (Could not resolve IP)") + self.results["subdomains"].append(result) + pbar.write( + colorama.Fore.BLUE + + f"[Found] {result['domain']} -> {', '.join(result['ips'])}" + ) pbar.update(1) + def _detect_wildcard_dns(self, base_domain): + wildcard_ips = set() + for _ in range(2): + label = "searchmap-" + "".join(random.choice("abcdefghijklmnopqrstuvwxyz0123456789") for _ in range(16)) + wildcard_ips.update(self._resolve_addresses(f"{label}.{base_domain}")) + return wildcard_ips + + def _sub_worker(self, subname, base_domain, wildcard_ips): + label = subname.strip().strip(".") + if not label: + return None + domain = f"{label}.{base_domain}" + ips = self._resolve_addresses(domain) + if not ips: + return None + if wildcard_ips and set(ips).issubset(wildcard_ips): + return None + return {"domain": domain, "ips": ips} + def run(self, do_port_scan, do_noping, do_dir_scan, do_sub_scan, do_full_scan): self.get_base_info() - + if do_full_scan: self.port_scan() self.multi_location_dns_check() @@ -433,55 +1031,146 @@ def run(self, do_port_scan, do_noping, do_dir_scan, do_sub_scan, do_full_scan): if do_sub_scan: self.sub_scan() -# --- 主程序入口 --- -def main(): - banner() + +def write_json(path, results): + with open(path, "w", encoding="utf-8") as handle: + json.dump(results, handle, ensure_ascii=False, indent=2, default=str) + + +def write_csv(path, results): + rows = [] + result_list = results if isinstance(results, list) else [results] + for item in result_list: + target = item.get("target", "") + for ip in item.get("basic", {}).get("ips", []): + rows.append({"target": target, "module": "basic", "key": "ip", "value": ip}) + for qtype, values in item.get("dns_records", {}).items(): + for value in values: + rows.append({"target": target, "module": "dns", "key": qtype, "value": value}) + for http in item.get("http", []): + rows.append({"target": target, "module": "http", "key": "url", "value": http.get("final_url", "")}) + rows.append({"target": target, "module": "http", "key": "title", "value": http.get("title", "")}) + for port in item.get("ports", []): + rows.append({ + "target": target, + "module": "port", + "key": f"{port.get('ip')}:{port.get('port')}", + "value": port.get("service", ""), + }) + for directory in item.get("directories", []): + rows.append({ + "target": target, + "module": "directory", + "key": str(directory.get("status", "")), + "value": directory.get("url", ""), + }) + for sub in item.get("subdomains", []): + rows.append({ + "target": target, + "module": "subdomain", + "key": sub.get("domain", ""), + "value": ", ".join(sub.get("ips", [])), + }) + + with open(path, "w", encoding="utf-8", newline="") as handle: + writer = csv.DictWriter(handle, fieldnames=["target", "module", "key", "value"]) + writer.writeheader() + writer.writerows(rows) + + +def build_parser(): parser = argparse.ArgumentParser( - description="SearchMap v1.0.3 - An automatic information collection tool for penetration testing.", - formatter_class=argparse.RawTextHelpFormatter) - + description=( + "SearchMap v1.1.0 - Pure Python information collection tool for " + "authorized security assessment." + ), + formatter_class=argparse.RawTextHelpFormatter, + ) + group = parser.add_mutually_exclusive_group(required=True) - group.add_argument('-u', '--url', help='Scan a single target URL or IP (e.g., https://example.com or 8.8.8.8)') - group.add_argument('-r', '--read', help='Batch scan targets from a file') - - parser.add_argument('-p', '--port', help='Scan target port(s)', action='store_true') - parser.add_argument('-n', '--noping', help='Multi-location DNS check for CDN detection', action='store_true') - parser.add_argument('-d', '--dirscan', help='Scan target directory', action='store_true') - parser.add_argument('-s', '--subscan', help='Scan target subdomain', action='store_true') - parser.add_argument('-a', '--fullscan', help='Run all scan modules (port, dir, sub, noping)', action='store_true') - - parser.add_argument('-o', '--outlog', help='Output results to a log file') - parser.add_argument('-t', '--threads', help='Number of concurrent threads (default: 20)', type=int, default=20) - + group.add_argument("-u", "--url", help="Scan a single target URL, domain, or IP") + group.add_argument("-r", "--read", help="Batch scan targets from a file") + + parser.add_argument("-p", "--port", help="Run pure Python TCP port scan", action="store_true") + parser.add_argument("-n", "--noping", help="Multi-resolver DNS/CDN detection", action="store_true") + parser.add_argument("-d", "--dirscan", help="Scan web directories with soft-404 filtering", action="store_true") + parser.add_argument("-s", "--subscan", help="Bruteforce subdomains with wildcard DNS filtering", action="store_true") + parser.add_argument("-a", "--fullscan", help="Run all modules", action="store_true") + + parser.add_argument("-o", "--outlog", help="Output console results to a log file") + parser.add_argument("--json-out", help="Write structured JSON results") + parser.add_argument("--csv-out", help="Write flattened CSV findings") + parser.add_argument("-t", "--threads", help="Concurrent threads (default: 20, max: 256)", type=int, default=20) + parser.add_argument("--timeout", help="Network timeout in seconds (default: 5)", type=float, default=5.0) + parser.add_argument("--ports", help="Port set for -p: top100, web, 80,443,8000-8100", default="top100") + parser.add_argument("--dict", dest="dir_dict", help="Directory wordlist path", default="dict/fuzz.txt") + parser.add_argument("--subdict", help="Subdomain wordlist path", default="dict/subdomain.txt") + parser.add_argument( + "--resolver", + action="append", + default=[], + help="Custom DNS resolver IP. Can be used multiple times.", + ) + return parser + + +def run_single_target(target, args): + resolvers = DEFAULT_RESOLVERS + if args.resolver: + resolvers = {f"custom-{i + 1}": ip for i, ip in enumerate(args.resolver)} + scanner = SearchMap( + target, + threads=args.threads, + timeout=args.timeout, + ports=args.ports, + dir_dict=args.dir_dict, + sub_dict=args.subdict, + resolvers=resolvers, + ) + scanner.run(args.port, args.noping, args.dirscan, args.subscan, args.fullscan) + return scanner.results + + +def main(): + banner() + parser = build_parser() args = parser.parse_args() if args.outlog: sys.stdout = Logger(args.outlog) + all_results = [] if args.read: try: - with open(args.read, 'r', encoding='utf-8') as f: - urls = [line.strip() for line in f if line.strip()] - - print(colorama.Fore.GREEN + f"[Info] Total tasks: {len(urls)}") - for i, url in enumerate(urls): - print("\n" + "#"*20 + f" Task {i+1}/{len(urls)}: {url} " + "#"*20) - try: - scanner = SearchMap(url, args.threads) - scanner.run(args.port, args.noping, args.dirscan, args.subscan, args.fullscan) - except Exception as e: - print(colorama.Fore.RED + f"[Task Error] An unexpected error occurred while scanning {url}: {e}") - + with open(args.read, "r", encoding="utf-8") as handle: + targets = [line.strip() for line in handle if line.strip()] except FileNotFoundError: print(colorama.Fore.RED + f"[Error] Input file not found: {args.read}") - + return + + print(colorama.Fore.GREEN + f"[Info] Total tasks: {len(targets)}") + for index, target in enumerate(targets, 1): + print("\n" + "#" * 20 + f" Task {index}/{len(targets)}: {target} " + "#" * 20) + try: + all_results.append(run_single_target(target, args)) + except Exception as exc: + print(colorama.Fore.RED + f"[Task Error] {target}: {exc}") + all_results.append({"target": target, "errors": [str(exc)]}) else: try: print(colorama.Fore.GREEN + f"[Info] Starting scan for: {args.url}") - scanner = SearchMap(args.url, args.threads) - scanner.run(args.port, args.noping, args.dirscan, args.subscan, args.fullscan) - except Exception as e: - print(colorama.Fore.RED + f"[Task Error] An unexpected error occurred: {e}") + all_results = run_single_target(args.url, args) + except Exception as exc: + print(colorama.Fore.RED + f"[Task Error] An unexpected error occurred: {exc}") + all_results = {"target": args.url, "errors": [str(exc)]} + + if args.json_out: + write_json(args.json_out, all_results) + print(colorama.Fore.GREEN + f"[Info] JSON results written to: {args.json_out}") + if args.csv_out: + write_csv(args.csv_out, all_results) + print(colorama.Fore.GREEN + f"[Info] CSV findings written to: {args.csv_out}") + -if __name__ == '__main__': +if __name__ == "__main__": main() From b8662a7279967334ab0ec266eed4b67144905911 Mon Sep 17 00:00:00 2001 From: asaotomo <67818638+asaotomo@users.noreply.github.com> Date: Mon, 25 May 2026 22:27:43 +0800 Subject: [PATCH 2/4] Refresh README usage examples --- README.md | 82 ++++++++++++++++++++++++++----------------------------- 1 file changed, 38 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index d093f4e..0acc013 100644 --- a/README.md +++ b/README.md @@ -35,108 +35,102 @@ sudo apt-get install python-pip ## 三.使用方法 -**1.-u 获取网站基本信息** +> 说明:V1.1.0 输出内容会随目标、网络环境和解析器返回结果变化,旧版本终端截图已移除。以下示例以命令为准。 + +**1.-u 获取基础信息、DNS记录、HTTP/TLS指纹** ``` -$ python3 searchmap.py -u https://www.baidu.com +$ python3 searchmap.py -u https://www.baidu.com ``` -image - ``` -$ python3 searchmap.py -u 123.123.123.123 +$ python3 searchmap.py -u 123.123.123.123 ``` -image - **2.-p 使用纯Python TCP connect进行端口扫描** ``` -$ python3 searchmap.py -u https://www.baidu.com -p +$ python3 searchmap.py -u https://www.baidu.com -p ``` -image +默认扫描内置Top 100常见端口,并对开放端口进行服务名、Banner和TLS版本探测。 -**3.-r 批量扫描网站基本信息** +**3.--ports 自定义端口集合** ``` -$ python3 searchmap.py -r myurl.txt +# 支持 top100、web、单端口、逗号列表和端口范围 +$ python3 searchmap.py -u https://www.baidu.com -p --ports web +$ python3 searchmap.py -u https://www.baidu.com -p --ports 80,443,8000-8100 ``` -image +**4.-n 使用多解析器DNS检测CDN/负载均衡** -**4.-n 使用多节点DNS检测来判断目标是否使用cdn加速** +``` +$ python3 searchmap.py -u https://www.baidu.com -n +``` + +也可以指定自定义解析器: ``` -$ python3 searchmap.py -u https://www.baidu.com -n +$ python3 searchmap.py -u https://www.baidu.com -n --resolver 8.8.8.8 --resolver 1.1.1.1 ``` -image **5.-d 对网站目录进行多线程扫描探测,能够自动识别伪响应页面** -PS:程序使用的默认字典为dict/fuzz.txt,用户可自行替换字典内容进行FUZZ。 +PS: 程序使用的默认字典为`dict/fuzz.txt`,用户可自行替换字典内容进行FUZZ。 ``` -$ python3 searchmap.py -u https://www.baidu.com -d +$ python3 searchmap.py -u https://www.baidu.com -d +$ python3 searchmap.py -u https://www.baidu.com -d --dict dict/fuzz.txt ``` -image - **6.-s 对输入域名的进行子域名爆破** -PS:程序使用的默认字典为dict/subdomain.txt,用户可自行替换字典内容进行FUZZ。 +PS: 程序使用的默认字典为`dict/subdomain.txt`,用户可自行替换字典内容进行FUZZ。 ``` -$ python3 searchmap.py -u https://www.baidu.com -s +$ python3 searchmap.py -u https://www.baidu.com -s +$ python3 searchmap.py -u https://www.baidu.com -s --subdict dict/subdomain.txt ``` -image **7.-a 对目标域名进行全功能扫描** ``` -$ python3 searchmap.py -u https://www.baidu.com -a +$ python3 searchmap.py -u https://www.baidu.com -a ``` -**8.-o 将扫描内容保存为日志** +**8.-r 批量扫描目标** ``` -$ python3 searchmap.py -u https://www.baidu.com -o myscan.log +$ python3 searchmap.py -r myurl.txt +$ python3 searchmap.py -r myurl.txt -p -n -d -s ``` -**9.-t 自定义扫描线程数** +**9.-o 将控制台扫描内容保存为日志** ``` -# 使用50个线程进行全方位扫描,速度更快 -$ python3 searchmap.py -u https://www.baidu.com -a -t 50 +$ python3 searchmap.py -u https://www.baidu.com -o myscan.log ``` -**10.--ports 自定义端口集合** - -``` -# 支持 top100、web、单端口、逗号列表和端口范围 -$ python3 searchmap.py -u https://www.baidu.com -p --ports web -$ python3 searchmap.py -u https://www.baidu.com -p --ports 80,443,8000-8100 -``` - -**11.--json-out / --csv-out 导出结构化结果** +**10.--json-out / --csv-out 导出结构化结果** ``` $ python3 searchmap.py -u https://www.baidu.com -a --json-out result.json --csv-out findings.csv +$ python3 searchmap.py -r myurl.txt -p -n --json-out batch-result.json --csv-out batch-findings.csv ``` -**12.--dict / --subdict 指定目录和子域名字典** +**11.-t / --timeout 控制并发和超时** ``` -$ python3 searchmap.py -u https://www.baidu.com -d --dict dict/fuzz.txt -$ python3 searchmap.py -u https://www.baidu.com -s --subdict dict/subdomain.txt +# 使用50个线程进行全方位扫描,并将单次网络超时设为3秒 +$ python3 searchmap.py -u https://www.baidu.com -a -t 50 --timeout 3 ``` -**13.组合用法** +**12.组合用法** ``` -$ python3 searchmap.py -u https://www.baidu.com -p -n -d -s - -$ python3 searchmap.py -r myurl.txt -p -n -d -s +$ python3 searchmap.py -u https://www.baidu.com -p -n -d -s --ports web +$ python3 searchmap.py -r myurl.txt -a -t 50 --timeout 3 --json-out result.json ``` From 6c6ab1fe27470599f09224ff559fb896dfaab539 Mon Sep 17 00:00:00 2001 From: asaotomo <67818638+asaotomo@users.noreply.github.com> Date: Mon, 25 May 2026 22:58:12 +0800 Subject: [PATCH 3/4] Update print statement from 'Hello' to 'Goodbye' --- searchmap.py | 760 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 672 insertions(+), 88 deletions(-) diff --git a/searchmap.py b/searchmap.py index f8279d5..c7ecb16 100644 --- a/searchmap.py +++ b/searchmap.py @@ -13,6 +13,7 @@ import ssl import sys import tempfile +import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass @@ -25,6 +26,7 @@ import dns.resolver import requests import whois +from requests.adapters import HTTPAdapter from tqdm import tqdm @@ -32,7 +34,7 @@ requests.packages.urllib3.exceptions.InsecureRequestWarning ) -VERSION = "1.1.0" +VERSION = "1.2.0" LAST_UPDATED = "2026.05.25" DEFAULT_RESOLVERS = { @@ -48,7 +50,7 @@ DNS_RECORD_TYPES = ("A", "AAAA", "CNAME", "NS", "MX", "TXT", "SOA", "CAA") -TOP_100_PORTS = ( +FAST_PORTS = ( 7, 9, 13, 21, 22, 23, 25, 26, 37, 53, 79, 80, 81, 88, 106, 110, 111, 113, 119, 135, 139, 143, 144, 179, 199, 389, 427, 443, 444, 445, @@ -61,9 +63,72 @@ 9100, 9999, 10000, 32768, 49152, 49153, 49154, 49155, 49156, 49157, ) -WEB_PORTS = {80, 81, 443, 8000, 8008, 8080, 8081, 8443, 8888, 10000} -TLS_PORTS = {443, 465, 636, 990, 993, 995, 8443} +HIGH_VALUE_PORTS = ( + 1, 3, 5, 17, 19, 20, 24, 49, 69, 70, 82, 83, 84, 85, 89, 90, 99, + 109, 115, 118, 123, 137, 138, 161, 162, 177, 194, 264, 280, 311, + 389, 406, 407, 416, 417, 443, 445, 500, 512, 593, 623, 625, 636, + 666, 691, 700, 705, 711, 714, 720, 722, 749, 765, 777, 783, 787, + 800, 801, 808, 843, 880, 888, 898, 900, 901, 902, 903, 981, 987, + 992, 993, 995, 999, 1000, 1024, 1025, 1026, 1027, 1028, 1029, 1030, + 1031, 1032, 1033, 1034, 1035, 1080, 1099, 11211, 1194, 1214, 1241, + 1311, 1352, 1433, 1434, 1521, 1583, 1720, 1723, 1883, 1900, 2049, + 2082, 2083, 2086, 2087, 2095, 2096, 2181, 2375, 2376, 2483, 2484, + 2601, 2604, 3000, 3001, 3306, 3389, 3690, 4369, 4443, 4567, 5000, + 5001, 5005, 5009, 5044, 5060, 5061, 5432, 5601, 5672, 5900, 5901, + 5984, 5985, 5986, 6000, 6379, 6443, 6666, 6667, 7001, 7002, 7070, + 7199, 7474, 7547, 7676, 7777, 8000, 8001, 8008, 8009, 8010, 8069, + 8080, 8081, 8082, 8083, 8086, 8088, 8090, 8091, 8098, 8123, 8161, + 8200, 8222, 8333, 8443, 8500, 8530, 8531, 8834, 8880, 8888, 8983, + 9000, 9001, 9042, 9090, 9091, 9200, 9300, 9418, 9443, 9600, 9999, + 10000, 10250, 10255, 27017, 27018, 28017, 32768, 49152, 49153, 49154, + 49155, 49156, 49157, +) + +SMART_PORTS = tuple(sorted(set(FAST_PORTS) | set(HIGH_VALUE_PORTS))) +COMMON_PORTS = tuple(sorted(set(range(1, 1025)) | set(HIGH_VALUE_PORTS))) + +WEB_PORTS = { + 80, 81, 82, 83, 84, 85, 88, 89, 90, 443, 591, 593, 800, 801, 808, + 880, 888, 981, 1311, 2082, 2083, 2086, 2087, 2095, 2096, 3000, 3001, + 4567, 5000, 5001, 5601, 5984, 7001, 7002, 7070, 7474, 7547, 8000, + 8001, 8008, 8009, 8010, 8069, 8080, 8081, 8082, 8083, 8086, 8088, + 8090, 8091, 8123, 8161, 8200, 8333, 8443, 8500, 8834, 8880, 8888, + 8983, 9000, 9001, 9090, 9091, 9200, 9443, 10000, +} +TLS_PORTS = {443, 465, 563, 587, 636, 853, 989, 990, 992, 993, 995, 2083, 2087, 2096, 2376, 4443, 5001, 5986, 6443, 8443, 8834, 9443} INTERESTING_DIR_STATUS = {200, 201, 204, 301, 302, 307, 308, 401, 403, 405} +DEFAULT_DIR_EXTENSIONS = ("php", "asp", "aspx", "jsp", "jspx", "do", "action", "html", "htm", "js", "json", "txt", "bak", "zip") +DEFAULT_MAX_BODY = 65536 + +SMART_DIR_SEEDS = ( + "robots.txt", "sitemap.xml", ".well-known/security.txt", ".git/HEAD", + ".env", ".env.local", ".env.production", ".svn/entries", "WEB-INF/web.xml", + "swagger-ui/", "swagger.json", "api-docs", "v2/api-docs", "v3/api-docs", + "openapi.json", "graphql", "graphiql", "actuator", "actuator/health", + "server-status", "phpinfo.php", "info.php", "admin", "admin/", "login", + "login/", "manager/html", "jolokia", "console", "wp-login.php", + "wp-admin/", "phpmyadmin/", "pma/", "backup", "backup/", "db.sql", + "dump.sql", "config.php.bak", "config.yml", "config.json", +) + +BACKUP_SUFFIXES = (".bak", ".old", ".orig", ".save", ".swp", "~", ".zip", ".tar.gz") +RECURSIVE_DIR_WORDS = ( + "admin", "login", "upload", "uploads", "backup", "api", "config", + "assets", "static", "files", "manager", "console", "debug", +) + +BANNER_HINTS = ( + (r"^SSH-([\w.-]+)", "ssh", "SSH"), + (r"(?i)^220.*ftp", "ftp", "FTP"), + (r"(?i)^220.*smtp", "smtp", "SMTP"), + (r"(?i)mysql_native_password|mariadb", "mysql", "MySQL/MariaDB"), + (r"(?i)postgresql", "postgresql", "PostgreSQL"), + (r"(?i)^\+PONG|^-NOAUTH", "redis", "Redis"), + (r"(?i)mongodb", "mongodb", "MongoDB"), + (r"(?i)memcached", "memcached", "Memcached"), + (r"(?i)elasticsearch", "elasticsearch", "Elasticsearch"), + (r"(?i)HTTP/\d", "http", "HTTP"), +) SECURITY_HEADERS = ( "strict-transport-security", @@ -151,17 +216,32 @@ def __init__( timeout=5.0, ports=None, dir_dict="dict/fuzz.txt", + dir_mode="smart", + dir_extensions=None, + dir_statuses=None, + dir_depth=0, + dir_all_web=False, sub_dict="dict/subdomain.txt", resolvers=None, + max_body=DEFAULT_MAX_BODY, ): self.target = self._parse_target(target) self.threads = max(1, min(int(threads), 256)) self.timeout = max(0.5, float(timeout)) - self.port_spec = ports or "top100" + self.port_spec = ports or "smart" self.dir_dict = dir_dict + self.dir_mode = (dir_mode or "smart").lower() + self.dir_extensions = dir_extensions or DEFAULT_DIR_EXTENSIONS + self.dir_statuses = dir_statuses or INTERESTING_DIR_STATUS + if dir_depth is None: + dir_depth = 1 if self.dir_mode == "deep" else 0 + self.dir_depth = max(0, min(int(dir_depth), 3)) + self.dir_all_web = dir_all_web self.sub_dict = sub_dict self.resolvers = resolvers or DEFAULT_RESOLVERS + self.max_body = max(4096, int(max_body)) self.headers = self._get_random_header() + self._thread_local = threading.local() self.ip_list = [] self.working_web_url = None self.results = { @@ -176,6 +256,7 @@ def __init__( "ports": [], "directories": [], "subdomains": [], + "fingerprints": [], "errors": [], } @@ -308,6 +389,15 @@ def _print_info(self, key, value, color=colorama.Fore.CYAN, indent=0): value = ", ".join(map(str, value)) print(f"{indent_space}{colorama.Fore.GREEN}[{key}]: {color}{value}") + def _remember_fingerprint(self, source, data): + item = {"source": source} + item.update({key: value for key, value in data.items() if value not in (None, "", [], {})}) + signature = json.dumps(item, sort_keys=True, ensure_ascii=False, default=str) + for existing in self.results["fingerprints"]: + if json.dumps(existing, sort_keys=True, ensure_ascii=False, default=str) == signature: + return + self.results["fingerprints"].append(item) + def _build_url(self, scheme, path="/"): host = self._format_host_for_url(self.target.host) port = f":{self.target.port}" if self.target.port else "" @@ -350,15 +440,54 @@ def _web_root_candidates(self): roots.append(urlunparse((parsed.scheme, parsed.netloc, "/", "", "", ""))) return list(dict.fromkeys(roots)) - def _request(self, method, url, allow_redirects=True): - return requests.request( + def _get_session(self): + session = getattr(self._thread_local, "session", None) + if session is None: + session = requests.Session() + session.headers.update(self.headers) + adapter = HTTPAdapter( + pool_connections=self.threads, + pool_maxsize=self.threads, + max_retries=0, + ) + session.mount("http://", adapter) + session.mount("https://", adapter) + self._thread_local.session = session + return session + + def _request(self, method, url, allow_redirects=True, max_body=None): + response = self._get_session().request( method, url, - headers=self.headers, timeout=self.timeout, verify=False, allow_redirects=allow_redirects, + stream=max_body is not None, ) + if max_body is not None: + response._content = self._read_limited_body(response, max_body) + response._content_consumed = True + response.close() + return response + + @staticmethod + def _read_limited_body(response, max_body): + chunks = [] + total = 0 + try: + for chunk in response.iter_content(chunk_size=8192): + if not chunk: + continue + remaining = max_body - total + if remaining <= 0: + break + chunks.append(chunk[:remaining]) + total += len(chunk[:remaining]) + if total >= max_body: + break + except requests.RequestException: + return b"" + return b"".join(chunks) def _resolve_addresses(self, host=None): host = host or self.target.host @@ -468,49 +597,36 @@ def http_fingerprint(self): for url in self._web_candidates(): try: - response = self._request("GET", url, allow_redirects=True) + result = self._fingerprint_http_url(url) except requests.RequestException as exc: self._print_info("HTTP Probe Failed", f"{url} -> {exc}", colorama.Fore.YELLOW) continue - parsed_final = urlparse(response.url) + parsed_final = urlparse(result["final_url"]) self.working_web_url = urlunparse((parsed_final.scheme, parsed_final.netloc, "/", "", "", "")) - headers = {key.lower(): value for key, value in response.headers.items()} - title = self._extract_title(response.text) - generator = self._extract_generator(response.text) - technologies = self._detect_technologies(headers, response.text) - present_security = [header for header in SECURITY_HEADERS if header in headers] - missing_security = [header for header in SECURITY_HEADERS if header not in headers] - - result = { - "url": url, - "final_url": response.url, - "status": response.status_code, - "title": title, - "server": response.headers.get("Server", ""), - "powered_by": response.headers.get("X-Powered-By", ""), - "content_type": response.headers.get("Content-Type", ""), - "content_length": len(response.content or b""), - "redirects": [item.status_code for item in response.history], - "generator": generator, - "technologies": technologies, - "security_headers_present": present_security, - "security_headers_missing": missing_security, - "well_known": self._probe_well_known(self.working_web_url), - } + result["well_known"] = self._probe_well_known(self.working_web_url) findings.append(result) + self._remember_fingerprint("http", { + "url": result["final_url"], + "status": result["status"], + "title": result["title"], + "server": result["server"], + "technologies": result["technologies"], + "favicon_hash": result.get("favicon_hash", ""), + }) - self._print_info("URL", response.url) - self._print_info("Status", response.status_code) - self._print_info("Title", title or "No Title Found") + self._print_info("URL", result["final_url"]) + self._print_info("Status", result["status"]) + self._print_info("Title", result["title"] or "No Title Found") self._print_info("Server", result["server"]) self._print_info("X-Powered-By", result["powered_by"]) self._print_info("Content-Type", result["content_type"]) self._print_info("Content-Length", result["content_length"]) - self._print_info("Generator", generator) - self._print_info("Technologies", technologies) - self._print_info("Security Headers Present", present_security) - self._print_info("Security Headers Missing", missing_security, colorama.Fore.YELLOW) + self._print_info("Generator", result["generator"]) + self._print_info("Technologies", result["technologies"]) + self._print_info("Favicon Hash", result.get("favicon_hash")) + self._print_info("Security Headers Present", result["security_headers_present"]) + self._print_info("Security Headers Missing", result["security_headers_missing"], colorama.Fore.YELLOW) for item in result["well_known"]: print(colorama.Fore.BLUE + f" - {item['path']} -> {item['status']} {item['url']}") break @@ -519,6 +635,44 @@ def http_fingerprint(self): print(colorama.Fore.YELLOW + "[Info] No HTTP service responded on the candidate URL(s).") self.results["http"] = findings + def _fingerprint_http_url(self, url): + response = self._request("GET", url, allow_redirects=True, max_body=self.max_body) + headers = {key.lower(): value for key, value in response.headers.items()} + html = response.text + title = self._extract_title(html) + generator = self._extract_generator(html) + technologies = self._detect_technologies(headers, html) + present_security = [header for header in SECURITY_HEADERS if header in headers] + missing_security = [header for header in SECURITY_HEADERS if header not in headers] + return { + "url": url, + "final_url": response.url, + "status": response.status_code, + "title": title, + "server": response.headers.get("Server", ""), + "powered_by": response.headers.get("X-Powered-By", ""), + "content_type": response.headers.get("Content-Type", ""), + "content_length": len(response.content or b""), + "redirects": [item.status_code for item in response.history], + "generator": generator, + "technologies": technologies, + "security_headers_present": present_security, + "security_headers_missing": missing_security, + "favicon_hash": self._favicon_hash(response.url), + } + + def _favicon_hash(self, final_url): + parsed = urlparse(final_url) + favicon_url = urlunparse((parsed.scheme, parsed.netloc, "/favicon.ico", "", "", "")) + try: + response = self._request("GET", favicon_url, allow_redirects=True, max_body=65536) + except requests.RequestException: + return "" + content_type = response.headers.get("Content-Type", "").lower() + if response.status_code >= 400 or ("image" not in content_type and len(response.content or b"") < 32): + return "" + return str(zlib.crc32(response.content or b"") & 0xFFFFFFFF) + def _detect_technologies(self, headers, html): tech = set() server = headers.get("server", "").lower() @@ -535,6 +689,8 @@ def _detect_technologies(self, headers, html): "tengine": "Tengine", "gunicorn": "Gunicorn", "werkzeug": "Werkzeug", + "envoy": "Envoy", + "varnish": "Varnish", } for needle, label in header_map.items(): if needle in server: @@ -542,6 +698,14 @@ def _detect_technologies(self, headers, html): if powered_by: tech.add(f"X-Powered-By: {self._strip_control(headers.get('x-powered-by', ''), 80)}") + if "cf-ray" in headers or "cf-cache-status" in headers: + tech.add("Cloudflare") + if "x-cache" in headers or "via" in headers: + tech.add("Reverse Proxy/Cache") + if "x-generator" in headers: + tech.add(self._strip_control(headers.get("x-generator", ""), 80)) + if "x-redirect-by" in headers: + tech.add(self._strip_control(headers.get("x-redirect-by", ""), 80)) if "phpsessid" in cookies or ".php" in body: tech.add("PHP") if "jsessionid" in cookies: @@ -554,16 +718,34 @@ def _detect_technologies(self, headers, html): tech.add("Drupal") if "joomla" in body or "/media/system/js/" in body: tech.add("Joomla") + if "laravel_session" in cookies or "csrf-token" in body: + tech.add("Laravel") + if "thinkphp" in body: + tech.add("ThinkPHP") + if "django" in cookies or "csrftoken" in cookies: + tech.add("Django") + if "rails" in cookies or "csrf-param" in body: + tech.add("Ruby on Rails") + if "swagger-ui" in body or "openapi" in body: + tech.add("Swagger/OpenAPI") + if "actuator" in body and "spring" in body: + tech.add("Spring Boot") if "__next_data__" in body: tech.add("Next.js") if "nuxt" in body: tech.add("Nuxt") if "vite" in body: tech.add("Vite") + if "angular" in body or "ng-version" in body: + tech.add("Angular") if "react" in body: tech.add("React") if "vue" in body: tech.add("Vue") + if "jquery" in body: + tech.add("jQuery") + if "bootstrap" in body: + tech.add("Bootstrap") return sorted(tech) def _probe_well_known(self, root_url): @@ -571,7 +753,7 @@ def _probe_well_known(self, root_url): for path in ("/robots.txt", "/sitemap.xml", "/.well-known/security.txt"): url = root_url.rstrip("/") + path try: - response = self._request("GET", url, allow_redirects=False) + response = self._request("GET", url, allow_redirects=False, max_body=self.max_body) except requests.RequestException: continue if response.status_code in INTERESTING_DIR_STATUS: @@ -611,15 +793,17 @@ def tls_fingerprint(self): print(colorama.Fore.YELLOW + "[Info] No TLS certificate could be collected.") self.results["tls"] = results - def _read_tls_certificate(self, host, port): + def _read_tls_certificate(self, host, port, server_name=None): context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE sock = None try: sock = socket.create_connection((host, port), timeout=self.timeout) - server_name = None if self._is_ip(host) else host - with context.wrap_socket(sock, server_hostname=server_name) as tls_sock: + sni_name = server_name if server_name and not self._is_ip(server_name) else None + if not sni_name and not self._is_ip(host): + sni_name = host + with context.wrap_socket(sock, server_hostname=sni_name) as tls_sock: der_cert = tls_sock.getpeercert(binary_form=True) tls_version = tls_sock.version() cipher = tls_sock.cipher() @@ -724,7 +908,7 @@ def _dns_resolver_worker(self, name, nameserver): return {"resolver": name, "nameserver": nameserver, "ips": sorted(set(ips))} def port_scan(self): - print("\n" + "=" * 20 + " Pure Python Port Scan " + "=" * 20) + print("\n" + "=" * 20 + " Smart Port Scan " + "=" * 20) if not self.ip_list: self.ip_list = self._resolve_addresses() if not self.ip_list: @@ -732,8 +916,8 @@ def port_scan(self): return ports = self._parse_ports(self.port_spec) - self._print_info("Port Set", f"{len(ports)} ports") - self._print_info("Scanner", "TCP connect scan with banner probing") + self._print_info("Port Set", f"{self.port_spec} ({len(ports)} ports)") + self._print_info("Scanner", "TCP connect scan with service, banner, HTTP and TLS fingerprinting") tasks = [] with ThreadPoolExecutor(max_workers=self.threads) as executor: @@ -745,24 +929,31 @@ def port_scan(self): result = future.result() if result: self.results["ports"].append(result) - banner = f" | {result['banner']}" if result.get("banner") else "" - tls = f" | TLS {result['tls_version']}" if result.get("tls_version") else "" - pbar.write( - colorama.Fore.BLUE - + f"[Open] {result['ip']}:{result['port']} " - + f"({result['service']}){tls}{banner}" - ) + pbar.write(colorama.Fore.BLUE + f"[Open] {result['ip']}:{result['port']} ({result['service']})") pbar.update(1) if not self.results["ports"]: print(colorama.Fore.YELLOW + "[Info] No open TCP ports found in selected port set.") + return + + self.results["ports"].sort(key=lambda item: (item["ip"], item["port"])) + self._print_port_table(self.results["ports"]) @staticmethod def _parse_ports(spec): - if not spec or spec.lower() == "top100": - return list(TOP_100_PORTS) - if spec.lower() == "web": + if not spec: + return list(SMART_PORTS) + lowered = spec.lower() + if lowered in ("fast", "top100"): + return list(FAST_PORTS) + if lowered in ("smart", "default"): + return list(SMART_PORTS) + if lowered in ("common", "top1000"): + return list(COMMON_PORTS) + if lowered == "web": return sorted(WEB_PORTS) + if lowered in ("full", "all"): + return list(range(1, 65536)) ports = set() for part in spec.split(","): @@ -790,6 +981,9 @@ def _scan_one_port(self, ip, port): latency_ms = int((time.monotonic() - start) * 1000) service = self._service_name(port) banner, tls_version = self._grab_banner(ip, port) + fingerprint = self._fingerprint_open_port(ip, port, service, banner, tls_version) + if fingerprint.get("service"): + service = fingerprint["service"] return { "ip": ip, "port": port, @@ -797,8 +991,24 @@ def _scan_one_port(self, ip, port): "latency_ms": latency_ms, "banner": banner, "tls_version": tls_version, + "fingerprint": fingerprint, } + def _print_port_table(self, ports): + print("\n" + colorama.Fore.GREEN + "[Open Ports]:") + header = f"{'IP':<22} {'PORT':<7} {'SERVICE':<14} {'RTT':<8} {'FINGERPRINT'}" + print(colorama.Fore.CYAN + header) + print(colorama.Fore.CYAN + "-" * min(len(header) + 40, 120)) + for item in ports: + fp = item.get("fingerprint", {}) + summary = fp.get("summary") or item.get("banner") or "" + tls = f"TLS {item['tls_version']} " if item.get("tls_version") else "" + summary = self._strip_control(f"{tls}{summary}", 90) + print( + f"{item['ip']:<22} {item['port']:<7} " + f"{item['service']:<14} {str(item['latency_ms']) + 'ms':<8} {summary}" + ) + @staticmethod def _service_name(port): try: @@ -855,44 +1065,275 @@ def _grab_banner_once(self, ip, port, use_tls): if use_tls: conn.close() + def _fingerprint_open_port(self, ip, port, service, banner, tls_version): + fingerprint = { + "service": service, + "banner": banner, + "tls_version": tls_version, + "summary": "", + } + + for pattern, service_name, product in BANNER_HINTS: + if banner and re.search(pattern, banner): + fingerprint["service"] = service_name + fingerprint["product"] = product + break + + should_http_probe = port in WEB_PORTS or tls_version or self._looks_like_http_banner(banner) + if should_http_probe: + for url in self._port_web_urls(port, tls_first=bool(tls_version or port in TLS_PORTS)): + try: + http_fp = self._fingerprint_http_url(url) + except requests.RequestException: + continue + fingerprint.update({ + "service": "https" if url.startswith("https://") else "http", + "url": http_fp["final_url"], + "status": http_fp["status"], + "title": http_fp["title"], + "server": http_fp["server"], + "technologies": http_fp["technologies"], + "favicon_hash": http_fp.get("favicon_hash", ""), + }) + parts = [ + str(http_fp["status"]), + http_fp["server"], + http_fp["title"], + ", ".join(http_fp["technologies"][:4]), + ] + fingerprint["summary"] = " | ".join(part for part in parts if part) + self._remember_fingerprint("port-http", { + "endpoint": f"{ip}:{port}", + "url": http_fp["final_url"], + "title": http_fp["title"], + "server": http_fp["server"], + "technologies": http_fp["technologies"], + "favicon_hash": http_fp.get("favicon_hash", ""), + }) + return fingerprint + + if tls_version: + cert = self._read_tls_certificate( + ip, + port, + server_name=self.target.host if not self._is_ip(self.target.host) else None, + ) + if cert: + fingerprint["cert_issuer"] = cert.get("issuer", "") + fingerprint["cert_subject"] = cert.get("subject", "") + fingerprint["cert_sha256"] = cert.get("sha256", "") + fingerprint["summary"] = f"TLS {tls_version} | {cert.get('subject', '')}" + self._remember_fingerprint("port-tls", { + "endpoint": f"{ip}:{port}", + "tls_version": tls_version, + "issuer": cert.get("issuer", ""), + "subject": cert.get("subject", ""), + }) + + if not fingerprint["summary"]: + fingerprint["summary"] = banner or fingerprint.get("product", "") + if fingerprint.get("product"): + self._remember_fingerprint("port", { + "endpoint": f"{ip}:{port}", + "service": fingerprint["service"], + "product": fingerprint["product"], + "banner": banner, + }) + return fingerprint + + @staticmethod + def _looks_like_http_banner(banner): + lowered = (banner or "").lower() + return lowered.startswith("http/") or "server:" in lowered or "\s*(.*?)\s*", response.text, re.I | re.S): + parsed = urlparse(loc.strip()) + if parsed.netloc and parsed.netloc != root_host: + continue + if parsed.path and parsed.path != "/": + seeds.append(parsed.path.lstrip("/")) + except requests.RequestException: + pass + + return seeds[:200] + + def _is_directory_candidate(self, result): + if result["status"] in (401, 403): + return False + if result["url"].endswith("/"): + return True + location = result.get("location", "") + return location.endswith("/") def _pick_web_root(self): if self.working_web_url: return self.working_web_url for url in self._web_root_candidates(): try: - response = self._request("GET", url) + response = self._request("GET", url, max_body=self.max_body) except requests.RequestException: continue if response.status_code < 500: @@ -916,7 +1357,7 @@ def _build_soft404_baselines(self, root_url): token = "searchmap-" + "".join(random.choice("abcdefghijklmnopqrstuvwxyz0123456789") for _ in range(16)) url = root_url.rstrip("/") + "/" + token try: - response = self._request("GET", url, allow_redirects=False) + response = self._request("GET", url, allow_redirects=False, max_body=self.max_body) except requests.RequestException: continue baselines.append(self._response_signature(response)) @@ -928,14 +1369,22 @@ def _dir_worker(self, root_url, path, baselines): return None url = root_url.rstrip("/") + "/" + clean_path try: - response = self._request("GET", url, allow_redirects=False) + response = self._request("GET", url, allow_redirects=False, max_body=self.max_body) except requests.RequestException: return None - if response.status_code not in INTERESTING_DIR_STATUS: + if response.status_code not in self.dir_statuses: return None signature = self._response_signature(response) if self._looks_like_soft404(signature, baselines): return None + tags = self._classify_directory_finding(clean_path, response, signature) + if tags: + self._remember_fingerprint("directory", { + "url": url, + "status": response.status_code, + "tags": tags, + "title": signature["title"], + }) return { "url": url, "path": "/" + clean_path, @@ -943,6 +1392,8 @@ def _dir_worker(self, root_url, path, baselines): "length": len(response.content or b""), "title": signature["title"], "location": response.headers.get("Location", ""), + "content_type": response.headers.get("Content-Type", ""), + "tags": tags, } @staticmethod @@ -955,10 +1406,46 @@ def _looks_like_soft404(signature, baselines): base_len = max(baseline["length"], 1) length_delta = abs(signature["length"] - baseline["length"]) / base_len same_title = signature["title"] and signature["title"] == baseline["title"] - if length_delta < 0.05 or same_title: + if same_title and length_delta < 0.25: + return True + if not signature["title"] and not baseline["title"] and length_delta < 0.02: return True return False + def _classify_directory_finding(self, path, response, signature): + lowered = path.lower() + body = (response.text or "").lower() + tags = [] + if response.status_code in (401, 403): + tags.append("access-controlled") + if lowered.startswith(".git/") or "ref: refs/heads" in body: + tags.append("git-exposure") + if ".env" in lowered or re.search(r"(?m)^(?:aws_|secret|password|token|api[_-]?key)", body): + tags.append("secret/config") + if lowered.endswith((".sql", ".db", ".mdb", ".sqlite")): + tags.append("database-dump") + if lowered.endswith((".zip", ".tar", ".tar.gz", ".tgz", ".rar", ".7z", ".bak", ".old", "~")): + tags.append("backup-file") + if "swagger" in lowered or "openapi" in lowered or "api-docs" in lowered: + tags.append("api-docs") + if "graphql" in lowered or "graphiql" in lowered: + tags.append("graphql") + if "phpmyadmin" in lowered or "/pma" in f"/{lowered}": + tags.append("db-admin") + if "wp-login" in lowered or "wp-admin" in lowered: + tags.append("wordpress") + if "admin" in lowered or "login" in lowered or "manager" in lowered: + tags.append("admin/auth") + if "actuator" in lowered or "jolokia" in lowered or "server-status" in lowered: + tags.append("ops-endpoint") + if signature["title"]: + tech = self._detect_technologies( + {key.lower(): value for key, value in response.headers.items()}, + response.text, + ) + tags.extend([f"tech:{item}" for item in tech[:3]]) + return list(dict.fromkeys(tags)) + def sub_scan(self): print("\n" + "=" * 20 + " Subdomain Scan " + "=" * 20) base_domain = self._registrable_domain(self.target.host) @@ -1016,10 +1503,12 @@ def run(self, do_port_scan, do_noping, do_dir_scan, do_sub_scan, do_full_scan): self.get_base_info() if do_full_scan: + self.dir_all_web = True self.port_scan() self.multi_location_dns_check() self.dir_scan() self.sub_scan() + self._print_fingerprint_summary() return if do_port_scan: @@ -1030,6 +1519,39 @@ def run(self, do_port_scan, do_noping, do_dir_scan, do_sub_scan, do_full_scan): self.dir_scan() if do_sub_scan: self.sub_scan() + self._print_fingerprint_summary() + + def _print_fingerprint_summary(self): + print("\n" + "=" * 20 + " Fingerprint Summary " + "=" * 20) + technologies = set() + web_targets = [] + sensitive_paths = [] + + for item in self.results.get("http", []): + web_targets.append(item.get("final_url", "")) + technologies.update(item.get("technologies", [])) + + for item in self.results.get("ports", []): + fp = item.get("fingerprint", {}) + technologies.update(fp.get("technologies", [])) + if fp.get("url"): + web_targets.append(fp["url"]) + + for item in self.results.get("directories", []): + tags = item.get("tags", []) + if tags: + sensitive_paths.append(f"{item['status']} {item['path']} [{', '.join(tags[:4])}]") + for tag in tags: + if tag.startswith("tech:"): + technologies.add(tag.split(":", 1)[1]) + + open_ports = [f"{item['ip']}:{item['port']}/{item['service']}" for item in self.results.get("ports", [])] + self._print_info("Open Ports", open_ports[:40]) + self._print_info("Web Targets", list(dict.fromkeys(web_targets))[:20]) + self._print_info("Technologies", sorted(technologies)) + self._print_info("Interesting Paths", sensitive_paths[:30], colorama.Fore.YELLOW) + if not any([open_ports, web_targets, technologies, sensitive_paths]): + print(colorama.Fore.YELLOW + "[Info] No fingerprint signals collected.") def write_json(path, results): @@ -1062,7 +1584,18 @@ def write_csv(path, results): "target": target, "module": "directory", "key": str(directory.get("status", "")), - "value": directory.get("url", ""), + "value": " | ".join(filter(None, [ + directory.get("url", ""), + ", ".join(directory.get("tags", [])), + directory.get("title", ""), + ])), + }) + for fp in item.get("fingerprints", []): + rows.append({ + "target": target, + "module": "fingerprint", + "key": fp.get("source", ""), + "value": json.dumps(fp, ensure_ascii=False, default=str), }) for sub in item.get("subdomains", []): rows.append({ @@ -1078,10 +1611,31 @@ def write_csv(path, results): writer.writerows(rows) +def parse_csv_values(value): + return tuple(item.strip().lstrip(".") for item in (value or "").split(",") if item.strip()) + + +def parse_status_values(value): + statuses = set() + for part in (value or "").split(","): + part = part.strip() + if not part: + continue + if "-" in part: + start, end = part.split("-", 1) + start, end = int(start), int(end) + if start > end: + start, end = end, start + statuses.update(range(start, end + 1)) + else: + statuses.add(int(part)) + return {status for status in statuses if 100 <= status <= 599} + + def build_parser(): parser = argparse.ArgumentParser( description=( - "SearchMap v1.1.0 - Pure Python information collection tool for " + "SearchMap v1.2.0 - Pure Python information collection tool for " "authorized security assessment." ), formatter_class=argparse.RawTextHelpFormatter, @@ -1102,8 +1656,31 @@ def build_parser(): parser.add_argument("--csv-out", help="Write flattened CSV findings") parser.add_argument("-t", "--threads", help="Concurrent threads (default: 20, max: 256)", type=int, default=20) parser.add_argument("--timeout", help="Network timeout in seconds (default: 5)", type=float, default=5.0) - parser.add_argument("--ports", help="Port set for -p: top100, web, 80,443,8000-8100", default="top100") + parser.add_argument( + "--ports", + help="Port set for -p: fast/top100, smart, common/top1000, web, full, 80,443,8000-8100", + default="smart", + ) parser.add_argument("--dict", dest="dir_dict", help="Directory wordlist path", default="dict/fuzz.txt") + parser.add_argument( + "--dir-mode", + choices=("fast", "smart", "deep"), + default="smart", + help="Directory scan strategy: fast, smart with expansion, or deep with backups/recursion", + ) + parser.add_argument( + "--dir-ext", + default=",".join(DEFAULT_DIR_EXTENSIONS), + help="Extensions used for %%EXT%% directory entries", + ) + parser.add_argument( + "--dir-status", + default="200,201,204,301,302,307,308,401,403,405", + help="Interesting directory HTTP statuses, e.g. 200,301,401,403 or 200-403", + ) + parser.add_argument("--dir-depth", type=int, default=None, help="Recursive directory depth (default: 0, deep: 1)") + parser.add_argument("--dir-all-web", action="store_true", help="Scan all web services found by port scan") + parser.add_argument("--max-body", type=int, default=DEFAULT_MAX_BODY, help="Max bytes to read per HTTP response") parser.add_argument("--subdict", help="Subdomain wordlist path", default="dict/subdomain.txt") parser.add_argument( "--resolver", @@ -1118,14 +1695,21 @@ def run_single_target(target, args): resolvers = DEFAULT_RESOLVERS if args.resolver: resolvers = {f"custom-{i + 1}": ip for i, ip in enumerate(args.resolver)} + dir_statuses = parse_status_values(args.dir_status) or INTERESTING_DIR_STATUS scanner = SearchMap( target, threads=args.threads, timeout=args.timeout, ports=args.ports, dir_dict=args.dir_dict, + dir_mode=args.dir_mode, + dir_extensions=parse_csv_values(args.dir_ext) or DEFAULT_DIR_EXTENSIONS, + dir_statuses=dir_statuses, + dir_depth=args.dir_depth, + dir_all_web=args.dir_all_web, sub_dict=args.subdict, resolvers=resolvers, + max_body=args.max_body, ) scanner.run(args.port, args.noping, args.dirscan, args.subscan, args.fullscan) return scanner.results From 443a29b4119f3a0efd56eb66acfe5532262a396c Mon Sep 17 00:00:00 2001 From: asaotomo <67818638+asaotomo@users.noreply.github.com> Date: Mon, 25 May 2026 22:58:43 +0800 Subject: [PATCH 4/4] Update README.md --- README.md | 44 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 0acc013..94241f8 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -**SearchMap_V1.1.0** +**SearchMap_V1.2.0** -searchmap是一款集**域名解析、DNS记录枚举、WHOIS查询、CDN检测、纯Python端口扫描、TLS证书识别、HTTP指纹、目录扫描、子域名挖掘、结构化导出**为一体的前渗透测试综合信息收集工具。新版本继续强化**稳定性、性能和结果可靠性**,移除了对 `nmap`、IP归属地API、反查网页等外部工具/第三方数据接口的依赖,核心侦察能力尽量由本地Python网络能力完成,适合授权安全自查和资产梳理。 +searchmap是一款集**域名解析、DNS记录枚举、WHOIS查询、CDN检测、纯Python智能端口扫描、TLS证书识别、HTTP/服务指纹、智能目录扫描、子域名挖掘、结构化导出**为一体的前渗透测试综合信息收集工具。新版本继续强化**稳定性、性能和结果可靠性**,移除了对 `nmap`、IP归属地API、反查网页等外部工具/第三方数据接口的依赖,核心侦察能力尽量由本地Python网络能力完成,适合授权安全自查和资产梳理。 ![image](https://user-images.githubusercontent.com/67818638/133013451-1d3f8310-6c17-4985-b526-9d9af9e8302c.png) ## 一.功能特性 @@ -9,9 +9,9 @@ searchmap是一款集**域名解析、DNS记录枚举、WHOIS查询、CDN检测 - **DNS记录枚举**: 自动收集A、AAAA、CNAME、NS、MX、TXT、SOA、CAA等关键记录。 - **WHOIS查询**: 获取域名的详细注册信息。 - **多解析器DNS检测 (CDN识别)**: 并行查询多个公共DNS解析器,通过IP差异和CDN CNAME特征判断目标是否使用CDN或负载均衡。 -- **HTTP/TLS指纹**: 自动识别网站标题、Server、X-Powered-By、常见技术栈、安全响应头、robots/sitemap/security.txt,以及TLS版本、证书主体、颁发者、有效期和SHA256指纹。 -- **纯Python端口扫描**: 使用TCP connect扫描和Banner探测替代Nmap,不需要安装外部二进制工具。 -- **多线程目录与子域名爆破**: 高效的并发引擎,目录扫描内置软404基线过滤,子域名扫描内置泛解析过滤。 +- **HTTP/TLS/服务指纹**: 自动识别网站标题、Server、X-Powered-By、常见技术栈、安全响应头、favicon hash、robots/sitemap/security.txt,以及TLS版本、证书主体、颁发者、有效期和SHA256指纹。 +- **纯Python智能端口扫描**: 使用TCP connect扫描和Banner探测替代Nmap,内置`fast`、`smart`、`common/top1000`、`web`、`full`等端口集,并对开放端口进行服务/Web/TLS指纹识别。 +- **智能目录与子域名爆破**: 目录扫描使用连接池和限量读取提升速度,支持路径扩展、`%EXT%`扩展、robots/sitemap种子、软404基线过滤、敏感路径标签、递归扫描;子域名扫描内置泛解析过滤。 - **批量处理**: 支持从文件读取多个目标进行批量扫描。 - **日志与结构化输出**: 支持控制台日志、JSON结果和CSV发现项导出,方便归档和二次分析。 @@ -35,7 +35,7 @@ sudo apt-get install python-pip ## 三.使用方法 -> 说明:V1.1.0 输出内容会随目标、网络环境和解析器返回结果变化,旧版本终端截图已移除。以下示例以命令为准。 +> 说明:V1.2.0 输出内容会随目标、网络环境和解析器返回结果变化,旧版本终端截图已移除。以下示例以命令为准。 **1.-u 获取基础信息、DNS记录、HTTP/TLS指纹** @@ -53,13 +53,14 @@ $ python3 searchmap.py -u 123.123.123.123 $ python3 searchmap.py -u https://www.baidu.com -p ``` -默认扫描内置Top 100常见端口,并对开放端口进行服务名、Banner和TLS版本探测。 +默认使用`smart`端口集,并对开放端口进行服务名、Banner、HTTP和TLS指纹探测,最后会输出指纹汇总。 **3.--ports 自定义端口集合** ``` -# 支持 top100、web、单端口、逗号列表和端口范围 +# 支持 fast/top100、smart、common/top1000、web、full、单端口、逗号列表和端口范围 $ python3 searchmap.py -u https://www.baidu.com -p --ports web +$ python3 searchmap.py -u https://www.baidu.com -p --ports common $ python3 searchmap.py -u https://www.baidu.com -p --ports 80,443,8000-8100 ``` @@ -84,6 +85,17 @@ $ python3 searchmap.py -u https://www.baidu.com -d $ python3 searchmap.py -u https://www.baidu.com -d --dict dict/fuzz.txt ``` +智能目录扫描参数: + +``` +# fast更快,smart默认更均衡,deep会增加备份文件变体和递归扫描 +$ python3 searchmap.py -u https://www.baidu.com -d --dir-mode fast +$ python3 searchmap.py -u https://www.baidu.com -d --dir-mode deep --dir-depth 1 + +# 自定义%EXT%扩展、关注状态码和单响应最大读取字节 +$ python3 searchmap.py -u https://www.baidu.com -d --dir-ext php,asp,aspx,jsp,html,js --dir-status 200,301,302,401,403 --max-body 32768 +``` + **6.-s 对输入域名的进行子域名爆破** PS: 程序使用的默认字典为`dict/subdomain.txt`,用户可自行替换字典内容进行FUZZ。 @@ -129,8 +141,8 @@ $ python3 searchmap.py -u https://www.baidu.com -a -t 50 --timeout 3 **12.组合用法** ``` -$ python3 searchmap.py -u https://www.baidu.com -p -n -d -s --ports web -$ python3 searchmap.py -r myurl.txt -a -t 50 --timeout 3 --json-out result.json +$ python3 searchmap.py -u https://www.baidu.com -p -n -d -s --ports smart +$ python3 searchmap.py -r myurl.txt -a -t 50 --timeout 3 --dir-all-web --json-out result.json ``` @@ -162,6 +174,18 @@ $ python3 searchmap.py -r myurl.txt -a -t 50 --timeout 3 --json-out result.json ## 四.更新日志 +********* +**Version1.2.0_UpdateLog** +------------------------------------- +1. **端口扫描增强**: 默认端口集升级为`smart`,新增`fast/top100`、`common/top1000`、`web`、`full`预设,保留端口列表和范围写法。 +2. **开放端口指纹**: 对开放端口自动进行Banner、HTTP、TLS、Server、Title、技术栈和favicon hash探测,并以表格展示。 +3. **目录扫描提速**: 使用线程内连接池、HTTP keep-alive和响应体限量读取,降低大页面下载造成的拖慢。 +4. **目录扫描智能化**: 新增`--dir-mode fast|smart|deep`、`--dir-ext`、`--dir-status`、`--dir-depth`、`--dir-all-web`、`--max-body`参数。 +5. **目录发现增强**: 支持`%EXT%`自动扩展、目录斜杠变体、内置敏感路径、robots.txt和sitemap.xml种子路径。 +6. **误报过滤增强**: 优化软404判断,降低真实页面被长度相近误杀的概率。 +7. **敏感路径标签**: 对`.git`、`.env`、备份文件、数据库备份、Swagger/OpenAPI、GraphQL、后台登录、运维端点等结果自动打标签。 +8. **最终指纹汇总**: 扫描结束统一输出开放端口、Web目标、技术栈和高价值目录发现,并写入JSON/CSV结果。 + ********* **Version1.1.0_UpdateLog** -------------------------------------