From 3b262dd51c85244c19393ef80f6815f672230235 Mon Sep 17 00:00:00 2001 From: Thomas Achatz Date: Wed, 13 May 2026 11:25:29 +0200 Subject: [PATCH 01/10] Replace grand-central Ingress with HTTPRoute and Traefik Middlewares --- crate/operator/exposure.py | 23 + crate/operator/grand_central.py | 429 +++++++++++++++++- .../handlers/handle_create_grand_central.py | 3 + .../handlers/handle_update_allowed_cidrs.py | 46 +- crate/operator/operations.py | 66 ++- crate/operator/restore_backup.py | 10 +- .../charts/crate-operator/templates/rbac.yaml | 12 + deploy/rbac.yaml | 12 + 8 files changed, 568 insertions(+), 33 deletions(-) diff --git a/crate/operator/exposure.py b/crate/operator/exposure.py index 9fdb8b4c..bf4a6c1a 100644 --- a/crate/operator/exposure.py +++ b/crate/operator/exposure.py @@ -36,6 +36,12 @@ _lb_annotations_to_remove, get_owner_references, ) +from crate.operator.grand_central import ( + create_grand_central_exposure, + delete_grand_central_ingress, + delete_grand_central_traefik_resources, + read_grand_central_deployment, +) from crate.operator.utils import crate from crate.operator.utils.k8s_api_client import GlobalApiClient from crate.operator.utils.kopf import StateBasedSubHandler @@ -614,3 +620,20 @@ async def handle( postgres_port, logger, ) + + # grand-central: only if GC is deployed in this cluster + gc_deployment = await read_grand_central_deployment(namespace, name) + if gc_deployment: + if old_exposure == "traefik": + await delete_grand_central_traefik_resources(namespace, name, logger) + else: + await delete_grand_central_ingress(namespace, name, logger) + + await create_grand_central_exposure( + namespace=namespace, + name=name, + spec=spec, + meta=body["metadata"], + logger=logger, + use_traefik=(new_exposure == "traefik"), + ) diff --git a/crate/operator/grand_central.py b/crate/operator/grand_central.py index 3af71d71..89d1d8b1 100644 --- a/crate/operator/grand_central.py +++ b/crate/operator/grand_central.py @@ -24,8 +24,10 @@ import kopf from kubernetes_asyncio.client import ( + ApiException, AppsV1Api, CoreV1Api, + CustomObjectsApi, NetworkingV1Api, V1Affinity, V1Container, @@ -90,6 +92,24 @@ from crate.operator.utils.secrets import get_image_pull_secrets from crate.operator.utils.typing import LabelType +# Gateway API / Traefik CRD constants +_GATEWAY_API_GROUP = "gateway.networking.k8s.io" +_GATEWAY_API_VERSION = "v1" +_HTTPROUTE_PLURAL = "httproutes" + +_GC_GATEWAY_NAME: str = "traefik" +_GC_GATEWAY_NAMESPACE: str = "traefik" +_GC_GATEWAY_SECTION_NAME: str = "websecure" + +_TRAEFIK_GROUP = "traefik.io" +_TRAEFIK_VERSION = "v1alpha1" +_MIDDLEWARE_PLURAL = "middlewares" + +_MIDDLEWARE_COMPRESS_JS = "compress-js" +_MIDDLEWARE_BUFFERING = "buffering" +_MIDDLEWARE_IP_ALLOWLIST = "ip-allowlist" +_OPEN_CIDR = ["0.0.0.0/0", "::/0"] + def get_grand_central_labels(name: str, meta: kopf.Meta) -> Dict[str, Any]: return build_cratedb_labels( @@ -270,6 +290,360 @@ def get_grand_central_service( ) +def _build_response_header_filter(cors_allow_origin: str) -> Dict[str, Any]: + """Return a ResponseHeaderModifier filter dict shared by all route rules.""" + return { + "type": "ResponseHeaderModifier", + "responseHeaderModifier": { + "set": [ + {"name": "X-Frame-Options", "value": "DENY"}, + {"name": "X-Content-Type-Options", "value": "nosniff"}, + { + "name": "Referrer-Policy", + "value": "strict-origin-when-cross-origin", + }, + { + "name": "Access-Control-Allow-Origin", + "value": cors_allow_origin, + }, + {"name": "Access-Control-Allow-Credentials", "value": "true"}, + { + "name": "Access-Control-Allow-Methods", + "value": "GET,POST,PUT,PATCH,OPTIONS,DELETE", + }, + { + "name": "Access-Control-Allow-Headers", + "value": "Content-Type,Authorization", + }, + {"name": "Access-Control-Max-Age", "value": "7200"}, + ] + }, + } + + +def _middleware_ref(name: str) -> Dict[str, Any]: + return { + "type": "ExtensionRef", + "extensionRef": { + "group": _TRAEFIK_GROUP, + "kind": "Middleware", + "name": name, + }, + } + + +def get_grand_central_httproute( + owner_references: Optional[List[V1OwnerReference]], + name: str, + labels: LabelType, + hostname: str, + spec: kopf.Spec, +) -> Dict[str, Any]: + cors_allow_origin = ( + spec["cluster"].get("settings", {}).get("http.cors.allow-origin") or "*" + ) + header_filter = _build_response_header_filter(cors_allow_origin) + service_name = f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}" + + raw_owner_refs = [] + if owner_references: + for ref in owner_references: + raw_owner_refs.append( + { + "apiVersion": ref.api_version, + "blockOwnerDeletion": ref.block_owner_deletion, + "controller": ref.controller, + "kind": ref.kind, + "name": ref.name, + "uid": ref.uid, + } + ) + + return { + "apiVersion": f"{_GATEWAY_API_GROUP}/{_GATEWAY_API_VERSION}", + "kind": "HTTPRoute", + "metadata": { + "name": service_name, + "labels": labels, + "ownerReferences": raw_owner_refs, + }, + "spec": { + "hostnames": [hostname], + "parentRefs": [ + { + "name": _GC_GATEWAY_NAME, + "namespace": _GC_GATEWAY_NAMESPACE, + "sectionName": _GC_GATEWAY_SECTION_NAME, + } + ], + "rules": [ + { + "matches": [{"path": {"type": "PathPrefix", "value": "/api"}}], + "filters": [ + header_filter, + _middleware_ref(_MIDDLEWARE_COMPRESS_JS), + _middleware_ref(_MIDDLEWARE_BUFFERING), + _middleware_ref(_MIDDLEWARE_IP_ALLOWLIST), + ], + "backendRefs": [ + {"name": service_name, "port": GRAND_CENTRAL_BACKEND_API_PORT} + ], + }, + { + "matches": [ + {"path": {"type": "PathPrefix", "value": "/socket.io"}} + ], + "filters": [ + header_filter, + _middleware_ref(_MIDDLEWARE_BUFFERING), + _middleware_ref(_MIDDLEWARE_IP_ALLOWLIST), + ], + "backendRefs": [ + {"name": service_name, "port": GRAND_CENTRAL_BACKEND_API_PORT} + ], + }, + ], + }, + } + + +def _build_middleware_base( + name: str, + middleware_name: str, + labels: LabelType, + owner_references: Optional[List[V1OwnerReference]], +) -> Dict[str, Any]: + raw_owner_refs = [] + if owner_references: + for ref in owner_references: + raw_owner_refs.append( + { + "apiVersion": ref.api_version, + "blockOwnerDeletion": ref.block_owner_deletion, + "controller": ref.controller, + "kind": ref.kind, + "name": ref.name, + "uid": ref.uid, + } + ) + return { + "apiVersion": f"{_TRAEFIK_GROUP}/{_TRAEFIK_VERSION}", + "kind": "Middleware", + "metadata": { + "name": middleware_name, + "labels": labels, + "ownerReferences": raw_owner_refs, + }, + } + + +def get_grand_central_middleware_compress_js( + owner_references: Optional[List[V1OwnerReference]], + name: str, + labels: LabelType, +) -> Dict[str, Any]: + body = _build_middleware_base( + name, _MIDDLEWARE_COMPRESS_JS, labels, owner_references + ) + body["spec"] = { + "compress": { + "includedContentTypes": [ + "application/javascript", + "text/javascript", + ] + } + } + return body + + +def get_grand_central_middleware_buffering( + owner_references: Optional[List[V1OwnerReference]], + name: str, + labels: LabelType, +) -> Dict[str, Any]: + body = _build_middleware_base(name, _MIDDLEWARE_BUFFERING, labels, owner_references) + body["spec"] = { + "buffering": { + "maxRequestBodyBytes": 1_073_741_824, + "memRequestBodyBytes": 2_097_152, + } + } + return body + + +def get_grand_central_middleware_ip_allowlist( + owner_references: Optional[List[V1OwnerReference]], + name: str, + labels: LabelType, + cidrs: Optional[List[str]] = None, +) -> Dict[str, Any]: + """Traefik Middleware: IP allow-list (replaces nginx whitelist-source-range).""" + body = _build_middleware_base( + name, _MIDDLEWARE_IP_ALLOWLIST, labels, owner_references + ) + body["spec"] = { + "ipAllowList": { + "sourceRange": cidrs if cidrs else _OPEN_CIDR, + } + } + return body + + +async def update_grand_central_ip_allowlist( + namespace: str, + name: str, + cidrs: List[str], + logger: logging.Logger, +) -> None: + """Patch the ip-allowlist Middleware with a new set of CIDRs.""" + source_range = cidrs if cidrs else _OPEN_CIDR + async with GlobalApiClient() as api_client: + custom = CustomObjectsApi(api_client) + await custom.patch_namespaced_custom_object( + group=_TRAEFIK_GROUP, + version=_TRAEFIK_VERSION, + namespace=namespace, + plural=_MIDDLEWARE_PLURAL, + name=_MIDDLEWARE_IP_ALLOWLIST, + body={"spec": {"ipAllowList": {"sourceRange": source_range}}}, + _content_type="application/merge-patch+json", + ) + + +async def delete_grand_central_ingress( + namespace: str, name: str, logger: logging.Logger +) -> None: + async with GlobalApiClient() as api_client: + networking = NetworkingV1Api(api_client) + try: + await networking.delete_namespaced_ingress( + name=f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + namespace=namespace, + ) + logger.info(f"Deleted Ingress {GRAND_CENTRAL_RESOURCE_PREFIX}-{name}") + except ApiException as e: + if e.status != 404: + raise + + +async def delete_grand_central_traefik_resources( + namespace: str, name: str, logger: logging.Logger +) -> None: + async with GlobalApiClient() as api_client: + custom = CustomObjectsApi(api_client) + + # HTTPRoute + try: + await custom.delete_namespaced_custom_object( + group=_GATEWAY_API_GROUP, + version=_GATEWAY_API_VERSION, + namespace=namespace, + plural=_HTTPROUTE_PLURAL, + name=f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + ) + except ApiException as e: + if e.status != 404: + raise + + # Middlewares + for mw_name in ( + _MIDDLEWARE_COMPRESS_JS, + _MIDDLEWARE_BUFFERING, + _MIDDLEWARE_IP_ALLOWLIST, + ): + try: + await custom.delete_namespaced_custom_object( + group=_TRAEFIK_GROUP, + version=_TRAEFIK_VERSION, + namespace=namespace, + plural=_MIDDLEWARE_PLURAL, + name=mw_name, + ) + except ApiException as e: + if e.status != 404: + raise + logger.info(f"Deleted GC HTTPRoute + Middlewares for {name}") + + +async def read_grand_central_httproute( + namespace: str, name: str +) -> Optional[Dict[str, Any]]: + async with GlobalApiClient() as api_client: + custom = CustomObjectsApi(api_client) + try: + return await custom.get_namespaced_custom_object( + group=_GATEWAY_API_GROUP, + version=_GATEWAY_API_VERSION, + namespace=namespace, + plural=_HTTPROUTE_PLURAL, + name=f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + ) + except Exception: + return None + + +async def create_grand_central_exposure( + namespace: str, + name: str, + spec: kopf.Spec, + meta: kopf.Meta, + logger: logging.Logger, + use_traefik: bool = False, +) -> None: + owner_references = get_owner_references(name, meta) + cluster_name = spec["cluster"]["name"] + external_dns = spec["cluster"]["externalDNS"] + hostname = external_dns.replace(cluster_name, f"{cluster_name}.gc").rstrip(".") + labels = get_grand_central_labels(name, meta) + cidrs = spec["cluster"].get("allowedCIDRs", None) + + async with GlobalApiClient() as api_client: + if use_traefik: + custom = CustomObjectsApi(api_client) + for mw_body in ( + get_grand_central_middleware_compress_js( + owner_references, name, labels + ), + get_grand_central_middleware_buffering(owner_references, name, labels), + get_grand_central_middleware_ip_allowlist( + owner_references, name, labels, cidrs + ), + ): + await call_kubeapi( + custom.create_namespaced_custom_object, + logger, + continue_on_conflict=True, + group=_TRAEFIK_GROUP, + version=_TRAEFIK_VERSION, + namespace=namespace, + plural=_MIDDLEWARE_PLURAL, + body=mw_body, + ) + await call_kubeapi( + custom.create_namespaced_custom_object, + logger, + continue_on_conflict=True, + group=_GATEWAY_API_GROUP, + version=_GATEWAY_API_VERSION, + namespace=namespace, + plural=_HTTPROUTE_PLURAL, + body=get_grand_central_httproute( + owner_references, name, labels, hostname, spec + ), + ) + else: + networking = NetworkingV1Api(api_client) + await call_kubeapi( + networking.create_namespaced_ingress, + logger, + continue_on_conflict=True, + namespace=namespace, + body=get_grand_central_ingress( + owner_references, name, labels, hostname, spec + ), + ) + + async def read_grand_central_ingress(namespace: str, name: str) -> Optional[V1Ingress]: async with GlobalApiClient() as api_client: networking = NetworkingV1Api(api_client) @@ -398,6 +772,7 @@ async def create_grand_central_backend( spec: kopf.Spec, meta: kopf.Meta, logger: logging.Logger, + use_traefik: bool = False, ) -> None: image_pull_secrets = get_image_pull_secrets() owner_references = get_owner_references(name, meta) @@ -405,6 +780,7 @@ async def create_grand_central_backend( external_dns = spec["cluster"]["externalDNS"] hostname = external_dns.replace(cluster_name, f"{cluster_name}.gc").rstrip(".") labels = get_grand_central_labels(name, meta) + cidrs = spec["cluster"].get("allowedCIDRs", None) async with GlobalApiClient() as api_client: apps = AppsV1Api(api_client) @@ -431,15 +807,50 @@ async def create_grand_central_backend( namespace=namespace, body=get_grand_central_service(owner_references, name, labels), ) - await call_kubeapi( - networking.create_namespaced_ingress, - logger, - continue_on_conflict=True, - namespace=namespace, - body=get_grand_central_ingress( - owner_references, name, labels, hostname, spec - ), - ) + + if use_traefik: + custom = CustomObjectsApi(api_client) + for mw_body in ( + get_grand_central_middleware_compress_js( + owner_references, name, labels + ), + get_grand_central_middleware_buffering(owner_references, name, labels), + get_grand_central_middleware_ip_allowlist( + owner_references, name, labels, cidrs + ), + ): + await call_kubeapi( + custom.create_namespaced_custom_object, + logger, + continue_on_conflict=True, + group=_TRAEFIK_GROUP, + version=_TRAEFIK_VERSION, + namespace=namespace, + plural=_MIDDLEWARE_PLURAL, + body=mw_body, + ) + await call_kubeapi( + custom.create_namespaced_custom_object, + logger, + continue_on_conflict=True, + group=_GATEWAY_API_GROUP, + version=_GATEWAY_API_VERSION, + namespace=namespace, + plural=_HTTPROUTE_PLURAL, + body=get_grand_central_httproute( + owner_references, name, labels, hostname, spec + ), + ) + else: + await call_kubeapi( + networking.create_namespaced_ingress, + logger, + continue_on_conflict=True, + namespace=namespace, + body=get_grand_central_ingress( + owner_references, name, labels, hostname, spec + ), + ) async def create_grand_central_user( diff --git a/crate/operator/handlers/handle_create_grand_central.py b/crate/operator/handlers/handle_create_grand_central.py index 48b14608..1b294935 100644 --- a/crate/operator/handlers/handle_create_grand_central.py +++ b/crate/operator/handlers/handle_create_grand_central.py @@ -45,6 +45,8 @@ async def create_grand_central( new is not None and new.get("backendEnabled") ): cratedb = await get_cratedb_resource(namespace, name) + exposure = cratedb["spec"]["cluster"].get("exposure", "loadbalancer") + use_traefik = exposure == "traefik" kopf.register( fn=subhandler_partial( create_grand_central_backend, @@ -53,6 +55,7 @@ async def create_grand_central( cratedb["spec"], cratedb["metadata"], logger, + use_traefik=use_traefik, ), id="create_grand_central", backoff=config.BOOTSTRAP_RETRY_DELAY, diff --git a/crate/operator/handlers/handle_update_allowed_cidrs.py b/crate/operator/handlers/handle_update_allowed_cidrs.py index 04a0bbba..b10ea982 100644 --- a/crate/operator/handlers/handle_update_allowed_cidrs.py +++ b/crate/operator/handlers/handle_update_allowed_cidrs.py @@ -27,7 +27,11 @@ from crate.operator.constants import GRAND_CENTRAL_RESOURCE_PREFIX from crate.operator.exposure import update_traefik_ip_restriction -from crate.operator.grand_central import read_grand_central_ingress +from crate.operator.grand_central import ( + read_grand_central_httproute, + read_grand_central_ingress, + update_grand_central_ip_allowlist, +) from crate.operator.utils.k8s_api_client import GlobalApiClient from crate.operator.utils.kubeapi import get_cratedb_resource from crate.operator.utils.notifications import send_operation_progress_notification @@ -88,22 +92,34 @@ async def update_service_allowed_cidrs( if exposure == "traefik": await update_traefik_ip_restriction(namespace, name, new_cidrs, logger) - ingress = await read_grand_central_ingress(namespace=namespace, name=name) + if exposure == "traefik": + httproute = await read_grand_central_httproute( + namespace=namespace, name=name + ) + if httproute: + await update_grand_central_ip_allowlist( + namespace=namespace, + name=name, + cidrs=new_cidrs, + logger=logger, + ) + else: + ingress = await read_grand_central_ingress(namespace=namespace, name=name) - if ingress: - await networking.patch_namespaced_ingress( - name=f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", - namespace=namespace, - body={ - "metadata": { - "annotations": { - "nginx.ingress.kubernetes.io/whitelist-source-range": ",".join( # noqa - new_cidrs - ) + if ingress: + await networking.patch_namespaced_ingress( + name=f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + namespace=namespace, + body={ + "metadata": { + "annotations": { + "nginx.ingress.kubernetes.io/whitelist-source-range": ",".join( # noqa + new_cidrs + ) + } } - } - }, - ) + }, + ) await send_operation_progress_notification( namespace=namespace, diff --git a/crate/operator/operations.py b/crate/operator/operations.py index 0293e83e..017c3d86 100644 --- a/crate/operator/operations.py +++ b/crate/operator/operations.py @@ -77,7 +77,14 @@ delete_service as delete_clusterip_service, delete_traefik_resources, ) -from crate.operator.grand_central import read_grand_central_deployment +from crate.operator.grand_central import ( + create_grand_central_exposure, + delete_grand_central_ingress, + delete_grand_central_traefik_resources, + read_grand_central_deployment, + read_grand_central_httproute, + read_grand_central_ingress, +) from crate.operator.sql import execute_sql from crate.operator.utils import crate from crate.operator.utils.jwt import crate_version_supports_jwt @@ -642,6 +649,18 @@ async def are_traefik_resources_present(namespace: str, name: str) -> bool: return True +async def is_grand_central_exposed( + namespace: str, name: str, use_traefik: bool +) -> bool: + """ + Return True if the active exposure resource already exists. + """ + if use_traefik: + return await read_grand_central_httproute(namespace, name) is not None + else: + return await read_grand_central_ingress(namespace, name) is not None + + async def _recreate_traefik_resources( namespace: str, name: str, logger: logging.Logger ): @@ -760,7 +779,7 @@ async def suspend_or_start_cluster( ) # scale grand central deployment back up if it exists await suspend_or_start_grand_central( - apps, namespace, name, suspend=False + apps, namespace, name, suspend=False, logger=logger ) await send_operation_progress_notification( namespace=namespace, @@ -815,7 +834,7 @@ async def suspend_or_start_cluster( ) # scale grand central deployment down if it exists await suspend_or_start_grand_central( - apps, namespace, name, suspend=True + apps, namespace, name, suspend=True, logger=logger ) await send_operation_progress_notification( namespace=namespace, @@ -850,16 +869,51 @@ async def suspend_or_start_cluster( async def suspend_or_start_grand_central( - apps: AppsV1Api, namespace: str, name: str, suspend: bool + apps: AppsV1Api, + namespace: str, + name: str, + suspend: bool, + logger: logging.Logger, ): deployment = await read_grand_central_deployment(namespace=namespace, name=name) + if not deployment: + return - if deployment: + # Determine which exposure mode is active + cratedb = await get_cratedb_resource(namespace, name) + spec = cratedb["spec"] + meta = cratedb["metadata"] + exposure = spec.get("cluster", {}).get("exposure", "loadbalancer") + use_traefik = exposure == "traefik" + + if suspend: + # Scale the deployment to 0 first, then remove the routing resource + await update_deployment_replicas( + apps, + namespace, + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + 0, + ) + if use_traefik: + await delete_grand_central_traefik_resources(namespace, name, logger) + else: + await delete_grand_central_ingress(namespace, name, logger) + else: + # Recreate the routing resource, then scale back up + if not await is_grand_central_exposed(namespace, name, use_traefik): + await create_grand_central_exposure( + namespace=namespace, + name=name, + spec=spec, + meta=meta, + logger=logger, + use_traefik=use_traefik, + ) await update_deployment_replicas( apps, namespace, f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", - 0 if suspend else 1, + 1, ) diff --git a/crate/operator/restore_backup.py b/crate/operator/restore_backup.py index d8807781..f39f1433 100644 --- a/crate/operator/restore_backup.py +++ b/crate/operator/restore_backup.py @@ -283,7 +283,7 @@ async def shards_recovery_in_progress( if not tables or (len(tables) == 1 and tables[0].lower() == "all"): tables = await get_snapshot_tables(conn_factory, snapshot, logger) for t in tables: - (schema, table_name) = t.rsplit(".", 1) + schema, table_name = t.rsplit(".", 1) try: # If there is at least one shard, the table is not empty. # We need to check that to ensure the operation does not fail while @@ -1145,7 +1145,9 @@ async def _restart_grand_central( ): async with GlobalApiClient() as api_client: apps = AppsV1Api(api_client) - await suspend_or_start_grand_central(apps, namespace, name, suspend=False) + await suspend_or_start_grand_central( + apps, namespace, name, suspend=False, logger=logger + ) class SendSuccessNotificationSubHandler(StateBasedSubHandler): @@ -1274,7 +1276,9 @@ async def restore_internal_tables_context( await internal_tables.set_gc_tables(restore_type, tables) if internal_tables.has_tables_to_process(): logger.info("Suspending GC operations before restoring internal tables") - await suspend_or_start_grand_central(apps, namespace, name, suspend=True) + await suspend_or_start_grand_central( + apps, namespace, name, suspend=True, logger=logger + ) yield internal_tables diff --git a/deploy/charts/crate-operator/templates/rbac.yaml b/deploy/charts/crate-operator/templates/rbac.yaml index 97761db8..283227d9 100644 --- a/deploy/charts/crate-operator/templates/rbac.yaml +++ b/deploy/charts/crate-operator/templates/rbac.yaml @@ -89,6 +89,18 @@ rules: resources: - middlewaretcps - ingressroutetcps + - middlewares + verbs: + - create + - get + - list + - watch + - patch + - delete +- apiGroups: + - gateway.networking.k8s.io + resources: + - httproutes verbs: - create - get diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml index 74dfe42b..5dba8b36 100644 --- a/deploy/rbac.yaml +++ b/deploy/rbac.yaml @@ -114,6 +114,18 @@ rules: resources: - middlewaretcps - ingressroutetcps + - middlewares + verbs: + - create + - get + - list + - watch + - patch + - delete +- apiGroups: + - gateway.networking.k8s.io + resources: + - httproutes verbs: - create - get From a321a8c740392e579c3dba05d9cf7a84c67d4a80 Mon Sep 17 00:00:00 2001 From: Thomas Achatz Date: Fri, 15 May 2026 11:01:43 +0200 Subject: [PATCH 02/10] Fix IP whitelist annotation when switching exposure --- crate/operator/grand_central.py | 68 ++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 31 deletions(-) diff --git a/crate/operator/grand_central.py b/crate/operator/grand_central.py index 89d1d8b1..52100d17 100644 --- a/crate/operator/grand_central.py +++ b/crate/operator/grand_central.py @@ -639,7 +639,7 @@ async def create_grand_central_exposure( continue_on_conflict=True, namespace=namespace, body=get_grand_central_ingress( - owner_references, name, labels, hostname, spec + owner_references, name, labels, hostname, spec, cidrs ), ) @@ -681,45 +681,51 @@ def get_grand_central_ingress( labels: LabelType, hostname: str, spec: kopf.Spec, + cidrs: Optional[List[str]] = None, ) -> V1Ingress: allow_origin = ( spec["cluster"].get("settings", {}).get("http.cors.allow-origin") or "$http_origin" ) + annotations = { + "external-dns.alpha.kubernetes.io/hostname": hostname, + "nginx.ingress.kubernetes.io/proxy-body-size": "1G", + "nginx.ingress.kubernetes.io/configuration-snippet": ( + """ + gzip on; + gzip_types + application/javascript + text/javascript; + more_set_headers "X-XSS-Protection: 1;mode=block" + "X-Frame-Options: DENY" + "X-Content-Type-Options: nosniff" + "Referrer-Policy: strict-origin-when-cross-origin" + ; + """ # noqa + ), + "nginx.ingress.kubernetes.io/proxy-buffer-size": "64k", + "nginx.ingress.kubernetes.io/ssl-redirect": "true", + "nginx.ingress.kubernetes.io/enable-cors": "true", + "nginx.ingress.kubernetes.io/cors-allow-credentials": "true", + "nginx.ingress.kubernetes.io/cors-allow-origin": allow_origin, + "nginx.ingress.kubernetes.io/cors-allow-methods": ( + "GET,POST,PUT,PATCH,OPTIONS,DELETE" + ), + "nginx.ingress.kubernetes.io/cors-allow-headers": ( + "Content-Type,Authorization" + ), + "nginx.ingress.kubernetes.io/cors-max-age": "7200", + } + if cidrs: # ← add + annotations["nginx.ingress.kubernetes.io/whitelist-source-range"] = ",".join( + cidrs + ) return V1Ingress( metadata=V1ObjectMeta( name=f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", labels=labels, owner_references=owner_references, - annotations={ - "external-dns.alpha.kubernetes.io/hostname": hostname, - "nginx.ingress.kubernetes.io/proxy-body-size": "1G", - "nginx.ingress.kubernetes.io/configuration-snippet": ( - """ - gzip on; - gzip_types - application/javascript - text/javascript; - more_set_headers "X-XSS-Protection: 1;mode=block" - "X-Frame-Options: DENY" - "X-Content-Type-Options: nosniff" - "Referrer-Policy: strict-origin-when-cross-origin" - ; - """ # noqa - ), - "nginx.ingress.kubernetes.io/proxy-buffer-size": "64k", - "nginx.ingress.kubernetes.io/ssl-redirect": "true", - "nginx.ingress.kubernetes.io/enable-cors": "true", - "nginx.ingress.kubernetes.io/cors-allow-credentials": "true", - "nginx.ingress.kubernetes.io/cors-allow-origin": allow_origin, - "nginx.ingress.kubernetes.io/cors-allow-methods": ( - "GET,POST,PUT,PATCH,OPTIONS,DELETE" - ), - "nginx.ingress.kubernetes.io/cors-allow-headers": ( - "Content-Type,Authorization" - ), - "nginx.ingress.kubernetes.io/cors-max-age": "7200", - }, + annotations=annotations, ), spec=V1IngressSpec( ingress_class_name="nginx", @@ -848,7 +854,7 @@ async def create_grand_central_backend( continue_on_conflict=True, namespace=namespace, body=get_grand_central_ingress( - owner_references, name, labels, hostname, spec + owner_references, name, labels, hostname, spec, cidrs ), ) From 694633979da35eea56b13edf27aa518c3cabdd24 Mon Sep 17 00:00:00 2001 From: Thomas Achatz Date: Mon, 18 May 2026 10:57:08 +0200 Subject: [PATCH 03/10] Add tests --- CHANGES.rst | 3 + crate/operator/grand_central.py | 137 ++++- .../handlers/handle_update_allowed_cidrs.py | 1 - crate/operator/operations.py | 28 +- tests/test_create_grand_central.py | 504 ++++++++++++++++++ 5 files changed, 665 insertions(+), 8 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 16a196df..b057f738 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -10,6 +10,9 @@ Unreleased ``traefik``, the operator creates a ClusterIP service with Traefik IngressRouteTCP and MiddlewareTCP resources instead of a cloud LoadBalancer. +* Replaced grand-central Ingress with HTTPRoute and Traefik Middlewares when + ``spec.cluster.exposure`` is set to ``traefik``. + 2.60.0 (2026-04-22) ------------------- diff --git a/crate/operator/grand_central.py b/crate/operator/grand_central.py index 52100d17..0ce6bc82 100644 --- a/crate/operator/grand_central.py +++ b/crate/operator/grand_central.py @@ -105,9 +105,9 @@ _TRAEFIK_VERSION = "v1alpha1" _MIDDLEWARE_PLURAL = "middlewares" -_MIDDLEWARE_COMPRESS_JS = "compress-js" -_MIDDLEWARE_BUFFERING = "buffering" -_MIDDLEWARE_IP_ALLOWLIST = "ip-allowlist" +_MIDDLEWARE_COMPRESS_JS = "grand-central-compress-js" +_MIDDLEWARE_BUFFERING = "grand-central-buffering" +_MIDDLEWARE_IP_ALLOWLIST = "grand-central-ip-allowlist" _OPEN_CIDR = ["0.0.0.0/0", "::/0"] @@ -291,7 +291,12 @@ def get_grand_central_service( def _build_response_header_filter(cors_allow_origin: str) -> Dict[str, Any]: - """Return a ResponseHeaderModifier filter dict shared by all route rules.""" + """ + Build a Gateway API ResponseHeaderModifier filter that sets security and + CORS headers on all grand-central HTTPRoute rules. + + :param cors_allow_origin: Value for the ``Access-Control-Allow-Origin`` header. + """ return { "type": "ResponseHeaderModifier", "responseHeaderModifier": { @@ -322,6 +327,11 @@ def _build_response_header_filter(cors_allow_origin: str) -> Dict[str, Any]: def _middleware_ref(name: str) -> Dict[str, Any]: + """ + Build a Gateway API ExtensionRef filter pointing to a Traefik Middleware. + + :param name: The name of the Traefik Middleware resource to reference. + """ return { "type": "ExtensionRef", "extensionRef": { @@ -339,6 +349,20 @@ def get_grand_central_httproute( hostname: str, spec: kopf.Spec, ) -> Dict[str, Any]: + """ + Build the HTTPRoute manifest for grand-central. + + Creates two rules: one matching ``/api`` (with JS compression, buffering, + and IP allowlist middlewares) and one matching ``/socket.io`` (with + buffering and IP allowlist only). Both rules set security and CORS response + headers via a ResponseHeaderModifier filter. + + :param owner_references: Owner references to set on the resource. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param labels: Kubernetes labels to apply to the resource. + :param hostname: The external hostname the HTTPRoute should match. + :param spec: The ``spec`` section of the CrateDB custom resource. + """ cors_allow_origin = ( spec["cluster"].get("settings", {}).get("http.cors.allow-origin") or "*" ) @@ -413,6 +437,14 @@ def _build_middleware_base( labels: LabelType, owner_references: Optional[List[V1OwnerReference]], ) -> Dict[str, Any]: + """ + Build the common metadata scaffold for a Traefik Middleware resource. + + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param middleware_name: The Kubernetes resource name for this Middleware. + :param labels: Kubernetes labels to apply to the resource. + :param owner_references: Owner references to set on the resource. + """ raw_owner_refs = [] if owner_references: for ref in owner_references: @@ -442,6 +474,17 @@ def get_grand_central_middleware_compress_js( name: str, labels: LabelType, ) -> Dict[str, Any]: + """ + Build the ``grand-central-compress-js`` Traefik Middleware manifest. + + Configures Traefik to gzip-compress ``application/javascript`` and + ``text/javascript`` responses, replacing the nginx ``gzip`` configuration + snippet from the legacy Ingress. + + :param owner_references: Owner references to set on the resource. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param labels: Kubernetes labels to apply to the resource. + """ body = _build_middleware_base( name, _MIDDLEWARE_COMPRESS_JS, labels, owner_references ) @@ -461,6 +504,17 @@ def get_grand_central_middleware_buffering( name: str, labels: LabelType, ) -> Dict[str, Any]: + """ + Build the ``grand-central-buffering`` Traefik Middleware manifest. + + Configures request/response buffering with a 1 GiB maximum request body, + replacing the nginx ``proxy-body-size: 1G`` annotation from the legacy + Ingress. + + :param owner_references: Owner references to set on the resource. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param labels: Kubernetes labels to apply to the resource. + """ body = _build_middleware_base(name, _MIDDLEWARE_BUFFERING, labels, owner_references) body["spec"] = { "buffering": { @@ -477,7 +531,20 @@ def get_grand_central_middleware_ip_allowlist( labels: LabelType, cidrs: Optional[List[str]] = None, ) -> Dict[str, Any]: - """Traefik Middleware: IP allow-list (replaces nginx whitelist-source-range).""" + """ + Build the ``grand-central-ip-allowlist`` Traefik Middleware manifest. + + Restricts access to the listed CIDR ranges, replacing the nginx + ``whitelist-source-range`` annotation from the legacy Ingress. When + ``cidrs`` is empty or ``None``, all traffic is allowed (``0.0.0.0/0`` + and ``::/0``). + + :param owner_references: Owner references to set on the resource. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param labels: Kubernetes labels to apply to the resource. + :param cidrs: Optional list of CIDR ranges to allow. Defaults to open + access when not provided. + """ body = _build_middleware_base( name, _MIDDLEWARE_IP_ALLOWLIST, labels, owner_references ) @@ -495,7 +562,19 @@ async def update_grand_central_ip_allowlist( cidrs: List[str], logger: logging.Logger, ) -> None: - """Patch the ip-allowlist Middleware with a new set of CIDRs.""" + """ + Patch the ``grand-central-ip-allowlist`` Middleware in place with a new + set of CIDR ranges. + + When ``cidrs`` is empty, the middleware is set to allow all traffic + (``0.0.0.0/0`` and ``::/0``) rather than blocking everything. + + :param namespace: The Kubernetes namespace where the Middleware resides. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param cidrs: New list of CIDR ranges. Pass an empty list to remove all + restrictions. + :param logger: Logger for operation tracking. + """ source_range = cidrs if cidrs else _OPEN_CIDR async with GlobalApiClient() as api_client: custom = CustomObjectsApi(api_client) @@ -513,6 +592,16 @@ async def update_grand_central_ip_allowlist( async def delete_grand_central_ingress( namespace: str, name: str, logger: logging.Logger ) -> None: + """ + Delete the nginx Ingress resource for grand-central. + + A 404 response is treated as success so that the function is safe to call + even when the Ingress has already been removed. + + :param namespace: The Kubernetes namespace where the Ingress resides. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param logger: Logger for operation tracking. + """ async with GlobalApiClient() as api_client: networking = NetworkingV1Api(api_client) try: @@ -529,6 +618,18 @@ async def delete_grand_central_ingress( async def delete_grand_central_traefik_resources( namespace: str, name: str, logger: logging.Logger ) -> None: + """ + Delete the HTTPRoute and all three Traefik Middlewares for grand-central. + + Resources are deleted in order: HTTPRoute first, then + ``grand-central-compress-js``, ``grand-central-buffering``, and + ``grand-central-ip-allowlist``. A 404 response for any resource is + treated as success. + + :param namespace: The Kubernetes namespace where the resources reside. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param logger: Logger for operation tracking. + """ async with GlobalApiClient() as api_client: custom = CustomObjectsApi(api_client) @@ -568,6 +669,13 @@ async def delete_grand_central_traefik_resources( async def read_grand_central_httproute( namespace: str, name: str ) -> Optional[Dict[str, Any]]: + """ + Return the HTTPRoute object for grand-central, or ``None`` if it does not + exist or cannot be read. + + :param namespace: The Kubernetes namespace to look up the HTTPRoute in. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + """ async with GlobalApiClient() as api_client: custom = CustomObjectsApi(api_client) try: @@ -590,6 +698,23 @@ async def create_grand_central_exposure( logger: logging.Logger, use_traefik: bool = False, ) -> None: + """ + Create only the routing resources for grand-central (HTTPRoute and + Traefik Middlewares, or nginx Ingress), without touching the Deployment + or Service. + + This is used when resuming a suspended cluster or when switching the + ``spec.cluster.exposure`` field, where the Deployment and Service already + exist and only the routing layer needs to be (re-)created. + + :param namespace: The Kubernetes namespace to create resources in. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param spec: The ``spec`` section of the CrateDB custom resource. + :param meta: The ``metadata`` section of the CrateDB custom resource. + :param logger: Logger for operation tracking. + :param use_traefik: When ``True``, create an HTTPRoute and Traefik + Middlewares. When ``False`` (default), create an nginx Ingress. + """ owner_references = get_owner_references(name, meta) cluster_name = spec["cluster"]["name"] external_dns = spec["cluster"]["externalDNS"] diff --git a/crate/operator/handlers/handle_update_allowed_cidrs.py b/crate/operator/handlers/handle_update_allowed_cidrs.py index b10ea982..708e1ea4 100644 --- a/crate/operator/handlers/handle_update_allowed_cidrs.py +++ b/crate/operator/handlers/handle_update_allowed_cidrs.py @@ -92,7 +92,6 @@ async def update_service_allowed_cidrs( if exposure == "traefik": await update_traefik_ip_restriction(namespace, name, new_cidrs, logger) - if exposure == "traefik": httproute = await read_grand_central_httproute( namespace=namespace, name=name ) diff --git a/crate/operator/operations.py b/crate/operator/operations.py index 017c3d86..f0ee2202 100644 --- a/crate/operator/operations.py +++ b/crate/operator/operations.py @@ -653,7 +653,15 @@ async def is_grand_central_exposed( namespace: str, name: str, use_traefik: bool ) -> bool: """ - Return True if the active exposure resource already exists. + Return ``True`` if the active grand-central routing resource already exists. + + Checks for an HTTPRoute when ``use_traefik`` is ``True``, or for an nginx + Ingress otherwise. + + :param namespace: The Kubernetes namespace to check. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param use_traefik: When ``True``, check for an HTTPRoute. When ``False``, + check for an nginx Ingress. """ if use_traefik: return await read_grand_central_httproute(namespace, name) is not None @@ -875,6 +883,24 @@ async def suspend_or_start_grand_central( suspend: bool, logger: logging.Logger, ): + """ + Scale the grand-central Deployment to 0 (suspend) or 1 (start) and + manage its routing resources accordingly. + + On suspend, the Deployment is scaled to 0 and the active routing resource + (HTTPRoute + Middlewares or nginx Ingress) is deleted so it no longer + routes traffic. On start, the routing resource is recreated if absent + before the Deployment is scaled back up. + + Does nothing if no grand-central Deployment exists for this cluster. + + :param apps: An instance of the Kubernetes Apps V1 API. + :param namespace: The Kubernetes namespace for the CrateDB cluster. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param suspend: When ``True``, scale down and delete routing resources. + When ``False``, recreate routing resources and scale back up. + :param logger: Logger for operation tracking. + """ deployment = await read_grand_central_deployment(namespace=namespace, name=name) if not deployment: return diff --git a/tests/test_create_grand_central.py b/tests/test_create_grand_central.py index ec9ea904..41f7c6a9 100644 --- a/tests/test_create_grand_central.py +++ b/tests/test_create_grand_central.py @@ -19,9 +19,11 @@ # with Crate these terms will supersede the license and you may use the # software solely pursuant to the terms of the relevant commercial agreement. from typing import Set +from unittest import mock import pytest from kubernetes_asyncio.client import ( + ApiException, AppsV1Api, CoreV1Api, CustomObjectsApi, @@ -34,6 +36,7 @@ GC_USERNAME, GRAND_CENTRAL_PROMETHEUS_PORT, GRAND_CENTRAL_RESOURCE_PREFIX, + KOPF_STATE_STORE_PREFIX, RESOURCE_CRATEDB, ) from crate.operator.cratedb import connection_factory @@ -48,9 +51,39 @@ is_kopf_handler_finished, require_connection, start_cluster, + wait_for_kopf_handler, +) + +_GC_TRAEFIK_MIDDLEWARES = ( + "grand-central-compress-js", + "grand-central-buffering", + "grand-central-ip-allowlist", ) +async def _start_gc_on_cluster(coapi, name, namespace_name, mock_bootstrap=None): + """Patch grandCentral onto a running cluster and wait for the handler.""" + await coapi.patch_namespaced_custom_object( + group=API_GROUP, + version="v1", + plural=RESOURCE_CRATEDB, + namespace=namespace_name, + name=name, + body=[ + { + "op": "add", + "path": "/spec/grandCentral", + "value": { + "backendEnabled": True, + "backendImage": "cloud.registry.cr8.net/crate/grand-central:latest", + "apiUrl": "https://my-cratedb-api.cloud/", + "jwkUrl": "https://my-cratedb-api.cloud/api/v2/meta/jwk/", + }, + } + ], + ) + + @pytest.mark.k8s @pytest.mark.asyncio async def test_create_grand_central(faker, namespace, kopf_runner, api_client): @@ -249,6 +282,413 @@ async def test_create_grand_central(faker, namespace, kopf_runner, api_client): assert svc_prometheus_port.target_port == GRAND_CENTRAL_PROMETHEUS_PORT +@pytest.mark.k8s +@pytest.mark.asyncio +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +async def test_create_grand_central_traefik( + mock_send_notification, + faker, + namespace, + kopf_runner, + api_client, +): + """ + Creating a cluster with exposure=traefik and then enabling grandCentral + should produce an HTTPRoute and all three Traefik Middlewares - no Ingress. + """ + apps = AppsV1Api(api_client) + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + networking = NetworkingV1Api(api_client) + name = faker.domain_word() + + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + wait_for_lb=False, + additional_cluster_spec={ + "exposure": "traefik", + "externalDNS": f"{name}.example.com", + }, + ) + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + ) + + await _start_gc_on_cluster(coapi, name, namespace.metadata.name) + + # Deployment and Service are always created regardless of exposure + await assert_wait_for( + True, + does_deployment_exist, + apps, + namespace.metadata.name, + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + ) + await assert_wait_for( + True, + do_services_exist, + core, + namespace.metadata.name, + {f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}"}, + ) + + # HTTPRoute must exist + await assert_wait_for( + True, + does_gc_httproute_exist, + coapi, + namespace.metadata.name, + name, + err_msg="GC HTTPRoute was not created", + timeout=DEFAULT_TIMEOUT, + ) + + # All three Middlewares must exist + for mw in _GC_TRAEFIK_MIDDLEWARES: + await assert_wait_for( + True, + does_gc_middleware_exist, + coapi, + namespace.metadata.name, + mw, + err_msg=f"Middleware '{mw}' was not created", + timeout=DEFAULT_TIMEOUT, + ) + + # No nginx Ingress should be created + await assert_wait_for( + False, + does_ingress_exist, + networking, + namespace.metadata.name, + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + err_msg="Unexpected Ingress was created for traefik exposure", + timeout=DEFAULT_TIMEOUT, + ) + + +@pytest.mark.k8s +@pytest.mark.asyncio +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +async def test_gc_change_exposure_loadbalancer_to_traefik( + mock_send_notification, + faker, + namespace, + kopf_runner, + api_client, +): + """ + Switching a cluster with GC from loadbalancer to traefik should delete + the Ingress and create the HTTPRoute + Middlewares. + """ + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + networking = NetworkingV1Api(api_client) + name = faker.domain_word() + + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + additional_cluster_spec={ + "externalDNS": f"{name}.example.com", + }, + ) + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + ) + await _start_gc_on_cluster(coapi, name, namespace.metadata.name) + + # Ingress exists before the switch + await assert_wait_for( + True, + does_ingress_exist, + networking, + namespace.metadata.name, + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + err_msg="Initial Ingress was not created", + timeout=DEFAULT_TIMEOUT, + ) + + # Switch to traefik + await coapi.patch_namespaced_custom_object( + group=API_GROUP, + version="v1", + plural=RESOURCE_CRATEDB, + namespace=namespace.metadata.name, + name=name, + body=[{"op": "replace", "path": "/spec/cluster/exposure", "value": "traefik"}], + ) + await assert_wait_for( + True, + is_kopf_handler_finished, + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_update", + err_msg="Exposure change handler did not finish", + timeout=DEFAULT_TIMEOUT, + ) + + # Ingress gone, HTTPRoute + Middlewares present + await assert_wait_for( + False, + does_ingress_exist, + networking, + namespace.metadata.name, + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + err_msg="Ingress was not deleted after switching to traefik", + timeout=DEFAULT_TIMEOUT, + ) + await assert_wait_for( + True, + does_gc_httproute_exist, + coapi, + namespace.metadata.name, + name, + err_msg="GC HTTPRoute was not created after switching to traefik", + timeout=DEFAULT_TIMEOUT, + ) + for mw in _GC_TRAEFIK_MIDDLEWARES: + await assert_wait_for( + True, + does_gc_middleware_exist, + coapi, + namespace.metadata.name, + mw, + err_msg=f"Middleware '{mw}' was not created after switching to traefik", + timeout=DEFAULT_TIMEOUT, + ) + + +@pytest.mark.k8s +@pytest.mark.asyncio +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +@mock.patch("crate.operator.grand_central.bootstrap_gc_admin_user") +async def test_gc_change_exposure_traefik_to_loadbalancer( + mock_bootstrap, + mock_send_notification, + faker, + namespace, + kopf_runner, + api_client, +): + """ + Switching a cluster with GC from traefik to loadbalancer should delete + the HTTPRoute + Middlewares and create the Ingress. + """ + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + networking = NetworkingV1Api(api_client) + name = faker.domain_word() + cidrs = ["10.0.0.0/8", "192.168.1.0/24"] + + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + wait_for_lb=False, + additional_cluster_spec={ + "exposure": "traefik", + "externalDNS": f"{name}.example.com", + "allowedCIDRs": cidrs, + }, + ) + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + ) + await _start_gc_on_cluster(coapi, name, namespace.metadata.name) + + # HTTPRoute exists before the switch + await assert_wait_for( + True, + does_gc_httproute_exist, + coapi, + namespace.metadata.name, + name, + err_msg="Initial HTTPRoute was not created", + timeout=DEFAULT_TIMEOUT, + ) + + # Switch to loadbalancer + await coapi.patch_namespaced_custom_object( + group=API_GROUP, + version="v1", + plural=RESOURCE_CRATEDB, + namespace=namespace.metadata.name, + name=name, + body=[ + {"op": "replace", "path": "/spec/cluster/exposure", "value": "loadbalancer"} + ], + ) + await assert_wait_for( + True, + is_kopf_handler_finished, + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_update", + err_msg="Exposure change handler did not finish", + timeout=DEFAULT_TIMEOUT, + ) + + # HTTPRoute + Middlewares gone, Ingress present + await assert_wait_for( + False, + does_gc_httproute_exist, + coapi, + namespace.metadata.name, + name, + err_msg="HTTPRoute was not deleted after switching to loadbalancer", + timeout=DEFAULT_TIMEOUT, + ) + for mw in _GC_TRAEFIK_MIDDLEWARES: + await assert_wait_for( + False, + does_gc_middleware_exist, + coapi, + namespace.metadata.name, + mw, + err_msg=f"Middleware '{mw}' was not deleted after switching to loadbalancer", # noqa + timeout=DEFAULT_TIMEOUT, + ) + await assert_wait_for( + True, + does_ingress_exist, + networking, + namespace.metadata.name, + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + err_msg="Ingress was not created after switching to loadbalancer", + timeout=DEFAULT_TIMEOUT, + ) + ingress = await networking.read_namespaced_ingress( + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", namespace.metadata.name + ) + whitelist = ingress.metadata.annotations.get( + "nginx.ingress.kubernetes.io/whitelist-source-range" + ) + assert whitelist is not None, "whitelist-source-range annotation is missing" + assert set(whitelist.split(",")) == set( + cidrs + ), f"Expected CIDRs {cidrs}, got {whitelist}" + + +@pytest.mark.k8s +@pytest.mark.asyncio +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +async def test_gc_cidr_update_traefik( + mock_send_notification, + faker, + namespace, + kopf_runner, + api_client, +): + """ + Updating allowedCIDRs on a traefik cluster with GC should patch the + ip-allowlist Middleware's sourceRange accordingly. + """ + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + name = faker.domain_word() + initial_cidrs = ["10.0.0.0/8"] + updated_cidrs = ["10.0.0.0/8", "192.168.1.0/24"] + + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + wait_for_lb=False, + additional_cluster_spec={ + "exposure": "traefik", + "externalDNS": f"{name}.example.com", + "allowedCIDRs": initial_cidrs, + }, + ) + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + ) + await _start_gc_on_cluster(coapi, name, namespace.metadata.name) + + # ip-allowlist starts with initial_cidrs + await assert_wait_for( + True, + does_gc_middleware_exist, + coapi, + namespace.metadata.name, + "grand-central-ip-allowlist", + err_msg="ip-allowlist Middleware was not created", + timeout=DEFAULT_TIMEOUT, + ) + cidrs = await get_gc_ip_allowlist_cidrs(coapi, namespace.metadata.name) + assert set(cidrs) == set(initial_cidrs), f"Expected {initial_cidrs}, got {cidrs}" + + # Update CIDRs + await coapi.patch_namespaced_custom_object( + group=API_GROUP, + version="v1", + plural=RESOURCE_CRATEDB, + namespace=namespace.metadata.name, + name=name, + body=[ + { + "op": "replace", + "path": "/spec/cluster/allowedCIDRs", + "value": updated_cidrs, + } + ], + ) + await assert_wait_for( + True, + is_kopf_handler_finished, + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/service_cidr_changes/spec.cluster.allowedCIDRs", + err_msg="CIDR update handler did not finish", + timeout=DEFAULT_TIMEOUT, + ) + + # ip-allowlist now reflects updated_cidrs + async def _cidrs_updated(coapi, namespace): + cidrs = await get_gc_ip_allowlist_cidrs(coapi, namespace) + return set(cidrs) == set(updated_cidrs) + + await assert_wait_for( + True, + _cidrs_updated, + coapi, + namespace.metadata.name, + err_msg=f"ip-allowlist sourceRange was not updated to {updated_cidrs}", + timeout=DEFAULT_TIMEOUT, + ) + + async def does_deployment_exist(apps: AppsV1Api, namespace: str, name: str) -> bool: deployments = await apps.list_namespaced_deployment(namespace=namespace) return name in (d.metadata.name for d in deployments.items) @@ -271,3 +711,67 @@ async def does_ingress_exist( async def does_secret_exist(core: CoreV1Api, namespace: str, name: str) -> bool: secrets = await core.list_namespaced_secret(namespace) return name in (s.metadata.name for s in secrets.items) + + +async def does_gc_httproute_exist( + coapi: CustomObjectsApi, namespace: str, name: str +) -> bool: + try: + await coapi.get_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1", + namespace=namespace, + plural="httproutes", + name=f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", + ) + return True + except ApiException as e: + if e.status == 404: + return False + raise + + +async def does_gc_middleware_exist( + coapi: CustomObjectsApi, namespace: str, middleware_name: str +) -> bool: + try: + await coapi.get_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="middlewares", + name=middleware_name, + ) + return True + except ApiException as e: + if e.status == 404: + return False + raise + + +async def get_gc_ip_allowlist_cidrs(coapi: CustomObjectsApi, namespace: str) -> list: + try: + mw = await coapi.get_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="middlewares", + name="grand-central-ip-allowlist", + ) + return mw.get("spec", {}).get("ipAllowList", {}).get("sourceRange", []) + except ApiException as e: + if e.status == 404: + return [] + raise + + +async def gc_deployment_replicas(apps: AppsV1Api, namespace: str, name: str) -> int: + try: + d = await apps.read_namespaced_deployment( + f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", namespace + ) + return d.spec.replicas + except ApiException as e: + if e.status == 404: + return -1 + raise From 244e45e9df33d35549d68501cce637fcbd4f8a9c Mon Sep 17 00:00:00 2001 From: Thomas Achatz Date: Thu, 21 May 2026 14:48:03 +0200 Subject: [PATCH 04/10] Create separate Middleware for CORS headers --- crate/operator/grand_central.py | 79 ++++++++++++++------- tests/test_create_grand_central.py | 110 ++++++++++++++++++++++++++++- 2 files changed, 163 insertions(+), 26 deletions(-) diff --git a/crate/operator/grand_central.py b/crate/operator/grand_central.py index 0ce6bc82..7b2edae7 100644 --- a/crate/operator/grand_central.py +++ b/crate/operator/grand_central.py @@ -108,6 +108,7 @@ _MIDDLEWARE_COMPRESS_JS = "grand-central-compress-js" _MIDDLEWARE_BUFFERING = "grand-central-buffering" _MIDDLEWARE_IP_ALLOWLIST = "grand-central-ip-allowlist" +_MIDDLEWARE_CORS = "grand-central-cors" _OPEN_CIDR = ["0.0.0.0/0", "::/0"] @@ -290,12 +291,10 @@ def get_grand_central_service( ) -def _build_response_header_filter(cors_allow_origin: str) -> Dict[str, Any]: +def _build_response_header_filter() -> Dict[str, Any]: """ - Build a Gateway API ResponseHeaderModifier filter that sets security and - CORS headers on all grand-central HTTPRoute rules. - - :param cors_allow_origin: Value for the ``Access-Control-Allow-Origin`` header. + Build a Gateway API ResponseHeaderModifier filter that sets security + headers on all grand-central HTTPRoute rules. """ return { "type": "ResponseHeaderModifier", @@ -307,20 +306,6 @@ def _build_response_header_filter(cors_allow_origin: str) -> Dict[str, Any]: "name": "Referrer-Policy", "value": "strict-origin-when-cross-origin", }, - { - "name": "Access-Control-Allow-Origin", - "value": cors_allow_origin, - }, - {"name": "Access-Control-Allow-Credentials", "value": "true"}, - { - "name": "Access-Control-Allow-Methods", - "value": "GET,POST,PUT,PATCH,OPTIONS,DELETE", - }, - { - "name": "Access-Control-Allow-Headers", - "value": "Content-Type,Authorization", - }, - {"name": "Access-Control-Max-Age", "value": "7200"}, ] }, } @@ -363,10 +348,7 @@ def get_grand_central_httproute( :param hostname: The external hostname the HTTPRoute should match. :param spec: The ``spec`` section of the CrateDB custom resource. """ - cors_allow_origin = ( - spec["cluster"].get("settings", {}).get("http.cors.allow-origin") or "*" - ) - header_filter = _build_response_header_filter(cors_allow_origin) + header_filter = _build_response_header_filter() service_name = f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}" raw_owner_refs = [] @@ -405,6 +387,7 @@ def get_grand_central_httproute( "matches": [{"path": {"type": "PathPrefix", "value": "/api"}}], "filters": [ header_filter, + _middleware_ref(_MIDDLEWARE_CORS), _middleware_ref(_MIDDLEWARE_COMPRESS_JS), _middleware_ref(_MIDDLEWARE_BUFFERING), _middleware_ref(_MIDDLEWARE_IP_ALLOWLIST), @@ -419,6 +402,7 @@ def get_grand_central_httproute( ], "filters": [ header_filter, + _middleware_ref(_MIDDLEWARE_CORS), _middleware_ref(_MIDDLEWARE_BUFFERING), _middleware_ref(_MIDDLEWARE_IP_ALLOWLIST), ], @@ -556,6 +540,50 @@ def get_grand_central_middleware_ip_allowlist( return body +def get_grand_central_middleware_cors( + owner_references: Optional[List[V1OwnerReference]], + name: str, + labels: LabelType, + spec: kopf.Spec, +) -> Dict[str, Any]: + """ + Build the ``grand-central-cors`` Traefik Middleware manifest. + + The CrateDB setting accepts a comma-separated list of origins; these are + split into individual entries. Falls back to ``["*"]`` when the setting + is absent. + + :param owner_references: Owner references to set on the resource. + :param name: The CrateDB custom resource name defining the CrateDB cluster. + :param labels: Kubernetes labels to apply to the resource. + :param spec: The ``spec`` section of the CrateDB custom resource, used to + read ``cluster.settings.http.cors.allow-origin``. + """ + raw_origin = ( + spec["cluster"].get("settings", {}).get("http.cors.allow-origin") or "*" + ) + origin_list = [o.strip() for o in raw_origin.split(",") if o.strip()] + + body = _build_middleware_base(name, _MIDDLEWARE_CORS, labels, owner_references) + body["spec"] = { + "headers": { + "accessControlAllowOriginList": origin_list, + "accessControlAllowCredentials": True, + "accessControlAllowMethods": [ + "GET", + "POST", + "PUT", + "PATCH", + "OPTIONS", + "DELETE", + ], + "accessControlAllowHeaders": ["Content-Type", "Authorization"], + "accessControlMaxAge": 7200, + } + } + return body + + async def update_grand_central_ip_allowlist( namespace: str, name: str, @@ -621,7 +649,7 @@ async def delete_grand_central_traefik_resources( """ Delete the HTTPRoute and all three Traefik Middlewares for grand-central. - Resources are deleted in order: HTTPRoute first, then + Resources are deleted in order: HTTPRoute first, then ``grand-central-cors``, ``grand-central-compress-js``, ``grand-central-buffering``, and ``grand-central-ip-allowlist``. A 404 response for any resource is treated as success. @@ -648,6 +676,7 @@ async def delete_grand_central_traefik_resources( # Middlewares for mw_name in ( + _MIDDLEWARE_CORS, _MIDDLEWARE_COMPRESS_JS, _MIDDLEWARE_BUFFERING, _MIDDLEWARE_IP_ALLOWLIST, @@ -726,6 +755,7 @@ async def create_grand_central_exposure( if use_traefik: custom = CustomObjectsApi(api_client) for mw_body in ( + get_grand_central_middleware_cors(owner_references, name, labels, spec), get_grand_central_middleware_compress_js( owner_references, name, labels ), @@ -942,6 +972,7 @@ async def create_grand_central_backend( if use_traefik: custom = CustomObjectsApi(api_client) for mw_body in ( + get_grand_central_middleware_cors(owner_references, name, labels, spec), get_grand_central_middleware_compress_js( owner_references, name, labels ), diff --git a/tests/test_create_grand_central.py b/tests/test_create_grand_central.py index 41f7c6a9..1ce4f282 100644 --- a/tests/test_create_grand_central.py +++ b/tests/test_create_grand_central.py @@ -55,6 +55,7 @@ ) _GC_TRAEFIK_MIDDLEWARES = ( + "grand-central-cors", "grand-central-compress-js", "grand-central-buffering", "grand-central-ip-allowlist", @@ -294,7 +295,7 @@ async def test_create_grand_central_traefik( ): """ Creating a cluster with exposure=traefik and then enabling grandCentral - should produce an HTTPRoute and all three Traefik Middlewares - no Ingress. + should produce an HTTPRoute and all four Traefik Middlewares - no Ingress. """ apps = AppsV1Api(api_client) coapi = CustomObjectsApi(api_client) @@ -351,7 +352,7 @@ async def test_create_grand_central_traefik( timeout=DEFAULT_TIMEOUT, ) - # All three Middlewares must exist + # All four Middlewares must exist for mw in _GC_TRAEFIK_MIDDLEWARES: await assert_wait_for( True, @@ -363,6 +364,29 @@ async def test_create_grand_central_traefik( timeout=DEFAULT_TIMEOUT, ) + cors_headers = await get_gc_cors_headers( + coapi, + namespace.metadata.name, + ) + + assert cors_headers["accessControlAllowMethods"] == [ + "GET", + "POST", + "PUT", + "PATCH", + "OPTIONS", + "DELETE", + ] + assert cors_headers["accessControlAllowHeaders"] == [ + "Content-Type", + "Authorization", + ] + assert cors_headers["accessControlAllowCredentials"] is True + assert cors_headers["accessControlMaxAge"] == 7200 + assert cors_headers["accessControlAllowOriginList"] == [ + "*" + ], "Expected default origin list ['*'] when no cors setting is configured" + # No nginx Ingress should be created await assert_wait_for( False, @@ -375,6 +399,72 @@ async def test_create_grand_central_traefik( ) +@pytest.mark.k8s +@pytest.mark.asyncio +@mock.patch("crate.operator.webhooks.webhook_client.send_notification") +async def test_gc_cors_origin_list_traefik( + mock_send_notification, + faker, + namespace, + kopf_runner, + api_client, +): + """ + When spec.cluster.settings.http.cors.allow-origin contains a + comma-separated list of origins, the grand-central-cors Middleware must + split them into individual accessControlAllowOriginList entries rather + than setting a single comma-joined string, which browsers reject. + """ + coapi = CustomObjectsApi(api_client) + core = CoreV1Api(api_client) + name = faker.domain_word() + allowed_origins = [ + "https://console.cratedb-dev.cloud", + "http://localhost:8000", + ] + + await start_cluster( + name, + namespace, + core, + coapi, + 1, + wait_for_healthy=False, + wait_for_lb=False, + additional_cluster_spec={ + "exposure": "traefik", + "externalDNS": f"{name}.example.com", + "settings": { + "http.cors.allow-origin": ",".join(allowed_origins), + }, + }, + ) + await wait_for_kopf_handler( + coapi, + name, + namespace.metadata.name, + f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + ) + await _start_gc_on_cluster(coapi, name, namespace.metadata.name) + + await assert_wait_for( + True, + does_gc_middleware_exist, + coapi, + namespace.metadata.name, + "grand-central-cors", + err_msg="grand-central-cors Middleware was not created", + timeout=DEFAULT_TIMEOUT, + ) + + cors_headers = await get_gc_cors_headers(coapi, namespace.metadata.name) + + assert ( + cors_headers["accessControlAllowOriginList"] == allowed_origins + ), "Multi-origin CORS setting was not split into individual list entries" + assert cors_headers["accessControlAllowCredentials"] is True + + @pytest.mark.k8s @pytest.mark.asyncio @mock.patch("crate.operator.webhooks.webhook_client.send_notification") @@ -749,6 +839,22 @@ async def does_gc_middleware_exist( raise +async def get_gc_cors_headers(coapi: CustomObjectsApi, namespace: str) -> dict: + try: + mw = await coapi.get_namespaced_custom_object( + group="traefik.io", + version="v1alpha1", + namespace=namespace, + plural="middlewares", + name="grand-central-cors", + ) + return mw.get("spec", {}).get("headers", {}) + except ApiException as e: + if e.status == 404: + return {} + raise + + async def get_gc_ip_allowlist_cidrs(coapi: CustomObjectsApi, namespace: str) -> list: try: mw = await coapi.get_namespaced_custom_object( From 6747dc2ef8bdf43418b56dc14e32fa3a22fabfc1 Mon Sep 17 00:00:00 2001 From: Shyukri Shyukriev Date: Thu, 21 May 2026 22:13:37 +0300 Subject: [PATCH 05/10] fix: Add missing logger=mock.ANY to suspend_or_start_grand_central --- tests/test_restore_backup.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/tests/test_restore_backup.py b/tests/test_restore_backup.py index 99f12312..04eab817 100644 --- a/tests/test_restore_backup.py +++ b/tests/test_restore_backup.py @@ -302,7 +302,9 @@ async def test_restore_backup_aws( True, mocked_coro_func_called_with, mock_suspend_gc, - mock.call(mock.ANY, namespace.metadata.name, name, suspend=True), + mock.call( + mock.ANY, namespace.metadata.name, name, suspend=True, logger=mock.ANY + ), err_msg="Did not call pause grand central.", timeout=DEFAULT_TIMEOUT, ) @@ -335,7 +337,9 @@ async def test_restore_backup_aws( True, mocked_coro_func_called_with, mock_suspend_gc, - mock.call(mock.ANY, namespace.metadata.name, name, suspend=False), + mock.call( + mock.ANY, namespace.metadata.name, name, suspend=False, logger=mock.ANY + ), err_msg="Did not call resume grand central.", timeout=DEFAULT_TIMEOUT, ) @@ -590,7 +594,9 @@ async def test_restore_backup_azure_blob( True, mocked_coro_func_called_with, mock_suspend_gc, - mock.call(mock.ANY, namespace.metadata.name, name, suspend=True), + mock.call( + mock.ANY, namespace.metadata.name, name, suspend=True, logger=mock.ANY + ), err_msg="Did not call pause grand central.", timeout=DEFAULT_TIMEOUT, ) @@ -623,7 +629,9 @@ async def test_restore_backup_azure_blob( True, mocked_coro_func_called_with, mock_suspend_gc, - mock.call(mock.ANY, namespace.metadata.name, name, suspend=False), + mock.call( + mock.ANY, namespace.metadata.name, name, suspend=False, logger=mock.ANY + ), err_msg="Did not call resume grand central.", timeout=DEFAULT_TIMEOUT, ) From b4041f12c03161f10086e5bf1c2e54bb8641e540 Mon Sep 17 00:00:00 2001 From: Thomas Achatz Date: Fri, 22 May 2026 10:30:34 +0200 Subject: [PATCH 06/10] Try to fix gc test --- tests/test_create_grand_central.py | 7 +++++-- tests/test_exposure.py | 3 ++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/test_create_grand_central.py b/tests/test_create_grand_central.py index 1ce4f282..334baf51 100644 --- a/tests/test_create_grand_central.py +++ b/tests/test_create_grand_central.py @@ -468,7 +468,9 @@ async def test_gc_cors_origin_list_traefik( @pytest.mark.k8s @pytest.mark.asyncio @mock.patch("crate.operator.webhooks.webhook_client.send_notification") +@mock.patch("crate.operator.grand_central.bootstrap_gc_admin_user") async def test_gc_change_exposure_loadbalancer_to_traefik( + mock_bootstrap, mock_send_notification, faker, namespace, @@ -500,6 +502,7 @@ async def test_gc_change_exposure_loadbalancer_to_traefik( name, namespace.metadata.name, f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + timeout=DEFAULT_TIMEOUT * 3, ) await _start_gc_on_cluster(coapi, name, namespace.metadata.name) @@ -531,7 +534,7 @@ async def test_gc_change_exposure_loadbalancer_to_traefik( namespace.metadata.name, f"{KOPF_STATE_STORE_PREFIX}/cluster_update", err_msg="Exposure change handler did not finish", - timeout=DEFAULT_TIMEOUT, + timeout=DEFAULT_TIMEOUT * 2, ) # Ingress gone, HTTPRoute + Middlewares present @@ -639,7 +642,7 @@ async def test_gc_change_exposure_traefik_to_loadbalancer( namespace.metadata.name, f"{KOPF_STATE_STORE_PREFIX}/cluster_update", err_msg="Exposure change handler did not finish", - timeout=DEFAULT_TIMEOUT, + timeout=DEFAULT_TIMEOUT * 2, ) # HTTPRoute + Middlewares gone, Ingress present diff --git a/tests/test_exposure.py b/tests/test_exposure.py index 9549f252..bf511ac2 100644 --- a/tests/test_exposure.py +++ b/tests/test_exposure.py @@ -557,7 +557,8 @@ async def test_change_exposure_loadbalancer_to_traefik( coapi, name, namespace.metadata.name, - f"{KOPF_STATE_STORE_PREFIX}/cluster_create", + f"{KOPF_STATE_STORE_PREFIX}/cluster_create.bootstrap", + timeout=DEFAULT_TIMEOUT * 3, ) # Initially service is LoadBalancer, no Traefik resources From 6a99c3488fd24fc552855f9fe0f2a951bad4993a Mon Sep 17 00:00:00 2001 From: Thomas Achatz Date: Tue, 26 May 2026 10:55:26 +0200 Subject: [PATCH 07/10] Code review fixes --- crate/operator/grand_central.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crate/operator/grand_central.py b/crate/operator/grand_central.py index 7b2edae7..0a269497 100644 --- a/crate/operator/grand_central.py +++ b/crate/operator/grand_central.py @@ -871,7 +871,7 @@ def get_grand_central_ingress( ), "nginx.ingress.kubernetes.io/cors-max-age": "7200", } - if cidrs: # ← add + if cidrs: annotations["nginx.ingress.kubernetes.io/whitelist-source-range"] = ",".join( cidrs ) From ab7f3a0d5d6051e79a77777d346ac4deecf07ef7 Mon Sep 17 00:00:00 2001 From: Thomas Achatz Date: Tue, 26 May 2026 11:15:39 +0200 Subject: [PATCH 08/10] fixup! Code review fixes --- crate/operator/exposure.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crate/operator/exposure.py b/crate/operator/exposure.py index bf4a6c1a..98597ab9 100644 --- a/crate/operator/exposure.py +++ b/crate/operator/exposure.py @@ -624,11 +624,6 @@ async def handle( # grand-central: only if GC is deployed in this cluster gc_deployment = await read_grand_central_deployment(namespace, name) if gc_deployment: - if old_exposure == "traefik": - await delete_grand_central_traefik_resources(namespace, name, logger) - else: - await delete_grand_central_ingress(namespace, name, logger) - await create_grand_central_exposure( namespace=namespace, name=name, @@ -637,3 +632,8 @@ async def handle( logger=logger, use_traefik=(new_exposure == "traefik"), ) + + if old_exposure == "traefik": + await delete_grand_central_traefik_resources(namespace, name, logger) + else: + await delete_grand_central_ingress(namespace, name, logger) From 8b722cea10010d2c9842ac629be9fc8b3b1870d7 Mon Sep 17 00:00:00 2001 From: Thomas Achatz Date: Tue, 26 May 2026 16:09:25 +0200 Subject: [PATCH 09/10] fixup! fixup! Code review fixes --- crate/operator/grand_central.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/crate/operator/grand_central.py b/crate/operator/grand_central.py index 0a269497..922f9ea1 100644 --- a/crate/operator/grand_central.py +++ b/crate/operator/grand_central.py @@ -639,7 +639,12 @@ async def delete_grand_central_ingress( ) logger.info(f"Deleted Ingress {GRAND_CENTRAL_RESOURCE_PREFIX}-{name}") except ApiException as e: - if e.status != 404: + if e.status == 404: + logger.info( + f"Ingress {GRAND_CENTRAL_RESOURCE_PREFIX}-{name} not found " + "during deletion - already absent, continuing." + ) + else: raise @@ -671,7 +676,12 @@ async def delete_grand_central_traefik_resources( name=f"{GRAND_CENTRAL_RESOURCE_PREFIX}-{name}", ) except ApiException as e: - if e.status != 404: + if e.status == 404: + logger.info( + f"HTTPRoute {GRAND_CENTRAL_RESOURCE_PREFIX}-{name} not found " + "during deletion - already absent, continuing." + ) + else: raise # Middlewares @@ -690,7 +700,12 @@ async def delete_grand_central_traefik_resources( name=mw_name, ) except ApiException as e: - if e.status != 404: + if e.status == 404: + logger.info( + f"Middleware {mw_name} not found " + "during deletion - already absent, continuing." + ) + else: raise logger.info(f"Deleted GC HTTPRoute + Middlewares for {name}") From 6f7be821ee2eb187070d29bcf87a38bb4cac88e7 Mon Sep 17 00:00:00 2001 From: Thomas Achatz Date: Wed, 27 May 2026 10:26:37 +0200 Subject: [PATCH 10/10] fixup! fixup! fixup! Code review fixes --- crate/operator/grand_central.py | 57 +++++---------------------------- 1 file changed, 8 insertions(+), 49 deletions(-) diff --git a/crate/operator/grand_central.py b/crate/operator/grand_central.py index 922f9ea1..a8d65d51 100644 --- a/crate/operator/grand_central.py +++ b/crate/operator/grand_central.py @@ -952,16 +952,11 @@ async def create_grand_central_backend( ) -> None: image_pull_secrets = get_image_pull_secrets() owner_references = get_owner_references(name, meta) - cluster_name = spec["cluster"]["name"] - external_dns = spec["cluster"]["externalDNS"] - hostname = external_dns.replace(cluster_name, f"{cluster_name}.gc").rstrip(".") labels = get_grand_central_labels(name, meta) - cidrs = spec["cluster"].get("allowedCIDRs", None) async with GlobalApiClient() as api_client: apps = AppsV1Api(api_client) core = CoreV1Api(api_client) - networking = NetworkingV1Api(api_client) await call_kubeapi( apps.create_namespaced_deployment, @@ -984,50 +979,14 @@ async def create_grand_central_backend( body=get_grand_central_service(owner_references, name, labels), ) - if use_traefik: - custom = CustomObjectsApi(api_client) - for mw_body in ( - get_grand_central_middleware_cors(owner_references, name, labels, spec), - get_grand_central_middleware_compress_js( - owner_references, name, labels - ), - get_grand_central_middleware_buffering(owner_references, name, labels), - get_grand_central_middleware_ip_allowlist( - owner_references, name, labels, cidrs - ), - ): - await call_kubeapi( - custom.create_namespaced_custom_object, - logger, - continue_on_conflict=True, - group=_TRAEFIK_GROUP, - version=_TRAEFIK_VERSION, - namespace=namespace, - plural=_MIDDLEWARE_PLURAL, - body=mw_body, - ) - await call_kubeapi( - custom.create_namespaced_custom_object, - logger, - continue_on_conflict=True, - group=_GATEWAY_API_GROUP, - version=_GATEWAY_API_VERSION, - namespace=namespace, - plural=_HTTPROUTE_PLURAL, - body=get_grand_central_httproute( - owner_references, name, labels, hostname, spec - ), - ) - else: - await call_kubeapi( - networking.create_namespaced_ingress, - logger, - continue_on_conflict=True, - namespace=namespace, - body=get_grand_central_ingress( - owner_references, name, labels, hostname, spec, cidrs - ), - ) + await create_grand_central_exposure( + namespace=namespace, + name=name, + spec=spec, + meta=meta, + logger=logger, + use_traefik=use_traefik, + ) async def create_grand_central_user(