diff --git a/.github/tests/setup-dockerfile.sh b/.github/tests/setup-dockerfile.sh index b8169909e..a8b8ba29e 100755 --- a/.github/tests/setup-dockerfile.sh +++ b/.github/tests/setup-dockerfile.sh @@ -11,7 +11,7 @@ RUN if [ -f /etc/debian_version ]; then \ export DEBIAN_FRONTEND=noninteractive NEEDRESTART_MODE=n && \ apt-get update && \ apt-get install -y gdb-multiarch python3-dev python3-pip python3-wheel python3-setuptools \ - git cmake gcc g++ pkg-config libglib2.0-dev gdbserver qemu-user file; \ + git cmake gcc g++ pkg-config libglib2.0-dev gdbserver qemu-user file curl wget unzip; \ fi # Install python3-full for Ubuntu 24.04 @@ -22,9 +22,10 @@ fi # Install dependencies for Fedora-based images RUN if [ -f /etc/fedora-release ]; then \ dnf install -y gdb gdb-gdbserver python3-devel python3-pip python3-wheel python3-setuptools python3-rpm \ - git cmake gcc gcc-c++ pkg-config glib2-devel qemu-user qemu-user-static file procps-ng && \ + git cmake gcc gcc-c++ pkg-config glib2-devel qemu-user qemu-user-static file procps-ng wget curl unzip && \ dnf --enablerepo='*debug*' install -y glibc-debuginfo && \ dnf clean all; \ + ln -s /usr/bin/gdb /usr/bin/gdb-multiarch; \ fi # Copy only requirements.txt for caching diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 0f5b68f26..45643851f 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -22,7 +22,7 @@ jobs: run: | export NEEDRESTART_MODE=n sudo apt-get update -qq - sudo apt-get install -qq -y gdb-multiarch python3-dev python3-pip python3-wheel python3-setuptools git cmake gcc g++ pkg-config libglib2.0-dev gdbserver qemu-user curl + sudo apt-get install -qq -y curl wget gdb-multiarch python3-dev python3-pip python3-wheel python3-setuptools git cmake gcc g++ pkg-config libglib2.0-dev gdbserver qemu-user sudo apt-get install -y python3-full - name: Run coverage @@ -34,8 +34,9 @@ jobs: echo PY_VER=`gdb -q -nx -ex "pi print('.'.join(map(str, sys.version_info[:2])))" -ex quit` >> $GITHUB_ENV echo GEF_CI_NB_CPU=`grep -c ^processor /proc/cpuinfo` >> $GITHUB_ENV echo GEF_CI_ARCH=`uname --processor` >> $GITHUB_ENV - python${{ env.PY_VER }} -m pip install --user --upgrade -r tests/requirements.txt --quiet + python${{ env.PY_VER }} -m pip install --user --upgrade -r tests/requirements.txt -r docs/requirements.txt --quiet current_score=$(curl --silent https://hugsy.github.io/gef/coverage/gef_py.html | grep pc_cov | sed 's?.*\([^%]*\)%?\1?g') + make -C tests/binaries bash scripts/generate-coverage-docs.sh new_score=$(cat docs/coverage/gef_py.html | grep pc_cov | sed 's?.*\([^%]*\)%?\1?g') score_diff=$(python -c "print(f'{${new_score} - ${current_score}:.04f}')") diff --git a/.gitignore b/.gitignore index 09f604883..2cc5a59d5 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,5 @@ htmlcov .benchmarks site/ untracked/ +.zed/ +Dockerfile diff --git a/LICENSE b/LICENSE index 11030ec99..fdee1e53e 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ The MIT License (MIT) -Copyright (c) 2013-2025 crazy rabbidz +Copyright (c) 2013-2026 crazy rabbidz Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/docs/commands/gef-remote.md b/docs/commands/gef-remote.md index dcc2b091e..1e1fd5d0e 100644 --- a/docs/commands/gef-remote.md +++ b/docs/commands/gef-remote.md @@ -1,15 +1,8 @@ ## Command `gef-remote` -[`target remote`](https://sourceware.org/gdb/onlinedocs/gdb/Remote-Debugging.html#Remote-Debugging) -is the traditional GDB way of debugging process or system remotely. However this command by itself -does a limited job (80's bandwidth FTW) to collect more information about the target, making the -process of debugging more cumbersome. GEF greatly improves that state with the `gef-remote` command. - -📝 **Note**: If using GEF, `gef-remote` **must** be your way or debugging remote processes, never -`target remote`. Maintainers will provide minimal support or help if you decide to use the -traditional `target remote` command. For many reasons, you **should not** use `target remote` alone -with GEF. It is still important to note that the default `target remote` command has been -overwritten by a minimal copy `gef-remote`, in order to make most tools relying on this command work. +> [!IMPORTANT] +> `gef-remote` is deprecated since 2026.04 in favor of `target remote`. +> The command will be removed in a future release. Do not rely on it. `gef-remote` can function in 2 ways: diff --git a/docs/compat.md b/docs/compat.md index 334aed2e6..fe5fe1f34 100644 --- a/docs/compat.md +++ b/docs/compat.md @@ -2,9 +2,9 @@ This matrix indicates the version of Python and/or GDB -| GEF version | GDB Python compatibility | Python compatibility | -| :--: | :--: | :--: | -| [2018.02](https://github.com/hugsy/gef/releases/tag/2018.02) | 7.2 | Python 2.7, Python 3.4+ | -| [2020.03](https://github.com/hugsy/gef/releases/tag/2020.03) | 7.4 | Python 2.7, Python 3.4+ | -| [2022.01](https://github.com/hugsy/gef/releases/tag/2022.01) | 8.0 | Python 3.6+ | -| [2025.01](https://github.com/hugsy/gef/releases/tag/2025.01) | 10.0 | Python 3.10+ | +| GEF version | GDB Python compatibility | Python compatibility | +| :----------------------------------------------------------: | :----------------------: | :---------------------: | +| [2018.02](https://github.com/hugsy/gef/releases/tag/2018.02) | 7.2 | Python 2.7, Python 3.4+ | +| [2020.03](https://github.com/hugsy/gef/releases/tag/2020.03) | 7.4 | Python 2.7, Python 3.4+ | +| [2022.01](https://github.com/hugsy/gef/releases/tag/2022.01) | 8.0 | Python 3.6+ | +| [2025.01](https://github.com/hugsy/gef/releases/tag/2025.01) | 10.0 | Python 3.10+ | diff --git a/gef.py b/gef.py index 12bfda529..33cb0ef69 100644 --- a/gef.py +++ b/gef.py @@ -290,13 +290,15 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: return wrapper +# +# Helpers +# class ValidationError(Exception): pass -# -# Helpers -# +class InitializationError(Exception): + pass class ObsoleteException(Exception): @@ -387,10 +389,10 @@ def is_alive() -> bool: return False -def calling_function() -> str | None: +def calling_function(frame: int = 3) -> str | None: """Return the name of the calling function""" try: - stack_info = traceback.extract_stack()[-3] + stack_info = traceback.extract_stack()[-frame] return stack_info.name except Exception as e: dbg(f"traceback failed with {str(e)}") @@ -4050,9 +4052,28 @@ def get_os() -> str: return gef.session.os -@lru_cache() -def is_qemu() -> bool: - if not is_remote_debug(): +def is_target_remote(conn: gdb.TargetConnection | None = None) -> bool: + "Returns True for `remote` only." + _conn = conn or gdb.selected_inferior().connection + return isinstance(_conn, gdb.RemoteTargetConnection) and _conn.type == "remote" + + +def is_target_extended_remote(conn: gdb.TargetConnection | None = None) -> bool: + "Returns True for `extended-remote` only." + _conn = conn or gdb.selected_inferior().connection + return ( + isinstance(_conn, gdb.RemoteTargetConnection) + and _conn.type == "extended-remote" + ) + + +def is_target_remote_or_extended(conn: gdb.TargetConnection | None = None) -> bool: + return is_target_remote(conn) or is_target_extended_remote(conn) + + +def is_running_in_qemu() -> bool: + "See https://www.qemu.org/docs/master/system/gdb.html" + if not is_target_remote(): return False response = ( gdb.execute( @@ -4063,9 +4084,8 @@ def is_qemu() -> bool: return "ENABLE=" in response -@lru_cache() -def is_qemu_usermode() -> bool: - if not is_qemu(): +def is_running_in_qemu_user() -> bool: + if not is_running_in_qemu(): return False response = ( gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or "" @@ -4073,9 +4093,8 @@ def is_qemu_usermode() -> bool: return "Text=" in response -@lru_cache() -def is_qemu_system() -> bool: - if not is_qemu(): +def is_running_in_qemu_system() -> bool: + if not is_running_in_qemu(): return False response = ( gdb.execute("maintenance packet qOffsets", to_string=True, from_tty=False) or "" @@ -4083,6 +4102,14 @@ def is_qemu_system() -> bool: return 'received: ""' in response +def is_running_in_gdbserver() -> bool: + return is_target_remote_or_extended() and not is_running_in_qemu() + + +def is_running_in_rr() -> bool: + return is_running_in_gdbserver() and os.environ.get("GDB_UNDER_RR", None) == "1" + + def is_target_coredump() -> bool: global gef if gef.session.coredump_mode is not None: @@ -4254,6 +4281,8 @@ def new_objfile_handler(evt: "gdb.NewObjFileEvent | None") -> None: path = progspace.filename or "" else: raise RuntimeError("Cannot determine file path") + + assert path try: if gef.session.root and path.startswith("target:"): # If the process is in a container, replace the "target:" prefix @@ -4308,7 +4337,7 @@ def exit_handler(_: "gdb.ExitedEvent") -> None: with bkp_fpath.open("w") as fd: for bp in list(gdb.breakpoints()): - if not bp.enabled or not bp.is_valid: + if not bp.enabled or not bp.is_valid(): continue fd.write(f"{'t' if bp.temporary else ''}break {bp.location}\n") return @@ -4526,6 +4555,7 @@ def is_in_x86_kernel(address: int) -> bool: return (address >> memalign) == 0xF +@deprecated("Use `is_target_remote()`") def is_remote_debug() -> bool: """ "Return True is the current debugging session is running through GDB remote session.""" return gef.session.remote_initializing or gef.session.remote is not None @@ -5908,7 +5938,7 @@ class PieRemoteCommand(GenericCommand): def do_invoke(self, argv: list[str]) -> None: try: - gdb.execute(f"gef-remote {' '.join(argv)}") + gdb.execute(f"target remote {' '.join(argv)}") except gdb.error as e: err(str(e)) return @@ -7118,7 +7148,12 @@ class RemoteCommand(GenericCommand): a local copy of the execution environment, including the target binary and its libraries in the local temporary directory (the value by default is in `gef.config.tempdir`). Additionally, it will fetch all the /proc/PID/maps and loads all its information. If procfs is not available remotely, the command - will likely fail. You can however still use the limited command provided by GDB `target remote`.""" + will likely fail. You can however still use the limited command provided by GDB `target remote`. + + **Important:** + As of 2026.04, the `gef-remote` is deprecated in favor of the native command `target remote` command. As it will be + removed in a future release, do not rely on it. + """ _cmdline_ = "gef-remote" _syntax_ = f"{_cmdline_} [OPTIONS] TARGET" @@ -7137,52 +7172,16 @@ def __init__(self) -> None: {"--pid": -1, "--qemu-user": False, "--qemu-binary": ""}, ) def do_invoke(self, _: list[str], **kwargs: Any) -> None: - if gef.session.remote is not None: - err( - "You already are in remote session. Close it first before opening a new one..." - ) - return - - # argument check + # for now, warn only and re-route to `target remote` + warn( + "`gef-remote` is now deprecated and will soon be removed. Use `target remote`" + ) args: argparse.Namespace = kwargs["arguments"] if not args.host or not args.port: - err("Missing parameters") + err("Missing host/port parameters") return - # qemu-user support - qemu_binary: pathlib.Path | None = None - if args.qemu_user: - try: - qemu_binary = ( - pathlib.Path(args.qemu_binary).expanduser().absolute() - if args.qemu_binary - else gef.session.file - ) - if not qemu_binary or not qemu_binary.exists(): - raise FileNotFoundError(f"{qemu_binary} does not exist") - except Exception as e: - err(f"Failed to initialize qemu-user mode, reason: {str(e)}") - return - - # Try to establish the remote session, throw on error - # Set `.remote_initializing` to True here - `GefRemoteSessionManager` invokes code which - # calls `is_remote_debug` which checks if `remote_initializing` is True or `.remote` is None - # This prevents some spurious errors being thrown during startup - gef.session.remote_initializing = True - session = GefRemoteSessionManager(args.host, args.port, args.pid, qemu_binary) - - dbg( - f"[remote] initializing remote session with {session.target} under {session.root}" - ) - if not session.connect(args.pid) or not session.setup(): - gef.session.remote = None - gef.session.remote_initializing = False - raise EnvironmentError("Failed to setup remote target") - - gef.session.remote_initializing = False - gef.session.remote = session - reset_all_caches() - gdb.execute("context") + gdb.execute(f"target remote {args.host}:{args.port}") return @@ -8419,7 +8418,7 @@ def __init__(self) -> None: def do_invoke(self, _: list[str], **kwargs: Any) -> None: args: argparse.Namespace = kwargs["arguments"] - if is_qemu_system(): + if is_running_in_qemu_system(): err("Unsupported") return @@ -12295,16 +12294,36 @@ def __parse_maps(self) -> list[Section] | None: try: return list(self.parse_gdb_info_proc_maps()) - except Exception: - pass + except Exception as e: + dbg(f"parse_gdb_info_proc_maps() failed, reason: {str(e)}") try: return list(self.parse_procfs_maps()) - except Exception: - pass + except Exception as e: + dbg(f"parse_procfs_maps() failed, reason: {str(e)}") try: return list(self.parse_monitor_info_mem()) + except Exception as e: + dbg(f"parse_monitor_info_mem() failed, reason: {str(e)}") + + try: + # as a very last resort, use a mock rwx memory layout only if a session is running + assert gef.binary and gef.session.pid + warn("Could not determine memory layout accurately, using mock layout") + fname = gef.binary.path + if is_32bit(): + page_start, page_end = 0x00000000, 0xFFFFFFFF + else: + page_start, page_end = 0x0000000000000000, 0xFFFFFFFFFFFFFFFF + return [ + Section( + page_start=page_start, + page_end=page_end, + permission=Permission.ALL, + path=str(fname), + ), + ] except Exception: pass @@ -12845,7 +12864,7 @@ def __repr__(self) -> str: def auxiliary_vector(self) -> dict[str, int] | None: if not is_alive(): return None - if is_qemu_system(): + if is_running_in_qemu_system(): return None if not self._auxiliary_vector: auxiliary_vector = {} @@ -12968,6 +12987,9 @@ class RemoteMode(enum.IntEnum): GDBSERVER = 0 QEMU = 1 RR = 2 + GDBSERVER_MULTI = 3 + QEMU_USER = 4 + QEMU_SYSTEM = 5 def __str__(self): return self.name @@ -12977,44 +12999,58 @@ def __repr__(self): def prompt_string(self) -> str: match self: - case GefRemoteSessionManager.RemoteMode.QEMU: + case ( + GefRemoteSessionManager.RemoteMode.QEMU + | GefRemoteSessionManager.RemoteMode.QEMU_USER + | GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM + ): return Color.boldify("(qemu) ") case GefRemoteSessionManager.RemoteMode.RR: return Color.boldify("(rr) ") - case GefRemoteSessionManager.RemoteMode.GDBSERVER: + case ( + GefRemoteSessionManager.RemoteMode.GDBSERVER + | GefRemoteSessionManager.RemoteMode.GDBSERVER_MULTI + ): return Color.boldify("(remote) ") raise AttributeError("Unknown value") - def __init__( - self, host: str, port: int, pid: int = -1, qemu: pathlib.Path | None = None - ) -> None: + @staticmethod + def init() -> "GefRemoteSessionManager.RemoteMode": + if is_running_in_qemu_system(): + return GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM + if is_running_in_qemu_user(): + return GefRemoteSessionManager.RemoteMode.QEMU_USER + if is_running_in_rr(): + return GefRemoteSessionManager.RemoteMode.RR + if is_running_in_gdbserver(): + if is_target_extended_remote(): + return GefRemoteSessionManager.RemoteMode.GDBSERVER_MULTI + return GefRemoteSessionManager.RemoteMode.GDBSERVER + raise AttributeError + + def __init__(self, conn: gdb.RemoteTargetConnection) -> None: super().__init__() - self.__host = host - self.__port = port + assert is_target_remote_or_extended() + remote_host = conn.details + assert remote_host + host, port = remote_host.split(":", 1) + self.__host = host or "localhost" + self.__port = int(port) self.__local_root_fd = tempfile.TemporaryDirectory() self.__local_root_path = pathlib.Path(self.__local_root_fd.name) - self.__qemu = qemu - if pid > 0: - self._pid = pid + self._mode = GefRemoteSessionManager.RemoteMode.init() - if self.__qemu is not None: - self._mode = GefRemoteSessionManager.RemoteMode.QEMU - elif os.environ.get("GDB_UNDER_RR", None) == "1": - self._mode = GefRemoteSessionManager.RemoteMode.RR - else: - self._mode = GefRemoteSessionManager.RemoteMode.GDBSERVER + self.setup() def close(self) -> None: self.__local_root_fd.cleanup() - try: - gef_on_new_unhook(self.remote_objfile_event_handler) - gef_on_new_hook(new_objfile_handler) - except Exception as e: - warn(f"Exception while restoring local context: {str(e)}") - raise def __str__(self) -> str: - return f"RemoteSession(target='{self.target}', local='{self.root}', pid={self.pid}, mode={self.mode})" + msg = f"RemoteSession(target='{self.target}', local='{self.root}', mode={self.mode}" + if self.mode == GefRemoteSessionManager.RemoteMode.GDBSERVER: + msg += f", pid={self.pid}" + msg += ")" + return msg def __repr__(self) -> str: return str(self) @@ -13057,59 +13093,25 @@ def mode(self) -> RemoteMode: return self._mode def sync(self, src: str, dst: str | None = None) -> bool: - """Copy the `src` into the temporary chroot. If `dst` is provided, that path will be - used instead of `src`.""" - if not dst: - dst = src - tgt = self.root / dst.lstrip("/") - if tgt.exists(): - return True - tgt.parent.mkdir(parents=True, exist_ok=True) - dbg(f"[remote] downloading '{src}' -> '{tgt}'") - gdb.execute(f"remote get '{src}' '{tgt.absolute()}'") - return tgt.exists() + raise DeprecationWarning def connect(self, pid: int) -> bool: - """Connect to remote target. If in extended mode, also attach to the given PID.""" - # before anything, register our new hook to download files from the remote target - dbg("[remote] Installing new objfile handlers") - try: - gef_on_new_unhook(new_objfile_handler) - except SystemError: - # the default objfile handler might already have been removed, ignore failure - pass - - gef_on_new_hook(self.remote_objfile_event_handler) - - # then attempt to connect - is_extended_mode = pid > -1 - dbg(f"[remote] Enabling extended remote: {bool(is_extended_mode)}") - try: - with DisableContextOutputContext(): - cmd = f"target {'extended-' if is_extended_mode else ''}remote {self.target}" - dbg(f"[remote] Executing '{cmd}'") - gdb.execute(cmd) - if is_extended_mode: - gdb.execute(f"attach {pid:d}") - return True - except Exception as e: - err(f"Failed to connect to {self.target}: {e}") - - # a failure will trigger the cleanup, deleting our hook anyway - return False + raise DeprecationWarning def setup(self) -> bool: # setup remote adequately depending on remote or qemu mode + info(f"Setting up remote session as '{self._mode}'") match self.mode: - case GefRemoteSessionManager.RemoteMode.QEMU: - dbg(f"Setting up as qemu session, target={self.__qemu}") - self.__setup_qemu() + case GefRemoteSessionManager.RemoteMode.QEMU_USER: + self.__setup_qemu_user() + case GefRemoteSessionManager.RemoteMode.QEMU_SYSTEM: + self.__setup_qemu_system() case GefRemoteSessionManager.RemoteMode.RR: - dbg("Setting up as rr session") self.__setup_rr() case GefRemoteSessionManager.RemoteMode.GDBSERVER: - dbg("Setting up as remote session") self.__setup_remote() + case GefRemoteSessionManager.RemoteMode.GDBSERVER_MULTI: + self.__setup_remote_multi() case _: raise ValueError @@ -13119,87 +13121,25 @@ def setup(self) -> bool: reset_architecture() return True - def __setup_qemu(self) -> bool: - # setup emulated file in the chroot - assert self.__qemu - target = self.root / str(self.__qemu.parent).lstrip("/") - target.mkdir(parents=True, exist_ok=False) - shutil.copy2(self.__qemu, target) - self._file = self.__qemu - assert self.lfile.exists() - - # create a procfs - procfs = self.root / f"proc/{self.pid}/" - procfs.mkdir(parents=True, exist_ok=True) - - ## /proc/pid/cmdline - cmdline = procfs / "cmdline" - if not cmdline.exists(): - with cmdline.open("w") as fd: - fd.write("") - - ## /proc/pid/environ - environ = procfs / "environ" - if not environ.exists(): - with environ.open("wb") as fd: - fd.write(b"PATH=/bin\x00HOME=/tmp\x00") - - ## /proc/pid/maps - maps = procfs / "maps" - if not maps.exists(): - with maps.open("w") as fd: - fname = self.file.absolute() - mem_range = ( - "00000000-ffffffff" - if is_32bit() - else "0000000000000000-ffffffffffffffff" - ) - fd.write( - f"{mem_range} rwxp 00000000 00:00 0 {fname}\n" - ) + def __setup_qemu_system(self) -> bool: + raise NotImplementedError("TODO") + + def __setup_qemu_user(self) -> bool: + self.__local_root_path = pathlib.Path("/") return True def __setup_remote(self) -> bool: - # get the file - fpath = f"/proc/{self.pid}/exe" - if not self.sync(fpath, str(self.file)): - err(f"'{fpath}' could not be fetched on the remote system.") - return False - - # pseudo procfs - for _file in ("maps", "environ", "cmdline"): - fpath = f"/proc/{self.pid}/{_file}" - if not self.sync(fpath): - err(f"'{fpath}' could not be fetched on the remote system.") - return False + self.__local_root_path = pathlib.Path("/") + return True + def __setup_remote_multi(self) -> bool: + self.__local_root_path = pathlib.Path("/") return True def __setup_rr(self) -> bool: - # - # Simply override the local root path, the binary must exist - # on the host. - # self.__local_root_path = pathlib.Path("/") return True - def remote_objfile_event_handler(self, evt: "gdb.NewObjFileEvent") -> None: - dbg( - f"[remote] in remote_objfile_handler({evt.new_objfile.filename if evt else 'None'}))" - ) - if not evt or not evt.new_objfile.filename: - return - if not evt.new_objfile.filename.startswith( - "target:" - ) and not evt.new_objfile.filename.startswith("/"): - warn(f"[remote] skipping '{evt.new_objfile.filename}'") - return - if evt.new_objfile.filename.startswith("target:"): - src: str = evt.new_objfile.filename[len("target:") :] - if not self.sync(src): - raise FileNotFoundError(f"Failed to sync '{src}'") - return - class GefUiManager(GefManager): """Class managing UI settings.""" @@ -13288,6 +13228,7 @@ class Gef: heap: GefHeapManager session: GefSessionManager gdb: GefCommand + temp: dict[str, Any] def __init__(self) -> None: self.binary: FileFormat | None = None @@ -13298,11 +13239,18 @@ def __init__(self) -> None: self.config = GefSettingsManager() self.ui = GefUiManager() self.libc = GefLibcManager() + self.temp = {} return def __str__(self) -> str: return f"Gef(binary='{self.binary or 'None'}', arch={self.arch})" + def __repr__(self) -> str: + binary = self.binary + arch = self.arch + session = self.session + return f"Gef({binary=:}, {arch=:}, {session=:})" + def reinitialize_managers(self) -> None: """Reinitialize the managers. Avoid calling this function directly, using `pi reset()` is preferred""" self.memory = GefMemoryManager() @@ -13321,20 +13269,30 @@ def setup(self) -> None: def reset_caches(self) -> None: """Recursively clean the cache of all the managers. Avoid calling this function directly, using `reset-cache` is preferred""" + self.temp.clear() for mgr in (self.memory, self.heap, self.session, self.arch): mgr.reset_caches() return +def target_remote_hook(): + # disable the context until the session has been fully established + gef.temp["context_old_value"] = gef.config["context.enable"] + gef.config["context.enable"] = False + + def target_remote_posthook(): - if gef.session.remote_initializing: - return + conn = gdb.selected_inferior().connection + if not isinstance(conn, gdb.RemoteTargetConnection): + raise TypeError("Expected type gdb.RemoteTargetConnection") + assert is_target_remote_or_extended(conn), "Target is not remote" + gef.session.remote = GefRemoteSessionManager(conn) - gef.session.remote = GefRemoteSessionManager("", 0) - if not gef.session.remote.setup(): - raise EnvironmentError( - f"Failed to create a proper environment for {gef.session.remote}" - ) + # switch back context to its old context + gef.config["context.enable"] = gef.temp.pop("context_old_value", True) + + # if here, no exception was thrown, print context + gdb.execute("context") if __name__ == "__main__": @@ -13418,36 +13376,18 @@ def target_remote_posthook(): GefTmuxSetup() - if GDB_VERSION > (9, 0): - disable_tr_overwrite_setting = "gef.disable_target_remote_overwrite" - - if not gef.config[disable_tr_overwrite_setting]: - warnmsg = ( - "Using `target remote` with GEF should work in most cases, " - "but use `gef-remote` if you can. You can disable the " - "overwrite of the `target remote` command by toggling " - f"`{disable_tr_overwrite_setting}` in the config." - ) - hook = f""" - define target hookpost-{{}} - pi target_remote_posthook() - context - pi if calling_function() != "connect": warn("{warnmsg}") - end - """ - - # Register a post-hook for `target remote` that initialize the remote session - gdb.execute(hook.format("remote")) - gdb.execute(hook.format("extended-remote")) - else: - errmsg = ( - "Using `target remote` does not work, use `gef-remote` " - f"instead. You can toggle `{disable_tr_overwrite_setting}` " - "if this is not desired." - ) - hook = f"""pi if calling_function() != "connect": err("{errmsg}")""" - gdb.execute(f"define target hook-remote\n{hook}\nend") - gdb.execute(f"define target hook-extended-remote\n{hook}\nend") + # Initialize `target *remote` pre/post hooks + hook = """ + define target hook{1}-{0} + pi target_remote_{1}hook() + end + """ + # pre-hooks + gdb.execute(hook.format("remote", "")) + gdb.execute(hook.format("extended-remote", "")) + # post-hooks + gdb.execute(hook.format("remote", "post")) + gdb.execute(hook.format("extended-remote", "post")) # restore saved breakpoints (if any) bkp_fpath = ( diff --git a/scripts/generate-coverage-docs.sh b/scripts/generate-coverage-docs.sh index f9c9120da..0e329b9f3 100644 --- a/scripts/generate-coverage-docs.sh +++ b/scripts/generate-coverage-docs.sh @@ -13,7 +13,7 @@ PY_VER=$(gdb -q -nx -ex 'pi print(f"{sys.version_info.major}.{sys.version_info.m rm -f -- "${GEF_DOCS_DIR}"/* echo "[+] Generating coverage report in '${TMPDIR_RUN}'" -COVERAGE_DIR="${TMPDIR_RUN}" python${PY_VER} -m pytest -n ${NB_CORES} "${GEF_TESTS_DIR}" -k "not benchmark" +COVERAGE_DIR="${TMPDIR_RUN}" python${PY_VER} -m pytest --forked -n ${NB_CORES} -v "${GEF_TESTS_DIR}" -m "not benchmark" echo "[+] Combining data to '${TMPDIR_COV}'" python${PY_VER} -m coverage combine --data-file=${TMPDIR_COV} "${TMPDIR_RUN}"/* diff --git a/tests/api/gef_memory.py b/tests/api/gef_memory.py index 8e490e619..e15317dfa 100644 --- a/tests/api/gef_memory.py +++ b/tests/api/gef_memory.py @@ -3,7 +3,6 @@ """ import pathlib -import random import pytest from tests.base import RemoteGefUnitTestGeneric @@ -13,8 +12,8 @@ IN_GITHUB_ACTIONS, debug_target, gdbserver_session, + get_random_port, qemuuser_session, - GDBSERVER_DEFAULT_HOST, ) @@ -122,18 +121,15 @@ def test_func_parse_maps_local_procfs(self): @pytest.mark.slow def test_func_parse_maps_remote_gdbserver(self): gef, gdb = self._gef, self._gdb - # When in a gef-remote session `parse_gdb_info_proc_maps` should work to + # When in a remote session `parse_gdb_info_proc_maps` should work to # query the memory maps - while True: - port = random.randint(1025, 65535) - if port != self._port: - break + port = get_random_port() with pytest.raises(Exception): - gdb.execute(f"gef-remote {GDBSERVER_DEFAULT_HOST} {port}") + gdb.execute(f"target remote :{port}") with gdbserver_session(port=port) as _: - gdb.execute(f"gef-remote {GDBSERVER_DEFAULT_HOST} {port}") + gdb.execute(f"target remote :{port}") sections = gef.memory.maps assert len(sections) > 0 @@ -142,15 +138,10 @@ def test_func_parse_maps_remote_gdbserver(self): ) def test_func_parse_maps_remote_qemu(self): gdb, gef = self._gdb, self._gef - # When in a gef-remote qemu-user session `parse_gdb_info_proc_maps` - # should work to query the memory maps - while True: - port = random.randint(1025, 65535) - if port != self._port: - break + port = get_random_port() with qemuuser_session(port=port) as _: - cmd = f"gef-remote --qemu-user --qemu-binary {self._target} {GDBSERVER_DEFAULT_HOST} {port}" + cmd = f"target remote :{port}" gdb.execute(cmd) sections = gef.memory.maps assert len(sections) > 0 @@ -160,15 +151,10 @@ def test_func_parse_maps_remote_qemu(self): ) def test_func_parse_maps_realpath(self): gef, gdb = self._gef, self._gdb - # When in a gef-remote session `parse_gdb_info_proc_maps` should work to - # query the memory maps - while True: - port = random.randint(1025, 65535) - if port != self._port: - break + port = get_random_port() with gdbserver_session(port=port) as _: - gdb.execute(f"gef-remote {GDBSERVER_DEFAULT_HOST} {port}") + gdb.execute(f"target remote :{port}") gdb.execute("b main") gdb.execute("continue") sections = gef.memory.maps diff --git a/tests/api/gef_remote.py b/tests/api/gef_remote.py new file mode 100644 index 000000000..f71a906c3 --- /dev/null +++ b/tests/api/gef_remote.py @@ -0,0 +1,119 @@ +""" +`target remote/extended-remote` test module. +""" + +import pytest + +from tests.base import RemoteGefUnitTestGeneric +from tests.utils import ( + ARCH, + OS, + debug_target, + gdbserver_multi_session, + gdbserver_session, + get_random_port, + qemuuser_session, +) + + +@pytest.mark.skipif( + OS != "ubuntu" or ARCH != "x86_64", + reason=f"Skipped for {OS} on CI", +) +class GefRemoteApi(RemoteGefUnitTestGeneric): + def setUp(self) -> None: + self._target = debug_target("default") + return super().setUp() + + def test_gef_remote_test_gdbserver(self): + """Test `gdbserver file`""" + _gdb = self._gdb + _gef = self._gef + _root = self._conn.root + port = get_random_port() + + with gdbserver_session(port=port): + assert not _root.eval("is_target_remote()") + assert not _root.eval("is_target_remote_or_extended()") + assert not _root.eval("is_running_in_gdbserver()") + assert not _root.eval("is_running_in_rr()") + assert not _root.eval("is_running_in_qemu()") + + _gdb.execute(f"target remote :{port}") + + assert _root.eval("is_target_remote()") + assert _root.eval("is_target_remote_or_extended()") + assert _root.eval("is_running_in_gdbserver()") + + assert not _root.eval("is_target_extended_remote()") + assert not _root.eval("is_running_in_qemu()") + assert not _root.eval("is_running_in_qemu_system()") + assert not _root.eval("is_running_in_qemu_user()") + assert not _root.eval("is_running_in_rr()") + + assert hasattr(_gef.session, "remote") + assert "GDBSERVER" in str(_gef.session.remote) + assert "GDBSERVER_MULTI" not in str(_gef.session.remote) + + def test_gef_remote_test_gdbserver_multi(self): + """Test `gdbserver --multi file`""" + _gdb = self._gdb + _gef = self._gef + _root = self._conn.root + port = get_random_port() + + with gdbserver_multi_session(port=port): + assert not _root.eval("is_target_remote()") + assert not _root.eval("is_target_remote_or_extended()") + assert not _root.eval("is_running_in_gdbserver()") + assert not _root.eval("is_running_in_rr()") + assert not _root.eval("is_running_in_qemu()") + + _gdb.execute(f"target extended-remote :{port}") + _gdb.execute(f"set remote exec-file {self._target}") + _gdb.execute(f"file {self._target}") + _gdb.execute(f"start {self._target}") + + assert _root.eval("is_target_remote_or_extended()") + assert _root.eval("is_target_extended_remote()") + assert _root.eval("is_running_in_gdbserver()") + assert not _root.eval("is_target_remote()") + + assert not _root.eval("is_running_in_qemu()") + assert not _root.eval("is_running_in_qemu_system()") + assert not _root.eval("is_running_in_qemu_user()") + assert not _root.eval("is_running_in_rr()") + + assert hasattr(_gef.session, "remote") + assert "GDBSERVER_MULTI" in str(_gef.session.remote) + + def test_gef_remote_test_qemuuser(self): + """Test `qemu-user -g`""" + _gdb = self._gdb + _gef = self._gef + _root = self._conn.root + port = get_random_port() + + with qemuuser_session(port=port): + assert not _root.eval("is_target_remote()") + assert not _root.eval("is_target_remote_or_extended()") + assert not _root.eval("is_running_in_gdbserver()") + + _gdb.execute(f"target remote :{port}") + + assert _root.eval("is_target_remote()") + assert _root.eval("is_target_remote_or_extended()") + assert _root.eval("is_running_in_qemu()") + assert _root.eval("is_running_in_qemu_user()") + + assert not _root.eval("is_target_extended_remote()") + assert not _root.eval("is_running_in_qemu_system()") + assert not _root.eval("is_running_in_gdbserver()") + assert not _root.eval("is_running_in_rr()") + + assert hasattr(_gef.session, "remote") + assert "QEMU_USER" in str(_gef.session.remote) + + # TODO add tests for + # - [ ] qemu-system + # - [ ] rr diff --git a/tests/api/gef_session.py b/tests/api/gef_session.py index a24c04370..87d090c7c 100644 --- a/tests/api/gef_session.py +++ b/tests/api/gef_session.py @@ -4,7 +4,6 @@ import os import pathlib -import random import re import pytest @@ -14,8 +13,8 @@ ARCH, debug_target, gdbserver_session, + get_random_port, qemuuser_session, - GDBSERVER_DEFAULT_HOST, ) @@ -63,12 +62,11 @@ def test_root_dir_local(self): def test_root_dir_remote(self): gdb = self._gdb gdb.execute("start") - expected = os.stat("/") - host = GDBSERVER_DEFAULT_HOST - port = random.randint(1025, 65535) + port = get_random_port() + with gdbserver_session(port=port): - gdb.execute(f"gef-remote {host} {port}") + gdb.execute(f"target remote :{port}") result = self._conn.root.eval("os.stat(gef.session.root)") assert (expected.st_dev == result.st_dev) and ( expected.st_ino == result.st_ino @@ -77,11 +75,8 @@ def test_root_dir_remote(self): @pytest.mark.skipif(ARCH not in ("x86_64",), reason=f"Skipped for {ARCH}") def test_root_dir_qemu(self): gdb, gef = self._gdb, self._gef + port = get_random_port() - host = GDBSERVER_DEFAULT_HOST - port = random.randint(1025, 65535) with qemuuser_session(port=port): - gdb.execute( - f"gef-remote --qemu-user --qemu-binary {self._target} {host} {port}" - ) + gdb.execute(f"target remote :{port}") assert re.search(r"\/proc\/[0-9]+/root", str(gef.session.root)) diff --git a/tests/base.py b/tests/base.py index b4003ecc5..e2dd18505 100644 --- a/tests/base.py +++ b/tests/base.py @@ -1,6 +1,5 @@ import os import pathlib -import random import re import subprocess import tempfile @@ -10,7 +9,7 @@ import rpyc -from .utils import debug_target +from .utils import debug_target, get_random_port, which COVERAGE_DIR = os.getenv("COVERAGE_DIR", "") GEF_PATH = pathlib.Path(os.getenv("GEF_PATH", "gef.py")).absolute() @@ -19,6 +18,8 @@ RPYC_PORT = 18812 RPYC_SPAWN_TIME = 1.0 RPYC_MAX_REMOTE_CONNECTION_ATTEMPTS = 5 +GDB_BINARY_PATH = which("gdb-multiarch") +RPYC_CONNECT_FAILURE_DELAY = 0.2 class RemoteGefUnitTestGeneric(unittest.TestCase): @@ -28,6 +29,7 @@ class RemoteGefUnitTestGeneric(unittest.TestCase): """ def setUp(self) -> None: + self._gdb_path = GDB_BINARY_PATH attempt = RPYC_MAX_REMOTE_CONNECTION_ATTEMPTS while True: try: @@ -41,7 +43,7 @@ def setUp(self) -> None: attempt -= 1 if attempt == 0: raise - time.sleep(0.2) + time.sleep(RPYC_CONNECT_FAILURE_DELAY) continue self._gdb = self._conn.root.gdb @@ -58,7 +60,7 @@ def __setup(self): # # Select a random tcp port for rpyc # - self._port = random.randint(1025, 65535) + self._port = get_random_port() self._commands = "" if COVERAGE_DIR: @@ -84,7 +86,7 @@ def __setup(self): self._initfile.write(self._commands) self._initfile.flush() self._command = [ - "gdb", + GDB_BINARY_PATH, "-q", "-nx", "-ex", @@ -92,6 +94,7 @@ def __setup(self): "--", str(self._target.absolute()), ] + self._process = subprocess.Popen(self._command) assert self._process.pid > 0 time.sleep(RPYC_SPAWN_TIME) diff --git a/tests/commands/gef_remote.py b/tests/commands/gef_remote.py index 734c0a8b1..561e638a6 100644 --- a/tests/commands/gef_remote.py +++ b/tests/commands/gef_remote.py @@ -2,17 +2,16 @@ `gef_remote` command test module """ -import random - import pytest from tests.base import RemoteGefUnitTestGeneric from tests.utils import ( ARCH, + GDBSERVER_DEFAULT_HOST, debug_target, gdbserver_session, + get_random_port, qemuuser_session, - GDBSERVER_DEFAULT_HOST, ) @@ -26,54 +25,44 @@ def setUp(self) -> None: def test_cmd_gef_remote_gdbserver(self): gdb = self._gdb gef = self._gef - root = self._conn.root + port = get_random_port() gdbserver_mode = "GDBSERVER" - while True: - port = random.randint(1025, 65535) - if port != self._port: - break with gdbserver_session(port=port): - gdb.execute(f"gef-remote {GDBSERVER_DEFAULT_HOST} {port}") - res: str = root.eval("str(gef.session.remote)") + gdb.execute(f"target remote {GDBSERVER_DEFAULT_HOST}:{port}") + res: str = str(gef.session.remote) assert res.startswith( - f"RemoteSession(target='{GDBSERVER_DEFAULT_HOST}:{port}', local='/tmp/" + f"RemoteSession(target='{GDBSERVER_DEFAULT_HOST}:{port}', local='/" ) - assert res.endswith(f"pid={gef.session.pid}, mode={gdbserver_mode})") + assert res.endswith(f"mode={gdbserver_mode}, pid={gef.session.pid})") @pytest.mark.slow @pytest.mark.skipif(ARCH not in ("x86_64",), reason=f"Skipped for {ARCH}") def test_cmd_gef_remote_qemu_user(self): gdb = self._gdb gef = self._gef - root = self._conn.root - qemu_mode = "QEMU" - while True: - port = random.randint(1025, 65535) - if port != self._port: - break + qemu_mode = "QEMU_USER" + port = get_random_port() with qemuuser_session(port=port): - cmd = f"gef-remote --qemu-user --qemu-binary {self._target} {GDBSERVER_DEFAULT_HOST} {port}" + cmd = f"target remote {GDBSERVER_DEFAULT_HOST}:{port}" gdb.execute(cmd) - res = root.eval("str(gef.session.remote)") + res = str(gef.session.remote) assert res.startswith( - f"RemoteSession(target='{GDBSERVER_DEFAULT_HOST}:{port}', local='/tmp/" + f"RemoteSession(target='{GDBSERVER_DEFAULT_HOST}:{port}', local='/" ) - assert res.endswith(f"pid={gef.session.pid}, mode={qemu_mode})") + assert res.endswith(f"mode={qemu_mode})") def test_cmd_target_remote(self): gdb = self._gdb gef = self._gef - root = self._conn.root gdbserver_mode = "GDBSERVER" - while True: - port = random.randint(1025, 65535) - if port != self._port: - break + port = get_random_port() with gdbserver_session(port=port) as _: gdb.execute(f"target remote {GDBSERVER_DEFAULT_HOST}:{port}") - res: str = root.eval("str(gef.session.remote)") - assert res.startswith("RemoteSession(target=':0', local='/tmp/") - assert res.endswith(f"pid={gef.session.pid}, mode={gdbserver_mode})") + res: str = str(gef.session.remote) + assert res.startswith( + f"RemoteSession(target='{GDBSERVER_DEFAULT_HOST}:{port}', local='/" + ) + assert res.endswith(f"mode={gdbserver_mode}, pid={gef.session.pid})") diff --git a/tests/regressions/1131_target_remote_registers.py b/tests/regressions/1131_target_remote_registers.py new file mode 100644 index 000000000..fecf78888 --- /dev/null +++ b/tests/regressions/1131_target_remote_registers.py @@ -0,0 +1,75 @@ +import os +import pathlib +import tempfile + +import pytest + +from tests.base import RemoteGefUnitTestGeneric +from tests.utils import OS, ARCH, get_random_port, qemuuser_session + +URL = "https://github.com/user-attachments/files/16913262/repr.zip" + + +# +# gdb-multiarch acts weird on arm64, and simply doesn't exist on fedora (ubuntu only), so we skip this test on arm64 on CI. +# It cant either be run locally on arm64 if gdb-multiarch is available. We might want to revisit later, for now just skip. +# + + +@pytest.mark.slow +@pytest.mark.skipif( + OS != "ubuntu" or ARCH != "x86_64", + reason=f"Skipped for {OS} on CI", +) +class MissingTargetRemoteRegisters(RemoteGefUnitTestGeneric): + """@ref https://github.com/hugsy/gef/pull/1131""" + + def setUp(self) -> None: + repro_script = f""" + wget -O {{0}}/repr.zip {URL} + unzip {{0}}/repr.zip -d {{0}} + """ + + self._tempdir = tempfile.TemporaryDirectory(prefix="gef-tests-") + self._tempdir_path = pathlib.Path(self._tempdir.name) + os.system(repro_script.format(self._tempdir_path)) + self._current_dir = self._tempdir_path / "repr" + self._current_dir.mkdir(parents=True, exist_ok=True) + self._previous_cwd = os.getcwd() + os.chdir(self._current_dir) + self._target = self._current_dir / "chal" + # self._target.mkdir(exist_ok=True) + return super().setUp() + + def tearDown(self) -> None: + # Restore the original working directory if it was saved + previous_cwd = getattr(self, "_previous_cwd", None) + if previous_cwd is not None: + os.chdir(previous_cwd) + + # Ensure the temporary directory is cleaned up + tempdir = getattr(self, "_tempdir", None) + if tempdir is not None: + tempdir.cleanup() + + return super().tearDown() + + def test_target_remote_validate_post_hook_registers_display(self): + _gdb = self._gdb + _gef = self._gef + port = get_random_port() + + # cmd: ./qemu-mipsel-static -g 1234 -L ./target ./chal + with qemuuser_session( + exe=self._target, + port=port, + qemu_exe=self._current_dir / "qemu-mipsel-static", + args=["-L", str(self._current_dir / "target")], + ): + _gdb.execute(f"target remote :{port}") + + res = str(_gef.session.remote) + assert ( + f"RemoteSession(target='localhost:{port}', local='/', mode=QEMU_USER)" + in res + ) diff --git a/tests/regressions/gdbserver_connection.py b/tests/regressions/gdbserver_connection.py index 3b4200369..dd0ee558d 100644 --- a/tests/regressions/gdbserver_connection.py +++ b/tests/regressions/gdbserver_connection.py @@ -16,8 +16,8 @@ def test_can_establish_connection_to_gdbserver_again_after_disconnect(self): gdb = self._gdb with gdbserver_session(port=5001) as _, gdbserver_session(port=5002) as _: - gdb.execute("gef-remote 127.0.0.1 5001") + gdb.execute("target remote :5001") gdb.execute("detach") - gdb.execute("gef-remote 127.0.0.1 5002") + gdb.execute("target remote :5002") gdb.execute("continue") diff --git a/tests/utils.py b/tests/utils.py index 3325a7348..449aabaf0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -8,25 +8,40 @@ import os import pathlib import platform +import shutil import struct import subprocess import tempfile import time +import random + from typing import Iterable, List, Optional, Union from urllib.request import urlopen def which(program: str) -> pathlib.Path: - for path in os.environ["PATH"].split(os.pathsep): - dirname = pathlib.Path(path) - fpath = dirname / program - if os.access(fpath, os.X_OK): - return fpath - raise FileNotFoundError(f"Missing file `{program}`") + fpath = shutil.which(program) + if not fpath: + raise FileNotFoundError(f"Missing file `{program}`") + return pathlib.Path(fpath) + + +def os_release() -> str: + return ( + [ + x + for x in pathlib.Path("/etc/os-release").read_text().splitlines() + if x.startswith("ID=") + ][0] + .strip() + .replace("ID=", "") + .replace('"', "") + ) TMPDIR = pathlib.Path(tempfile.gettempdir()) ARCH = (os.getenv("GEF_CI_ARCH") or platform.machine()).lower() +OS = (os.getenv("GEF_CI_OS") or os_release()).lower() BIN_SH = pathlib.Path("/bin/sh") CI_VALID_ARCHITECTURES_32B = ("i686", "armv7l") CI_VALID_ARCHITECTURES_64B = ("x86_64", "aarch64", "mips64el", "ppc64le", "riscv64") @@ -40,7 +55,8 @@ def which(program: str) -> pathlib.Path: STRIP_ANSI_DEFAULT = True GDBSERVER_DEFAULT_HOST = "localhost" GDBSERVER_DEFAULT_PORT = 1234 -GDBSERVER_BINARY = which("gdbserver") +GDBSERVER_BINARY: pathlib.Path = which("gdbserver") +GDBSERVER_STARTUP_DELAY_SEC: float = 0.5 assert GDBSERVER_BINARY.exists() QEMU_USER_X64_BINARY = which("qemu-x86_64") @@ -122,6 +138,15 @@ def start_gdbserver( return subprocess.Popen(cmd) +def start_gdbserver_multi( + host: str = GDBSERVER_DEFAULT_HOST, + port: int = GDBSERVER_DEFAULT_PORT, +) -> subprocess.Popen: + cmd = [GDBSERVER_BINARY, "--multi", f"{host}:{port}"] + logging.debug(f"Starting {cmd}") + return subprocess.Popen(cmd) + + def stop_gdbserver(gdbserver: subprocess.Popen) -> None: """Stop the gdbserver and wait until it is terminated if it was still running. Needed to make the used port available again. @@ -142,7 +167,20 @@ def gdbserver_session( ): sess = start_gdbserver(exe, host, port) try: - time.sleep(1) # forced delay to allow gdbserver to start listening + time.sleep(GDBSERVER_STARTUP_DELAY_SEC) + yield sess + finally: + stop_gdbserver(sess) + + +@contextlib.contextmanager +def gdbserver_multi_session( + port: int = GDBSERVER_DEFAULT_PORT, + host: str = GDBSERVER_DEFAULT_HOST, +): + sess = start_gdbserver_multi(host, port) + try: + time.sleep(GDBSERVER_STARTUP_DELAY_SEC) yield sess finally: stop_gdbserver(sess) @@ -151,9 +189,16 @@ def gdbserver_session( def start_qemuuser( exe: Union[str, pathlib.Path] = debug_target("default"), port: int = GDBSERVER_DEFAULT_PORT, + qemu_exe: pathlib.Path = QEMU_USER_X64_BINARY, + args: list[str] | None = None, ) -> subprocess.Popen: + cmd = [qemu_exe, "-g", str(port)] + if args: + cmd.extend(args) + cmd.append(exe) + logging.info(f"Starting '{cmd}'") return subprocess.Popen( - [QEMU_USER_X64_BINARY, "-g", str(port), exe], + cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) @@ -168,9 +213,19 @@ def stop_qemuuser(process: subprocess.Popen) -> None: @contextlib.contextmanager def qemuuser_session(*args, **kwargs): exe = kwargs.get("exe", "") or debug_target("default") - port = kwargs.get("port", 0) or GDBSERVER_DEFAULT_PORT - sess = start_qemuuser(exe, port) + port = kwargs.get("port", GDBSERVER_DEFAULT_PORT) + qemu_exe = kwargs.get("qemu_exe", None) or QEMU_USER_X64_BINARY + args = kwargs.get("args", None) + if args: + # if specified, expect a list of strings + assert isinstance(args, list) + assert len(args) + for arg in args: + assert isinstance(arg, str) + + sess = start_qemuuser(exe, port=port, qemu_exe=qemu_exe, args=args) try: + time.sleep(GDBSERVER_STARTUP_DELAY_SEC) yield sess finally: stop_qemuuser(sess) @@ -310,3 +365,16 @@ def p32(x: int) -> bytes: def p64(x: int) -> bytes: return struct.pack(" int: + global __available_ports + if len(__available_ports) < 2: + __available_ports = list(range(1024, 65535)) + idx = random.choice(range(len(__available_ports))) + port = __available_ports[idx] + __available_ports.pop(idx) + return port