Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ All notable changes to this project will be documented in this file.
- BREAKING: `configOverrides` now only accepts `bootstrap.conf`, `nifi.properties` and `security.properties`.
Previously, arbitrary keys were silently accepted but ignored ([#921]).
- Bump `stackable-operator` to 0.110.1 and `kube` to 3.1.0 ([#921]).
- Internal operator refactoring: introduce dereference() and validate() steps in the reconciler ([#935]).

### Fixed

Expand All @@ -27,6 +28,7 @@ All notable changes to this project will be documented in this file.
[#922]: https://github.com/stackabletech/nifi-operator/pull/922
[#924]: https://github.com/stackabletech/nifi-operator/pull/924
[#928]: https://github.com/stackabletech/nifi-operator/pull/928
[#935]: https://github.com/stackabletech/nifi-operator/pull/935

## [26.3.0] - 2026-03-16

Expand Down
18 changes: 9 additions & 9 deletions Cargo.nix

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions crate-hashes.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

146 changes: 35 additions & 111 deletions rust/operator-binary/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::{
borrow::Cow,
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, HashMap},
sync::Arc,
};

Expand All @@ -27,10 +27,7 @@ use stackable_operator::{
cli::OperatorEnvironmentOptions,
client::Client,
cluster_resources::{ClusterResourceApplyStrategy, ClusterResources},
commons::{
product_image_selection::{self, ResolvedProductImage},
rbac::build_rbac_resources,
},
commons::{product_image_selection::ResolvedProductImage, rbac::build_rbac_resources},
constants::RESTART_CONTROLLER_ENABLED_LABEL,
crd::{authentication::oidc::v1alpha1::AuthenticationProvider, git_sync},
k8s_openapi::{
Expand Down Expand Up @@ -75,20 +72,21 @@ use stackable_operator::{
use strum::{EnumDiscriminants, IntoStaticStr};
use tracing::Instrument;

mod dereference;
mod validate;

use crate::{
OPERATOR_NAME,
config::{
self, JVM_SECURITY_PROPERTIES_FILE, NIFI_BOOTSTRAP_CONF, NIFI_CONFIG_DIRECTORY,
NIFI_PROPERTIES, NIFI_PYTHON_WORKING_DIRECTORY, NIFI_STATE_MANAGEMENT_XML, NifiRepository,
build_bootstrap_conf, build_nifi_properties, build_state_management_xml,
validated_product_config,
},
crd::{
APP_NAME, BALANCE_PORT, BALANCE_PORT_NAME, Container, HTTPS_PORT, HTTPS_PORT_NAME,
METRICS_PORT, METRICS_PORT_NAME, NifiConfig, NifiNodeRoleConfig, NifiRole, NifiRoleType,
NifiStatus, PROTOCOL_PORT, PROTOCOL_PORT_NAME, STACKABLE_LOG_CONFIG_DIR, STACKABLE_LOG_DIR,
authentication::AuthenticationClassResolved, authorization::NifiAccessPolicyProvider,
v1alpha1,
authorization::NifiAccessPolicyProvider, v1alpha1,
},
listener::{
LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, build_group_listener, build_group_listener_pvc,
Expand All @@ -100,7 +98,7 @@ use crate::{
upgrade::{self, ClusterVersionUpdateState},
},
product_logging::extend_role_group_config_map,
reporting_task::{self, build_maybe_reporting_task, build_reporting_task_service_name},
reporting_task::{build_maybe_reporting_task, build_reporting_task_service_name},
security::{
authentication::{
AUTHORIZERS_XML_FILE_NAME, LOGIN_IDENTITY_PROVIDERS_XML_FILE_NAME,
Expand Down Expand Up @@ -143,6 +141,12 @@ pub enum Error {
#[snafu(display("object defines no namespace"))]
ObjectHasNoNamespace,

#[snafu(display("failed to dereference resources"))]
Dereference { source: dereference::Error },

#[snafu(display("failed to validate cluster"))]
ValidateCluster { source: validate::Error },

#[snafu(display("failed to create cluster resources"))]
CreateClusterResources {
source: stackable_operator::cluster_resources::Error,
Expand Down Expand Up @@ -200,12 +204,6 @@ pub enum Error {
source: stackable_operator::builder::meta::Error,
},

#[snafu(display("Failed to load product config"))]
ProductConfigLoadFailed {
#[snafu(source(from(config::Error, Box::new)))]
source: Box<config::Error>,
},

#[snafu(display("Failed to find information about file [{}] in product config", kind))]
ProductConfigKindNotSpecified { kind: String },

Expand Down Expand Up @@ -277,11 +275,6 @@ pub enum Error {
source: crate::security::authorization::Error,
},

#[snafu(display("Failed to resolve NiFi Authentication Configuration"))]
FailedResolveNifiAuthenticationConfig {
source: crate::crd::authentication::Error,
},

#[snafu(display("failed to create PodDisruptionBudget"))]
FailedToCreatePdb {
source: crate::operations::pdb::Error,
Expand Down Expand Up @@ -348,11 +341,6 @@ pub enum Error {

#[snafu(display("failed to build authorization configuration"))]
AuthorizationConfiguration { source: authorization::Error },

#[snafu(display("failed to resolve product image"))]
ResolveProductImage {
source: product_image_selection::Error,
},
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -376,15 +364,25 @@ pub async fn reconcile_nifi(

let client = &ctx.client;

let resolved_product_image = nifi
.spec
.image
.resolve(
CONTAINER_IMAGE_BASE_NAME,
&ctx.operator_environment.image_repository,
crate::built_info::PKG_VERSION,
)
.context(ResolveProductImageSnafu)?;
// dereference (client required)
let dereferenced_objects = dereference::dereference(client, nifi)
.await
.context(DereferenceSnafu)?;

// validate (no client required)
let validated = validate::validate(
nifi,
&dereferenced_objects,
&ctx.operator_environment,
&ctx.product_config,
)
.context(ValidateClusterSnafu)?;

let resolved_product_image = validated.image;
let authentication_config = validated.authentication_config;
let authorization_config = validated.authorization_config;
let validated_config = validated.validated_role_config;
let proxy_hosts = dereferenced_objects.proxy_hosts;
Comment thread
adwk67 marked this conversation as resolved.
Outdated

tracing::info!("Checking for sensitive key configuration");
check_or_generate_sensitive_key(client, nifi)
Expand Down Expand Up @@ -419,14 +417,6 @@ pub async fn reconcile_nifi(
}
// end todo

let validated_config = validated_product_config(
nifi,
&resolved_product_image.product_version,
nifi.spec.nodes.as_ref().context(NoNodesDefinedSnafu)?,
&ctx.product_config,
)
.context(ProductConfigLoadFailedSnafu)?;

let mut cluster_resources = ClusterResources::new(
APP_NAME,
OPERATOR_NAME,
Expand All @@ -442,30 +432,12 @@ pub async fn reconcile_nifi(
.map(Cow::Borrowed)
.unwrap_or_default();

let authentication_config = NifiAuthenticationConfig::try_from(
AuthenticationClassResolved::from(nifi, client)
.await
.context(FailedResolveNifiAuthenticationConfigSnafu)?,
)
.context(InvalidNifiAuthenticationConfigSnafu)?;

if let NifiAuthenticationConfig::Oidc { .. } = authentication_config {
check_or_generate_oidc_admin_password(client, nifi)
.await
.context(SecuritySnafu)?;
}

let authorization_config = ResolvedNifiAuthorizationConfig::from(
&nifi.spec.cluster_config.authorization,
client,
nifi.metadata
.namespace
.as_deref()
.context(ObjectHasNoNamespaceSnafu)?,
)
.await
.context(InvalidNifiAuthorizationConfigSnafu)?;

let (rbac_sa, rbac_rolebinding) = build_rbac_resources(
nifi,
APP_NAME,
Expand Down Expand Up @@ -530,13 +502,9 @@ pub async fn reconcile_nifi(

let role = nifi.spec.nodes.as_ref().context(NoNodesDefinedSnafu)?;

// This is due to the fact that users might access NiFi via these addresses, if they try to
// connect from an external machine (not inside the k8s overlay network).
// Since we cannot predict which of the addresses a user might decide to use we will simply
// add all of them to the setting for now.
// The proxy hosts allow-list lets external users access NiFi via addresses we cannot
// predict, so all of them are added to the setting.
// For more information see <https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#proxy_configuration>
let proxy_hosts = get_proxy_hosts(client, nifi, &resolved_product_image).await?;

let rg_configmap = build_node_rolegroup_config_map(
nifi,
&resolved_product_image,
Expand Down Expand Up @@ -738,9 +706,7 @@ async fn build_node_rolegroup_config_map(
.get_authentication_config()
.context(InvalidNifiAuthenticationConfigSnafu)?;

let authorizers_xml = authorization_config
.get_authorizers_config(nifi)
.context(InvalidNifiAuthorizationConfigSnafu)?;
let authorizers_xml = authorization_config.get_authorizers_config(nifi);

let jvm_sec_props: BTreeMap<String, Option<String>> = rolegroup_config
.get(&PropertyNameKind::File(
Expand Down Expand Up @@ -1534,48 +1500,6 @@ fn get_volume_claim_templates(
Ok(pvcs)
}

async fn get_proxy_hosts(
client: &Client,
nifi: &v1alpha1::NifiCluster,
resolved_product_image: &ResolvedProductImage,
) -> Result<String> {
let host_header_check = nifi.spec.cluster_config.host_header_check.clone();

if host_header_check.allow_all {
tracing::info!(
"spec.clusterConfig.hostHeaderCheck.allowAll is set to true. All proxy hosts will be allowed."
);
if !host_header_check.additional_allowed_hosts.is_empty() {
tracing::info!(
"spec.clusterConfig.hostHeaderCheck.additionalAllowedHosts is ignored and only '*' is added to the allow-list."
)
}
return Ok("*".to_string());
}

// Address and port are injected from the listener volume during the prepare container
let mut proxy_hosts = HashSet::from([
"${env:LISTENER_DEFAULT_ADDRESS}:${env:LISTENER_DEFAULT_PORT_HTTPS}".to_string(),
]);
proxy_hosts.extend(host_header_check.additional_allowed_hosts);

// Reporting task only exists for NiFi 1.x
if resolved_product_image.product_version.starts_with("1.") {
let reporting_task_service_name = reporting_task::build_reporting_task_fqdn_service_name(
nifi,
&client.kubernetes_cluster_info,
)
.context(ReportingTaskSnafu)?;

proxy_hosts.insert(format!("{reporting_task_service_name}:{HTTPS_PORT}"));
}

let mut proxy_hosts = Vec::from_iter(proxy_hosts);
proxy_hosts.sort();

Ok(proxy_hosts.join(","))
}

pub fn error_policy(
_obj: Arc<DeserializeGuard<v1alpha1::NifiCluster>>,
error: &Error,
Expand Down
Loading
Loading