diff --git a/.sqlx/query-6b980602f82b52e0f186d532fd5f93141c03a7f704c461194f45047636867855.json b/.sqlx/query-7a7c1fcde15de78b7ee3dabaa92353b2a17d7a3ec2e600896f4291bbd6eeae77.json similarity index 91% rename from .sqlx/query-6b980602f82b52e0f186d532fd5f93141c03a7f704c461194f45047636867855.json rename to .sqlx/query-7a7c1fcde15de78b7ee3dabaa92353b2a17d7a3ec2e600896f4291bbd6eeae77.json index ddb4940baa4..799fec0bfd7 100644 --- a/.sqlx/query-6b980602f82b52e0f186d532fd5f93141c03a7f704c461194f45047636867855.json +++ b/.sqlx/query-7a7c1fcde15de78b7ee3dabaa92353b2a17d7a3ec2e600896f4291bbd6eeae77.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n il.token,\n il.catalog_prefix AS \"catalog_prefix!: String\",\n il.capability AS \"capability!: models::Capability\",\n il.single_use AS \"single_use!: bool\",\n il.detail,\n il.created_at AS \"created_at!: chrono::DateTime\",\n t.sso_provider_id\n FROM internal.invite_links il\n LEFT JOIN tenants t ON il.catalog_prefix::text ^@ t.tenant\n WHERE il.catalog_prefix::text ^@ ANY($1)\n AND ($5::text IS NULL OR il.catalog_prefix::text ^@ $5)\n AND ($4::bool IS NULL OR il.single_use = $4)\n AND ($2::timestamptz IS NULL OR il.created_at < $2)\n ORDER BY il.created_at DESC\n LIMIT $3 + 1\n ", + "query": "\n SELECT\n il.token,\n il.catalog_prefix AS \"catalog_prefix!: String\",\n il.capability AS \"capability!: models::Capability\",\n il.single_use AS \"single_use!: bool\",\n il.detail,\n il.created_at AS \"created_at!: chrono::DateTime\",\n t.sso_provider_id\n FROM internal.invite_links il\n LEFT JOIN tenants t ON il.catalog_prefix::text ^@ t.tenant\n WHERE il.catalog_prefix::text ^@ ANY($1)\n AND ($5::text IS NULL OR il.catalog_prefix::text ^@ $5)\n AND ($4::bool IS NULL OR il.single_use = $4)\n AND ($2::timestamptz IS NULL OR il.created_at < $2)\n AND NOT (il.catalog_prefix::text ^@ ANY($6::text[]))\n ORDER BY il.created_at DESC\n LIMIT $3 + 1\n ", "describe": { "columns": [ { @@ -84,7 +84,8 @@ "Timestamptz", "Int4", "Bool", - "Text" + "Text", + "TextArray" ] }, "nullable": [ @@ -97,5 +98,5 @@ true ] }, - "hash": "6b980602f82b52e0f186d532fd5f93141c03a7f704c461194f45047636867855" + "hash": "7a7c1fcde15de78b7ee3dabaa92353b2a17d7a3ec2e600896f4291bbd6eeae77" } diff --git a/.sqlx/query-f81979ab766a313baf27bb7b132e90e506c2b130dccd4496df39ac55dbbf0be0.json b/.sqlx/query-a93ae7bf9e1f6b70def0680845e783a097cab9a98b20f17a5f125f993c295062.json similarity index 50% rename from .sqlx/query-f81979ab766a313baf27bb7b132e90e506c2b130dccd4496df39ac55dbbf0be0.json rename to .sqlx/query-a93ae7bf9e1f6b70def0680845e783a097cab9a98b20f17a5f125f993c295062.json index c5ceea2b76f..f7578f37405 100644 --- a/.sqlx/query-f81979ab766a313baf27bb7b132e90e506c2b130dccd4496df39ac55dbbf0be0.json +++ b/.sqlx/query-a93ae7bf9e1f6b70def0680845e783a097cab9a98b20f17a5f125f993c295062.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "select\n dp.data_plane_name,\n dp.cidr_blocks::text[] as \"cidr_blocks!: Vec\",\n dp.gcp_service_account_email,\n dp.aws_iam_user_arn,\n dp.azure_application_name,\n dp.azure_application_client_id\n from unnest($1::text[]) as input(name)\n join data_planes dp on dp.data_plane_name = input.name\n ", + "query": "select\n dp.data_plane_name,\n dp.cidr_blocks::text[] as \"cidr_blocks!: Vec\",\n dp.gcp_service_account_email,\n dp.aws_iam_user_arn,\n dp.azure_application_name,\n dp.azure_application_client_id,\n dp.private_links as \"private_links!: Vec\",\n dp.aws_link_endpoints as \"aws_link_endpoints: Vec\",\n dp.azure_link_endpoints as \"azure_link_endpoints: Vec\",\n dp.gcp_psc_endpoints as \"gcp_psc_endpoints: Vec\"\n from unnest($1::text[]) as input(name)\n join data_planes dp on dp.data_plane_name = input.name\n ", "describe": { "columns": [ { @@ -32,6 +32,26 @@ "ordinal": 5, "name": "azure_application_client_id", "type_info": "Text" + }, + { + "ordinal": 6, + "name": "private_links!: Vec", + "type_info": "JsonArray" + }, + { + "ordinal": 7, + "name": "aws_link_endpoints: Vec", + "type_info": "JsonArray" + }, + { + "ordinal": 8, + "name": "azure_link_endpoints: Vec", + "type_info": "JsonArray" + }, + { + "ordinal": 9, + "name": "gcp_psc_endpoints: Vec", + "type_info": "JsonArray" } ], "parameters": { @@ -45,8 +65,12 @@ true, true, true, + true, + false, + true, + true, true ] }, - "hash": "f81979ab766a313baf27bb7b132e90e506c2b130dccd4496df39ac55dbbf0be0" + "hash": "a93ae7bf9e1f6b70def0680845e783a097cab9a98b20f17a5f125f993c295062" } diff --git a/.sqlx/query-f971d873ba7d5b4945c4dbf56172a25c999916da259a87463162e0b654feea98.json b/.sqlx/query-f971d873ba7d5b4945c4dbf56172a25c999916da259a87463162e0b654feea98.json new file mode 100644 index 00000000000..1c6e4dd71ec --- /dev/null +++ b/.sqlx/query-f971d873ba7d5b4945c4dbf56172a25c999916da259a87463162e0b654feea98.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE data_planes\n SET private_links = $2, updated_at = now()\n WHERE data_plane_name = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "JsonArray" + ] + }, + "nullable": [] + }, + "hash": "f971d873ba7d5b4945c4dbf56172a25c999916da259a87463162e0b654feea98" +} diff --git a/crates/control-plane-api/src/fixtures/private_links.sql b/crates/control-plane-api/src/fixtures/private_links.sql new file mode 100644 index 00000000000..af9c30a1599 --- /dev/null +++ b/crates/control-plane-api/src/fixtures/private_links.sql @@ -0,0 +1,79 @@ +-- Adds a private data plane with populated private-link config and one AWS +-- provisioning result row, plus a `read` grant for Alice on the private +-- prefix so the GraphQL authorization layer can surface it. +-- +-- Loaded alongside `data_planes` and `alice` in tests that exercise the +-- typed `privateLinks` field on the dataPlanes query and the +-- `updateDataPlanePrivateLinks` mutation. Kept separate from the shared +-- `data_planes.sql` so its presence does not change every other GraphQL +-- test. +do $$ +declare + alice_private_dp_id flowid := '444444444444'; + +begin + + -- `hmac_keys` is populated directly (rather than via `encrypted_hmac_keys`) + -- so the snapshot loader treats this row as live without exercising the + -- SOPS decrypt path, which other fixtures in this directory already cover. + insert into public.data_planes ( + id, + data_plane_name, + data_plane_fqdn, + hmac_keys, + encrypted_hmac_keys, + broker_address, + reactor_address, + ops_logs_name, + ops_stats_name, + ops_l1_events_name, + ops_l1_inferred_name, + ops_l1_stats_name, + ops_l2_events_transform, + ops_l2_inferred_transform, + ops_l2_stats_transform, + enable_l2, + cidr_blocks, + aws_iam_user_arn, + gcp_service_account_email, + azure_application_name, + azure_application_client_id, + private_links, + aws_link_endpoints + ) values ( + alice_private_dp_id, + 'ops/dp/private/aliceCo/aws-us-east-1-c1', + 'dp.private.aliceCo', + '{c2VjcmV0,b3RoZXI=}', + '{}', + 'broker.dp.private.aliceCo', + 'reactor.dp.private.aliceCo', + 'ops/tasks/private/aliceCo/logs', + 'ops/tasks/private/aliceCo/stats', + 'ops/rollups/L1/private/aliceCo/events', + 'ops/rollups/L1/private/aliceCo/inferred', + 'ops/rollups/L1/private/aliceCo/stats', + 'from.dp.private.aliceCo', + 'from.dp.private.aliceCo', + 'from.dp.private.aliceCo', + false, + '{10.10.0.0/16}', + 'arn:aws:iam::444555666:user/test', + 'test-gcp-private@estuary-test.iam.gserviceaccount.com', + 'estuary-test-app-private', + '44444444-4444-4444-4444-444444444444', + array[ + '{"region":"us-east-1","az_ids":["use1-az1","use1-az2"],"service_name":"com.amazonaws.vpce.us-east-1.vpce-svc-abc123"}'::json, + '{"service_name":"/subscriptions/x/resourceGroups/rg/providers/Microsoft.Network/privateLinkServices/svc","location":"eastus","dns_name":"privatelink.database.windows.net"}'::json, + '{"service_attachment":"projects/p/regions/us-central1/serviceAttachments/sa","region":"us-central1","dns_zone_name":"z","dns_record_names":["r1","r2"],"all_ports":true}'::json + ], + array[ + '{"endpoint_id":"vpce-0123456789abcdef0","state":"available"}'::json + ] + ); + + insert into public.role_grants (subject_role, object_role, capability) values + ('aliceCo/', 'ops/dp/private/aliceCo/', 'read'); + +end +$$; diff --git a/crates/control-plane-api/src/server/public/graphql/data_planes.rs b/crates/control-plane-api/src/server/public/graphql/data_planes.rs index cce4a277922..315b0a3486c 100644 --- a/crates/control-plane-api/src/server/public/graphql/data_planes.rs +++ b/crates/control-plane-api/src/server/public/graphql/data_planes.rs @@ -1,5 +1,5 @@ use async_graphql::{ - Context, SimpleObject, + ComplexObject, Context, SimpleObject, types::connection::{self, Connection}, }; use std::collections::HashMap; @@ -17,6 +17,7 @@ pub enum DataPlaneCloudProvider { /// A data plane where tasks execute and collections are stored. #[derive(Debug, Clone, SimpleObject)] +#[graphql(complex)] pub struct DataPlane { /// Name of this data-plane under the catalog namespace. pub name: String, @@ -45,6 +46,37 @@ pub struct DataPlane { pub azure_application_name: Option, /// Azure application client ID for this data-plane. pub azure_application_client_id: Option, + /// AWS PrivateLink endpoint provisioning results, opaque JSON exported by + /// the data-plane controller. Empty when no AWS endpoints are provisioned. + pub aws_link_endpoints: Vec>, + /// Azure Private Link endpoint provisioning results, opaque JSON. + pub azure_link_endpoints: Vec>, + /// GCP Private Service Connect endpoint provisioning results, opaque JSON. + pub gcp_psc_endpoints: Vec>, + + #[graphql(skip)] + raw_private_links: Vec, +} + +#[ComplexObject] +impl DataPlane { + /// Configured private link endpoints for this data-plane. Replacing this + /// list (via `updateDataPlanePrivateLinks`) triggers reconvergence by the + /// data-plane controller on its next poll. + async fn private_links(&self) -> async_graphql::Result> { + self.raw_private_links + .iter() + .enumerate() + .map(|(idx, raw)| { + serde_json::from_value::(raw.clone()).map_err(|err| { + async_graphql::Error::new(format!( + "failed to parse private_links[{idx}] for data plane {}: {err}", + self.name, + )) + }) + }) + .collect() + } } /// Fetches detail fields for the given data plane names from the database. @@ -64,7 +96,11 @@ async fn fetch_data_plane_details( dp.gcp_service_account_email, dp.aws_iam_user_arn, dp.azure_application_name, - dp.azure_application_client_id + dp.azure_application_client_id, + dp.private_links as "private_links!: Vec", + dp.aws_link_endpoints as "aws_link_endpoints: Vec", + dp.azure_link_endpoints as "azure_link_endpoints: Vec", + dp.gcp_psc_endpoints as "gcp_psc_endpoints: Vec" from unnest($1::text[]) as input(name) join data_planes dp on dp.data_plane_name = input.name "#, @@ -84,6 +120,10 @@ async fn fetch_data_plane_details( aws_iam_user_arn: row.aws_iam_user_arn, azure_application_name: row.azure_application_name, azure_application_client_id: row.azure_application_client_id, + private_links: row.private_links, + aws_link_endpoints: row.aws_link_endpoints.unwrap_or_default(), + azure_link_endpoints: row.azure_link_endpoints.unwrap_or_default(), + gcp_psc_endpoints: row.gcp_psc_endpoints.unwrap_or_default(), }, ) }) @@ -96,6 +136,10 @@ struct DataPlaneDetails { aws_iam_user_arn: Option, azure_application_name: Option, azure_application_client_id: Option, + private_links: Vec, + aws_link_endpoints: Vec, + azure_link_endpoints: Vec, + gcp_psc_endpoints: Vec, } /// Parses a data plane name into its component parts. @@ -278,31 +322,40 @@ impl DataPlanesQuery { names.into_iter(), |data_plane_name, user_capability| { let dp = row_data.get(&data_plane_name)?; + let details = details_map.get(&data_plane_name); let (cloud_provider, region, tag, is_public) = parse_data_plane_name(&data_plane_name).expect("name validated by pre-filter"); - let details = details_map.get(&data_plane_name); - Some(connection::Edge::new( - data_plane_name.clone(), - DataPlane { - name: data_plane_name, - fqdn: dp.data_plane_fqdn.clone(), - reactor_address: dp.reactor_address.clone(), - user_capability: user_capability - .expect("capability guaranteed by pre-filter"), - cloud_provider, - region, - tag, - is_public, - cidr_blocks: details.map(|d| d.cidr_blocks.clone()).unwrap_or_default(), - gcp_service_account_email: details - .and_then(|d| d.gcp_service_account_email.clone()), - aws_iam_user_arn: details.and_then(|d| d.aws_iam_user_arn.clone()), - azure_application_name: details - .and_then(|d| d.azure_application_name.clone()), - azure_application_client_id: details - .and_then(|d| d.azure_application_client_id.clone()), - }, - )) + let node = DataPlane { + name: data_plane_name.clone(), + fqdn: dp.data_plane_fqdn.clone(), + reactor_address: dp.reactor_address.clone(), + user_capability: user_capability + .expect("capability guaranteed by pre-filter"), + cloud_provider, + region, + tag, + is_public, + cidr_blocks: details.map(|d| d.cidr_blocks.clone()).unwrap_or_default(), + gcp_service_account_email: details + .and_then(|d| d.gcp_service_account_email.clone()), + aws_iam_user_arn: details.and_then(|d| d.aws_iam_user_arn.clone()), + azure_application_name: details.and_then(|d| d.azure_application_name.clone()), + azure_application_client_id: details + .and_then(|d| d.azure_application_client_id.clone()), + aws_link_endpoints: details + .map(|d| d.aws_link_endpoints.iter().cloned().map(async_graphql::Json).collect()) + .unwrap_or_default(), + azure_link_endpoints: details + .map(|d| d.azure_link_endpoints.iter().cloned().map(async_graphql::Json).collect()) + .unwrap_or_default(), + gcp_psc_endpoints: details + .map(|d| d.gcp_psc_endpoints.iter().cloned().map(async_graphql::Json).collect()) + .unwrap_or_default(), + raw_private_links: details + .map(|d| d.private_links.clone()) + .unwrap_or_default(), + }; + Some(connection::Edge::new(data_plane_name, node)) }, ); @@ -312,6 +365,75 @@ impl DataPlanesQuery { } } +#[derive(Debug, Default)] +pub struct DataPlanesMutation; + +#[async_graphql::Object] +impl DataPlanesMutation { + /// Replaces the configured private link endpoints on a private data plane. + /// + /// The provided list overwrites the entire `private_links` column; partial + /// updates are intentionally not supported. The data-plane controller + /// converges to the new configuration on its next poll. + /// + /// Interim authorization: requires `read` on the private data-plane name. + /// This matches the existing data-plane deployment authorization shape and + /// will be replaced with `manage_dataplane` once the orthogonal capability + /// model lands. + pub async fn update_data_plane_private_links( + &self, + ctx: &Context<'_>, + data_plane_name: String, + private_links: Vec, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let claims = env.claims()?; + + // Structural check only: the name must sit under `ops/dp/private/` and + // have at least one path segment beyond it. Anything more specific + // (cluster suffix shape, owning prefix shape) is the data plane's + // problem, not the mutation's; an unknown name falls out as "not + // found" when the UPDATE matches zero rows. + if data_plane_name + .strip_prefix("ops/dp/private/") + .map_or(true, |rest| !rest.contains('/') || rest.starts_with('/')) + { + return Err(async_graphql::Error::new(format!( + "{data_plane_name} is not a private data-plane name" + ))); + } + + let policy_result = crate::server::evaluate_names_authorization( + env.snapshot(), + claims, + models::Capability::Read, + [data_plane_name.as_str()], + ); + env.authorization_outcome(policy_result).await?; + + let bound: Vec> = + private_links.iter().map(sqlx::types::Json).collect(); + let result = sqlx::query!( + r#"UPDATE data_planes + SET private_links = $2, updated_at = now() + WHERE data_plane_name = $1 + "#, + data_plane_name, + &bound as &[sqlx::types::Json<&models::PrivateLink>], + ) + .execute(&env.pg_pool) + .await?; + + if result.rows_affected() == 0 { + return Err(async_graphql::Error::new(format!( + "data plane '{data_plane_name}' not found" + ))); + } + + Ok(true) + } +} + #[cfg(test)] mod tests { use super::*; @@ -351,6 +473,10 @@ mod tests { awsIamUserArn azureApplicationName azureApplicationClientId + awsLinkEndpoints + azureLinkEndpoints + gcpPscEndpoints + privateLinks { __typename } } } } @@ -364,6 +490,386 @@ mod tests { insta::assert_json_snapshot!(response); } + #[sqlx::test( + migrations = "../../supabase/migrations", + fixtures( + path = "../../../fixtures", + scripts("data_planes", "alice", "private_links") + ) + )] + async fn test_graphql_data_planes_with_private_links(pool: sqlx::PgPool) { + let _guard = test_server::init(); + + let server = + test_server::TestServer::start(pool.clone(), test_server::snapshot(pool, false).await) + .await; + + let token = server.make_access_token(uuid::Uuid::from_bytes([0x11; 16]), None); + + // The private fixture grants Alice `read` on + // ops/dp/private/aliceCo/aws-us-east-1-c1 and populates one entry of + // each private-link variant plus a single AWS provisioning result. + let response: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + query { + dataPlanes { + edges { + node { + name + awsLinkEndpoints + azureLinkEndpoints + gcpPscEndpoints + privateLinks { + __typename + ... on AWSPrivateLink { + region + azIds + serviceName + } + ... on AzurePrivateLink { + serviceName + location + dnsName + resourceType + } + ... on GCPPrivateServiceConnect { + serviceAttachment + region + dnsZoneName + dnsRecordNames + allPorts + } + } + } + } + } + } + "# + }), + Some(&token), + ) + .await; + + insta::assert_json_snapshot!("data_planes_with_private_links", response); + } + + // A malformed `private_links` row should produce a field-level error that + // names the data plane and the failing index, without breaking the rest + // of the `dataPlanes` query selection. + #[sqlx::test( + migrations = "../../supabase/migrations", + fixtures( + path = "../../../fixtures", + scripts("data_planes", "alice", "private_links") + ) + )] + async fn test_graphql_data_planes_malformed_private_link(pool: sqlx::PgPool) { + let _guard = test_server::init(); + + // Corrupt the private_links column for the private dp before snapshot. + sqlx::query( + r#"UPDATE data_planes + SET private_links = array['{"not":"a private link"}'::json] + WHERE data_plane_name = 'ops/dp/private/aliceCo/aws-us-east-1-c1'"#, + ) + .execute(&pool) + .await + .unwrap(); + + let server = + test_server::TestServer::start(pool.clone(), test_server::snapshot(pool, false).await) + .await; + + let token = server.make_access_token(uuid::Uuid::from_bytes([0x11; 16]), None); + + let response: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + query { + dataPlanes { + edges { + node { + name + privateLinks { __typename } + } + } + } + } + "# + }), + Some(&token), + ) + .await; + + insta::assert_json_snapshot!("data_planes_malformed_private_link", response); + } + + // ===== updateDataPlanePrivateLinks mutation tests ===== + + const VALID_AWS_INPUT: &str = r#"{ + "aws": { + "region": "us-east-1", + "azIds": ["use1-az1", "use1-az2"], + "serviceName": "com.amazonaws.vpce.us-east-1.vpce-svc-abc123" + } + }"#; + const VALID_AZURE_INPUT: &str = r#"{ + "azure": { + "serviceName": "/subscriptions/x/resourceGroups/rg/providers/Microsoft.Network/privateLinkServices/svc", + "location": "eastus", + "dnsName": "privatelink.database.windows.net", + "resourceType": "" + } + }"#; + const VALID_GCP_INPUT: &str = r#"{ + "gcp": { + "serviceAttachment": "projects/p/regions/us-central1/serviceAttachments/sa", + "region": "us-central1", + "dnsZoneName": "z", + "dnsRecordNames": ["r1"], + "allPorts": true + } + }"#; + + fn update_mutation( + name: &str, + links_json: &str, + ) -> serde_json::Value { + serde_json::json!({ + "query": r#" + mutation($name: String!, $links: [PrivateLinkInput!]!) { + updateDataPlanePrivateLinks(dataPlaneName: $name, privateLinks: $links) + }"#, + "variables": { + "name": name, + "links": serde_json::from_str::(&format!("[{links_json}]")).unwrap(), + } + }) + } + + #[sqlx::test( + migrations = "../../supabase/migrations", + fixtures( + path = "../../../fixtures", + scripts("data_planes", "alice", "private_links") + ) + )] + async fn test_update_private_links_happy_path(pool: sqlx::PgPool) { + let _guard = test_server::init(); + + let server = + test_server::TestServer::start(pool.clone(), test_server::snapshot(pool.clone(), false).await) + .await; + let alice_token = server.make_access_token(uuid::Uuid::from_bytes([0x11; 16]), None); + + let dp = "ops/dp/private/aliceCo/aws-us-east-1-c1"; + let links = format!("{VALID_AWS_INPUT},{VALID_AZURE_INPUT},{VALID_GCP_INPUT}"); + + let updated_at_before: chrono::DateTime = sqlx::query_scalar( + "SELECT updated_at FROM data_planes WHERE data_plane_name = $1", + ) + .bind(dp) + .fetch_one(&pool) + .await + .unwrap(); + + let response: serde_json::Value = server + .graphql(&update_mutation(dp, &links), Some(&alice_token)) + .await; + assert_eq!(response["data"]["updateDataPlanePrivateLinks"], true); + + // Postgres `now()` is `transaction_timestamp()` at microsecond + // precision, so two distinct transactions return distinct values. + let updated_at_after: chrono::DateTime = sqlx::query_scalar( + "SELECT updated_at FROM data_planes WHERE data_plane_name = $1", + ) + .bind(dp) + .fetch_one(&pool) + .await + .unwrap(); + assert!( + updated_at_after > updated_at_before, + "updated_at must advance on a successful mutation" + ); + + // Calling again with a single AWS link replaces the entire array + // rather than merging. + let response: serde_json::Value = server + .graphql( + &update_mutation(dp, VALID_AWS_INPUT), + Some(&alice_token), + ) + .await; + assert_eq!(response["data"]["updateDataPlanePrivateLinks"], true); + + // Confirm the second call replaced (rather than merged) the array. + let stored_count: i64 = sqlx::query_scalar( + "SELECT array_length(private_links, 1)::bigint FROM data_planes WHERE data_plane_name = $1", + ) + .bind(dp) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(stored_count, 1); + } + + /// Extracts the first error message from a GraphQL response, or panics + /// if the response did not return an error. + fn first_error_message(response: &serde_json::Value) -> &str { + response["errors"][0]["message"] + .as_str() + .unwrap_or_else(|| panic!("expected an error, got: {response}")) + } + + #[sqlx::test( + migrations = "../../supabase/migrations", + fixtures( + path = "../../../fixtures", + scripts("data_planes", "alice", "private_links") + ) + )] + async fn test_update_private_links_authorization(pool: sqlx::PgPool) { + let _guard = test_server::init(); + + // Create a bob who has no grants on the private dp. + sqlx::query( + "INSERT INTO auth.users (id, email) VALUES \ + ('22222222-2222-2222-2222-222222222222', 'bob@example.test')", + ) + .execute(&pool) + .await + .unwrap(); + + let server = + test_server::TestServer::start(pool.clone(), test_server::snapshot(pool, false).await) + .await; + let alice_token = server.make_access_token(uuid::Uuid::from_bytes([0x11; 16]), None); + let bob_token = server.make_access_token( + uuid::Uuid::from_bytes([0x22; 16]), + Some("bob@example.test"), + ); + + let dp = "ops/dp/private/aliceCo/aws-us-east-1-c1"; + + // Alice has read on the private dp via the aliceCo/ -> ops/dp/private/aliceCo/ + // role grant installed by the private_links fixture. + let alice_ok: serde_json::Value = server + .graphql(&update_mutation(dp, VALID_AWS_INPUT), Some(&alice_token)) + .await; + assert_eq!( + alice_ok["data"]["updateDataPlanePrivateLinks"], true, + "alice with `read` should succeed: {alice_ok}", + ); + + // Bob has no grants and should be rejected. + let bob_denied: serde_json::Value = server + .graphql(&update_mutation(dp, VALID_AWS_INPUT), Some(&bob_token)) + .await; + assert_eq!( + first_error_message(&bob_denied), + "PermissionDenied: bob@example.test is not authorized to access prefix or name 'ops/dp/private/aliceCo/aws-us-east-1-c1' with required capability read", + ); + } + + #[sqlx::test( + migrations = "../../supabase/migrations", + fixtures(path = "../../../fixtures", scripts("data_planes", "alice")) + )] + async fn test_update_private_links_name_gates(pool: sqlx::PgPool) { + let _guard = test_server::init(); + let server = + test_server::TestServer::start(pool.clone(), test_server::snapshot(pool, false).await) + .await; + let alice_token = server.make_access_token(uuid::Uuid::from_bytes([0x11; 16]), None); + + // Names outside `ops/dp/private//...` are rejected by the + // structural check before any auth or DB work. + let cases: &[&str] = &[ + "ops/dp/public/aws-us-west-2-c1", + "ops/dp/private/aws-us-east-1-c1", + ]; + for name in cases { + let response: serde_json::Value = server + .graphql(&update_mutation(name, VALID_AWS_INPUT), Some(&alice_token)) + .await; + assert_eq!( + first_error_message(&response), + format!("{name} is not a private data-plane name"), + "case: {name}", + ); + } + } + + #[sqlx::test( + migrations = "../../supabase/migrations", + fixtures( + path = "../../../fixtures", + scripts("data_planes", "alice", "private_links") + ) + )] + async fn test_update_private_links_oneof_multi_branch(pool: sqlx::PgPool) { + let _guard = test_server::init(); + let server = + test_server::TestServer::start(pool.clone(), test_server::snapshot(pool, false).await) + .await; + let alice_token = server.make_access_token(uuid::Uuid::from_bytes([0x11; 16]), None); + + // PrivateLinkInput is a @oneOf input; supplying two branches at once + // must be rejected by GraphQL validation before reaching the handler. + let response: serde_json::Value = server + .graphql( + &update_mutation( + "ops/dp/private/aliceCo/aws-us-east-1-c1", + r#"{ + "aws": {"region":"us-east-1","azIds":["a"],"serviceName":"com.amazonaws.vpce.us-east-1.vpce-svc-abc"}, + "azure": {"serviceName":"svc","location":"eastus","dnsName":null,"resourceType":null} + }"#, + ), + Some(&alice_token), + ) + .await; + assert_eq!( + first_error_message(&response), + "Invalid value for argument \"privateLinks.0\", Oneof input objects requires have exactly one field", + ); + } + + #[sqlx::test( + migrations = "../../supabase/migrations", + fixtures( + path = "../../../fixtures", + scripts("data_planes", "alice", "private_links") + ) + )] + async fn test_update_private_links_nonexistent_dp(pool: sqlx::PgPool) { + let _guard = test_server::init(); + let server = + test_server::TestServer::start(pool.clone(), test_server::snapshot(pool, false).await) + .await; + let alice_token = server.make_access_token(uuid::Uuid::from_bytes([0x11; 16]), None); + + // Alice has `read` via aliceCo/ -> ops/dp/private/aliceCo/ (the + // fixture's role grant covers any sub-prefix), the structural check + // passes, the auth check passes, and the UPDATE matches zero rows + // because the data plane does not exist. + let response: serde_json::Value = server + .graphql( + &update_mutation( + "ops/dp/private/aliceCo/aws-us-east-2-c9", + VALID_AWS_INPUT, + ), + Some(&alice_token), + ) + .await; + assert_eq!( + first_error_message(&response), + "data plane 'ops/dp/private/aliceCo/aws-us-east-2-c9' not found", + ); + } + #[test] fn parses_aws_public() { let (provider, region, tag, is_public) = diff --git a/crates/control-plane-api/src/server/public/graphql/invite_links.rs b/crates/control-plane-api/src/server/public/graphql/invite_links.rs index b4cd95f90c3..24359bcd79e 100644 --- a/crates/control-plane-api/src/server/public/graphql/invite_links.rs +++ b/crates/control-plane-api/src/server/public/graphql/invite_links.rs @@ -53,6 +53,28 @@ pub struct InviteLinksQuery; const DEFAULT_PAGE_SIZE: usize = 25; const MAX_PREFIXES: usize = 20; +/// Internal prefixes that are off-limits for generic user-facing delegation. +/// +/// `ops/dp/private//` and `ops/tasks/private//` are owned by +/// the platform: they back private data-plane infrastructure and the +/// associated ops collections, and their grants are derived from the +/// customer's catalog prefix at data-plane provisioning time. Allowing direct +/// invite links or grants on these prefixes would bypass that ownership +/// relationship. +const INTERNAL_PRIVATE_PREFIXES: &[&str] = &["ops/dp/private/", "ops/tasks/private/"]; + +fn reject_internal_private_prefix(catalog_prefix: &str) -> async_graphql::Result<()> { + if INTERNAL_PRIVATE_PREFIXES + .iter() + .any(|p| catalog_prefix.starts_with(p)) + { + return Err(async_graphql::Error::new(format!( + "{catalog_prefix} is an internal prefix and cannot be managed via invite links" + ))); + } + Ok(()) +} + #[async_graphql::Object] impl InviteLinksQuery { /// List invite links the caller has admin access to. @@ -118,6 +140,7 @@ impl InviteLinksQuery { AND ($5::text IS NULL OR il.catalog_prefix::text ^@ $5) AND ($4::bool IS NULL OR il.single_use = $4) AND ($2::timestamptz IS NULL OR il.created_at < $2) + AND NOT (il.catalog_prefix::text ^@ ANY($6::text[])) ORDER BY il.created_at DESC LIMIT $3 + 1 "#, @@ -126,6 +149,10 @@ impl InviteLinksQuery { limit as i64, single_use_eq, prefix_starts_with.as_deref(), + &INTERNAL_PRIVATE_PREFIXES + .iter() + .map(|p| p.to_string()) + .collect::>(), ) .fetch_all(&env.pg_pool) .await?; @@ -186,6 +213,8 @@ impl InviteLinksMutation { ))); } + reject_internal_private_prefix(catalog_prefix.as_str())?; + verify_authorization(env, &catalog_prefix).await?; let row = sqlx::query!( @@ -375,6 +404,8 @@ impl InviteLinksMutation { None => return Err(async_graphql::Error::new("Invalid invite link")), }; + reject_internal_private_prefix(&invite.catalog_prefix)?; + verify_authorization(env, &invite.catalog_prefix).await?; sqlx::query!("DELETE FROM internal.invite_links WHERE token = $1", token,) @@ -1408,6 +1439,167 @@ mod test { ); } + // Generic invite-link APIs must not create, delete, or expose links under + // the internal `ops/dp/private/` and `ops/tasks/private/` prefixes. Those + // are owned by the platform and managed via data-plane provisioning, not + // user-driven delegation. + #[sqlx::test( + migrations = "../../supabase/migrations", + fixtures(path = "../../../fixtures", scripts("data_planes", "alice")) + )] + async fn test_invite_link_internal_prefix_guard(pool: sqlx::PgPool) { + let _guard = test_server::init(); + + let server = test_server::TestServer::start( + pool.clone(), + test_server::snapshot(pool.clone(), true).await, + ) + .await; + + let alice_token = server.make_access_token( + uuid::Uuid::from_bytes([0x11; 16]), + Some("alice@example.test"), + ); + + // createInviteLink rejects ops/dp/private/ prefix. + let create_dp: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation { + createInviteLink( + catalogPrefix: "ops/dp/private/aliceCo/" + capability: read + ) { token } + }"# + }), + Some(&alice_token), + ) + .await; + + insta::assert_json_snapshot!("create_internal_dp_prefix_rejected", create_dp); + + // createInviteLink rejects ops/tasks/private/ prefix. + let create_tasks: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation { + createInviteLink( + catalogPrefix: "ops/tasks/private/aliceCo/" + capability: read + ) { token } + }"# + }), + Some(&alice_token), + ) + .await; + + insta::assert_json_snapshot!("create_internal_tasks_prefix_rejected", create_tasks); + + // Defense-in-depth: directly insert invite links under the internal + // prefixes (bypassing the create guard) and grant Alice admin on those + // prefixes so she would otherwise be authorized to manage and list them. + let dp_token: uuid::Uuid = sqlx::query_scalar( + "INSERT INTO internal.invite_links (catalog_prefix, capability, single_use) \ + VALUES ('ops/dp/private/aliceCo/', 'read', false) RETURNING token", + ) + .fetch_one(&pool) + .await + .unwrap(); + + let tasks_token: uuid::Uuid = sqlx::query_scalar( + "INSERT INTO internal.invite_links (catalog_prefix, capability, single_use) \ + VALUES ('ops/tasks/private/aliceCo/', 'read', false) RETURNING token", + ) + .fetch_one(&pool) + .await + .unwrap(); + + sqlx::query( + "INSERT INTO user_grants (user_id, object_role, capability) VALUES \ + ($1, 'ops/dp/private/aliceCo/', 'admin'), \ + ($1, 'ops/tasks/private/aliceCo/', 'admin')", + ) + .bind(uuid::Uuid::from_bytes([0x11; 16])) + .execute(&pool) + .await + .unwrap(); + + // Refresh the snapshot so the new user grants are visible to authorization. + let server = test_server::TestServer::start( + pool.clone(), + test_server::snapshot(pool.clone(), true).await, + ) + .await; + + // deleteInviteLink rejects internal-prefix links. + let delete_dp: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($token: UUID!) { + deleteInviteLink(token: $token) + }"#, + "variables": { "token": dp_token } + }), + Some(&alice_token), + ) + .await; + + insta::assert_json_snapshot!("delete_internal_dp_prefix_rejected", delete_dp); + + let delete_tasks: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($token: UUID!) { + deleteInviteLink(token: $token) + }"#, + "variables": { "token": tasks_token } + }), + Some(&alice_token), + ) + .await; + + insta::assert_json_snapshot!("delete_internal_tasks_prefix_rejected", delete_tasks); + + // inviteLinks listing filters internal-prefix links out even when the + // caller has admin on them. + let listing: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + query { + inviteLinks(filter: { catalogPrefix: { startsWith: "ops/" } }) { + edges { node { catalogPrefix } } + } + }"# + }), + Some(&alice_token), + ) + .await; + + let edges = listing["data"]["inviteLinks"]["edges"] + .as_array() + .expect("should have edges"); + assert_eq!( + edges.len(), + 0, + "internal-prefix invite links must not appear in the listing" + ); + + // The directly-inserted rows are still present in the DB; the guard + // only suppresses them from the generic user-facing APIs. + let row_count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM internal.invite_links WHERE token = ANY($1)") + .bind(&[dp_token, tasks_token]) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(row_count, 2); + } + #[sqlx::test( migrations = "../../supabase/migrations", fixtures(path = "../../../fixtures", scripts("data_planes", "sso_tenant")) diff --git a/crates/control-plane-api/src/server/public/graphql/mod.rs b/crates/control-plane-api/src/server/public/graphql/mod.rs index be94746c1fc..928ae9aa5d0 100644 --- a/crates/control-plane-api/src/server/public/graphql/mod.rs +++ b/crates/control-plane-api/src/server/public/graphql/mod.rs @@ -73,6 +73,7 @@ pub struct MutationRoot( alert_configs::AlertConfigsMutation, alert_subscriptions::AlertSubscriptionsMutation, invite_links::InviteLinksMutation, + data_planes::DataPlanesMutation, ); pub fn create_schema(alert_config_defaults: models::AlertConfig) -> GraphQLSchema { diff --git a/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__data_planes_malformed_private_link.snap b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__data_planes_malformed_private_link.snap new file mode 100644 index 00000000000..7ab623c0ddc --- /dev/null +++ b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__data_planes_malformed_private_link.snap @@ -0,0 +1,23 @@ +--- +source: crates/control-plane-api/src/server/public/graphql/data_planes.rs +expression: response +--- +{ + "data": null, + "errors": [ + { + "locations": [ + { + "column": 37, + "line": 7 + } + ], + "message": "failed to parse private_links[0] for data plane ops/dp/private/aliceCo/aws-us-east-1-c1: data did not match any variant of untagged enum PrivateLink", + "path": [ + "dataPlanes", + "edges", + 0 + ] + } + ] +} diff --git a/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__data_planes_with_private_links.snap b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__data_planes_with_private_links.snap new file mode 100644 index 00000000000..b13edcd151f --- /dev/null +++ b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__data_planes_with_private_links.snap @@ -0,0 +1,72 @@ +--- +source: crates/control-plane-api/src/server/public/graphql/data_planes.rs +expression: response +--- +{ + "data": { + "dataPlanes": { + "edges": [ + { + "node": { + "awsLinkEndpoints": [ + { + "endpoint_id": "vpce-0123456789abcdef0", + "state": "available" + } + ], + "azureLinkEndpoints": [], + "gcpPscEndpoints": [], + "name": "ops/dp/private/aliceCo/aws-us-east-1-c1", + "privateLinks": [ + { + "__typename": "AWSPrivateLink", + "azIds": [ + "use1-az1", + "use1-az2" + ], + "region": "us-east-1", + "serviceName": "com.amazonaws.vpce.us-east-1.vpce-svc-abc123" + }, + { + "__typename": "AzurePrivateLink", + "dnsName": "privatelink.database.windows.net", + "location": "eastus", + "resourceType": null, + "serviceName": "/subscriptions/x/resourceGroups/rg/providers/Microsoft.Network/privateLinkServices/svc" + }, + { + "__typename": "GCPPrivateServiceConnect", + "allPorts": true, + "dnsRecordNames": [ + "r1", + "r2" + ], + "dnsZoneName": "z", + "region": "us-central1", + "serviceAttachment": "projects/p/regions/us-central1/serviceAttachments/sa" + } + ] + } + }, + { + "node": { + "awsLinkEndpoints": [], + "azureLinkEndpoints": [], + "gcpPscEndpoints": [], + "name": "ops/dp/public/aws-us-west-2-c1", + "privateLinks": [] + } + }, + { + "node": { + "awsLinkEndpoints": [], + "azureLinkEndpoints": [], + "gcpPscEndpoints": [], + "name": "ops/dp/public/gcp-us-central1-c2", + "privateLinks": [] + } + } + ] + } + } +} diff --git a/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__graphql_data_planes.snap b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__graphql_data_planes.snap index 3d6d0336ed9..c05e37ed97d 100644 --- a/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__graphql_data_planes.snap +++ b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__data_planes__tests__graphql_data_planes.snap @@ -1,6 +1,6 @@ --- source: crates/control-plane-api/src/server/public/graphql/data_planes.rs -assertion_line: 364 +assertion_line: 659 expression: response --- { @@ -10,17 +10,21 @@ expression: response { "node": { "awsIamUserArn": "arn:aws:iam::123456789:user/test", + "awsLinkEndpoints": [], "azureApplicationClientId": "11111111-1111-1111-1111-111111111111", "azureApplicationName": "estuary-test-app-one", + "azureLinkEndpoints": [], "cidrBlocks": [ "10.0.0.0/16", "192.168.1.0/24" ], "cloudProvider": "AWS", "fqdn": "dp.one", + "gcpPscEndpoints": [], "gcpServiceAccountEmail": "test-gcp-one@estuary-test.iam.gserviceaccount.com", "isPublic": true, "name": "ops/dp/public/aws-us-west-2-c1", + "privateLinks": [], "reactorAddress": "reactor.dp.one", "region": "us-west-2", "tag": "c1", @@ -30,16 +34,20 @@ expression: response { "node": { "awsIamUserArn": "arn:aws:iam::987654321:user/test", + "awsLinkEndpoints": [], "azureApplicationClientId": "22222222-2222-2222-2222-222222222222", "azureApplicationName": "estuary-test-app-two", + "azureLinkEndpoints": [], "cidrBlocks": [ "172.16.0.0/12" ], "cloudProvider": "GCP", "fqdn": "dp.two", + "gcpPscEndpoints": [], "gcpServiceAccountEmail": "test-gcp-two@estuary-test.iam.gserviceaccount.com", "isPublic": true, "name": "ops/dp/public/gcp-us-central1-c2", + "privateLinks": [], "reactorAddress": "reactor.dp.two", "region": "us-central1", "tag": "c2", diff --git a/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__invite_links__test__create_internal_dp_prefix_rejected.snap b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__invite_links__test__create_internal_dp_prefix_rejected.snap new file mode 100644 index 00000000000..bc9f88a6659 --- /dev/null +++ b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__invite_links__test__create_internal_dp_prefix_rejected.snap @@ -0,0 +1,21 @@ +--- +source: crates/control-plane-api/src/server/public/graphql/invite_links.rs +expression: create_dp +--- +{ + "data": null, + "errors": [ + { + "locations": [ + { + "column": 25, + "line": 3 + } + ], + "message": "ops/dp/private/aliceCo/ is an internal prefix and cannot be managed via invite links", + "path": [ + "createInviteLink" + ] + } + ] +} diff --git a/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__invite_links__test__create_internal_tasks_prefix_rejected.snap b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__invite_links__test__create_internal_tasks_prefix_rejected.snap new file mode 100644 index 00000000000..3f633cecf5f --- /dev/null +++ b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__invite_links__test__create_internal_tasks_prefix_rejected.snap @@ -0,0 +1,21 @@ +--- +source: crates/control-plane-api/src/server/public/graphql/invite_links.rs +expression: create_tasks +--- +{ + "data": null, + "errors": [ + { + "locations": [ + { + "column": 25, + "line": 3 + } + ], + "message": "ops/tasks/private/aliceCo/ is an internal prefix and cannot be managed via invite links", + "path": [ + "createInviteLink" + ] + } + ] +} diff --git a/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__invite_links__test__delete_internal_dp_prefix_rejected.snap b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__invite_links__test__delete_internal_dp_prefix_rejected.snap new file mode 100644 index 00000000000..8ae4882e1ff --- /dev/null +++ b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__invite_links__test__delete_internal_dp_prefix_rejected.snap @@ -0,0 +1,21 @@ +--- +source: crates/control-plane-api/src/server/public/graphql/invite_links.rs +expression: delete_dp +--- +{ + "data": null, + "errors": [ + { + "locations": [ + { + "column": 25, + "line": 3 + } + ], + "message": "ops/dp/private/aliceCo/ is an internal prefix and cannot be managed via invite links", + "path": [ + "deleteInviteLink" + ] + } + ] +} diff --git a/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__invite_links__test__delete_internal_tasks_prefix_rejected.snap b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__invite_links__test__delete_internal_tasks_prefix_rejected.snap new file mode 100644 index 00000000000..94f71111ea4 --- /dev/null +++ b/crates/control-plane-api/src/server/public/graphql/snapshots/control_plane_api__server__public__graphql__invite_links__test__delete_internal_tasks_prefix_rejected.snap @@ -0,0 +1,21 @@ +--- +source: crates/control-plane-api/src/server/public/graphql/invite_links.rs +expression: delete_tasks +--- +{ + "data": null, + "errors": [ + { + "locations": [ + { + "column": 25, + "line": 3 + } + ], + "message": "ops/tasks/private/aliceCo/ is an internal prefix and cannot be managed via invite links", + "path": [ + "deleteInviteLink" + ] + } + ] +} diff --git a/crates/data-plane-controller/src/shared/stack.rs b/crates/data-plane-controller/src/shared/stack.rs index 564a2c70814..9c78124ee5a 100644 --- a/crates/data-plane-controller/src/shared/stack.rs +++ b/crates/data-plane-controller/src/shared/stack.rs @@ -145,40 +145,10 @@ pub struct GCPBYOC { pub project_id: String, } -#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -#[serde(untagged)] -pub enum PrivateLink { - AWS(AWSPrivateLink), - Azure(AzurePrivateLink), - GCP(GCPPrivateServiceConnect), -} - -#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub struct AWSPrivateLink { - pub region: String, - pub az_ids: Vec, - pub service_name: String, -} - -#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub struct AzurePrivateLink { - pub service_name: String, - pub location: String, - #[serde(default, skip_serializing_if = "String::is_empty")] - pub dns_name: String, - #[serde(default, skip_serializing_if = "String::is_empty")] - pub resource_type: String, -} - -#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub struct GCPPrivateServiceConnect { - pub service_attachment: String, - pub region: String, - pub dns_zone_name: String, - pub dns_record_names: Vec, - #[serde(default, skip_serializing_if = "is_false")] - pub all_ports: bool, -} +// Private-link config types live in the shared `models` crate so the GraphQL +// API and the data-plane controller can speak the same shape. The DPC still +// references them through this module, so re-export them here. +pub use models::{AWSPrivateLink, AzurePrivateLink, GCPPrivateServiceConnect, PrivateLink}; #[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "lowercase")] @@ -587,6 +557,7 @@ mod test { az_ids: vec!["a".to_string(), "b".to_string()], region: "us-west-2".to_string(), service_name: "service".to_string(), + service_region: None, }), ); @@ -599,8 +570,8 @@ mod test { PrivateLink::Azure(AzurePrivateLink { location: "eastus".to_string(), service_name: "service".to_string(), - resource_type: "managedInstance".to_string(), - dns_name: "privatelink.database.windows.net".to_string(), + resource_type: Some("managedInstance".to_string()), + dns_name: Some("privatelink.database.windows.net".to_string()), }), ); assert_eq!( diff --git a/crates/data-plane-controller/tests/snapshots/test_private_links__private_links.snap b/crates/data-plane-controller/tests/snapshots/test_private_links__private_links.snap index 2750b8e9bfb..ad616798d30 100644 --- a/crates/data-plane-controller/tests/snapshots/test_private_links__private_links.snap +++ b/crates/data-plane-controller/tests/snapshots/test_private_links__private_links.snap @@ -1,5 +1,6 @@ --- source: crates/data-plane-controller/tests/test_private_links.rs +assertion_line: 113 expression: trace.lock().unwrap().as_slice() --- [ @@ -70,7 +71,8 @@ expression: trace.lock().unwrap().as_slice() "a", "b" ], - "service_name": "service" + "service_name": "service", + "service_region": null } ], "deployments": [ @@ -422,7 +424,8 @@ expression: trace.lock().unwrap().as_slice() "a", "b" ], - "service_name": "service" + "service_name": "service", + "service_region": null } ], "deployments": [ @@ -735,7 +738,8 @@ expression: trace.lock().unwrap().as_slice() "a", "b" ], - "service_name": "service" + "service_name": "service", + "service_region": null }, { "region": "us-west-2", @@ -743,7 +747,8 @@ expression: trace.lock().unwrap().as_slice() "b", "c" ], - "service_name": "service-2" + "service_name": "service-2", + "service_region": null } ], "deployments": [ diff --git a/crates/data-plane-controller/tests/test_private_links.rs b/crates/data-plane-controller/tests/test_private_links.rs index bdd1689ee94..f901a02261a 100644 --- a/crates/data-plane-controller/tests/test_private_links.rs +++ b/crates/data-plane-controller/tests/test_private_links.rs @@ -31,6 +31,7 @@ async fn test_private_links() { az_ids: vec!["a".to_string(), "b".to_string()], region: "us-west-2".to_string(), service_name: "service".to_string(), + service_region: None, })]; inbox.push_back(( @@ -76,6 +77,7 @@ async fn test_private_links() { az_ids: vec!["b".to_string(), "c".to_string()], region: "us-west-2".to_string(), service_name: "service-2".to_string(), + service_region: None, })); inbox.push_back((models::Id::zero(), Some(Message::Converge))); diff --git a/crates/flow-client/control-plane-api.graphql b/crates/flow-client/control-plane-api.graphql index 5258ac8a3d7..0dd65175376 100644 --- a/crates/flow-client/control-plane-api.graphql +++ b/crates/flow-client/control-plane-api.graphql @@ -1,3 +1,17 @@ +type AWSPrivateLink { + region: String! + azIds: [String!]! + serviceName: String! + serviceRegion: String +} + +input AWSPrivateLinkInput { + region: String! + azIds: [String!]! + serviceName: String! + serviceRegion: String +} + """ Status of the abandonment evaluation for a task. """ @@ -312,6 +326,20 @@ type AutoDiscoverStatus { failure: AutoDiscoverFailure } +type AzurePrivateLink { + serviceName: String! + location: String! + dnsName: String + resourceType: String +} + +input AzurePrivateLinkInput { + serviceName: String! + location: String! + dnsName: String + resourceType: String +} + input BoolFilter { eq: Boolean } @@ -639,6 +667,25 @@ type DataPlane { Azure application client ID for this data-plane. """ azureApplicationClientId: String + """ + AWS PrivateLink endpoint provisioning results, opaque JSON exported by + the data-plane controller. Empty when no AWS endpoints are provisioned. + """ + awsLinkEndpoints: [JSON!]! + """ + Azure Private Link endpoint provisioning results, opaque JSON. + """ + azureLinkEndpoints: [JSON!]! + """ + GCP Private Service Connect endpoint provisioning results, opaque JSON. + """ + gcpPscEndpoints: [JSON!]! + """ + Configured private link endpoints for this data-plane. Replacing this + list (via `updateDataPlanePrivateLinks`) triggers reconvergence by the + data-plane controller on its next poll. + """ + privateLinks: [PrivateLink!]! } """ @@ -720,6 +767,22 @@ type FieldProvenance { source: String } +type GCPPrivateServiceConnect { + serviceAttachment: String! + region: String! + dnsZoneName: String! + dnsRecordNames: [String!]! + allPorts: Boolean! +} + +input GCPPrivateServiceConnectInput { + serviceAttachment: String! + region: String! + dnsZoneName: String! + dnsRecordNames: [String!]! + allPorts: Boolean! = false +} + scalar Id """ @@ -1077,6 +1140,19 @@ type MutationRoot { The caller must have admin capability on the invite link's catalog prefix. """ deleteInviteLink(token: UUID!): Boolean! + """ + Replaces the configured private link endpoints on a private data plane. + + The provided list overwrites the entire `private_links` column; partial + updates are intentionally not supported. The data-plane controller + converges to the new configuration on its next poll. + + Interim authorization: requires `read` on the private data-plane name. + This matches the existing data-plane deployment authorization shape and + will be replaced with `manage_dataplane` once the orthogonal capability + model lands. + """ + updateDataPlanePrivateLinks(dataPlaneName: String!, privateLinks: [PrivateLinkInput!]!): Boolean! } scalar Name @@ -1167,6 +1243,22 @@ input PrefixesBy { minCapability: Capability! } +""" +Private link configuration for a customer-owned data plane: AWS +PrivateLink, Azure Private Link, or GCP Private Service Connect. +""" +union PrivateLink = AWSPrivateLink | AzurePrivateLink | GCPPrivateServiceConnect + +""" +Private link configuration for a customer-owned data plane: AWS +PrivateLink, Azure Private Link, or GCP Private Service Connect. +""" +input PrivateLinkInput @oneOf { + aws: AWSPrivateLinkInput + azure: AzurePrivateLinkInput + gcp: GCPPrivateServiceConnectInput +} + """ Filter connectors by their protocol (capture or materialization). """ @@ -1761,6 +1853,10 @@ Directs the executor to include this field or fragment only when the `if` argume """ directive @include(if: Boolean!) on FIELD | FRAGMENT_SPREAD | INLINE_FRAGMENT """ +Indicates that an Input Object is a OneOf Input Object (and thus requires exactly one of its field be provided) +""" +directive @oneOf on INPUT_OBJECT +""" Directs the executor to skip this field or fragment when the `if` argument is true. """ directive @skip(if: Boolean!) on FIELD | FRAGMENT_SPREAD | INLINE_FRAGMENT diff --git a/crates/models/src/lib.rs b/crates/models/src/lib.rs index 337365f419e..04c0a278a0f 100644 --- a/crates/models/src/lib.rs +++ b/crates/models/src/lib.rs @@ -18,6 +18,7 @@ mod id; mod journals; mod labels; mod materializations; +mod private_links; pub mod publications; mod raw_value; mod references; @@ -57,6 +58,9 @@ pub use materializations::{ MaterializationBinding, MaterializationDef, MaterializationEndpoint, MaterializationFields, RecommendedDepth, TargetNamingStrategy, }; +pub use private_links::{ + AWSPrivateLink, AzurePrivateLink, GCPPrivateServiceConnect, PrivateLink, +}; pub use raw_value::RawValue; pub use references::{ CATALOG_PREFIX_RE, Capture, Collection, CompositeKey, Field, JsonPointer, Materialization, diff --git a/crates/models/src/private_links.rs b/crates/models/src/private_links.rs new file mode 100644 index 00000000000..c7213507338 --- /dev/null +++ b/crates/models/src/private_links.rs @@ -0,0 +1,168 @@ +use serde::{Deserialize, Serialize}; + +/// Private link configuration for a customer-owned data plane: AWS +/// PrivateLink, Azure Private Link, or GCP Private Service Connect. +// `#[serde(untagged)]` matches each variant by its required fields, preserving +// the `private_links json[]` column shape consumed by the data-plane +// controller. The types previously lived in `data-plane-controller::shared::stack` +// and are re-exported there for existing DPC callers. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[cfg_attr( + feature = "async-graphql", + derive(async_graphql::Union, async_graphql::OneofObject), + graphql(name = "PrivateLink", input_name = "PrivateLinkInput") +)] +#[serde(untagged)] +pub enum PrivateLink { + AWS(AWSPrivateLink), + Azure(AzurePrivateLink), + GCP(GCPPrivateServiceConnect), +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +#[cfg_attr( + feature = "async-graphql", + derive(async_graphql::SimpleObject, async_graphql::InputObject), + graphql(name = "AWSPrivateLink", input_name = "AWSPrivateLinkInput") +)] +pub struct AWSPrivateLink { + pub region: String, + pub az_ids: Vec, + pub service_name: String, + // AWS region of the PrivateLink service when it differs from the endpoint's + // region (cross-region PrivateLink). When unset, est-dry-dock defaults to + // `region`. Mirrors `service_region` in the est-dry-dock Pydantic model. + #[serde(default)] + pub service_region: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +#[cfg_attr( + feature = "async-graphql", + derive(async_graphql::SimpleObject, async_graphql::InputObject), + graphql(name = "AzurePrivateLink", input_name = "AzurePrivateLinkInput") +)] +pub struct AzurePrivateLink { + pub service_name: String, + pub location: String, + // `dns_name` and `resource_type` are optional. On the wire they round-trip + // as "field absent" for None and as the string value for Some; an incoming + // empty string is normalized to None on deserialize and on serialize, which + // preserves byte-for-byte compatibility with historical rows that wrote + // either a missing field or `""`. + #[serde( + default, + deserialize_with = "deserialize_empty_string_as_none", + skip_serializing_if = "is_none_or_empty" + )] + pub dns_name: Option, + #[serde( + default, + deserialize_with = "deserialize_empty_string_as_none", + skip_serializing_if = "is_none_or_empty" + )] + pub resource_type: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +#[cfg_attr( + feature = "async-graphql", + derive(async_graphql::SimpleObject, async_graphql::InputObject), + graphql( + name = "GCPPrivateServiceConnect", + input_name = "GCPPrivateServiceConnectInput" + ) +)] +pub struct GCPPrivateServiceConnect { + pub service_attachment: String, + pub region: String, + pub dns_zone_name: String, + pub dns_record_names: Vec, + #[serde(default, skip_serializing_if = "is_false")] + #[cfg_attr(feature = "async-graphql", graphql(default))] + pub all_ports: bool, +} + +fn is_false(b: &bool) -> bool { + !b +} + +fn is_none_or_empty(opt: &Option) -> bool { + opt.as_deref().map_or(true, str::is_empty) +} + +fn deserialize_empty_string_as_none<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + let opt = Option::::deserialize(deserializer)?; + Ok(opt.filter(|s| !s.is_empty())) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn azure_optional_fields_round_trip() { + // Three historical shapes for the optional fields all parse to the + // same Azure variant, and serializing omits them entirely. + let absent = r#"{"service_name":"svc","location":"eastus"}"#; + let empty = r#"{"service_name":"svc","location":"eastus","dns_name":"","resource_type":""}"#; + let some = + r#"{"service_name":"svc","location":"eastus","dns_name":"d","resource_type":"t"}"#; + + for raw in [absent, empty] { + let link: PrivateLink = serde_json::from_str(raw).unwrap(); + let PrivateLink::Azure(azure) = &link else { + panic!("expected Azure variant for {raw}"); + }; + assert_eq!(azure.dns_name, None); + assert_eq!(azure.resource_type, None); + // Round-trip serializes to the canonical absent shape. + assert_eq!(serde_json::to_string(&link).unwrap(), absent); + } + + let link: PrivateLink = serde_json::from_str(some).unwrap(); + let PrivateLink::Azure(azure) = &link else { + panic!("expected Azure variant"); + }; + assert_eq!(azure.dns_name.as_deref(), Some("d")); + assert_eq!(azure.resource_type.as_deref(), Some("t")); + assert_eq!(serde_json::to_string(&link).unwrap(), some); + } + + #[test] + fn untagged_dispatch_order() { + // Variant dispatch is determined by required-field presence in the + // declared AWS, Azure, GCP order. Each provider matches only on its + // unique required field set. + let aws: PrivateLink = serde_json::from_str( + r#"{"region":"us-east-1","az_ids":["use1-az1"],"service_name":"com.amazonaws.vpce.us-east-1.vpce-svc-abc"}"#, + ).unwrap(); + assert!(matches!(aws, PrivateLink::AWS(_))); + + let azure: PrivateLink = + serde_json::from_str(r#"{"service_name":"svc","location":"eastus"}"#).unwrap(); + assert!(matches!(azure, PrivateLink::Azure(_))); + + let gcp: PrivateLink = serde_json::from_str( + r#"{"service_attachment":"projects/p/regions/r/serviceAttachments/sa","region":"r","dns_zone_name":"z","dns_record_names":["n"]}"#, + ).unwrap(); + assert!(matches!(gcp, PrivateLink::GCP(_))); + } + + #[test] + fn gcp_all_ports_default_omitted() { + let gcp: PrivateLink = serde_json::from_str( + r#"{"service_attachment":"projects/p/regions/r/serviceAttachments/sa","region":"r","dns_zone_name":"z","dns_record_names":["n"]}"#, + ).unwrap(); + let PrivateLink::GCP(g) = &gcp else { unreachable!() }; + assert!(!g.all_ports); + // False default is skipped on serialize. + assert!(!serde_json::to_string(&gcp).unwrap().contains("all_ports")); + } +}