Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 136 additions & 39 deletions pathwaysutils/experimental/shared_pathways_service/gke_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,26 @@ def fetch_cluster_credentials(
raise


def deploy_gke_yaml(yaml: str) -> None:
def deploy_gke_yaml(yaml: str, action: str = "apply") -> None:
"""Deploys the given YAML to the GKE cluster.

Args:
yaml: The GKE YAML to deploy.
action: The kubectl action to perform ("apply" or "create"). Create is
equivalent to "apply" but does not support "replacing" the resource if it
already exists.

Raises:
subprocess.CalledProcessError: If the kubectl command fails.
ValueError: If action is not "apply" or "create".
"""
_logger.info("Deploying GKE YAML: %s", yaml)
kubectl_apply_command = ["kubectl", "apply", "-f", "-"]
if action not in ("apply", "create"):
raise ValueError(f"Invalid kubectl action: {action}")
_logger.info("Deploying GKE YAML with action %s: %s", action, yaml)
kubectl_command = ["kubectl", action, "-f", "-"]
try:
proxy_result = subprocess.run(
kubectl_apply_command,
kubectl_command,
input=yaml,
check=True,
capture_output=True,
Expand All @@ -93,6 +99,49 @@ def deploy_gke_yaml(yaml: str) -> None:
)


def delete_gke_resource(
resource_type: str, name: str, namespace: str = "default"
) -> None:
"""Deletes the given resource from the GKE cluster.

Args:
resource_type: The type of resource to delete (e.g. "deployment",
"service", "job").
name: The name of the resource.
namespace: The namespace of the resource.
"""
_validate_k8s_name(resource_type)
_validate_k8s_name(name)
_validate_k8s_name(namespace)
_logger.info(
"Deleting %s: %s in namespace: %s", resource_type, name, namespace
)
command = [
"kubectl",
"delete",
resource_type,
"-n",
namespace,
"--ignore-not-found",
"--",
name,
]
try:
result = subprocess.run(
command,
check=True,
capture_output=True,
text=True,
)
_logger.info("Successfully deleted %s. %s", resource_type, result.stdout)
except subprocess.CalledProcessError as e:
_logger.exception(
"Failed to delete %s. kubectl output:\n%r", resource_type, e.stderr
)
raise



def get_pod_from_job(job_name: str) -> str:
"""Returns the pod name for the given job.

Expand Down Expand Up @@ -224,7 +273,7 @@ def wait_for_pod(job_name: str) -> str:
return check_pod_ready(pod_name)


def __test_pod_connection(port: int) -> None:
def _test_remote_connection(port: int) -> None:
"""Tests the connection to the pod.

Args:
Expand All @@ -233,20 +282,22 @@ def __test_pod_connection(port: int) -> None:
_logger.info("Connecting to localhost:%d", port)
try:
with socket.create_connection(("localhost", port), timeout=30):
_logger.info("Pod is ready.")
_logger.info("Connection to localhost:%d is ready.", port)
except (socket.timeout, ConnectionRefusedError) as exc:
raise RuntimeError("Could not connect to the pod.") from exc


def enable_port_forwarding(
pod_name: str,
remote_server: str,
server_port: int,
namespace: str = "default",
) -> tuple[int, subprocess.Popen[str]]:
"""Enables port forwarding for the given pod.

Args:
pod_name: The name of the pod.
remote_server: The name of the pod or service.
server_port: The port of the server to forward to.
namespace: The namespace of the pod.

Returns:
A tuple containing the pod port and the port forwarding process.
Expand All @@ -255,28 +306,36 @@ def enable_port_forwarding(
cannot be established.
"""
try:
port_available = portpicker.pick_unused_port()
local_port = portpicker.pick_unused_port()
except Exception as e:
_logger.exception("Error finding free local port: %r", e)
raise

_logger.info("Found free local port: %d", port_available)
_logger.info("Found free local port: %d", local_port)
_logger.info(
"Starting port forwarding from local port %d to %s:%d",
port_available,
pod_name,
local_port,
remote_server,
server_port,
)

_validate_k8s_name(pod_name)
if "/" in remote_server:
parts = remote_server.split("/", 1)
_validate_k8s_name(parts[0])
_validate_k8s_name(parts[1])
else:
_validate_k8s_name(remote_server)
_validate_k8s_name(namespace)
port_forward_command = [
"kubectl",
"port-forward",
"-n",
namespace,
"--address",
"localhost",
"--",
f"pod/{pod_name}",
f"{port_available}:{server_port}",
f"{remote_server}",
f"{local_port}:{server_port}",
]
try:
# Start port forwarding in the background.
Expand Down Expand Up @@ -316,12 +375,12 @@ def enable_port_forwarding(
)

try:
__test_pod_connection(port_available)
_test_remote_connection(local_port)
except Exception:
port_forward_process.terminate()
raise

return (port_available, port_forward_process)
return (local_port, port_forward_process)


def stream_pod_logs(pod_name: str) -> subprocess.Popen[str]:
Expand Down Expand Up @@ -351,30 +410,68 @@ def stream_pod_logs(pod_name: str) -> subprocess.Popen[str]:
raise


def delete_gke_job(job_name: str) -> None:
"""Deletes the given job from the GKE cluster.

Args:
job_name: The name of the job.
"""
_validate_k8s_name(job_name)
_logger.info("Deleting job: %s", job_name)
delete_job_command = [
def wait_for_deployment(
name: str, namespace: str = "default", timeout: int = 300
) -> None:
"""Waits for deployment to be ready."""
_validate_k8s_name(name)
_validate_k8s_name(namespace)
_logger.info("Waiting for deployment %s to be ready...", name)
command = [
"kubectl",
"delete",
"job",
"--ignore-not-found",
"--",
job_name,
"rollout",
"status",
f"deployment/{name}",
"-n",
namespace,
f"--timeout={timeout}s",
]
try:
result = subprocess.run(
delete_job_command,
check=True,
capture_output=True,
text=True,
)
subprocess.run(command, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
_logger.exception("Failed to delete job. kubectl output:\\n%r", e.stderr)
raise
_logger.info("Successfully deleted job. %s", result.stdout)
_logger.exception("Deployment failed to become ready: %r", e)
raise RuntimeError(f"Deployment did not become ready: {e.stderr}") from e
_logger.info("Deployment %s is ready.", name)


def wait_for_service_ip(
name: str, namespace: str = "default", timeout: int = 300
) -> str:
"""Waits for service to get an external IP and returns it."""
_validate_k8s_name(name)
start_time = time.time()
while time.time() - start_time < timeout:
command = [
"kubectl",
"get",
"svc",
name,
"-n",
namespace,
"-o",
"jsonpath={.status.loadBalancer.ingress[0].ip}",
]
try:
result = subprocess.run(
command, check=True, capture_output=True, text=True
)
ip = result.stdout.strip()
if ip:
_logger.info("Service IP assigned: %s", ip)
return ip
except subprocess.CalledProcessError as e:
_logger.warning("Failed to get service IP: %r", e)
time.sleep(2)
raise RuntimeError(f"Timeout waiting for service IP for {name}")


def pick_unused_local_port() -> int:
"""Picks an unused local port."""
return portpicker.pick_unused_port()


def is_local_port_free(port: int) -> bool:
"""Checks if a local port is free."""
return portpicker.is_port_free(port)

Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ def __enter__(self):
self.proxy_pod_name = gke_utils.wait_for_pod(self._proxy_job_name)
self._proxy_port, self._port_forward_process = (
gke_utils.enable_port_forwarding(
self.proxy_pod_name, PROXY_SERVER_PORT
f"pod/{self.proxy_pod_name}", PROXY_SERVER_PORT
)
)

Expand Down Expand Up @@ -394,7 +394,7 @@ def _cleanup(self) -> None:

# Delete the proxy GKE job.
_logger.info("Deleting Pathways proxy...")
gke_utils.delete_gke_job(self._proxy_job_name)
gke_utils.delete_gke_resource("job", self._proxy_job_name)
_logger.info("Pathways proxy GKE job deletion complete.")

# Restore JAX variables.
Expand Down
Loading
Loading