Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

## [MAJOR.MINOR.PATCH] - YYYY-MM-DD

- Add `KafkaSchema` field `references[].kafkaSchemaRef`, type `object`: Reference to another
`KafkaSchema` resource in the same namespace. The subject and version are resolved from the
referenced resource's spec and status, dependents pick up new versions automatically.
Mutually exclusive with `subject` + `version` on the same entry.
- Change `KafkaSchema` field `references`: maxItems `100`
- **BREAKING**: Change `KafkaSchema` deletion to perform a hard delete instead of soft delete only.
The subject is no longer visible in the registry's listing after deletion,
and re-applying a `KafkaSchema` with the same `subjectName` after deletion starts at version 1.

## v0.38.0 - 2026-05-18

- Add `MySQL` and `PostgreSQL` field `migrationSecretSource`, type `object`: Reference to a Secret containing migration
Expand Down
70 changes: 62 additions & 8 deletions api/v1alpha1/kafkaschema_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package v1alpha1
import (
"github.com/aiven/go-client-codegen/handler/kafkaschemaregistry"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

// KafkaSchemaSpec defines the desired state of KafkaSchema
Expand All @@ -16,7 +17,7 @@ type KafkaSchemaSpec struct {
// Kafka Schema Subject name
SubjectName string `json:"subjectName"`

// Kafka Schema configuration should be a valid Avro Schema JSON format
// Kafka Schema definition. Format depends on schemaType (AVRO/JSON/PROTOBUF)
Schema string `json:"schema"`

// +kubebuilder:validation:Enum=AVRO;JSON;PROTOBUF
Expand All @@ -28,23 +29,49 @@ type KafkaSchemaSpec struct {
// Kafka Schemas compatibility level
CompatibilityLevel kafkaschemaregistry.CompatibilityType `json:"compatibilityLevel,omitempty"`

// Schema references for Protobuf or JSON schemas that import other schemas
// +kubebuilder:validation:MaxItems=100
// +listType=map
// +listMapKey=name
// Schema references for Protobuf or JSON schemas that import other schemas.
// References must form a directed acyclic graph (DAG); cycles are not allowed.
References []SchemaReference `json:"references,omitempty"`
}

// SchemaReference is a reference to another schema in the registry
// SchemaReference is a reference to another schema in the registry.
// Exactly one of {subject+version} or kafkaSchemaRef must be set.
// +kubebuilder:validation:XValidation:rule="(has(self.subject) && has(self.version) && !has(self.kafkaSchemaRef)) || (!has(self.subject) && !has(self.version) && has(self.kafkaSchemaRef))",message="set both subject and version, or set kafkaSchemaRef, but not both"
type SchemaReference struct {
// +kubebuilder:validation:MinLength=1
// +kubebuilder:validation:MaxLength=512
// Name used to reference the schema (e.g., the import path in Protobuf)
Name string `json:"name"`

// +kubebuilder:validation:MinLength=1
// Subject name of the referenced schema in the registry
Subject string `json:"subject"`
// +kubebuilder:validation:MaxLength=512
// Subject name of the referenced schema in the registry. Mutually exclusive with kafkaSchemaRef.
// +optional
Subject string `json:"subject,omitempty"`

// +kubebuilder:validation:Minimum=1
// Version of the referenced schema
Version int `json:"version"`
// Version of the referenced schema. Mutually exclusive with kafkaSchemaRef.
// +optional
Version int `json:"version,omitempty"`

// Reference to another KafkaSchema resource in the same namespace.
// Mutually exclusive with subject/version.
//
// Cleanup order matters: delete the dependent before the referent.
// +optional
KafkaSchemaRef *LocalKafkaSchemaRef `json:"kafkaSchemaRef,omitempty"`
}

// LocalKafkaSchemaRef references another KafkaSchema in the same namespace as the owner.
// Cross-namespace references are not supported to avoid confused-deputy situations in multi-tenant clusters.
type LocalKafkaSchemaRef struct {
// +kubebuilder:validation:MinLength=1
// +kubebuilder:validation:MaxLength=253
// Name of the KafkaSchema resource in the same namespace.
Name string `json:"name"`
}

// KafkaSchemaStatus defines the observed state of KafkaSchema
Expand All @@ -62,12 +89,20 @@ type KafkaSchemaStatus struct {
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status

// KafkaSchema is the Schema for the kafkaschemas API
// KafkaSchema is the Schema for the kafkaschemas API.
//
// Self-references (A -> A) are blocked at admission; transitive cycles
// (A -> B -> A) are not detected at admission time.
//
// Deletion: the operator performs a soft delete followed by a hard delete on
// the subject. The subject disappears from the registry's listing, re-applying a KafkaSchema with the same subjectName
// after deletion starts a brand-new subject at version 1.
// +kubebuilder:printcolumn:name="Service Name",type="string",JSONPath=".spec.serviceName"
// +kubebuilder:printcolumn:name="Project",type="string",JSONPath=".spec.project"
// +kubebuilder:printcolumn:name="Subject",type="string",JSONPath=".spec.subjectName"
// +kubebuilder:printcolumn:name="Compatibility Level",type="string",JSONPath=".spec.compatibilityLevel"
// +kubebuilder:printcolumn:name="Version",type="number",JSONPath=".status.version"
// +kubebuilder:validation:XValidation:rule="!has(self.spec.references) || self.spec.references.all(r, !has(r.kafkaSchemaRef) || r.kafkaSchemaRef.name != self.metadata.name)",message="kafkaSchemaRef cannot point to the KafkaSchema itself"
type KafkaSchema struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand All @@ -94,6 +129,25 @@ func (in *KafkaSchema) GetObjectMeta() *metav1.ObjectMeta {
return &in.ObjectMeta
}

// GetRefs returns ResourceReferenceObjects for any kafkaSchemaRef entries in Spec.References.
// The namespace is always the owner's namespace; refs are same-namespace only by design.
func (in *KafkaSchema) GetRefs() []*ResourceReferenceObject {
refs := make([]*ResourceReferenceObject, 0, len(in.Spec.References))
for _, ref := range in.Spec.References {
if ref.KafkaSchemaRef == nil {
continue
}
refs = append(refs, &ResourceReferenceObject{
GroupVersionKind: GroupVersion.WithKind("KafkaSchema"),
NamespacedName: types.NamespacedName{
Namespace: in.GetNamespace(),
Name: ref.KafkaSchemaRef.Name,
},
})
}
return refs
}

// +kubebuilder:object:root=true

// KafkaSchemaList contains a list of KafkaSchema
Expand Down
24 changes: 23 additions & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 58 additions & 13 deletions charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,15 @@ spec:
name: v1alpha1
schema:
openAPIV3Schema:
description: KafkaSchema is the Schema for the kafkaschemas API
description: |-
KafkaSchema is the Schema for the kafkaschemas API.

Self-references (A -> A) are blocked at admission; transitive cycles
(A -> B -> A) are not detected at admission time.

Deletion: the operator performs a soft delete followed by a hard delete on
the subject. The subject disappears from the registry's listing, re-applying a KafkaSchema with the same subjectName
after deletion starts a brand-new subject at version 1.
properties:
apiVersion:
description: |-
Expand Down Expand Up @@ -88,38 +96,70 @@ spec:
- message: Value is immutable
rule: self == oldSelf
references:
description:
Schema references for Protobuf or JSON schemas that import
other schemas
description: |-
Schema references for Protobuf or JSON schemas that import other schemas.
References must form a directed acyclic graph (DAG); cycles are not allowed.
items:
description:
SchemaReference is a reference to another schema in
the registry
description: |-
SchemaReference is a reference to another schema in the registry.
Exactly one of {subject+version} or kafkaSchemaRef must be set.
properties:
kafkaSchemaRef:
description: |-
Reference to another KafkaSchema resource in the same namespace.
Mutually exclusive with subject/version.

Cleanup order matters: delete the dependent before the referent.
properties:
name:
description:
Name of the KafkaSchema resource in the same
namespace.
maxLength: 253
minLength: 1
type: string
required:
- name
type: object
name:
description:
Name used to reference the schema (e.g., the import
path in Protobuf)
maxLength: 512
minLength: 1
type: string
subject:
description: Subject name of the referenced schema in the registry
description:
Subject name of the referenced schema in the registry.
Mutually exclusive with kafkaSchemaRef.
maxLength: 512
minLength: 1
type: string
version:
description: Version of the referenced schema
description:
Version of the referenced schema. Mutually exclusive
with kafkaSchemaRef.
minimum: 1
type: integer
required:
- name
- subject
- version
type: object
x-kubernetes-validations:
- message:
set both subject and version, or set kafkaSchemaRef,
but not both
rule:
(has(self.subject) && has(self.version) && !has(self.kafkaSchemaRef))
|| (!has(self.subject) && !has(self.version) && has(self.kafkaSchemaRef))
maxItems: 100
type: array
x-kubernetes-list-map-keys:
- name
x-kubernetes-list-type: map
schema:
description:
Kafka Schema configuration should be a valid Avro Schema
JSON format
Kafka Schema definition. Format depends on schemaType
(AVRO/JSON/PROTOBUF)
type: string
schemaType:
description: Schema type
Expand Down Expand Up @@ -235,6 +275,11 @@ spec:
- version
type: object
type: object
x-kubernetes-validations:
- message: kafkaSchemaRef cannot point to the KafkaSchema itself
rule:
"!has(self.spec.references) || self.spec.references.all(r, !has(r.kafkaSchemaRef)
|| r.kafkaSchemaRef.name != self.metadata.name)"
served: true
storage: true
subresources:
Expand Down
Loading
Loading