diff --git a/CHANGES.rst b/CHANGES.rst index 16a196df..b057f738 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -10,6 +10,9 @@ Unreleased ``traefik``, the operator creates a ClusterIP service with Traefik IngressRouteTCP and MiddlewareTCP resources instead of a cloud LoadBalancer. +* Replaced grand-central Ingress with HTTPRoute and Traefik Middlewares when + ``spec.cluster.exposure`` is set to ``traefik``. + 2.60.0 (2026-04-22) ------------------- diff --git a/crate/operator/exposure.py b/crate/operator/exposure.py index 9fdb8b4c..98597ab9 100644 --- a/crate/operator/exposure.py +++ b/crate/operator/exposure.py @@ -36,6 +36,12 @@ _lb_annotations_to_remove, get_owner_references, ) +from crate.operator.grand_central import ( + create_grand_central_exposure, + delete_grand_central_ingress, + delete_grand_central_traefik_resources, + read_grand_central_deployment, +) from crate.operator.utils import crate from crate.operator.utils.k8s_api_client import GlobalApiClient from crate.operator.utils.kopf import StateBasedSubHandler @@ -614,3 +620,20 @@ async def handle( postgres_port, logger, ) + + # grand-central: only if GC is deployed in this cluster + gc_deployment = await read_grand_central_deployment(namespace, name) + if gc_deployment: + await create_grand_central_exposure( + namespace=namespace, + name=name, + spec=spec, + meta=body["metadata"], + logger=logger, + use_traefik=(new_exposure == "traefik"), + ) + + if old_exposure == "traefik": + await delete_grand_central_traefik_resources(namespace, name, logger) + else: + await delete_grand_central_ingress(namespace, name, logger) diff --git a/crate/operator/grand_central.py b/crate/operator/grand_central.py index 3af71d71..922f9ea1 100644 --- a/crate/operator/grand_central.py +++ b/crate/operator/grand_central.py @@ -24,8 +24,10 @@ import kopf from kubernetes_asyncio.client import ( + ApiException, AppsV1Api, CoreV1Api, + CustomObjectsApi, NetworkingV1Api, V1Affinity, V1Container, @@ -90,6 +92,25 @@ from crate.operator.utils.secrets import get_image_pull_secrets from crate.operator.utils.typing import LabelType +# Gateway API / Traefik CRD constants +_GATEWAY_API_GROUP = "gateway.networking.k8s.io" +_GATEWAY_API_VERSION = "v1" +_HTTPROUTE_PLURAL = "httproutes" + +_GC_GATEWAY_NAME: str = "traefik" +_GC_GATEWAY_NAMESPACE: str = "traefik" +_GC_GATEWAY_SECTION_NAME: str = "websecure" + +_TRAEFIK_GROUP = "traefik.io" +_TRAEFIK_VERSION = "v1alpha1" +_MIDDLEWARE_PLURAL = "middlewares" + +_MIDDLEWARE_COMPRESS_JS = "grand-central-compress-js" +_MIDDLEWARE_BUFFERING = "grand-central-buffering" +_MIDDLEWARE_IP_ALLOWLIST = "grand-central-ip-allowlist" +_MIDDLEWARE_CORS = "grand-central-cors" +_OPEN_CIDR = ["0.0.0.0/0", "::/0"] + def get_grand_central_labels(name: str, meta: kopf.Meta) -> Dict[str, Any]: return build_cratedb_labels( @@ -270,6 +291,529 @@ def get_grand_central_service( ) +def _build_response_header_filter() -> Dict[str, Any]: + """ + Build a Gateway API ResponseHeaderModifier filter that sets security + headers on all grand-central HTTPRoute rules. + """ + return { + "type": "ResponseHeaderModifier", + "responseHeaderModifier": { + "set": [ + {"name": "X-Frame-Options", "value": "DENY"}, + {"name": "X-Content-Type-Options", "value": "nosniff"}, + { + "name": "Referrer-Policy", + "value": "strict-origin-when-cross-origin", + }, + ] + }, + } + + +def _middleware_ref(name: str) -> Dict[str, Any]: + """ + Build a Gateway API ExtensionRef filter pointing to a Traefik Middleware. + + :param name: The name of the Traefik Middleware resource to reference. + """ + return { + "type": "ExtensionRef", + "extensionRef": { + "group": _TRAEFIK_GROUP, + "kind": "Middleware", + "name": name, + }, + } + + +def get_grand_central_httproute( + owner_references: Optional[List[V1OwnerReference]], + name: str, + labels: LabelType, + hostname: str, + spec: kopf.Spec, +) -> Dict[str, Any]: + """ + Build the HTTPRoute manifest for grand-central. + + Creates two rules: one matching ``/api`` (with JS compression, buffering, + and IP allowlist middlewares) and one matching ``/socket.io`` (with + buffering and IP allowlist only). Both rules set security and CORS response + headers via a ResponseHeaderModifier filter. + + :param owner_references: Owner references to set on the resource. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param labels: Kubernetes labels to apply to the resource. + :param hostname: The external hostname the HTTPRoute should match. + :param spec: The ``spec`` section of the CrateDB custom resource. + """ + header_filter = _build_response_header_filter() + service_name = f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}" + + raw_owner_refs = [] + if owner_references: + for ref in owner_references: + raw_owner_refs.append( + { + "apiVersion": ref.api_version, + "blockOwnerDeletion": ref.block_owner_deletion, + "controller": ref.controller, + "kind": ref.kind, + "name": ref.name, + "uid": ref.uid, + } + ) + + return { + "apiVersion": f"{_GATEWAY_API_GROUP}/{_GATEWAY_API_VERSION}", + "kind": "HTTPRoute", + "metadata": { + "name": service_name, + "labels": labels, + "ownerReferences": raw_owner_refs, + }, + "spec": { + "hostnames": [hostname], + "parentRefs": [ + { + "name": _GC_GATEWAY_NAME, + "namespace": _GC_GATEWAY_NAMESPACE, + "sectionName": _GC_GATEWAY_SECTION_NAME, + } + ], + "rules": [ + { + "matches": [{"path": {"type": "PathPrefix", "value": "/api"}}], + "filters": [ + header_filter, + _middleware_ref(_MIDDLEWARE_CORS), + _middleware_ref(_MIDDLEWARE_COMPRESS_JS), + _middleware_ref(_MIDDLEWARE_BUFFERING), + _middleware_ref(_MIDDLEWARE_IP_ALLOWLIST), + ], + "backendRefs": [ + {"name": service_name, "port": GRAND_CENTRAL_BACKEND_API_PORT} + ], + }, + { + "matches": [ + {"path": {"type": "PathPrefix", "value": "/socket.io"}} + ], + "filters": [ + header_filter, + _middleware_ref(_MIDDLEWARE_CORS), + _middleware_ref(_MIDDLEWARE_BUFFERING), + _middleware_ref(_MIDDLEWARE_IP_ALLOWLIST), + ], + "backendRefs": [ + {"name": service_name, "port": GRAND_CENTRAL_BACKEND_API_PORT} + ], + }, + ], + }, + } + + +def _build_middleware_base( + name: str, + middleware_name: str, + labels: LabelType, + owner_references: Optional[List[V1OwnerReference]], +) -> Dict[str, Any]: + """ + Build the common metadata scaffold for a Traefik Middleware resource. + + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param middleware_name: The Kubernetes resource name for this Middleware. + :param labels: Kubernetes labels to apply to the resource. + :param owner_references: Owner references to set on the resource. + """ + raw_owner_refs = [] + if owner_references: + for ref in owner_references: + raw_owner_refs.append( + { + "apiVersion": ref.api_version, + "blockOwnerDeletion": ref.block_owner_deletion, + "controller": ref.controller, + "kind": ref.kind, + "name": ref.name, + "uid": ref.uid, + } + ) + return { + "apiVersion": f"{_TRAEFIK_GROUP}/{_TRAEFIK_VERSION}", + "kind": "Middleware", + "metadata": { + "name": middleware_name, + "labels": labels, + "ownerReferences": raw_owner_refs, + }, + } + + +def get_grand_central_middleware_compress_js( + owner_references: Optional[List[V1OwnerReference]], + name: str, + labels: LabelType, +) -> Dict[str, Any]: + """ + Build the ``grand-central-compress-js`` Traefik Middleware manifest. + + Configures Traefik to gzip-compress ``application/javascript`` and + ``text/javascript`` responses, replacing the nginx ``gzip`` configuration + snippet from the legacy Ingress. + + :param owner_references: Owner references to set on the resource. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param labels: Kubernetes labels to apply to the resource. + """ + body = _build_middleware_base( + name, _MIDDLEWARE_COMPRESS_JS, labels, owner_references + ) + body["spec"] = { + "compress": { + "includedContentTypes": [ + "application/javascript", + "text/javascript", + ] + } + } + return body + + +def get_grand_central_middleware_buffering( + owner_references: Optional[List[V1OwnerReference]], + name: str, + labels: LabelType, +) -> Dict[str, Any]: + """ + Build the ``grand-central-buffering`` Traefik Middleware manifest. + + Configures request/response buffering with a 1 GiB maximum request body, + replacing the nginx ``proxy-body-size: 1G`` annotation from the legacy + Ingress. + + :param owner_references: Owner references to set on the resource. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param labels: Kubernetes labels to apply to the resource. + """ + body = _build_middleware_base(name, _MIDDLEWARE_BUFFERING, labels, owner_references) + body["spec"] = { + "buffering": { + "maxRequestBodyBytes": 1_073_741_824, + "memRequestBodyBytes": 2_097_152, + } + } + return body + + +def get_grand_central_middleware_ip_allowlist( + owner_references: Optional[List[V1OwnerReference]], + name: str, + labels: LabelType, + cidrs: Optional[List[str]] = None, +) -> Dict[str, Any]: + """ + Build the ``grand-central-ip-allowlist`` Traefik Middleware manifest. + + Restricts access to the listed CIDR ranges, replacing the nginx + ``whitelist-source-range`` annotation from the legacy Ingress. When + ``cidrs`` is empty or ``None``, all traffic is allowed (``0.0.0.0/0`` + and ``::/0``). + + :param owner_references: Owner references to set on the resource. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param labels: Kubernetes labels to apply to the resource. + :param cidrs: Optional list of CIDR ranges to allow. Defaults to open + access when not provided. + """ + body = _build_middleware_base( + name, _MIDDLEWARE_IP_ALLOWLIST, labels, owner_references + ) + body["spec"] = { + "ipAllowList": { + "sourceRange": cidrs if cidrs else _OPEN_CIDR, + } + } + return body + + +def get_grand_central_middleware_cors( + owner_references: Optional[List[V1OwnerReference]], + name: str, + labels: LabelType, + spec: kopf.Spec, +) -> Dict[str, Any]: + """ + Build the ``grand-central-cors`` Traefik Middleware manifest. + + The CrateDB setting accepts a comma-separated list of origins; these are + split into individual entries. Falls back to ``["*"]`` when the setting + is absent. + + :param owner_references: Owner references to set on the resource. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param labels: Kubernetes labels to apply to the resource. + :param spec: The ``spec`` section of the CrateDB custom resource, used to + read ``cluster.settings.http.cors.allow-origin``. + """ + raw_origin = ( + spec["cluster"].get("settings", {}).get("http.cors.allow-origin") or "*" + ) + origin_list = [o.strip() for o in raw_origin.split(",") if o.strip()] + + body = _build_middleware_base(name, _MIDDLEWARE_CORS, labels, owner_references) + body["spec"] = { + "headers": { + "accessControlAllowOriginList": origin_list, + "accessControlAllowCredentials": True, + "accessControlAllowMethods": [ + "GET", + "POST", + "PUT", + "PATCH", + "OPTIONS", + "DELETE", + ], + "accessControlAllowHeaders": ["Content-Type", "Authorization"], + "accessControlMaxAge": 7200, + } + } + return body + + +async def update_grand_central_ip_allowlist( + namespace: str, + name: str, + cidrs: List[str], + logger: logging.Logger, +) -> None: + """ + Patch the ``grand-central-ip-allowlist`` Middleware in place with a new + set of CIDR ranges. + + When ``cidrs`` is empty, the middleware is set to allow all traffic + (``0.0.0.0/0`` and ``::/0``) rather than blocking everything. + + :param namespace: The Kubernetes namespace where the Middleware resides. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param cidrs: New list of CIDR ranges. Pass an empty list to remove all + restrictions. + :param logger: Logger for operation tracking. + """ + source_range = cidrs if cidrs else _OPEN_CIDR + async with GlobalApiClient() as api_client: + custom = CustomObjectsApi(api_client) + await custom.patch_namespaced_custom_object( + group=_TRAEFIK_GROUP, + version=_TRAEFIK_VERSION, + namespace=namespace, + plural=_MIDDLEWARE_PLURAL, + name=_MIDDLEWARE_IP_ALLOWLIST, + body={"spec": {"ipAllowList": {"sourceRange": source_range}}}, + _content_type="application/merge-patch+json", + ) + + +async def delete_grand_central_ingress( + namespace: str, name: str, logger: logging.Logger +) -> None: + """ + Delete the nginx Ingress resource for grand-central. + + A 404 response is treated as success so that the function is safe to call + even when the Ingress has already been removed. + + :param namespace: The Kubernetes namespace where the Ingress resides. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param logger: Logger for operation tracking. + """ + async with GlobalApiClient() as api_client: + networking = NetworkingV1Api(api_client) + try: + await networking.delete_namespaced_ingress( + name=f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + namespace=namespace, + ) + logger.info(f"Deleted Ingress {GRAND_CENTRAL_RESOURCE_PREFIX}-{name}") + except ApiException as e: + if e.status == 404: + logger.info( + f"Ingress {GRAND_CENTRAL_RESOURCE_PREFIX}-{name} not found " + "during deletion - already absent, continuing." + ) + else: + raise + + +async def delete_grand_central_traefik_resources( + namespace: str, name: str, logger: logging.Logger +) -> None: + """ + Delete the HTTPRoute and all three Traefik Middlewares for grand-central. + + Resources are deleted in order: HTTPRoute first, then ``grand-central-cors``, + ``grand-central-compress-js``, ``grand-central-buffering``, and + ``grand-central-ip-allowlist``. A 404 response for any resource is + treated as success. + + :param namespace: The Kubernetes namespace where the resources reside. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param logger: Logger for operation tracking. + """ + async with GlobalApiClient() as api_client: + custom = CustomObjectsApi(api_client) + + # HTTPRoute + try: + await custom.delete_namespaced_custom_object( + group=_GATEWAY_API_GROUP, + version=_GATEWAY_API_VERSION, + namespace=namespace, + plural=_HTTPROUTE_PLURAL, + name=f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + ) + except ApiException as e: + if e.status == 404: + logger.info( + f"HTTPRoute {GRAND_CENTRAL_RESOURCE_PREFIX}-{name} not found " + "during deletion - already absent, continuing." + ) + else: + raise + + # Middlewares + for mw_name in ( + _MIDDLEWARE_CORS, + _MIDDLEWARE_COMPRESS_JS, + _MIDDLEWARE_BUFFERING, + _MIDDLEWARE_IP_ALLOWLIST, + ): + try: + await custom.delete_namespaced_custom_object( + group=_TRAEFIK_GROUP, + version=_TRAEFIK_VERSION, + namespace=namespace, + plural=_MIDDLEWARE_PLURAL, + name=mw_name, + ) + except ApiException as e: + if e.status == 404: + logger.info( + f"Middleware {mw_name} not found " + "during deletion - already absent, continuing." + ) + else: + raise + logger.info(f"Deleted GC HTTPRoute + Middlewares for {name}") + + +async def read_grand_central_httproute( + namespace: str, name: str +) -> Optional[Dict[str, Any]]: + """ + Return the HTTPRoute object for grand-central, or ``None`` if it does not + exist or cannot be read. + + :param namespace: The Kubernetes namespace to look up the HTTPRoute in. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + """ + async with GlobalApiClient() as api_client: + custom = CustomObjectsApi(api_client) + try: + return await custom.get_namespaced_custom_object( + group=_GATEWAY_API_GROUP, + version=_GATEWAY_API_VERSION, + namespace=namespace, + plural=_HTTPROUTE_PLURAL, + name=f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + ) + except Exception: + return None + + +async def create_grand_central_exposure( + namespace: str, + name: str, + spec: kopf.Spec, + meta: kopf.Meta, + logger: logging.Logger, + use_traefik: bool = False, +) -> None: + """ + Create only the routing resources for grand-central (HTTPRoute and + Traefik Middlewares, or nginx Ingress), without touching the Deployment + or Service. + + This is used when resuming a suspended cluster or when switching the + ``spec.cluster.exposure`` field, where the Deployment and Service already + exist and only the routing layer needs to be (re-)created. + + :param namespace: The Kubernetes namespace to create resources in. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param spec: The ``spec`` section of the CrateDB custom resource. + :param meta: The ``metadata`` section of the CrateDB custom resource. + :param logger: Logger for operation tracking. + :param use_traefik: When ``True``, create an HTTPRoute and Traefik + Middlewares. When ``False`` (default), create an nginx Ingress. + """ + owner_references = get_owner_references(name, meta) + cluster_name = spec["cluster"]["name"] + external_dns = spec["cluster"]["externalDNS"] + hostname = external_dns.replace(cluster_name, f"{cluster_name}.gc").rstrip(".") + labels = get_grand_central_labels(name, meta) + cidrs = spec["cluster"].get("allowedCIDRs", None) + + async with GlobalApiClient() as api_client: + if use_traefik: + custom = CustomObjectsApi(api_client) + for mw_body in ( + get_grand_central_middleware_cors(owner_references, name, labels, spec), + get_grand_central_middleware_compress_js( + owner_references, name, labels + ), + get_grand_central_middleware_buffering(owner_references, name, labels), + get_grand_central_middleware_ip_allowlist( + owner_references, name, labels, cidrs + ), + ): + await call_kubeapi( + custom.create_namespaced_custom_object, + logger, + continue_on_conflict=True, + group=_TRAEFIK_GROUP, + version=_TRAEFIK_VERSION, + namespace=namespace, + plural=_MIDDLEWARE_PLURAL, + body=mw_body, + ) + await call_kubeapi( + custom.create_namespaced_custom_object, + logger, + continue_on_conflict=True, + group=_GATEWAY_API_GROUP, + version=_GATEWAY_API_VERSION, + namespace=namespace, + plural=_HTTPROUTE_PLURAL, + body=get_grand_central_httproute( + owner_references, name, labels, hostname, spec + ), + ) + else: + networking = NetworkingV1Api(api_client) + await call_kubeapi( + networking.create_namespaced_ingress, + logger, + continue_on_conflict=True, + namespace=namespace, + body=get_grand_central_ingress( + owner_references, name, labels, hostname, spec, cidrs + ), + ) + + async def read_grand_central_ingress(namespace: str, name: str) -> Optional[V1Ingress]: async with GlobalApiClient() as api_client: networking = NetworkingV1Api(api_client) @@ -307,45 +851,51 @@ def get_grand_central_ingress( labels: LabelType, hostname: str, spec: kopf.Spec, + cidrs: Optional[List[str]] = None, ) -> V1Ingress: allow_origin = ( spec["cluster"].get("settings", {}).get("http.cors.allow-origin") or "$http_origin" ) + annotations = { + "external-dns.alpha.kubernetes.io/hostname": hostname, + "nginx.ingress.kubernetes.io/proxy-body-size": "1G", + "nginx.ingress.kubernetes.io/configuration-snippet": ( + """ + gzip on; + gzip_types + application/javascript + text/javascript; + more_set_headers "X-XSS-Protection: 1;mode=block" + "X-Frame-Options: DENY" + "X-Content-Type-Options: nosniff" + "Referrer-Policy: strict-origin-when-cross-origin" + ; + """ # noqa + ), + "nginx.ingress.kubernetes.io/proxy-buffer-size": "64k", + "nginx.ingress.kubernetes.io/ssl-redirect": "true", + "nginx.ingress.kubernetes.io/enable-cors": "true", + "nginx.ingress.kubernetes.io/cors-allow-credentials": "true", + "nginx.ingress.kubernetes.io/cors-allow-origin": allow_origin, + "nginx.ingress.kubernetes.io/cors-allow-methods": ( + "GET,POST,PUT,PATCH,OPTIONS,DELETE" + ), + "nginx.ingress.kubernetes.io/cors-allow-headers": ( + "Content-Type,Authorization" + ), + "nginx.ingress.kubernetes.io/cors-max-age": "7200", + } + if cidrs: + annotations["nginx.ingress.kubernetes.io/whitelist-source-range"] = ",".join( + cidrs + ) return V1Ingress( metadata=V1ObjectMeta( name=f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", labels=labels, owner_references=owner_references, - annotations={ - "external-dns.alpha.kubernetes.io/hostname": hostname, - "nginx.ingress.kubernetes.io/proxy-body-size": "1G", - "nginx.ingress.kubernetes.io/configuration-snippet": ( - """ - gzip on; - gzip_types - application/javascript - text/javascript; - more_set_headers "X-XSS-Protection: 1;mode=block" - "X-Frame-Options: DENY" - "X-Content-Type-Options: nosniff" - "Referrer-Policy: strict-origin-when-cross-origin" - ; - """ # noqa - ), - "nginx.ingress.kubernetes.io/proxy-buffer-size": "64k", - "nginx.ingress.kubernetes.io/ssl-redirect": "true", - "nginx.ingress.kubernetes.io/enable-cors": "true", - "nginx.ingress.kubernetes.io/cors-allow-credentials": "true", - "nginx.ingress.kubernetes.io/cors-allow-origin": allow_origin, - "nginx.ingress.kubernetes.io/cors-allow-methods": ( - "GET,POST,PUT,PATCH,OPTIONS,DELETE" - ), - "nginx.ingress.kubernetes.io/cors-allow-headers": ( - "Content-Type,Authorization" - ), - "nginx.ingress.kubernetes.io/cors-max-age": "7200", - }, + annotations=annotations, ), spec=V1IngressSpec( ingress_class_name="nginx", @@ -398,6 +948,7 @@ async def create_grand_central_backend( spec: kopf.Spec, meta: kopf.Meta, logger: logging.Logger, + use_traefik: bool = False, ) -> None: image_pull_secrets = get_image_pull_secrets() owner_references = get_owner_references(name, meta) @@ -405,6 +956,7 @@ async def create_grand_central_backend( external_dns = spec["cluster"]["externalDNS"] hostname = external_dns.replace(cluster_name, f"{cluster_name}.gc").rstrip(".") labels = get_grand_central_labels(name, meta) + cidrs = spec["cluster"].get("allowedCIDRs", None) async with GlobalApiClient() as api_client: apps = AppsV1Api(api_client) @@ -431,15 +983,51 @@ async def create_grand_central_backend( namespace=namespace, body=get_grand_central_service(owner_references, name, labels), ) - await call_kubeapi( - networking.create_namespaced_ingress, - logger, - continue_on_conflict=True, - namespace=namespace, - body=get_grand_central_ingress( - owner_references, name, labels, hostname, spec - ), - ) + + if use_traefik: + custom = CustomObjectsApi(api_client) + for mw_body in ( + get_grand_central_middleware_cors(owner_references, name, labels, spec), + get_grand_central_middleware_compress_js( + owner_references, name, labels + ), + get_grand_central_middleware_buffering(owner_references, name, labels), + get_grand_central_middleware_ip_allowlist( + owner_references, name, labels, cidrs + ), + ): + await call_kubeapi( + custom.create_namespaced_custom_object, + logger, + continue_on_conflict=True, + group=_TRAEFIK_GROUP, + version=_TRAEFIK_VERSION, + namespace=namespace, + plural=_MIDDLEWARE_PLURAL, + body=mw_body, + ) + await call_kubeapi( + custom.create_namespaced_custom_object, + logger, + continue_on_conflict=True, + group=_GATEWAY_API_GROUP, + version=_GATEWAY_API_VERSION, + namespace=namespace, + plural=_HTTPROUTE_PLURAL, + body=get_grand_central_httproute( + owner_references, name, labels, hostname, spec + ), + ) + else: + await call_kubeapi( + networking.create_namespaced_ingress, + logger, + continue_on_conflict=True, + namespace=namespace, + body=get_grand_central_ingress( + owner_references, name, labels, hostname, spec, cidrs + ), + ) async def create_grand_central_user( diff --git a/crate/operator/handlers/handle_create_grand_central.py b/crate/operator/handlers/handle_create_grand_central.py index 48b14608..1b294935 100644 --- a/crate/operator/handlers/handle_create_grand_central.py +++ b/crate/operator/handlers/handle_create_grand_central.py @@ -45,6 +45,8 @@ async def create_grand_central( new is not None and new.get("backendEnabled") ): cratedb = await get_cratedb_resource(namespace, name) + exposure = cratedb["spec"]["cluster"].get("exposure", "loadbalancer") + use_traefik = exposure == "traefik" kopf.register( fn=subhandler_partial( create_grand_central_backend, @@ -53,6 +55,7 @@ async def create_grand_central( cratedb["spec"], cratedb["metadata"], logger, + use_traefik=use_traefik, ), id="create_grand_central", backoff=config.BOOTSTRAP_RETRY_DELAY, diff --git a/crate/operator/handlers/handle_update_allowed_cidrs.py b/crate/operator/handlers/handle_update_allowed_cidrs.py index 04a0bbba..708e1ea4 100644 --- a/crate/operator/handlers/handle_update_allowed_cidrs.py +++ b/crate/operator/handlers/handle_update_allowed_cidrs.py @@ -27,7 +27,11 @@ from crate.operator.constants import GRAND_CENTRAL_RESOURCE_PREFIX from crate.operator.exposure import update_traefik_ip_restriction -from crate.operator.grand_central import read_grand_central_ingress +from crate.operator.grand_central import ( + read_grand_central_httproute, + read_grand_central_ingress, + update_grand_central_ip_allowlist, +) from crate.operator.utils.k8s_api_client import GlobalApiClient from crate.operator.utils.kubeapi import get_cratedb_resource from crate.operator.utils.notifications import send_operation_progress_notification @@ -88,22 +92,33 @@ async def update_service_allowed_cidrs( if exposure == "traefik": await update_traefik_ip_restriction(namespace, name, new_cidrs, logger) - ingress = await read_grand_central_ingress(namespace=namespace, name=name) + httproute = await read_grand_central_httproute( + namespace=namespace, name=name + ) + if httproute: + await update_grand_central_ip_allowlist( + namespace=namespace, + name=name, + cidrs=new_cidrs, + logger=logger, + ) + else: + ingress = await read_grand_central_ingress(namespace=namespace, name=name) - if ingress: - await networking.patch_namespaced_ingress( - name=f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", - namespace=namespace, - body={ - "metadata": { - "annotations": { - "nginx.ingress.kubernetes.io/whitelist-source-range": ",".join( # noqa - new_cidrs - ) + if ingress: + await networking.patch_namespaced_ingress( + name=f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + namespace=namespace, + body={ + "metadata": { + "annotations": { + "nginx.ingress.kubernetes.io/whitelist-source-range": ",".join( # noqa + new_cidrs + ) + } } - } - }, - ) + }, + ) await send_operation_progress_notification( namespace=namespace, diff --git a/crate/operator/operations.py b/crate/operator/operations.py index 0293e83e..f0ee2202 100644 --- a/crate/operator/operations.py +++ b/crate/operator/operations.py @@ -77,7 +77,14 @@ delete_service as delete_clusterip_service, delete_traefik_resources, ) -from crate.operator.grand_central import read_grand_central_deployment +from crate.operator.grand_central import ( + create_grand_central_exposure, + delete_grand_central_ingress, + delete_grand_central_traefik_resources, + read_grand_central_deployment, + read_grand_central_httproute, + read_grand_central_ingress, +) from crate.operator.sql import execute_sql from crate.operator.utils import crate from crate.operator.utils.jwt import crate_version_supports_jwt @@ -642,6 +649,26 @@ async def are_traefik_resources_present(namespace: str, name: str) -> bool: return True +async def is_grand_central_exposed( + namespace: str, name: str, use_traefik: bool +) -> bool: + """ + Return ``True`` if the active grand-central routing resource already exists. + + Checks for an HTTPRoute when ``use_traefik`` is ``True``, or for an nginx + Ingress otherwise. + + :param namespace: The Kubernetes namespace to check. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param use_traefik: When ``True``, check for an HTTPRoute. When ``False``, + check for an nginx Ingress. + """ + if use_traefik: + return await read_grand_central_httproute(namespace, name) is not None + else: + return await read_grand_central_ingress(namespace, name) is not None + + async def _recreate_traefik_resources( namespace: str, name: str, logger: logging.Logger ): @@ -760,7 +787,7 @@ async def suspend_or_start_cluster( ) # scale grand central deployment back up if it exists await suspend_or_start_grand_central( - apps, namespace, name, suspend=False + apps, namespace, name, suspend=False, logger=logger ) await send_operation_progress_notification( namespace=namespace, @@ -815,7 +842,7 @@ async def suspend_or_start_cluster( ) # scale grand central deployment down if it exists await suspend_or_start_grand_central( - apps, namespace, name, suspend=True + apps, namespace, name, suspend=True, logger=logger ) await send_operation_progress_notification( namespace=namespace, @@ -850,16 +877,69 @@ async def suspend_or_start_cluster( async def suspend_or_start_grand_central( - apps: AppsV1Api, namespace: str, name: str, suspend: bool + apps: AppsV1Api, + namespace: str, + name: str, + suspend: bool, + logger: logging.Logger, ): + """ + Scale the grand-central Deployment to 0 (suspend) or 1 (start) and + manage its routing resources accordingly. + + On suspend, the Deployment is scaled to 0 and the active routing resource + (HTTPRoute + Middlewares or nginx Ingress) is deleted so it no longer + routes traffic. On start, the routing resource is recreated if absent + before the Deployment is scaled back up. + + Does nothing if no grand-central Deployment exists for this cluster. + + :param apps: An instance of the Kubernetes Apps V1 API. + :param namespace: The Kubernetes namespace for the CrateDB cluster. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param suspend: When ``True``, scale down and delete routing resources. + When ``False``, recreate routing resources and scale back up. + :param logger: Logger for operation tracking. + """ deployment = await read_grand_central_deployment(namespace=namespace, name=name) + if not deployment: + return + + # Determine which exposure mode is active + cratedb = await get_cratedb_resource(namespace, name) + spec = cratedb["spec"] + meta = cratedb["metadata"] + exposure = spec.get("cluster", {}).get("exposure", "loadbalancer") + use_traefik = exposure == "traefik" - if deployment: + if suspend: + # Scale the deployment to 0 first, then remove the routing resource + await update_deployment_replicas( + apps, + namespace, + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + 0, + ) + if use_traefik: + await delete_grand_central_traefik_resources(namespace, name, logger) + else: + await delete_grand_central_ingress(namespace, name, logger) + else: + # Recreate the routing resource, then scale back up + if not await is_grand_central_exposed(namespace, name, use_traefik): + await create_grand_central_exposure( + namespace=namespace, + name=name, + spec=spec, + meta=meta, + logger=logger, + use_traefik=use_traefik, + ) await update_deployment_replicas( apps, namespace, f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", - 0 if suspend else 1, + 1, ) diff --git a/crate/operator/restore_backup.py b/crate/operator/restore_backup.py index d8807781..f39f1433 100644 --- a/crate/operator/restore_backup.py +++ b/crate/operator/restore_backup.py @@ -283,7 +283,7 @@ async def shards_recovery_in_progress( if not tables or (len(tables) == 1 and tables[0].lower() == "all"): tables = await get_snapshot_tables(conn_factory, snapshot, logger) for t in tables: - (schema, table_name) = t.rsplit(".", 1) + schema, table_name = t.rsplit(".", 1) try: # If there is at least one shard, the table is not empty. # We need to check that to ensure the operation does not fail while @@ -1145,7 +1145,9 @@ async def _restart_grand_central( ): async with GlobalApiClient() as api_client: apps = AppsV1Api(api_client) - await suspend_or_start_grand_central(apps, namespace, name, suspend=False) + await suspend_or_start_grand_central( + apps, namespace, name, suspend=False, logger=logger + ) class SendSuccessNotificationSubHandler(StateBasedSubHandler): @@ -1274,7 +1276,9 @@ async def restore_internal_tables_context( await internal_tables.set_gc_tables(restore_type, tables) if internal_tables.has_tables_to_process(): logger.info("Suspending GC operations before restoring internal tables") - await suspend_or_start_grand_central(apps, namespace, name, suspend=True) + await suspend_or_start_grand_central( + apps, namespace, name, suspend=True, logger=logger + ) yield internal_tables diff --git a/deploy/charts/crate-operator/templates/rbac.yaml b/deploy/charts/crate-operator/templates/rbac.yaml index 97761db8..283227d9 100644 --- a/deploy/charts/crate-operator/templates/rbac.yaml +++ b/deploy/charts/crate-operator/templates/rbac.yaml @@ -89,6 +89,18 @@ rules: resources: - middlewaretcps - ingressroutetcps + - middlewares + verbs: + - create + - get + - list + - watch + - patch + - delete +- apiGroups: + - gateway.networking.k8s.io + resources: + - httproutes verbs: - create - get diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml index 74dfe42b..5dba8b36 100644 --- a/deploy/rbac.yaml +++ b/deploy/rbac.yaml @@ -114,6 +114,18 @@ rules: resources: - middlewaretcps - ingressroutetcps + - middlewares + verbs: + - create + - get + - list + - watch + - patch + - delete +- apiGroups: + - gateway.networking.k8s.io + resources: + - httproutes verbs: - create - get diff --git a/tests/test_create_grand_central.py b/tests/test_create_grand_central.py index ec9ea904..334baf51 100644 --- a/tests/test_create_grand_central.py +++ b/tests/test_create_grand_central.py @@ -19,9 +19,11 @@ # with Crate these terms will supersede the license and you may use the # software solely pursuant to the terms of the relevant commercial agreement. from typing import Set +from unittest import mock import pytest from kubernetes_asyncio.client import ( + ApiException, AppsV1Api, CoreV1Api, CustomObjectsApi, @@ -34,6 +36,7 @@ GC_USERNAME, GRAND_CENTRAL_PROMETHEUS_PORT, GRAND_CENTRAL_RESOURCE_PREFIX, + KOPF_STATE_STORE_PREFIX, RESOURCE_CRATEDB, ) from crate.operator.cratedb import connection_factory @@ -48,8 +51,39 @@ is_kopf_handler_finished, require_connection, start_cluster, + wait_for_kopf_handler, ) +_GC_TRAEFIK_MIDDLEWARES = ( + "grand-central-cors", + "grand-central-compress-js", + "grand-central-buffering", + "grand-central-ip-allowlist", +) + + +async def _start_gc_on_cluster(coapi, name, namespace_name, mock_bootstrap=None): + """Patch grandCentral onto a running cluster and wait for the handler.""" + await coapi.patch_namespaced_custom_object( + group=API_GROUP, + version="v1", + plural=RESOURCE_CRATEDB, + namespace=namespace_name, + name=name, + body=[ + { + "op": "add", + "path": "/spec/grandCentral", + "value": { + "backendEnabled": True, + "backendImage": "cloud.registry.cr8.net/crate/grand-central:latest", + "apiUrl": "https://my-cratedb-api.cloud/", + "jwkUrl": "https://my-cratedb-api.cloud/api/v2/meta/jwk/", + }, + } + ], + ) + @pytest.mark.k8s @pytest.mark.asyncio @@ -249,6 +283,505 @@ async def test_create_grand_central(faker, namespace, kopf_runner, api_client): assert svc_prometheus_port.target_port == GRAND_CENTRAL_PROMETHEUS_PORT +@pytest.mark.k8s +@pytest.mark.asyncio +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +async def test_create_grand_central_traefik( + mock_send_notification, + faker, + namespace, + kopf_runner, + api_client, +): + """ + Creating a cluster with exposure=traefik and then enabling grandCentral + should produce an HTTPRoute and all four Traefik Middlewares - no Ingress. + """ + apps = AppsV1Api(api_client) + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + networking = NetworkingV1Api(api_client) + name = faker.domain_word() + + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + wait_for_lb=False, + additional_cluster_spec={ + "exposure": "traefik", + "externalDNS": f"{name}.example.com", + }, + ) + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + ) + + await _start_gc_on_cluster(coapi, name, namespace.metadata.name) + + # Deployment and Service are always created regardless of exposure + await assert_wait_for( + True, + does_deployment_exist, + apps, + namespace.metadata.name, + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + ) + await assert_wait_for( + True, + do_services_exist, + core, + namespace.metadata.name, + {f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}"}, + ) + + # HTTPRoute must exist + await assert_wait_for( + True, + does_gc_httproute_exist, + coapi, + namespace.metadata.name, + name, + err_msg="GC HTTPRoute was not created", + timeout=DEFAULT_TIMEOUT, + ) + + # All four Middlewares must exist + for mw in _GC_TRAEFIK_MIDDLEWARES: + await assert_wait_for( + True, + does_gc_middleware_exist, + coapi, + namespace.metadata.name, + mw, + err_msg=f"Middleware '{mw}' was not created", + timeout=DEFAULT_TIMEOUT, + ) + + cors_headers = await get_gc_cors_headers( + coapi, + namespace.metadata.name, + ) + + assert cors_headers["accessControlAllowMethods"] == [ + "GET", + "POST", + "PUT", + "PATCH", + "OPTIONS", + "DELETE", + ] + assert cors_headers["accessControlAllowHeaders"] == [ + "Content-Type", + "Authorization", + ] + assert cors_headers["accessControlAllowCredentials"] is True + assert cors_headers["accessControlMaxAge"] == 7200 + assert cors_headers["accessControlAllowOriginList"] == [ + "*" + ], "Expected default origin list ['*'] when no cors setting is configured" + + # No nginx Ingress should be created + await assert_wait_for( + False, + does_ingress_exist, + networking, + namespace.metadata.name, + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + err_msg="Unexpected Ingress was created for traefik exposure", + timeout=DEFAULT_TIMEOUT, + ) + + +@pytest.mark.k8s +@pytest.mark.asyncio +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +async def test_gc_cors_origin_list_traefik( + mock_send_notification, + faker, + namespace, + kopf_runner, + api_client, +): + """ + When spec.cluster.settings.http.cors.allow-origin contains a + comma-separated list of origins, the grand-central-cors Middleware must + split them into individual accessControlAllowOriginList entries rather + than setting a single comma-joined string, which browsers reject. + """ + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + name = faker.domain_word() + allowed_origins = [ + "https://console.cratedb-dev.cloud", + "http://localhost:8000", + ] + + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + wait_for_lb=False, + additional_cluster_spec={ + "exposure": "traefik", + "externalDNS": f"{name}.example.com", + "settings": { + "http.cors.allow-origin": ",".join(allowed_origins), + }, + }, + ) + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + ) + await _start_gc_on_cluster(coapi, name, namespace.metadata.name) + + await assert_wait_for( + True, + does_gc_middleware_exist, + coapi, + namespace.metadata.name, + "grand-central-cors", + err_msg="grand-central-cors Middleware was not created", + timeout=DEFAULT_TIMEOUT, + ) + + cors_headers = await get_gc_cors_headers(coapi, namespace.metadata.name) + + assert ( + cors_headers["accessControlAllowOriginList"] == allowed_origins + ), "Multi-origin CORS setting was not split into individual list entries" + assert cors_headers["accessControlAllowCredentials"] is True + + +@pytest.mark.k8s +@pytest.mark.asyncio +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +@mock.patch("crate.operator.grand_central.bootstrap_gc_admin_user") +async def test_gc_change_exposure_loadbalancer_to_traefik( + mock_bootstrap, + mock_send_notification, + faker, + namespace, + kopf_runner, + api_client, +): + """ + Switching a cluster with GC from loadbalancer to traefik should delete + the Ingress and create the HTTPRoute + Middlewares. + """ + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + networking = NetworkingV1Api(api_client) + name = faker.domain_word() + + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + additional_cluster_spec={ + "externalDNS": f"{name}.example.com", + }, + ) + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + timeout=DEFAULT_TIMEOUT * 3, + ) + await _start_gc_on_cluster(coapi, name, namespace.metadata.name) + + # Ingress exists before the switch + await assert_wait_for( + True, + does_ingress_exist, + networking, + namespace.metadata.name, + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + err_msg="Initial Ingress was not created", + timeout=DEFAULT_TIMEOUT, + ) + + # Switch to traefik + await coapi.patch_namespaced_custom_object( + group=API_GROUP, + version="v1", + plural=RESOURCE_CRATEDB, + namespace=namespace.metadata.name, + name=name, + body=[{"op": "replace", "path": "/spec/cluster/exposure", "value": "traefik"}], + ) + await assert_wait_for( + True, + is_kopf_handler_finished, + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_update", + err_msg="Exposure change handler did not finish", + timeout=DEFAULT_TIMEOUT * 2, + ) + + # Ingress gone, HTTPRoute + Middlewares present + await assert_wait_for( + False, + does_ingress_exist, + networking, + namespace.metadata.name, + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + err_msg="Ingress was not deleted after switching to traefik", + timeout=DEFAULT_TIMEOUT, + ) + await assert_wait_for( + True, + does_gc_httproute_exist, + coapi, + namespace.metadata.name, + name, + err_msg="GC HTTPRoute was not created after switching to traefik", + timeout=DEFAULT_TIMEOUT, + ) + for mw in _GC_TRAEFIK_MIDDLEWARES: + await assert_wait_for( + True, + does_gc_middleware_exist, + coapi, + namespace.metadata.name, + mw, + err_msg=f"Middleware '{mw}' was not created after switching to traefik", + timeout=DEFAULT_TIMEOUT, + ) + + +@pytest.mark.k8s +@pytest.mark.asyncio +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +@mock.patch("crate.operator.grand_central.bootstrap_gc_admin_user") +async def test_gc_change_exposure_traefik_to_loadbalancer( + mock_bootstrap, + mock_send_notification, + faker, + namespace, + kopf_runner, + api_client, +): + """ + Switching a cluster with GC from traefik to loadbalancer should delete + the HTTPRoute + Middlewares and create the Ingress. + """ + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + networking = NetworkingV1Api(api_client) + name = faker.domain_word() + cidrs = ["10.0.0.0/8", "192.168.1.0/24"] + + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + wait_for_lb=False, + additional_cluster_spec={ + "exposure": "traefik", + "externalDNS": f"{name}.example.com", + "allowedCIDRs": cidrs, + }, + ) + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + ) + await _start_gc_on_cluster(coapi, name, namespace.metadata.name) + + # HTTPRoute exists before the switch + await assert_wait_for( + True, + does_gc_httproute_exist, + coapi, + namespace.metadata.name, + name, + err_msg="Initial HTTPRoute was not created", + timeout=DEFAULT_TIMEOUT, + ) + + # Switch to loadbalancer + await coapi.patch_namespaced_custom_object( + group=API_GROUP, + version="v1", + plural=RESOURCE_CRATEDB, + namespace=namespace.metadata.name, + name=name, + body=[ + {"op": "replace", "path": "/spec/cluster/exposure", "value": "loadbalancer"} + ], + ) + await assert_wait_for( + True, + is_kopf_handler_finished, + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_update", + err_msg="Exposure change handler did not finish", + timeout=DEFAULT_TIMEOUT * 2, + ) + + # HTTPRoute + Middlewares gone, Ingress present + await assert_wait_for( + False, + does_gc_httproute_exist, + coapi, + namespace.metadata.name, + name, + err_msg="HTTPRoute was not deleted after switching to loadbalancer", + timeout=DEFAULT_TIMEOUT, + ) + for mw in _GC_TRAEFIK_MIDDLEWARES: + await assert_wait_for( + False, + does_gc_middleware_exist, + coapi, + namespace.metadata.name, + mw, + err_msg=f"Middleware '{mw}' was not deleted after switching to loadbalancer", # noqa + timeout=DEFAULT_TIMEOUT, + ) + await assert_wait_for( + True, + does_ingress_exist, + networking, + namespace.metadata.name, + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + err_msg="Ingress was not created after switching to loadbalancer", + timeout=DEFAULT_TIMEOUT, + ) + ingress = await networking.read_namespaced_ingress( + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", namespace.metadata.name + ) + whitelist = ingress.metadata.annotations.get( + "nginx.ingress.kubernetes.io/whitelist-source-range" + ) + assert whitelist is not None, "whitelist-source-range annotation is missing" + assert set(whitelist.split(",")) == set( + cidrs + ), f"Expected CIDRs {cidrs}, got {whitelist}" + + +@pytest.mark.k8s +@pytest.mark.asyncio +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +async def test_gc_cidr_update_traefik( + mock_send_notification, + faker, + namespace, + kopf_runner, + api_client, +): + """ + Updating allowedCIDRs on a traefik cluster with GC should patch the + ip-allowlist Middleware's sourceRange accordingly. + """ + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + name = faker.domain_word() + initial_cidrs = ["10.0.0.0/8"] + updated_cidrs = ["10.0.0.0/8", "192.168.1.0/24"] + + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + wait_for_lb=False, + additional_cluster_spec={ + "exposure": "traefik", + "externalDNS": f"{name}.example.com", + "allowedCIDRs": initial_cidrs, + }, + ) + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + ) + await _start_gc_on_cluster(coapi, name, namespace.metadata.name) + + # ip-allowlist starts with initial_cidrs + await assert_wait_for( + True, + does_gc_middleware_exist, + coapi, + namespace.metadata.name, + "grand-central-ip-allowlist", + err_msg="ip-allowlist Middleware was not created", + timeout=DEFAULT_TIMEOUT, + ) + cidrs = await get_gc_ip_allowlist_cidrs(coapi, namespace.metadata.name) + assert set(cidrs) == set(initial_cidrs), f"Expected {initial_cidrs}, got {cidrs}" + + # Update CIDRs + await coapi.patch_namespaced_custom_object( + group=API_GROUP, + version="v1", + plural=RESOURCE_CRATEDB, + namespace=namespace.metadata.name, + name=name, + body=[ + { + "op": "replace", + "path": "/spec/cluster/allowedCIDRs", + "value": updated_cidrs, + } + ], + ) + await assert_wait_for( + True, + is_kopf_handler_finished, + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/service_cidr_changes/spec.cluster.allowedCIDRs", + err_msg="CIDR update handler did not finish", + timeout=DEFAULT_TIMEOUT, + ) + + # ip-allowlist now reflects updated_cidrs + async def _cidrs_updated(coapi, namespace): + cidrs = await get_gc_ip_allowlist_cidrs(coapi, namespace) + return set(cidrs) == set(updated_cidrs) + + await assert_wait_for( + True, + _cidrs_updated, + coapi, + namespace.metadata.name, + err_msg=f"ip-allowlist sourceRange was not updated to {updated_cidrs}", + timeout=DEFAULT_TIMEOUT, + ) + + async def does_deployment_exist(apps: AppsV1Api, namespace: str, name: str) -> bool: deployments = await apps.list_namespaced_deployment(namespace=namespace) return name in (d.metadata.name for d in deployments.items) @@ -271,3 +804,83 @@ async def does_ingress_exist( async def does_secret_exist(core: CoreV1Api, namespace: str, name: str) -> bool: secrets = await core.list_namespaced_secret(namespace) return name in (s.metadata.name for s in secrets.items) + + +async def does_gc_httproute_exist( + coapi: CustomObjectsApi, namespace: str, name: str +) -> bool: + try: + await coapi.get_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1", + namespace=namespace, + plural="httproutes", + name=f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + ) + return True + except ApiException as e: + if e.status == 404: + return False + raise + + +async def does_gc_middleware_exist( + coapi: CustomObjectsApi, namespace: str, middleware_name: str +) -> bool: + try: + await coapi.get_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="middlewares", + name=middleware_name, + ) + return True + except ApiException as e: + if e.status == 404: + return False + raise + + +async def get_gc_cors_headers(coapi: CustomObjectsApi, namespace: str) -> dict: + try: + mw = await coapi.get_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="middlewares", + name="grand-central-cors", + ) + return mw.get("spec", {}).get("headers", {}) + except ApiException as e: + if e.status == 404: + return {} + raise + + +async def get_gc_ip_allowlist_cidrs(coapi: CustomObjectsApi, namespace: str) -> list: + try: + mw = await coapi.get_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="middlewares", + name="grand-central-ip-allowlist", + ) + return mw.get("spec", {}).get("ipAllowList", {}).get("sourceRange", []) + except ApiException as e: + if e.status == 404: + return [] + raise + + +async def gc_deployment_replicas(apps: AppsV1Api, namespace: str, name: str) -> int: + try: + d = await apps.read_namespaced_deployment( + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", namespace + ) + return d.spec.replicas + except ApiException as e: + if e.status == 404: + return -1 + raise diff --git a/tests/test_exposure.py b/tests/test_exposure.py index 9549f252..bf511ac2 100644 --- a/tests/test_exposure.py +++ b/tests/test_exposure.py @@ -557,7 +557,8 @@ async def test_change_exposure_loadbalancer_to_traefik( coapi, name, namespace.metadata.name, - f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + f"{KOPF_STATE_STORE_PREFIX}/cluster_create.bootstrap", + timeout=DEFAULT_TIMEOUT * 3, ) # Initially service is LoadBalancer, no Traefik resources diff --git a/tests/test_restore_backup.py b/tests/test_restore_backup.py index 99f12312..04eab817 100644 --- a/tests/test_restore_backup.py +++ b/tests/test_restore_backup.py @@ -302,7 +302,9 @@ async def test_restore_backup_aws( True, mocked_coro_func_called_with, mock_suspend_gc, - mock.call(mock.ANY, namespace.metadata.name, name, suspend=True), + mock.call( + mock.ANY, namespace.metadata.name, name, suspend=True, logger=mock.ANY + ), err_msg="Did not call pause grand central.", timeout=DEFAULT_TIMEOUT, ) @@ -335,7 +337,9 @@ async def test_restore_backup_aws( True, mocked_coro_func_called_with, mock_suspend_gc, - mock.call(mock.ANY, namespace.metadata.name, name, suspend=False), + mock.call( + mock.ANY, namespace.metadata.name, name, suspend=False, logger=mock.ANY + ), err_msg="Did not call resume grand central.", timeout=DEFAULT_TIMEOUT, ) @@ -590,7 +594,9 @@ async def test_restore_backup_azure_blob( True, mocked_coro_func_called_with, mock_suspend_gc, - mock.call(mock.ANY, namespace.metadata.name, name, suspend=True), + mock.call( + mock.ANY, namespace.metadata.name, name, suspend=True, logger=mock.ANY + ), err_msg="Did not call pause grand central.", timeout=DEFAULT_TIMEOUT, ) @@ -623,7 +629,9 @@ async def test_restore_backup_azure_blob( True, mocked_coro_func_called_with, mock_suspend_gc, - mock.call(mock.ANY, namespace.metadata.name, name, suspend=False), + mock.call( + mock.ANY, namespace.metadata.name, name, suspend=False, logger=mock.ANY + ), err_msg="Did not call resume grand central.", timeout=DEFAULT_TIMEOUT, )