From 4cd5043f79859e9ab1ee9a64772210340ce2651f Mon Sep 17 00:00:00 2001 From: John Sirois Date: Thu, 6 Jun 2024 08:35:23 -0700 Subject: [PATCH 1/8] Improve test_gevent_monkeypatch robustness. Previously the test could hang when the log fifo was closed by the gunicorn server before the port was written to the log; now it fails. Follow up to #2417 which introduced the test and led to observed hangs in CI. --- tests/integration/test_issue_2415.py | 60 +++++++++------------------- 1 file changed, 19 insertions(+), 41 deletions(-) diff --git a/tests/integration/test_issue_2415.py b/tests/integration/test_issue_2415.py index de6ef27e8..b5092c218 100644 --- a/tests/integration/test_issue_2415.py +++ b/tests/integration/test_issue_2415.py @@ -5,9 +5,7 @@ import os.path import re import subprocess -import threading from textwrap import dedent -from threading import Event from typing import Optional import pytest @@ -20,10 +18,6 @@ if TYPE_CHECKING: from typing import Any - import attr # vendor:skip -else: - from pex.third_party import attr - @pytest.mark.skipif( IS_PYPY or PY_VER < (3, 8) or PY_VER >= (3, 13), @@ -96,48 +90,32 @@ def hello_world(): log = os.path.join(str(tmpdir), "log") os.mkfifo(log) - @attr.s - class LogScanner(object): - port_seen = attr.ib(factory=Event, init=False) # type: Event - _port = attr.ib(default=None) # type: Optional[int] - - def scan_log(self): - # type: () -> None - - with open(log) as log_fp: - for line in log_fp: - if self._port is None: - match = re.search(r"Listening at: http://127.0.0.1:(?P\d{1,5})", line) - if match: - self._port = int(match.group("port")) - self.port_seen.set() - - @property - def port(self): - # type: () -> int - self.port_seen.wait() - assert self._port is not None - return self._port - - log_scanner = LogScanner() - log_scan_thread = threading.Thread(target=log_scanner.scan_log) - log_scan_thread.daemon = True - log_scan_thread.start() - with open(os.path.join(str(tmpdir), "stderr"), "wb+") as stderr_fp: gunicorn = subprocess.Popen( args=[pex, "--bind", "127.0.0.1:0", "--log-file", log], stderr=stderr_fp ) atexit.register(gunicorn.kill) - with URLFetcher().get_body_stream( - "http://127.0.0.1:{port}".format(port=log_scanner.port) - ) as http_fp: - assert b"Hello, World!" == http_fp.read().strip() + # N.B.: Since the fifo is blocking, we can only open it now, after the server is launched + # in the background where it too is blocked waiting to write to the log. + with open(log) as log_fp: + port = None # type: Optional[int] + for line in log_fp: + match = re.search(r"Listening at: http://127.0.0.1:(?P\d{1,5})", line) + if match: + port = int(match.group("port")) + break + assert port is not None, "Failed to determine port from gunicorn log at {log}".format( + log=log + ) + + with URLFetcher().get_body_stream( + "http://127.0.0.1:{port}".format(port=port) + ) as http_fp: + assert b"Hello, World!" == http_fp.read().strip() + + gunicorn.kill() - gunicorn.kill() - log_scan_thread.join() - stderr_fp.flush() stderr_fp.seek(0) stderr = stderr_fp.read() assert b"MonkeyPatchWarning: Monkey-patching ssl after ssl " not in stderr, stderr.decode( From 76a710f062ab77ab4d6dfc2b3a055b8735747754 Mon Sep 17 00:00:00 2001 From: John Sirois Date: Thu, 6 Jun 2024 11:57:53 -0700 Subject: [PATCH 2/8] Switch gunicorn comms to a unix socket. Polling for a unix socket to be opened by gunicorn has proven to be both fully robust via extended local testing and helps the test run a few seconds faster. Also switch gevent monkey-patching to being handled by gunicorn, which appears to be how this is actually done in production setups. With lazy imports reverted, the test fails as expected; so the monkey patching by gunicorn is confirmed to be happening. --- pex/compatibility.py | 6 +++ pex/fetcher.py | 71 ++++++++++++++++++++++++- testing/data/locks/issue-2415.lock.json | 2 +- tests/integration/test_issue_2415.py | 57 +++++++++----------- 4 files changed, 103 insertions(+), 33 deletions(-) diff --git a/pex/compatibility.py b/pex/compatibility.py index d89c7391b..80b8610df 100644 --- a/pex/compatibility.py +++ b/pex/compatibility.py @@ -120,10 +120,13 @@ def exec_function(ast, globals_map): if PY3: + from http.client import HTTPConnection as HTTPConnection + from http.client import HTTPResponse as HTTPResponse from urllib import parse as _url_parse from urllib.error import HTTPError as HTTPError from urllib.parse import quote as _url_quote from urllib.parse import unquote as _url_unquote + from urllib.request import AbstractHTTPHandler as AbstractHTTPHandler from urllib.request import FileHandler as FileHandler from urllib.request import HTTPBasicAuthHandler as HTTPBasicAuthHandler from urllib.request import HTTPDigestAuthHandler as HTTPDigestAuthHandler @@ -137,6 +140,9 @@ def exec_function(ast, globals_map): from urllib import unquote as _url_unquote import urlparse as _url_parse + from httplib import HTTPConnection as HTTPConnection + from httplib import HTTPResponse as HTTPResponse + from urllib2 import AbstractHTTPHandler as AbstractHTTPHandler from urllib2 import FileHandler as FileHandler from urllib2 import HTTPBasicAuthHandler as HTTPBasicAuthHandler from urllib2 import HTTPDigestAuthHandler as HTTPDigestAuthHandler diff --git a/pex/fetcher.py b/pex/fetcher.py index b1614a6d2..97004fe4f 100644 --- a/pex/fetcher.py +++ b/pex/fetcher.py @@ -5,6 +5,7 @@ import contextlib import os +import socket import sys import threading import time @@ -13,16 +14,21 @@ from pex import asserts from pex.auth import PasswordDatabase, PasswordEntry from pex.compatibility import ( + PY2, + AbstractHTTPHandler, FileHandler, HTTPBasicAuthHandler, + HTTPConnection, HTTPDigestAuthHandler, HTTPError, HTTPPasswordMgrWithDefaultRealm, + HTTPResponse, HTTPSHandler, ProxyHandler, Request, build_opener, in_main_thread, + urlparse, ) from pex.network_configuration import NetworkConfiguration from pex.typing import TYPE_CHECKING, cast @@ -30,7 +36,7 @@ if TYPE_CHECKING: from ssl import SSLContext - from typing import BinaryIO, Dict, Iterable, Iterator, Mapping, Optional, Text + from typing import Any, BinaryIO, Dict, Iterable, Iterator, Mapping, Optional, Text import attr # vendor:skip else: @@ -148,6 +154,68 @@ def initialize_ssl_context(network_configuration=None): initialize_ssl_context() +class UnixHTTPConnection(HTTPConnection): + def __init__( + self, + *args, # type: Any + **kwargs # type: Any + ): + # type: (...) -> None + path = kwargs.pop("path") + super(UnixHTTPConnection, self).__init__(*args, **kwargs) + self.path = path + + def connect(self): + # type: () -> None + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(self.path) + self.sock = sock + + +class UnixHTTPHandler(AbstractHTTPHandler): + # N.B.: The naming scheme here is _; thus `unix` captures unix:// URLs and + # `open` captures the open event for unix:// URLs. + def unix_open(self, req): + # type: (Request) -> HTTPResponse + url_info = urlparse.urlparse(req.get_full_url()) + + path = "" + unix_socket_path = url_info.path + while not os.path.basename(unix_socket_path).endswith(".sock"): + path = os.path.join(path, os.path.basename(unix_socket_path)) + new_unix_socket_path = os.path.dirname(unix_socket_path) + if new_unix_socket_path == unix_socket_path: + # There was no *.sock component, so just use the full path. + path = "" + unix_socket_path = url_info.path + break + unix_socket_path = new_unix_socket_path + + # :///;?# + url = urlparse.urlunparse( + ("unix", "localhost", path, url_info.params, url_info.query, url_info.fragment) + ) + kwargs = {} if PY2 else {"method": req.method} + modified_req = Request( + url, + data=req.data, + headers=req.headers, + # N.B.: MyPy for Python 2.7 needs the cast. + origin_req_host=cast(str, req.origin_req_host), + unverifiable=req.unverifiable, + **kwargs + ) + + # The stdlib actually sets timeout this way - it is not a constructor argument in any + # Python version. + modified_req.timeout = req.timeout + + # N.B.: MyPy for Python 2.7 needs the cast. + return cast( + HTTPResponse, self.do_open(UnixHTTPConnection, modified_req, path=unix_socket_path) + ) + + class URLFetcher(object): USER_AGENT = "pex/{version}".format(version=__version__) @@ -171,6 +239,7 @@ def __init__( handlers = [ ProxyHandler(proxies), HTTPSHandler(context=get_ssl_context(network_configuration=network_configuration)), + UnixHTTPHandler(), ] if handle_file_urls: handlers.append(FileHandler()) diff --git a/testing/data/locks/issue-2415.lock.json b/testing/data/locks/issue-2415.lock.json index ce9d2d6c4..59fc6d3bc 100644 --- a/testing/data/locks/issue-2415.lock.json +++ b/testing/data/locks/issue-2415.lock.json @@ -1491,7 +1491,7 @@ "requirements": [ "flask", "gevent>=1.3.4", - "gunicorn" + "gunicorn[gevent]" ], "requires_python": [ "<3.13,>=3.8" diff --git a/tests/integration/test_issue_2415.py b/tests/integration/test_issue_2415.py index b5092c218..c4e074e7f 100644 --- a/tests/integration/test_issue_2415.py +++ b/tests/integration/test_issue_2415.py @@ -3,10 +3,9 @@ import atexit import os.path -import re import subprocess +import time from textwrap import dedent -from typing import Optional import pytest @@ -33,18 +32,15 @@ def test_gevent_monkeypatch(tmpdir): app_fp.write( dedent( """\ - from gevent import monkey - monkey.patch_all() - from flask import Flask app = Flask(__name__) - @app.route("/") - def hello_world(): - return "Hello, World!" + @app.route("/") + def hello_world(username): + return "Hello, {}!".format(username) """ ) ) @@ -61,10 +57,11 @@ def hello_world(): # --pip-version latest \ # --style universal \ # --interpreter-constraint ">=3.8,<3.13" \ + # --no-build \ # --indent 2 \ # flask \ # "gevent>=1.3.4" \ - # gunicorn + # gunicorn[gevent] lock = data.path("locks", "issue-2415.lock.json") run_pex_command( @@ -80,41 +77,39 @@ def hello_world(): "-c", "gunicorn", "--inject-args", - "app:app", + "--worker-class gevent app:app", "-o", pex, ], cwd=str(tmpdir), ).assert_success() - log = os.path.join(str(tmpdir), "log") - os.mkfifo(log) - + socket = os.path.join(str(tmpdir), "socket.sock") with open(os.path.join(str(tmpdir), "stderr"), "wb+") as stderr_fp: gunicorn = subprocess.Popen( - args=[pex, "--bind", "127.0.0.1:0", "--log-file", log], stderr=stderr_fp + args=[pex, "--bind", "unix:{socket}".format(socket=socket)], stderr=stderr_fp ) atexit.register(gunicorn.kill) - # N.B.: Since the fifo is blocking, we can only open it now, after the server is launched - # in the background where it too is blocked waiting to write to the log. - with open(log) as log_fp: - port = None # type: Optional[int] - for line in log_fp: - match = re.search(r"Listening at: http://127.0.0.1:(?P\d{1,5})", line) - if match: - port = int(match.group("port")) - break - assert port is not None, "Failed to determine port from gunicorn log at {log}".format( - log=log + start = time.time() + while not os.path.exists(socket): + if time.time() - start > 30: + break + # Local testing on an unloaded system shows gunicorn takes about a second to start up. + time.sleep(1.0) + assert os.path.exists(socket), ( + "Timed out after waiting {time:.3f}s for gunicorn to start and open a unix socket at " + "{socket}".format(time=time.time() - start, socket=socket) + ) + print( + "Waited {time:.3f}s for gunicorn to start and open a unix socket at {socket}".format( + time=time.time() - start, socket=socket ) + ) - with URLFetcher().get_body_stream( - "http://127.0.0.1:{port}".format(port=port) - ) as http_fp: - assert b"Hello, World!" == http_fp.read().strip() - - gunicorn.kill() + with URLFetcher().get_body_stream("unix://{socket}/World".format(socket=socket)) as http_fp: + assert b"Hello, World!" == http_fp.read().strip() + gunicorn.kill() stderr_fp.seek(0) stderr = stderr_fp.read() From 5c297795f97b3ef91b973e04ece73860ed913989 Mon Sep 17 00:00:00 2001 From: John Sirois Date: Thu, 6 Jun 2024 12:36:37 -0700 Subject: [PATCH 3/8] Request.method can be undefined, use getter. The stdlib is wild here. --- pex/fetcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pex/fetcher.py b/pex/fetcher.py index 97004fe4f..425a2f840 100644 --- a/pex/fetcher.py +++ b/pex/fetcher.py @@ -195,7 +195,7 @@ def unix_open(self, req): url = urlparse.urlunparse( ("unix", "localhost", path, url_info.params, url_info.query, url_info.fragment) ) - kwargs = {} if PY2 else {"method": req.method} + kwargs = {} if PY2 else {"method": req.get_method()} modified_req = Request( url, data=req.data, From 5d72544c3f688afa7d032301c2bb68c515720d94 Mon Sep 17 00:00:00 2001 From: John Sirois Date: Thu, 6 Jun 2024 15:16:14 -0700 Subject: [PATCH 4/8] I <3 macOS. --- tests/integration/test_issue_2415.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_issue_2415.py b/tests/integration/test_issue_2415.py index c4e074e7f..b7263dba6 100644 --- a/tests/integration/test_issue_2415.py +++ b/tests/integration/test_issue_2415.py @@ -9,10 +9,10 @@ import pytest -from pex.common import safe_open +from pex.common import safe_mkdtemp, safe_open from pex.fetcher import URLFetcher from pex.typing import TYPE_CHECKING -from testing import IS_PYPY, PY_VER, data, run_pex_command +from testing import IS_MAC, IS_PYPY, PY_VER, data, run_pex_command if TYPE_CHECKING: from typing import Any @@ -84,7 +84,18 @@ def hello_world(username): cwd=str(tmpdir), ).assert_success() - socket = os.path.join(str(tmpdir), "socket.sock") + # N.B.: Simply using a path under tmpdir does not work on Mac; so we jump through some extra + # hoops here and use the proper directory for socket files for the operating system we're + # running under. + socket_dir = safe_mkdtemp( + dir=os.environ.get( + "XDG_RUNTIME_DIR", + os.path.expanduser("~/Library/Caches/TemporaryItems") + if IS_MAC + else os.path.join("/run/user", str(os.getuid())), + ) + ) + socket = os.path.join(socket_dir, "gunicorn.sock") with open(os.path.join(str(tmpdir), "stderr"), "wb+") as stderr_fp: gunicorn = subprocess.Popen( args=[pex, "--bind", "unix:{socket}".format(socket=socket)], stderr=stderr_fp From 6a876f8c46e13a804241590363f5ce594613913d Mon Sep 17 00:00:00 2001 From: John Sirois Date: Thu, 6 Jun 2024 16:10:31 -0700 Subject: [PATCH 5/8] I really, really <3 macOS. --- tests/integration/test_issue_2415.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/integration/test_issue_2415.py b/tests/integration/test_issue_2415.py index b7263dba6..ed46274b9 100644 --- a/tests/integration/test_issue_2415.py +++ b/tests/integration/test_issue_2415.py @@ -87,15 +87,16 @@ def hello_world(username): # N.B.: Simply using a path under tmpdir does not work on Mac; so we jump through some extra # hoops here and use the proper directory for socket files for the operating system we're # running under. - socket_dir = safe_mkdtemp( - dir=os.environ.get( - "XDG_RUNTIME_DIR", - os.path.expanduser("~/Library/Caches/TemporaryItems") - if IS_MAC - else os.path.join("/run/user", str(os.getuid())), - ) + socket_dir = os.environ.get( + "XDG_RUNTIME_DIR", + os.path.expanduser("~/Library/Caches/TemporaryItems") + if IS_MAC + else os.path.join("/run/user", str(os.getuid())), ) - socket = os.path.join(socket_dir, "gunicorn.sock") + # But if the proper directory does not exist, we fall back to the tmpdir. + socket_dir = safe_mkdtemp(dir=None if not os.path.isdir(socket_dir) else socket_dir) + socket = os.path.realpath(os.path.join(socket_dir, "gunicorn.sock")) + with open(os.path.join(str(tmpdir), "stderr"), "wb+") as stderr_fp: gunicorn = subprocess.Popen( args=[pex, "--bind", "unix:{socket}".format(socket=socket)], stderr=stderr_fp From c1481d3f2e85e4abb3b6be86bb674d61ee3960fd Mon Sep 17 00:00:00 2001 From: John Sirois Date: Thu, 6 Jun 2024 16:16:30 -0700 Subject: [PATCH 6/8] The depths of my love appear to be limitless. --- tests/integration/test_issue_2415.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_issue_2415.py b/tests/integration/test_issue_2415.py index ed46274b9..6129fc676 100644 --- a/tests/integration/test_issue_2415.py +++ b/tests/integration/test_issue_2415.py @@ -9,7 +9,7 @@ import pytest -from pex.common import safe_mkdtemp, safe_open +from pex.common import safe_mkdtemp, safe_open, safe_mkdir from pex.fetcher import URLFetcher from pex.typing import TYPE_CHECKING from testing import IS_MAC, IS_PYPY, PY_VER, data, run_pex_command @@ -93,9 +93,13 @@ def hello_world(username): if IS_MAC else os.path.join("/run/user", str(os.getuid())), ) - # But if the proper directory does not exist, we fall back to the tmpdir. - socket_dir = safe_mkdtemp(dir=None if not os.path.isdir(socket_dir) else socket_dir) - socket = os.path.realpath(os.path.join(socket_dir, "gunicorn.sock")) + try: + safe_mkdir(socket_dir) + except OSError: + # But if the proper directory does not exist / can't be made with our perms, we fall back + # to the tmpdir. + socket_dir = str(tmpdir) + socket = os.path.realpath(os.path.join(safe_mkdtemp(dir=socket_dir), "gunicorn.sock")) with open(os.path.join(str(tmpdir), "stderr"), "wb+") as stderr_fp: gunicorn = subprocess.Popen( From 32ba48f59e1b3c467b8408257cde3fe27d69895b Mon Sep 17 00:00:00 2001 From: John Sirois Date: Thu, 6 Jun 2024 16:50:41 -0700 Subject: [PATCH 7/8] Maybe it's just the standard mac thing? --- tests/integration/test_issue_2415.py | 22 +++------------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/tests/integration/test_issue_2415.py b/tests/integration/test_issue_2415.py index 6129fc676..805737768 100644 --- a/tests/integration/test_issue_2415.py +++ b/tests/integration/test_issue_2415.py @@ -9,10 +9,10 @@ import pytest -from pex.common import safe_mkdtemp, safe_open, safe_mkdir +from pex.common import safe_open from pex.fetcher import URLFetcher from pex.typing import TYPE_CHECKING -from testing import IS_MAC, IS_PYPY, PY_VER, data, run_pex_command +from testing import IS_PYPY, PY_VER, data, run_pex_command if TYPE_CHECKING: from typing import Any @@ -84,23 +84,7 @@ def hello_world(username): cwd=str(tmpdir), ).assert_success() - # N.B.: Simply using a path under tmpdir does not work on Mac; so we jump through some extra - # hoops here and use the proper directory for socket files for the operating system we're - # running under. - socket_dir = os.environ.get( - "XDG_RUNTIME_DIR", - os.path.expanduser("~/Library/Caches/TemporaryItems") - if IS_MAC - else os.path.join("/run/user", str(os.getuid())), - ) - try: - safe_mkdir(socket_dir) - except OSError: - # But if the proper directory does not exist / can't be made with our perms, we fall back - # to the tmpdir. - socket_dir = str(tmpdir) - socket = os.path.realpath(os.path.join(safe_mkdtemp(dir=socket_dir), "gunicorn.sock")) - + socket = os.path.realpath(os.path.join(str(tmpdir), "gunicorn.sock")) with open(os.path.join(str(tmpdir), "stderr"), "wb+") as stderr_fp: gunicorn = subprocess.Popen( args=[pex, "--bind", "unix:{socket}".format(socket=socket)], stderr=stderr_fp From 2fb16582b428ef54a59db32fb65762faa183f848 Mon Sep 17 00:00:00 2001 From: John Sirois Date: Thu, 6 Jun 2024 17:59:31 -0700 Subject: [PATCH 8/8] Floundering. --- tests/integration/test_issue_2415.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_issue_2415.py b/tests/integration/test_issue_2415.py index 805737768..0b0dbf000 100644 --- a/tests/integration/test_issue_2415.py +++ b/tests/integration/test_issue_2415.py @@ -9,10 +9,10 @@ import pytest -from pex.common import safe_open +from pex.common import safe_mkdir, safe_open from pex.fetcher import URLFetcher from pex.typing import TYPE_CHECKING -from testing import IS_PYPY, PY_VER, data, run_pex_command +from testing import IS_MAC, IS_PYPY, PY_VER, data, run_pex_command if TYPE_CHECKING: from typing import Any @@ -84,7 +84,12 @@ def hello_world(username): cwd=str(tmpdir), ).assert_success() - socket = os.path.realpath(os.path.join(str(tmpdir), "gunicorn.sock")) + socket = os.path.join( + safe_mkdir(os.path.expanduser("~/Library/Caches/TemporaryItems")) + if IS_MAC + else str(tmpdir), + "gunicorn.sock", + ) with open(os.path.join(str(tmpdir), "stderr"), "wb+") as stderr_fp: gunicorn = subprocess.Popen( args=[pex, "--bind", "unix:{socket}".format(socket=socket)], stderr=stderr_fp @@ -93,7 +98,7 @@ def hello_world(username): start = time.time() while not os.path.exists(socket): - if time.time() - start > 30: + if time.time() - start > 60: break # Local testing on an unloaded system shows gunicorn takes about a second to start up. time.sleep(1.0)