diff --git a/CHANGELOG.md b/CHANGELOG.md index 143a840b..d3493a60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ ## [MAJOR.MINOR.PATCH] - YYYY-MM-DD +- Add `KafkaSchema` field `references[].kafkaSchemaRef`, type `object`: Reference to another + `KafkaSchema` resource in the same namespace. The subject and version are resolved from the + referenced resource's spec and status, dependents pick up new versions automatically. + Mutually exclusive with `subject` + `version` on the same entry. +- Change `KafkaSchema` field `references`: maxItems `100` +- **BREAKING**: Change `KafkaSchema` deletion to perform a hard delete instead of soft delete only. + The subject is no longer visible in the registry's listing after deletion, + and re-applying a `KafkaSchema` with the same `subjectName` after deletion starts at version 1. + ## v0.38.0 - 2026-05-18 - Add `MySQL` and `PostgreSQL` field `migrationSecretSource`, type `object`: Reference to a Secret containing migration diff --git a/api/v1alpha1/kafkaschema_types.go b/api/v1alpha1/kafkaschema_types.go index acf7f4f8..7a8182f6 100644 --- a/api/v1alpha1/kafkaschema_types.go +++ b/api/v1alpha1/kafkaschema_types.go @@ -5,6 +5,7 @@ package v1alpha1 import ( "github.com/aiven/go-client-codegen/handler/kafkaschemaregistry" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) // KafkaSchemaSpec defines the desired state of KafkaSchema @@ -16,7 +17,7 @@ type KafkaSchemaSpec struct { // Kafka Schema Subject name SubjectName string `json:"subjectName"` - // Kafka Schema configuration should be a valid Avro Schema JSON format + // Kafka Schema definition. Format depends on schemaType (AVRO/JSON/PROTOBUF) Schema string `json:"schema"` // +kubebuilder:validation:Enum=AVRO;JSON;PROTOBUF @@ -28,23 +29,49 @@ type KafkaSchemaSpec struct { // Kafka Schemas compatibility level CompatibilityLevel kafkaschemaregistry.CompatibilityType `json:"compatibilityLevel,omitempty"` - // Schema references for Protobuf or JSON schemas that import other schemas + // +kubebuilder:validation:MaxItems=100 + // +listType=map + // +listMapKey=name + // Schema references for Protobuf or JSON schemas that import other schemas. + // References must form a directed acyclic graph (DAG); cycles are not allowed. References []SchemaReference `json:"references,omitempty"` } -// SchemaReference is a reference to another schema in the registry +// SchemaReference is a reference to another schema in the registry. +// Exactly one of {subject+version} or kafkaSchemaRef must be set. +// +kubebuilder:validation:XValidation:rule="(has(self.subject) && has(self.version) && !has(self.kafkaSchemaRef)) || (!has(self.subject) && !has(self.version) && has(self.kafkaSchemaRef))",message="set both subject and version, or set kafkaSchemaRef, but not both" type SchemaReference struct { // +kubebuilder:validation:MinLength=1 + // +kubebuilder:validation:MaxLength=512 // Name used to reference the schema (e.g., the import path in Protobuf) Name string `json:"name"` // +kubebuilder:validation:MinLength=1 - // Subject name of the referenced schema in the registry - Subject string `json:"subject"` + // +kubebuilder:validation:MaxLength=512 + // Subject name of the referenced schema in the registry. Mutually exclusive with kafkaSchemaRef. + // +optional + Subject string `json:"subject,omitempty"` // +kubebuilder:validation:Minimum=1 - // Version of the referenced schema - Version int `json:"version"` + // Version of the referenced schema. Mutually exclusive with kafkaSchemaRef. + // +optional + Version int `json:"version,omitempty"` + + // Reference to another KafkaSchema resource in the same namespace. + // Mutually exclusive with subject/version. + // + // Cleanup order matters: delete the dependent before the referent. + // +optional + KafkaSchemaRef *LocalKafkaSchemaRef `json:"kafkaSchemaRef,omitempty"` +} + +// LocalKafkaSchemaRef references another KafkaSchema in the same namespace as the owner. +// Cross-namespace references are not supported to avoid confused-deputy situations in multi-tenant clusters. +type LocalKafkaSchemaRef struct { + // +kubebuilder:validation:MinLength=1 + // +kubebuilder:validation:MaxLength=253 + // Name of the KafkaSchema resource in the same namespace. + Name string `json:"name"` } // KafkaSchemaStatus defines the observed state of KafkaSchema @@ -62,12 +89,20 @@ type KafkaSchemaStatus struct { // +kubebuilder:object:root=true // +kubebuilder:subresource:status -// KafkaSchema is the Schema for the kafkaschemas API +// KafkaSchema is the Schema for the kafkaschemas API. +// +// Self-references (A -> A) are blocked at admission; transitive cycles +// (A -> B -> A) are not detected at admission time. +// +// Deletion: the operator performs a soft delete followed by a hard delete on +// the subject. The subject disappears from the registry's listing, re-applying a KafkaSchema with the same subjectName +// after deletion starts a brand-new subject at version 1. // +kubebuilder:printcolumn:name="Service Name",type="string",JSONPath=".spec.serviceName" // +kubebuilder:printcolumn:name="Project",type="string",JSONPath=".spec.project" // +kubebuilder:printcolumn:name="Subject",type="string",JSONPath=".spec.subjectName" // +kubebuilder:printcolumn:name="Compatibility Level",type="string",JSONPath=".spec.compatibilityLevel" // +kubebuilder:printcolumn:name="Version",type="number",JSONPath=".status.version" +// +kubebuilder:validation:XValidation:rule="!has(self.spec.references) || self.spec.references.all(r, !has(r.kafkaSchemaRef) || r.kafkaSchemaRef.name != self.metadata.name)",message="kafkaSchemaRef cannot point to the KafkaSchema itself" type KafkaSchema struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` @@ -94,6 +129,25 @@ func (in *KafkaSchema) GetObjectMeta() *metav1.ObjectMeta { return &in.ObjectMeta } +// GetRefs returns ResourceReferenceObjects for any kafkaSchemaRef entries in Spec.References. +// The namespace is always the owner's namespace; refs are same-namespace only by design. +func (in *KafkaSchema) GetRefs() []*ResourceReferenceObject { + refs := make([]*ResourceReferenceObject, 0, len(in.Spec.References)) + for _, ref := range in.Spec.References { + if ref.KafkaSchemaRef == nil { + continue + } + refs = append(refs, &ResourceReferenceObject{ + GroupVersionKind: GroupVersion.WithKind("KafkaSchema"), + NamespacedName: types.NamespacedName{ + Namespace: in.GetNamespace(), + Name: ref.KafkaSchemaRef.Name, + }, + }) + } + return refs +} + // +kubebuilder:object:root=true // KafkaSchemaList contains a list of KafkaSchema diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 68e16dea..a9e85eef 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -1709,7 +1709,9 @@ func (in *KafkaSchemaSpec) DeepCopyInto(out *KafkaSchemaSpec) { if in.References != nil { in, out := &in.References, &out.References *out = make([]SchemaReference, len(*in)) - copy(*out, *in) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } } @@ -2033,6 +2035,21 @@ func (in *KafkaTopicTag) DeepCopy() *KafkaTopicTag { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LocalKafkaSchemaRef) DeepCopyInto(out *LocalKafkaSchemaRef) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LocalKafkaSchemaRef. +func (in *LocalKafkaSchemaRef) DeepCopy() *LocalKafkaSchemaRef { + if in == nil { + return nil + } + out := new(LocalKafkaSchemaRef) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MigrationSecretSource) DeepCopyInto(out *MigrationSecretSource) { *out = *in @@ -2771,6 +2788,11 @@ func (in *RoleGrant) DeepCopy() *RoleGrant { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SchemaReference) DeepCopyInto(out *SchemaReference) { *out = *in + if in.KafkaSchemaRef != nil { + in, out := &in.KafkaSchemaRef, &out.KafkaSchemaRef + *out = new(LocalKafkaSchemaRef) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SchemaReference. diff --git a/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml b/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml index 927894a6..ae55f8dd 100644 --- a/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml +++ b/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml @@ -33,7 +33,15 @@ spec: name: v1alpha1 schema: openAPIV3Schema: - description: KafkaSchema is the Schema for the kafkaschemas API + description: |- + KafkaSchema is the Schema for the kafkaschemas API. + + Self-references (A -> A) are blocked at admission; transitive cycles + (A -> B -> A) are not detected at admission time. + + Deletion: the operator performs a soft delete followed by a hard delete on + the subject. The subject disappears from the registry's listing, re-applying a KafkaSchema with the same subjectName + after deletion starts a brand-new subject at version 1. properties: apiVersion: description: |- @@ -88,38 +96,70 @@ spec: - message: Value is immutable rule: self == oldSelf references: - description: - Schema references for Protobuf or JSON schemas that import - other schemas + description: |- + Schema references for Protobuf or JSON schemas that import other schemas. + References must form a directed acyclic graph (DAG); cycles are not allowed. items: - description: - SchemaReference is a reference to another schema in - the registry + description: |- + SchemaReference is a reference to another schema in the registry. + Exactly one of {subject+version} or kafkaSchemaRef must be set. properties: + kafkaSchemaRef: + description: |- + Reference to another KafkaSchema resource in the same namespace. + Mutually exclusive with subject/version. + + Cleanup order matters: delete the dependent before the referent. + properties: + name: + description: + Name of the KafkaSchema resource in the same + namespace. + maxLength: 253 + minLength: 1 + type: string + required: + - name + type: object name: description: Name used to reference the schema (e.g., the import path in Protobuf) + maxLength: 512 minLength: 1 type: string subject: - description: Subject name of the referenced schema in the registry + description: + Subject name of the referenced schema in the registry. + Mutually exclusive with kafkaSchemaRef. + maxLength: 512 minLength: 1 type: string version: - description: Version of the referenced schema + description: + Version of the referenced schema. Mutually exclusive + with kafkaSchemaRef. minimum: 1 type: integer required: - name - - subject - - version type: object + x-kubernetes-validations: + - message: + set both subject and version, or set kafkaSchemaRef, + but not both + rule: + (has(self.subject) && has(self.version) && !has(self.kafkaSchemaRef)) + || (!has(self.subject) && !has(self.version) && has(self.kafkaSchemaRef)) + maxItems: 100 type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map schema: description: - Kafka Schema configuration should be a valid Avro Schema - JSON format + Kafka Schema definition. Format depends on schemaType + (AVRO/JSON/PROTOBUF) type: string schemaType: description: Schema type @@ -235,6 +275,11 @@ spec: - version type: object type: object + x-kubernetes-validations: + - message: kafkaSchemaRef cannot point to the KafkaSchema itself + rule: + "!has(self.spec.references) || self.spec.references.all(r, !has(r.kafkaSchemaRef) + || r.kafkaSchemaRef.name != self.metadata.name)" served: true storage: true subresources: diff --git a/config/crd/bases/aiven.io_kafkaschemas.yaml b/config/crd/bases/aiven.io_kafkaschemas.yaml index 927894a6..ae55f8dd 100644 --- a/config/crd/bases/aiven.io_kafkaschemas.yaml +++ b/config/crd/bases/aiven.io_kafkaschemas.yaml @@ -33,7 +33,15 @@ spec: name: v1alpha1 schema: openAPIV3Schema: - description: KafkaSchema is the Schema for the kafkaschemas API + description: |- + KafkaSchema is the Schema for the kafkaschemas API. + + Self-references (A -> A) are blocked at admission; transitive cycles + (A -> B -> A) are not detected at admission time. + + Deletion: the operator performs a soft delete followed by a hard delete on + the subject. The subject disappears from the registry's listing, re-applying a KafkaSchema with the same subjectName + after deletion starts a brand-new subject at version 1. properties: apiVersion: description: |- @@ -88,38 +96,70 @@ spec: - message: Value is immutable rule: self == oldSelf references: - description: - Schema references for Protobuf or JSON schemas that import - other schemas + description: |- + Schema references for Protobuf or JSON schemas that import other schemas. + References must form a directed acyclic graph (DAG); cycles are not allowed. items: - description: - SchemaReference is a reference to another schema in - the registry + description: |- + SchemaReference is a reference to another schema in the registry. + Exactly one of {subject+version} or kafkaSchemaRef must be set. properties: + kafkaSchemaRef: + description: |- + Reference to another KafkaSchema resource in the same namespace. + Mutually exclusive with subject/version. + + Cleanup order matters: delete the dependent before the referent. + properties: + name: + description: + Name of the KafkaSchema resource in the same + namespace. + maxLength: 253 + minLength: 1 + type: string + required: + - name + type: object name: description: Name used to reference the schema (e.g., the import path in Protobuf) + maxLength: 512 minLength: 1 type: string subject: - description: Subject name of the referenced schema in the registry + description: + Subject name of the referenced schema in the registry. + Mutually exclusive with kafkaSchemaRef. + maxLength: 512 minLength: 1 type: string version: - description: Version of the referenced schema + description: + Version of the referenced schema. Mutually exclusive + with kafkaSchemaRef. minimum: 1 type: integer required: - name - - subject - - version type: object + x-kubernetes-validations: + - message: + set both subject and version, or set kafkaSchemaRef, + but not both + rule: + (has(self.subject) && has(self.version) && !has(self.kafkaSchemaRef)) + || (!has(self.subject) && !has(self.version) && has(self.kafkaSchemaRef)) + maxItems: 100 type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map schema: description: - Kafka Schema configuration should be a valid Avro Schema - JSON format + Kafka Schema definition. Format depends on schemaType + (AVRO/JSON/PROTOBUF) type: string schemaType: description: Schema type @@ -235,6 +275,11 @@ spec: - version type: object type: object + x-kubernetes-validations: + - message: kafkaSchemaRef cannot point to the KafkaSchema itself + rule: + "!has(self.spec.references) || self.spec.references.all(r, !has(r.kafkaSchemaRef) + || r.kafkaSchemaRef.name != self.metadata.name)" served: true storage: true subresources: diff --git a/controllers/kafkaschema_controller.go b/controllers/kafkaschema_controller.go index 430597d9..dfc617c3 100644 --- a/controllers/kafkaschema_controller.go +++ b/controllers/kafkaschema_controller.go @@ -4,18 +4,31 @@ package controllers import ( "context" + "errors" "fmt" + "sort" + "strings" avngen "github.com/aiven/go-client-codegen" "github.com/aiven/go-client-codegen/handler/kafkaschemaregistry" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/retry" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/aiven/aiven-operator/api/v1alpha1" ) +// kafkaSchemaRefIndex is the cache index key for finding KafkaSchemas that +// reference another KafkaSchema by name. +const kafkaSchemaRefIndex = "spec.references.kafkaSchemaRef.name" + //+kubebuilder:rbac:groups=aiven.io,resources=kafkaschemas,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=aiven.io,resources=kafkaschemas/status,verbs=get;update;patch //+kubebuilder:rbac:groups=aiven.io,resources=kafkaschemas/finalizers,verbs=get;create;update @@ -33,7 +46,76 @@ func newKafkaSchemaReconciler(c Controller) reconcilerType { return &KafkaSchemaController{Client: c.Client, avnGen: avnGen} }, nil, - ) + ).WithIndexes(registerKafkaSchemaRefIndex). + WithWatches(func(b *builder.Builder) *builder.Builder { + return b.Watches( + &v1alpha1.KafkaSchema{}, + handler.EnqueueRequestsFromMapFunc(findKafkaSchemasReferencing(c.Client)), + builder.WithPredicates(kafkaSchemaVersionChangedPredicate()), + ) + }) +} + +// registerKafkaSchemaRefIndex indexes KafkaSchemas by the referent names in Spec.References[*].KafkaSchemaRef.Name. +func registerKafkaSchemaRefIndex(ctx context.Context, mgr ctrl.Manager) error { + return mgr.GetFieldIndexer().IndexField(ctx, &v1alpha1.KafkaSchema{}, kafkaSchemaRefIndex, kafkaSchemaRefIndexValues) +} + +// kafkaSchemaRefIndexValues extracts referent names from a KafkaSchema for the index. +func kafkaSchemaRefIndexValues(obj client.Object) []string { + s, ok := obj.(*v1alpha1.KafkaSchema) + if !ok { + return nil + } + names := make([]string, 0, len(s.Spec.References)) + for _, ref := range s.Spec.References { + if ref.KafkaSchemaRef != nil { + names = append(names, ref.KafkaSchemaRef.Name) + } + } + + return names +} + +// findKafkaSchemasReferencing enqueues every KafkaSchema in the same namespace that has a kafkaSchemaRef pointing at. +func findKafkaSchemasReferencing(k client.Client) handler.MapFunc { + return func(ctx context.Context, obj client.Object) []reconcile.Request { + target, ok := obj.(*v1alpha1.KafkaSchema) + if !ok { + return nil + } + var list v1alpha1.KafkaSchemaList + if err := k.List(ctx, &list, + client.InNamespace(target.GetNamespace()), + client.MatchingFields{kafkaSchemaRefIndex: target.GetName()}, + ); err != nil { + return nil + } + + out := make([]reconcile.Request, 0, len(list.Items)) + for i := range list.Items { + out = append(out, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(&list.Items[i])}) + } + + return out + } +} + +// kafkaSchemaVersionChangedPredicate enqueues dependents only when a referent changes. +func kafkaSchemaVersionChangedPredicate() predicate.Predicate { + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + o, okO := e.ObjectOld.(*v1alpha1.KafkaSchema) + n, okN := e.ObjectNew.(*v1alpha1.KafkaSchema) + if !okO || !okN { + return true + } + return o.Status.Version != n.Status.Version || o.Generation != n.Generation + }, + CreateFunc: func(event.CreateEvent) bool { return true }, + DeleteFunc: func(event.DeleteEvent) bool { return false }, + GenericFunc: func(event.GenericEvent) bool { return false }, + } } func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.KafkaSchema) (Observation, error) { @@ -50,7 +132,6 @@ func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.Ka switch { case isServerError(err): // The service is operational but the schema registry may not yet be ready. - // Surface this as a precondition miss so the reconciler does a soft requeue. return Observation{}, fmt.Errorf("%w: schema registry not ready", errPreconditionNotMet) case isNotFound(err): // Subject is not registered yet @@ -81,6 +162,21 @@ func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.Ka } schema.Status.Version = got.Version + + // A kafkaSchemaRef referent can advance without spec changing. + // Compare what spec.references currently resolves to. + desired, err := r.resolveReferences(ctx, schema) + switch { + case errors.Is(err, errPreconditionNotMet): + // Referent exists but its Status.Version is still 0 — soft-requeue. + return Observation{}, err + case err != nil: + return Observation{}, fmt.Errorf("resolving desired references: %w", err) + } + if !referencesEqual(desired, got.References) { + return Observation{ResourceExists: true, ResourceUpToDate: false}, nil + } + meta.SetStatusCondition(&schema.Status.Conditions, getRunningCondition(metav1.ConditionTrue, "CheckRunning", "Instance is running on Aiven side")) metav1.SetMetaDataAnnotation(&schema.ObjectMeta, instanceIsRunningAnnotation, "true") @@ -91,7 +187,7 @@ func (r *KafkaSchemaController) Observe(ctx context.Context, schema *v1alpha1.Ka }, nil } - // Tracked version is not visible, maybe eventual-consistency lag after POST + // Tracked version is not visible yet. return Observation{}, fmt.Errorf("%w: tracked schema ID %d not visible in registry", errPreconditionNotMet, schema.Status.ID) } @@ -137,13 +233,9 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha } if len(schema.Spec.References) > 0 { - refs := make([]kafkaschemaregistry.ReferenceIn, len(schema.Spec.References)) - for i, ref := range schema.Spec.References { - refs[i] = kafkaschemaregistry.ReferenceIn{ - Name: ref.Name, - Subject: ref.Subject, - Version: ref.Version, - } + refs, err := r.resolveReferences(ctx, schema) + if err != nil { + return err } postIn.References = &refs } @@ -182,6 +274,67 @@ func (r *KafkaSchemaController) applySchema(ctx context.Context, schema *v1alpha return nil } +// resolveReferences turns Spec.References into the ReferenceIn slice. +func (r *KafkaSchemaController) resolveReferences( + ctx context.Context, + schema *v1alpha1.KafkaSchema, +) ([]kafkaschemaregistry.ReferenceIn, error) { + refs := make([]kafkaschemaregistry.ReferenceIn, 0, len(schema.Spec.References)) + for _, ref := range schema.Spec.References { + subject, version := ref.Subject, ref.Version + if ref.KafkaSchemaRef != nil { + target := &v1alpha1.KafkaSchema{} + key := client.ObjectKey{ + Namespace: schema.GetNamespace(), + Name: ref.KafkaSchemaRef.Name, + } + if err := r.Get(ctx, key, target); err != nil { + return nil, fmt.Errorf("resolving kafkaSchemaRef %s: %w", key, err) + } + if target.Status.Version == 0 { + return nil, fmt.Errorf("%w: referenced KafkaSchema %s has no version yet", errPreconditionNotMet, key) + } + subject = target.Spec.SubjectName + version = target.Status.Version + } + + refs = append(refs, kafkaschemaregistry.ReferenceIn{ + Name: ref.Name, + Subject: subject, + Version: version, + }) + } + + return refs, nil +} + +// referencesEqual compares desired vs. registry references by name (path / $ref key). +// Order is ignored. Names are enforced to be unique per KafkaSchema. +func referencesEqual(desired []kafkaschemaregistry.ReferenceIn, got []kafkaschemaregistry.ReferenceOut) bool { + if len(desired) != len(got) { + return false + } + + byName := make(map[string]kafkaschemaregistry.ReferenceOut, len(got)) + for _, r := range got { + byName[r.Name] = r + } + + // If the registry ever returned duplicate Names. + if len(byName) != len(got) { + return false + } + + for _, d := range desired { + g, ok := byName[d.Name] + if !ok || g.Subject != d.Subject || g.Version != d.Version { + return false + } + } + + return true +} + // persistStatusID writes schema.Status.ID to the API server in its own status // subresource update, retrying on optimistic-concurrency conflicts. // @@ -203,10 +356,66 @@ func (r *KafkaSchemaController) persistStatusID(ctx context.Context, schema *v1a } func (r *KafkaSchemaController) Delete(ctx context.Context, schema *v1alpha1.KafkaSchema) error { - // Soft delete: the schema is still accessible via ?deleted=true. - err := r.avnGen.ServiceSchemaRegistrySubjectDelete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName) - if err != nil && !isNotFound(err) { - return err + // Block delete if any KafkaSchema in this namespace still imports us via kafkaSchemaRef. + // Only catches kafkaSchemaRef dependents in the same namespace. + dependents, err := r.findKafkaSchemaRefDependents(ctx, schema) + if err != nil { + return fmt.Errorf("checking for kafkaSchemaRef dependents: %w", err) + } + + if len(dependents) > 0 { + return fmt.Errorf("%w: still referenced by %s", + v1alpha1.ErrDeleteDependencies, strings.Join(dependents, ", ")) + } + + // Two-step delete: soft-delete first, then hard-delete. + // The schema registry requires this ordering — a hard-delete is only + // allowed after a soft-delete on the same subject. + // + // Soft-delete leaves the subject's references attached in the registry's metadata, + // which keeps any *referent* of this subject pinned. + if err = r.avnGen.ServiceSchemaRegistrySubjectDelete( + ctx, + schema.Spec.Project, + schema.Spec.ServiceName, + schema.Spec.SubjectName, + ); err != nil && !isNotFound(err) { + return fmt.Errorf("soft-deleting Kafka Schema Subject: %w", err) + } + + if err = r.avnGen.ServiceSchemaRegistrySubjectDelete( + ctx, + schema.Spec.Project, + schema.Spec.ServiceName, + schema.Spec.SubjectName, + kafkaschemaregistry.ServiceSchemaRegistrySubjectDeletePermanent(true), + ); err != nil && !isNotFound(err) { + return fmt.Errorf("hard-deleting Kafka Schema Subject: %w", err) } return nil } + +// findKafkaSchemaRefDependents returns the sorted list with names of KafkaSchemas in the same +// namespace that reference schema via kafkaSchemaRef. +func (r *KafkaSchemaController) findKafkaSchemaRefDependents( + ctx context.Context, + schema *v1alpha1.KafkaSchema, +) ([]string, error) { + var list v1alpha1.KafkaSchemaList + if err := r.List(ctx, &list, + client.InNamespace(schema.GetNamespace()), + client.MatchingFields{kafkaSchemaRefIndex: schema.GetName()}, + ); err != nil { + return nil, err + } + names := make([]string, 0, len(list.Items)) + for i := range list.Items { + if list.Items[i].GetName() == schema.GetName() { + continue + } + names = append(names, list.Items[i].GetName()) + } + sort.Strings(names) + + return names, nil +} diff --git a/controllers/kafkaschema_controller_test.go b/controllers/kafkaschema_controller_test.go index 439dc01e..cd11331e 100644 --- a/controllers/kafkaschema_controller_test.go +++ b/controllers/kafkaschema_controller_test.go @@ -2,12 +2,14 @@ package controllers import ( "testing" + "time" avngen "github.com/aiven/go-client-codegen" "github.com/aiven/go-client-codegen/handler/kafkaschemaregistry" "github.com/aiven/go-client-codegen/handler/service" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -17,6 +19,7 @@ import ( ctrlruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/event" "github.com/aiven/aiven-operator/api/v1alpha1" ) @@ -36,53 +39,6 @@ spec: {"type":"record","name":"Test","fields":[{"name":"id","type":"string"}]} ` -func runKafkaSchemaScenario( - t *testing.T, - schema *v1alpha1.KafkaSchema, - avn avngen.Client, - additionalObjects ...client.Object, -) (*Reconciler[*v1alpha1.KafkaSchema], ctrlruntime.Result, error) { - t.Helper() - - scheme := runtime.NewScheme() - require.NoError(t, clientgoscheme.AddToScheme(scheme)) - require.NoError(t, v1alpha1.AddToScheme(scheme)) - - objects := append([]client.Object{schema}, additionalObjects...) - - r := newKafkaSchemaReconciler(Controller{ - Client: fake.NewClientBuilder(). - WithScheme(scheme). - WithStatusSubresource(&v1alpha1.KafkaSchema{}). - WithObjects(objects...). - Build(), - Scheme: scheme, - Recorder: record.NewFakeRecorder(10), - DefaultToken: "test-token", - PollInterval: testPollInterval, - }).(*Reconciler[*v1alpha1.KafkaSchema]) - r.newAivenGeneratedClient = func(_, _, _ string) (avngen.Client, error) { - return avn, nil - } - - res, err := r.Reconcile(t.Context(), ctrlruntime.Request{ - NamespacedName: types.NamespacedName{ - Name: schema.Name, - Namespace: schema.Namespace, - }, - }) - return r, res, err -} - -func runningService() *service.ServiceGetOut { - return &service.ServiceGetOut{ - State: service.ServiceStateTypeRunning, - NodeStates: []service.NodeStateOut{ - {State: service.NodeStateTypeRunning}, - }, - } -} - func TestKafkaSchemaReconciler(t *testing.T) { t.Parallel() @@ -316,6 +272,7 @@ func TestKafkaSchemaReconciler(t *testing.T) { require.NotContains(t, got.Annotations, instanceIsRunningAnnotation) }) + // Soft-delete followed by hard-delete t.Run("Deletes KafkaSchema and removes finalizer on deletion", func(t *testing.T) { schema := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) schema.Generation = 1 @@ -324,9 +281,15 @@ func TestKafkaSchemaReconciler(t *testing.T) { schema.DeletionTimestamp = &now avn := avngen.NewMockClient(t) + // Soft-delete: no query options avn.EXPECT(). ServiceSchemaRegistrySubjectDelete(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). Return(nil).Once() + // Hard-delete: must carry permanent=true + avn.EXPECT(). + ServiceSchemaRegistrySubjectDelete(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, + [][2]string{kafkaschemaregistry.ServiceSchemaRegistrySubjectDeletePermanent(true)}). + Return(nil).Once() r, res, err := runKafkaSchemaScenario(t, schema, avn) require.NoError(t, err) @@ -337,6 +300,417 @@ func TestKafkaSchemaReconciler(t *testing.T) { require.True(t, apierrors.IsNotFound(err)) }) + t.Run("Resolves kafkaSchemaRef from referent spec and status", func(t *testing.T) { + referent := &v1alpha1.KafkaSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: "referent", + Namespace: "default", + Generation: 1, + Annotations: map[string]string{ + processedGenerationAnnotation: "1", + instanceIsRunningAnnotation: "true", + }, + }, + Spec: func() v1alpha1.KafkaSchemaSpec { + s := v1alpha1.KafkaSchemaSpec{ + SubjectName: "resolved-subject", + SchemaType: kafkaschemaregistry.SchemaTypeProtobuf, + Schema: "syntax = \"proto3\"; message X {}", + } + s.Project = "test-project" + s.ServiceName = "test-service" + return s + }(), + Status: v1alpha1.KafkaSchemaStatus{ + ID: 100, + Version: 7, + }, + } + + schema := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + schema.Generation = 1 + schema.Spec.SchemaType = kafkaschemaregistry.SchemaTypeProtobuf + schema.Spec.References = []v1alpha1.SchemaReference{ + {Name: "common.proto", KafkaSchemaRef: &v1alpha1.LocalKafkaSchemaRef{Name: "referent"}}, + } + + avn := avngen.NewMockClient(t) + avn.EXPECT(). + ServiceGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, mock.Anything). + Return(runningService(), nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). + Return(nil, newAivenError(404, "not found")).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionPost( + mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, + mock.MatchedBy(func(in *kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionPostIn) bool { + if in.References == nil || len(*in.References) != 1 { + return false + } + ref := (*in.References)[0] + return ref.Name == "common.proto" && ref.Subject == "resolved-subject" && ref.Version == 7 + }), + ).Return(11, nil).Once() + + r, res, err := runKafkaSchemaScenario(t, schema, avn, referent) + require.NoError(t, err) + require.Equal(t, ctrlruntime.Result{RequeueAfter: requeueTimeout}, res) + + got := &v1alpha1.KafkaSchema{} + require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: schema.Name, Namespace: schema.Namespace}, got)) + require.Equal(t, 11, got.Status.ID) + }) + + t.Run("Resolves explicit and kafkaSchemaRef entries in one list", func(t *testing.T) { + referent := &v1alpha1.KafkaSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: "referent", + Namespace: "default", + Generation: 1, + Annotations: map[string]string{ + processedGenerationAnnotation: "1", + instanceIsRunningAnnotation: "true", + }, + }, + Spec: func() v1alpha1.KafkaSchemaSpec { + s := v1alpha1.KafkaSchemaSpec{ + SubjectName: "shared-subject", + } + s.Project = "test-project" + s.ServiceName = "test-service" + return s + }(), + Status: v1alpha1.KafkaSchemaStatus{ID: 1, Version: 3}, + } + + schema := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + schema.Generation = 1 + schema.Spec.SchemaType = kafkaschemaregistry.SchemaTypeProtobuf + schema.Spec.References = []v1alpha1.SchemaReference{ + {Name: "legacy.proto", Subject: "legacy-subject", Version: 2}, + {Name: "shared.proto", KafkaSchemaRef: &v1alpha1.LocalKafkaSchemaRef{Name: "referent"}}, + } + + avn := avngen.NewMockClient(t) + avn.EXPECT(). + ServiceGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, mock.Anything). + Return(runningService(), nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). + Return(nil, newAivenError(404, "not found")).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionPost( + mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, + mock.MatchedBy(func(in *kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionPostIn) bool { + if in.References == nil || len(*in.References) != 2 { + return false + } + first := (*in.References)[0] + second := (*in.References)[1] + return first.Name == "legacy.proto" && first.Subject == "legacy-subject" && first.Version == 2 && + second.Name == "shared.proto" && second.Subject == "shared-subject" && second.Version == 3 + }), + ).Return(55, nil).Once() + + r, res, err := runKafkaSchemaScenario(t, schema, avn, referent) + require.NoError(t, err) + require.Equal(t, ctrlruntime.Result{RequeueAfter: requeueTimeout}, res) + + got := &v1alpha1.KafkaSchema{} + require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: schema.Name, Namespace: schema.Namespace}, got)) + require.Equal(t, 55, got.Status.ID) + }) + + t.Run("Requeues without calling Aiven when kafkaSchemaRef target is missing", func(t *testing.T) { + schema := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + schema.Generation = 1 + schema.Spec.SchemaType = kafkaschemaregistry.SchemaTypeProtobuf + schema.Spec.References = []v1alpha1.SchemaReference{ + {Name: "common.proto", KafkaSchemaRef: &v1alpha1.LocalKafkaSchemaRef{Name: "not-there"}}, + } + + // the reconciler must block on the missing referent before call to Aiven + avn := avngen.NewMockClient(t) + + r, res, err := runKafkaSchemaScenario(t, schema, avn) + require.NoError(t, err) + require.Equal(t, ctrlruntime.Result{RequeueAfter: requeueTimeout}, res) + + got := &v1alpha1.KafkaSchema{} + require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: schema.Name, Namespace: schema.Namespace}, got)) + require.NotContains(t, got.Annotations, instanceIsRunningAnnotation) + require.NotContains(t, got.Annotations, processedGenerationAnnotation) + }) + + // resolveK8sRefs sees the referent as Ready, but its Status.Version is still 0. + // The reconciler must soft-requeue. + t.Run("Requeues from Create when referent is Ready but its Status.Version is still zero", func(t *testing.T) { + referent := &v1alpha1.KafkaSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: "referent", + Namespace: "default", + Generation: 1, + Annotations: map[string]string{ + processedGenerationAnnotation: "1", + instanceIsRunningAnnotation: "true", + }, + }, + Spec: func() v1alpha1.KafkaSchemaSpec { + s := v1alpha1.KafkaSchemaSpec{ + SubjectName: "resolved-subject", + } + s.Project = "test-project" + s.ServiceName = "test-service" + return s + }(), + // Status.Version intentionally zero: the referent is "Ready" by + // annotations but the registry-assigned version hasn't landed yet. + Status: v1alpha1.KafkaSchemaStatus{ID: 100, Version: 0}, + } + + schema := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + schema.Generation = 1 + schema.Spec.SchemaType = kafkaschemaregistry.SchemaTypeProtobuf + schema.Spec.References = []v1alpha1.SchemaReference{ + {Name: "common.proto", KafkaSchemaRef: &v1alpha1.LocalKafkaSchemaRef{Name: "referent"}}, + } + + avn := avngen.NewMockClient(t) + // Service must be operational. + avn.EXPECT(). + ServiceGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, mock.Anything). + Return(runningService(), nil).Once() + // Subject is not registered yet → Create. The reconciler must + // fail-fast on the version-zero precondition before reaching POST. + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). + Return(nil, newAivenError(404, "not found")).Once() + + r, res, err := runKafkaSchemaScenario(t, schema, avn, referent) + require.NoError(t, err) + require.Equal(t, ctrlruntime.Result{RequeueAfter: requeueTimeout}, res) + + got := &v1alpha1.KafkaSchema{} + require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: schema.Name, Namespace: schema.Namespace}, got)) + require.NotContains(t, got.Annotations, instanceIsRunningAnnotation) + require.NotContains(t, got.Annotations, processedGenerationAnnotation) + }) + + // When a user applies a referent and a dependent in the same kubectl apply, the dependent's + // first reconcile typically lands before the referent has finished its own. + // The dependent must soft-requeue waiting for the referent to become Ready. + t.Run("Multi-apply: dependent waits for referent", func(t *testing.T) { + // Referent: exists in k8s but not yet processed. + referent := &v1alpha1.KafkaSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: "referent", + Namespace: "default", + Generation: 1, + }, + Spec: func() v1alpha1.KafkaSchemaSpec { + s := v1alpha1.KafkaSchemaSpec{ + SubjectName: "resolved-subject", + } + s.Project = "test-project" + s.ServiceName = "test-service" + return s + }(), + } + + dependent := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + dependent.Generation = 1 + dependent.Spec.SchemaType = kafkaschemaregistry.SchemaTypeProtobuf + dependent.Spec.References = []v1alpha1.SchemaReference{ + {Name: "common.proto", KafkaSchemaRef: &v1alpha1.LocalKafkaSchemaRef{Name: "referent"}}, + } + + // Any call during pass 1 fails the test (no expectation registered). + avn := avngen.NewMockClient(t) + + r := setupKafkaSchemaReconciler(t, avn, dependent, referent) + + // dependent reconciles before the referent is Ready. + res, err := r.Reconcile(t.Context(), ctrlruntime.Request{ + NamespacedName: types.NamespacedName{Name: dependent.Name, Namespace: dependent.Namespace}, + }) + require.NoError(t, err) + require.Equal(t, ctrlruntime.Result{RequeueAfter: requeueTimeout}, res, "dependent must soft-requeue while referent is not Ready") + + afterPass1 := &v1alpha1.KafkaSchema{} + require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: dependent.Name, Namespace: dependent.Namespace}, afterPass1)) + require.NotContains(t, afterPass1.Annotations, instanceIsRunningAnnotation, "dependent must not be marked Ready before its first apply") + require.NotContains(t, afterPass1.Annotations, processedGenerationAnnotation, "dependent must not have processed any generation yet") + require.Equal(t, 0, afterPass1.Status.ID, "dependent must not have a registry ID after pass 1") + + // Simulate the referent's successful reconcile by patching status and annotations directly. + latestReferent := &v1alpha1.KafkaSchema{} + require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: referent.Name, Namespace: referent.Namespace}, latestReferent)) + latestReferent.Status.Version = 7 + require.NoError(t, r.Status().Update(t.Context(), latestReferent)) + require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: referent.Name, Namespace: referent.Namespace}, latestReferent)) + latestReferent.Annotations = map[string]string{ + processedGenerationAnnotation: "1", + instanceIsRunningAnnotation: "true", + } + require.NoError(t, r.Update(t.Context(), latestReferent)) + + // Dependent reconciles after the referent advanced. + avn.EXPECT(). + ServiceGet(mock.Anything, dependent.Spec.Project, dependent.Spec.ServiceName, mock.Anything). + Return(runningService(), nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, dependent.Spec.Project, dependent.Spec.ServiceName, dependent.Spec.SubjectName). + Return(nil, newAivenError(404, "not found")).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionPost( + mock.Anything, dependent.Spec.Project, dependent.Spec.ServiceName, dependent.Spec.SubjectName, + mock.MatchedBy(func(in *kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionPostIn) bool { + if in.References == nil || len(*in.References) != 1 { + return false + } + ref := (*in.References)[0] + return ref.Name == "common.proto" && + ref.Subject == latestReferent.Spec.SubjectName && + ref.Version == 7 + }), + ).Return(99, nil).Once() + + res, err = r.Reconcile(t.Context(), ctrlruntime.Request{ + NamespacedName: types.NamespacedName{Name: dependent.Name, Namespace: dependent.Namespace}, + }) + require.NoError(t, err) + require.Equal(t, ctrlruntime.Result{RequeueAfter: requeueTimeout}, res) + + afterPass2 := &v1alpha1.KafkaSchema{} + require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: dependent.Name, Namespace: dependent.Namespace}, afterPass2)) + require.Equal(t, 99, afterPass2.Status.ID, "dependent must record the registry ID returned by the POST") + require.Equal(t, "1", afterPass2.Annotations[processedGenerationAnnotation], "dependent must have processed its generation") + }) + + t.Run("Re-POSTs when a kafkaSchemaRef referent advances to a new version", func(t *testing.T) { + referent := &v1alpha1.KafkaSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: "referent", + Namespace: "default", + Generation: 1, + Annotations: map[string]string{ + processedGenerationAnnotation: "1", + instanceIsRunningAnnotation: "true", + }, + }, + Spec: func() v1alpha1.KafkaSchemaSpec { + s := v1alpha1.KafkaSchemaSpec{ + SubjectName: "resolved-subject", + } + s.Project = "test-project" + s.ServiceName = "test-service" + return s + }(), + // Referent has advanced to version 2. + Status: v1alpha1.KafkaSchemaStatus{ID: 200, Version: 2}, + } + + schema := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + schema.Generation = 1 + schema.Annotations = map[string]string{ + processedGenerationAnnotation: "1", + instanceIsRunningAnnotation: "true", + } + schema.Spec.SchemaType = kafkaschemaregistry.SchemaTypeProtobuf + schema.Spec.References = []v1alpha1.SchemaReference{ + {Name: "common.proto", KafkaSchemaRef: &v1alpha1.LocalKafkaSchemaRef{Name: "referent"}}, + } + // Dependent's own Status.ID was assigned at version=1 of the referent. + schema.Status.ID = 50 + schema.Status.Version = 1 + + avn := avngen.NewMockClient(t) + avn.EXPECT(). + ServiceGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, mock.Anything). + Return(runningService(), nil).Once() + // The registry currently has the dependent at version=1. Observe will see Status.ID match. + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). + Return([]int{1}, nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 1). + Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{Id: 50, Version: 1}, nil).Once() + + // EXPECTED: a fresh POST that carries the referent's NEW version (2). + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionPost( + mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, + mock.MatchedBy(func(in *kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionPostIn) bool { + if in.References == nil || len(*in.References) != 1 { + return false + } + ref := (*in.References)[0] + return ref.Name == "common.proto" && ref.Subject == "resolved-subject" && ref.Version == 2 + }), + ).Return(60, nil).Once() + + r, res, err := runKafkaSchemaScenario(t, schema, avn, referent) + require.NoError(t, err) + require.Equal(t, ctrlruntime.Result{RequeueAfter: requeueTimeout}, res) + + got := &v1alpha1.KafkaSchema{} + require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: schema.Name, Namespace: schema.Namespace}, got)) + require.Equal(t, 60, got.Status.ID, "dependent must have re-POSTed and recorded the new ID") + require.NotContains(t, got.Annotations, instanceIsRunningAnnotation, "Update must clear the running annotation when re-POSTing") + }) + + // Remove all references when Spec.References is empty, but + // the registry currently serves the schema with reference still attached. + t.Run("Re-POSTs without References when spec drops all references", func(t *testing.T) { + schema := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + schema.Generation = 1 + schema.Annotations = map[string]string{ + processedGenerationAnnotation: "1", + instanceIsRunningAnnotation: "true", + } + schema.Spec.SchemaType = kafkaschemaregistry.SchemaTypeProtobuf + // Spec carries no references. + schema.Spec.References = nil + schema.Status.ID = 77 + schema.Status.Version = 3 + + avn := avngen.NewMockClient(t) + avn.EXPECT(). + ServiceGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, mock.Anything). + Return(runningService(), nil).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionsGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). + Return([]int{3}, nil).Once() + // Registry reports one reference. Observe must treat this as stale. + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionGet(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, 3). + Return(&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionGetOut{ + Id: 77, + Version: 3, + References: []kafkaschemaregistry.ReferenceOut{ + {Name: "stale.proto", Subject: "stale-subject", Version: 1}, + }, + }, nil).Once() + + avn.EXPECT(). + ServiceSchemaRegistrySubjectVersionPost( + mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, + mock.MatchedBy(func(in *kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionPostIn) bool { + return in.References == nil + }), + ).Return(78, nil).Once() + + r, res, err := runKafkaSchemaScenario(t, schema, avn) + require.NoError(t, err) + require.Equal(t, ctrlruntime.Result{RequeueAfter: requeueTimeout}, res) + + got := &v1alpha1.KafkaSchema{} + require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: schema.Name, Namespace: schema.Namespace}, got)) + require.Equal(t, 78, got.Status.ID, "dependent must have re-POSTed and recorded the new ID") + require.NotContains(t, got.Annotations, instanceIsRunningAnnotation, "Update must clear the running annotation when re-POSTing") + }) + t.Run("Treats 404 on delete as already deleted", func(t *testing.T) { schema := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) schema.Generation = 1 @@ -348,6 +722,10 @@ func TestKafkaSchemaReconciler(t *testing.T) { avn.EXPECT(). ServiceSchemaRegistrySubjectDelete(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). Return(newAivenError(404, "not found")).Once() + avn.EXPECT(). + ServiceSchemaRegistrySubjectDelete(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, + [][2]string{kafkaschemaregistry.ServiceSchemaRegistrySubjectDeletePermanent(true)}). + Return(newAivenError(404, "not found")).Once() r, res, err := runKafkaSchemaScenario(t, schema, avn) require.NoError(t, err) @@ -357,4 +735,396 @@ func TestKafkaSchemaReconciler(t *testing.T) { err = r.Get(t.Context(), types.NamespacedName{Name: schema.Name, Namespace: schema.Namespace}, got) require.True(t, apierrors.IsNotFound(err)) }) + + // The schema registry rejects soft-delete of a subject that another + // subject still references. On a transient/server error the + // reconciler must keep the finalizer and requeue. + t.Run("Keeps finalizer and requeues when delete fails with a server error", func(t *testing.T) { + schema := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + schema.Generation = 1 + schema.Finalizers = []string{instanceDeletionFinalizer} + now := metav1.Now() + schema.DeletionTimestamp = &now + + avn := avngen.NewMockClient(t) + avn.EXPECT(). + ServiceSchemaRegistrySubjectDelete(mock.Anything, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName). + Return(newAivenError(500, "subject is referenced")).Once() + + r, res, err := runKafkaSchemaScenario(t, schema, avn) + require.NoError(t, err) + require.Equal(t, ctrlruntime.Result{RequeueAfter: requeueTimeout}, res) + + got := &v1alpha1.KafkaSchema{} + require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: schema.Name, Namespace: schema.Namespace}, got)) + require.Contains(t, got.Finalizers, instanceDeletionFinalizer, "finalizer must remain so deletion can retry") + }) + + // When in-namespace dependent still points at the referent via kafkaSchemaRef, + // Delete must exit before any Aiven call and the reconciler must soft requeue. + t.Run("Refuses delete and preserves Ready annotations when a kafkaSchemaRef dependent exists", func(t *testing.T) { + referent := newObjectFromYAML[v1alpha1.KafkaSchema](t, yamlKafkaSchema) + referent.Generation = 1 + referent.Finalizers = []string{instanceDeletionFinalizer} + referent.Annotations = map[string]string{ + processedGenerationAnnotation: "1", + instanceIsRunningAnnotation: "true", + } + now := metav1.Now() + referent.DeletionTimestamp = &now + + dependent := &v1alpha1.KafkaSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dependent", + Namespace: referent.Namespace, + }, + Spec: v1alpha1.KafkaSchemaSpec{ + References: []v1alpha1.SchemaReference{ + {Name: "common.proto", KafkaSchemaRef: &v1alpha1.LocalKafkaSchemaRef{Name: referent.Name}}, + }, + }, + } + + avn := avngen.NewMockClient(t) + + r, res, err := runKafkaSchemaScenario(t, referent, avn, dependent) + require.NoError(t, err) + require.Equal(t, ctrlruntime.Result{RequeueAfter: requeueTimeout}, res) + + got := &v1alpha1.KafkaSchema{} + require.NoError(t, r.Get(t.Context(), types.NamespacedName{Name: referent.Name, Namespace: referent.Namespace}, got)) + require.Contains(t, got.Finalizers, instanceDeletionFinalizer) + // Both annotations must survive, resolveK8sRefs gates on that. + require.Equal(t, "true", got.Annotations[instanceIsRunningAnnotation], + "running annotation must survive the failed-delete pass so dependents can still proceed through resolveK8sRefs") + require.Equal(t, "1", got.Annotations[processedGenerationAnnotation], + "processed-generation annotation must survive so hasLatestGeneration stays true while the referent is Terminating") + }) +} + +// runKafkaSchemaScenario builds a Reconciler seeded with schema + +// additionalObjects and reconciles schema. +func runKafkaSchemaScenario( + t *testing.T, + schema *v1alpha1.KafkaSchema, + avn avngen.Client, + additionalObjects ...client.Object, +) (*Reconciler[*v1alpha1.KafkaSchema], ctrlruntime.Result, error) { + t.Helper() + + objects := append([]client.Object{schema}, additionalObjects...) + r := setupKafkaSchemaReconciler(t, avn, objects...) + res, err := r.Reconcile(t.Context(), ctrlruntime.Request{ + NamespacedName: types.NamespacedName{ + Name: schema.Name, + Namespace: schema.Namespace, + }, + }) + return r, res, err +} + +func runningService() *service.ServiceGetOut { + return &service.ServiceGetOut{ + State: service.ServiceStateTypeRunning, + NodeStates: []service.NodeStateOut{ + {State: service.NodeStateTypeRunning}, + }, + } +} + +// setupKafkaSchemaReconciler builds a KafkaSchema Reconciler backed by a +// fake client seeded with the given objects. The fake client mirrors the +// runtime field indexer so Delete-time dependent lookups work. +func setupKafkaSchemaReconciler( + t *testing.T, + avn avngen.Client, + objects ...client.Object, +) *Reconciler[*v1alpha1.KafkaSchema] { + t.Helper() + + scheme := runtime.NewScheme() + require.NoError(t, clientgoscheme.AddToScheme(scheme)) + require.NoError(t, v1alpha1.AddToScheme(scheme)) + + r := newKafkaSchemaReconciler(Controller{ + Client: fake.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(&v1alpha1.KafkaSchema{}). + WithObjects(objects...). + WithIndex(&v1alpha1.KafkaSchema{}, kafkaSchemaRefIndex, kafkaSchemaRefIndexValues). + Build(), + Scheme: scheme, + Recorder: record.NewFakeRecorder(10), + DefaultToken: "test-token", + PollInterval: testPollInterval, + }).(*Reconciler[*v1alpha1.KafkaSchema]) + r.newAivenGeneratedClient = func(_, _, _ string) (avngen.Client, error) { + return avn, nil + } + return r +} + +func TestFindKafkaSchemasReferencing(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, clientgoscheme.AddToScheme(scheme)) + require.NoError(t, v1alpha1.AddToScheme(scheme)) + + target := &v1alpha1.KafkaSchema{ + ObjectMeta: metav1.ObjectMeta{Name: "target", Namespace: "default"}, + } + dependent := &v1alpha1.KafkaSchema{ + ObjectMeta: metav1.ObjectMeta{Name: "dependent", Namespace: "default"}, + Spec: v1alpha1.KafkaSchemaSpec{ + References: []v1alpha1.SchemaReference{ + {Name: "common.proto", KafkaSchemaRef: &v1alpha1.LocalKafkaSchemaRef{Name: "target"}}, + }, + }, + } + unrelated := &v1alpha1.KafkaSchema{ + ObjectMeta: metav1.ObjectMeta{Name: "unrelated", Namespace: "default"}, + Spec: v1alpha1.KafkaSchemaSpec{ + References: []v1alpha1.SchemaReference{ + {Name: "other.proto", Subject: "other-subject", Version: 1}, + }, + }, + } + // Same name in a different namespace must not be enqueued. + otherNs := &v1alpha1.KafkaSchema{ + ObjectMeta: metav1.ObjectMeta{Name: "cross-ns-dependent", Namespace: "elsewhere"}, + Spec: v1alpha1.KafkaSchemaSpec{ + References: []v1alpha1.SchemaReference{ + {Name: "common.proto", KafkaSchemaRef: &v1alpha1.LocalKafkaSchemaRef{Name: "target"}}, + }, + }, + } + + // Register the same field indexer the controller installs at runtime. + c := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(target, dependent, unrelated, otherNs). + WithIndex(&v1alpha1.KafkaSchema{}, kafkaSchemaRefIndex, kafkaSchemaRefIndexValues). + Build() + + got := findKafkaSchemasReferencing(c)(t.Context(), target) + require.Len(t, got, 1) + require.Equal(t, "dependent", got[0].Name) + require.Equal(t, "default", got[0].Namespace) +} + +// TestKafkaSchemaRefIndexValues pins the indexer key extraction. Each +// kafkaSchemaRef contributes one key; explicit subject/version entries do +// not. A schema with two kafkaSchemaRef entries appears under both keys so +// the cache can match either referent name. +func TestKafkaSchemaRefIndexValues(t *testing.T) { + t.Run("nil for non-KafkaSchema", func(t *testing.T) { + require.Nil(t, kafkaSchemaRefIndexValues(&corev1.Secret{})) + }) + + t.Run("empty for schema with no references", func(t *testing.T) { + s := &v1alpha1.KafkaSchema{} + require.Empty(t, kafkaSchemaRefIndexValues(s)) + }) + + t.Run("ignores explicit subject+version entries", func(t *testing.T) { + s := &v1alpha1.KafkaSchema{Spec: v1alpha1.KafkaSchemaSpec{ + References: []v1alpha1.SchemaReference{ + {Name: "explicit.proto", Subject: "explicit-subject", Version: 1}, + }, + }} + require.Empty(t, kafkaSchemaRefIndexValues(s)) + }) + + t.Run("returns referent names from kafkaSchemaRef entries", func(t *testing.T) { + s := &v1alpha1.KafkaSchema{Spec: v1alpha1.KafkaSchemaSpec{ + References: []v1alpha1.SchemaReference{ + {Name: "a.proto", KafkaSchemaRef: &v1alpha1.LocalKafkaSchemaRef{Name: "ref-a"}}, + {Name: "b.proto", KafkaSchemaRef: &v1alpha1.LocalKafkaSchemaRef{Name: "ref-b"}}, + {Name: "c.proto", Subject: "c-subject", Version: 2}, + }, + }} + require.Equal(t, []string{"ref-a", "ref-b"}, kafkaSchemaRefIndexValues(s)) + }) +} + +// The schema parser identifies refs by name (the import path / $ref key), ordering guarantee +// is not documented. Reordered ref lists must compare equal. +func TestReferencesEqual(t *testing.T) { + in := func(name, subject string, version int) kafkaschemaregistry.ReferenceIn { + return kafkaschemaregistry.ReferenceIn{Name: name, Subject: subject, Version: version} + } + out := func(name, subject string, version int) kafkaschemaregistry.ReferenceOut { + return kafkaschemaregistry.ReferenceOut{Name: name, Subject: subject, Version: version} + } + + t.Run("both empty", func(t *testing.T) { + require.True(t, referencesEqual(nil, nil)) + }) + + t.Run("different lengths", func(t *testing.T) { + require.False(t, referencesEqual( + []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1)}, + []kafkaschemaregistry.ReferenceOut{out("a.proto", "a", 1), out("b.proto", "b", 1)}, + )) + }) + + t.Run("identical, same order", func(t *testing.T) { + require.True(t, referencesEqual( + []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1), in("b.proto", "b", 2)}, + []kafkaschemaregistry.ReferenceOut{out("a.proto", "a", 1), out("b.proto", "b", 2)}, + )) + }) + + t.Run("identical, reordered", func(t *testing.T) { + require.True(t, referencesEqual( + []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1), in("b.proto", "b", 2)}, + []kafkaschemaregistry.ReferenceOut{out("b.proto", "b", 2), out("a.proto", "a", 1)}, + )) + }) + + t.Run("subject mismatch under same name", func(t *testing.T) { + require.False(t, referencesEqual( + []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1)}, + []kafkaschemaregistry.ReferenceOut{out("a.proto", "a-other", 1)}, + )) + }) + + t.Run("version mismatch under same name", func(t *testing.T) { + require.False(t, referencesEqual( + []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1)}, + []kafkaschemaregistry.ReferenceOut{out("a.proto", "a", 2)}, + )) + }) + + t.Run("name not present in got", func(t *testing.T) { + require.False(t, referencesEqual( + []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1)}, + []kafkaschemaregistry.ReferenceOut{out("z.proto", "a", 1)}, + )) + }) + + t.Run("desired empty, got non-empty", func(t *testing.T) { + require.False(t, referencesEqual( + nil, + []kafkaschemaregistry.ReferenceOut{out("a.proto", "a", 1)}, + )) + }) + + t.Run("desired non-empty, got empty", func(t *testing.T) { + require.False(t, referencesEqual( + []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1)}, + nil, + )) + }) + + // If the registry returns duplicate Names, the length check must reject the comparison. + t.Run("duplicate names in got are not equal", func(t *testing.T) { + require.False(t, referencesEqual( + []kafkaschemaregistry.ReferenceIn{in("a.proto", "a", 1)}, + []kafkaschemaregistry.ReferenceOut{out("a.proto", "a", 1), out("a.proto", "a", 1)}, + )) + }) +} + +// A missing referent must surface as errPreconditionNotMet. +func TestResolveReferences(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, clientgoscheme.AddToScheme(scheme)) + require.NoError(t, v1alpha1.AddToScheme(scheme)) + + dependent := func() *v1alpha1.KafkaSchema { + return &v1alpha1.KafkaSchema{ + ObjectMeta: metav1.ObjectMeta{Name: "dependent", Namespace: "default"}, + Spec: v1alpha1.KafkaSchemaSpec{ + References: []v1alpha1.SchemaReference{ + {Name: "common.proto", KafkaSchemaRef: &v1alpha1.LocalKafkaSchemaRef{Name: "referent"}}, + }, + }, + } + } + + t.Run("missing referent surfaces as errPreconditionNotMet", func(t *testing.T) { + c := &KafkaSchemaController{ + Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(dependent()).Build(), + } + + _, err := c.resolveReferences(t.Context(), dependent()) + require.Error(t, err) + require.ErrorIs(t, err, errPreconditionNotMet, + "missing referent must be a precondition miss so the reconciler soft-requeues") + }) + + t.Run("referent with zero Status.Version surfaces as errPreconditionNotMet", func(t *testing.T) { + referent := &v1alpha1.KafkaSchema{ + ObjectMeta: metav1.ObjectMeta{Name: "referent", Namespace: "default"}, + Spec: v1alpha1.KafkaSchemaSpec{SubjectName: "resolved-subject"}, + // Status.Version intentionally zero. + } + c := &KafkaSchemaController{ + Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(dependent(), referent).Build(), + } + + _, err := c.resolveReferences(t.Context(), dependent()) + require.Error(t, err) + require.ErrorIs(t, err, errPreconditionNotMet) + }) + + t.Run("resolved kafkaSchemaRef carries referent's spec subject and status version", func(t *testing.T) { + referent := &v1alpha1.KafkaSchema{ + ObjectMeta: metav1.ObjectMeta{Name: "referent", Namespace: "default"}, + Spec: v1alpha1.KafkaSchemaSpec{SubjectName: "resolved-subject"}, + Status: v1alpha1.KafkaSchemaStatus{Version: 7}, + } + c := &KafkaSchemaController{ + Client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(dependent(), referent).Build(), + } + + got, err := c.resolveReferences(t.Context(), dependent()) + require.NoError(t, err) + require.Len(t, got, 1) + require.Equal(t, "common.proto", got[0].Name) + require.Equal(t, "resolved-subject", got[0].Subject) + require.Equal(t, 7, got[0].Version) + }) +} + +func TestKafkaSchemaVersionChangedPredicate(t *testing.T) { + pred := kafkaSchemaVersionChangedPredicate() + + base := &v1alpha1.KafkaSchema{ + ObjectMeta: metav1.ObjectMeta{Name: "s", Namespace: "default", Generation: 1}, + Status: v1alpha1.KafkaSchemaStatus{ + Version: 1, + Conditions: []metav1.Condition{ + { + Type: "Ready", + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.NewTime(time.Unix(1_700_000_000, 0)), + Reason: "Stable", + Message: "running", + }, + }, + }, + } + + // Unchanged -> no enqueue. + require.False(t, pred.Update(event.UpdateEvent{ObjectOld: base.DeepCopy(), ObjectNew: base.DeepCopy()})) + + // Conditions-only change (timestamp / message) must not enqueue. + condsTouched := base.DeepCopy() + condsTouched.Status.Conditions[0].LastTransitionTime = metav1.NewTime(time.Unix(1_800_000_000, 0)) + condsTouched.Status.Conditions[0].Message = "still running" + require.False(t, pred.Update(event.UpdateEvent{ObjectOld: base.DeepCopy(), ObjectNew: condsTouched})) + + // Status.Version bump -> enqueue. + newer := base.DeepCopy() + newer.Status.Version = 2 + require.True(t, pred.Update(event.UpdateEvent{ObjectOld: base.DeepCopy(), ObjectNew: newer})) + + // Generation bump -> enqueue. + regen := base.DeepCopy() + regen.Generation = 2 + require.True(t, pred.Update(event.UpdateEvent{ObjectOld: base.DeepCopy(), ObjectNew: regen})) + + // Deletes must not enqueue. + require.False(t, pred.Delete(event.DeleteEvent{Object: base.DeepCopy()})) } diff --git a/controllers/reconciler.go b/controllers/reconciler.go index f72f9dff..21b09a6e 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -19,6 +19,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -67,6 +68,20 @@ type Reconciler[T v1alpha1.AivenManagedObject] struct { newObj func() T newSecret func(o objWithSecret, stringData map[string]string, addPrefix bool) *corev1.Secret options *controller.Options + watches []func(*builder.Builder) *builder.Builder + indexes []func(context.Context, ctrl.Manager) error +} + +// WithWatches registers extra watches that compose with the controller's builder. +func (r *Reconciler[T]) WithWatches(fns ...func(*builder.Builder) *builder.Builder) *Reconciler[T] { + r.watches = append(r.watches, fns...) + return r +} + +// WithIndexes registers field indexers for the controller-runtime cache. +func (r *Reconciler[T]) WithIndexes(fns ...func(context.Context, ctrl.Manager) error) *Reconciler[T] { + r.indexes = append(r.indexes, fns...) + return r } // requeueTimeout sets timeout to requeue controller @@ -200,6 +215,8 @@ func (r *Reconciler[T]) resolveK8sRefs(ctx context.Context, obj T) (requeue bool return false, fmt.Errorf("getting referenced resource %s %s: %w", ref.GroupVersionKind, ref.NamespacedName, err) } + // Block the dependent until the referent is Ready. + // Note: if the referent gets stuck, it keeps the dependent stuck too. if !IsReadyToUse(dep) { return true, nil } @@ -287,6 +304,9 @@ func (r *Reconciler[T]) createResource(ctx context.Context, controller AivenCont res, err := controller.Create(ctx, obj) if err != nil { + if requeue, ok := r.handlePreconditionNotMet(ctx, obj, err); ok { + return requeue, nil + } r.Recorder.Event(obj, corev1.EventTypeWarning, eventUnableToCreateOrUpdateAtAiven, err.Error()) meta.SetStatusCondition(obj.Conditions(), getErrorCondition(errConditionCreateOrUpdate, err)) return ctrl.Result{}, fmt.Errorf("unable to create or update instance at aiven: %w", err) @@ -309,6 +329,10 @@ func (r *Reconciler[T]) updateResource(ctx context.Context, controller AivenCont return ctrl.Result{RequeueAfter: requeueTimeout}, nil } + if requeue, ok := r.handlePreconditionNotMet(ctx, obj, err); ok { + return requeue, nil + } + r.Recorder.Event(obj, corev1.EventTypeWarning, eventUnableToWaitForInstanceToBeRunning, err.Error()) return ctrl.Result{}, fmt.Errorf("unable to wait until instance is running: %w", err) } @@ -479,8 +503,28 @@ func (r *Reconciler[T]) handleDeleteError(ctx context.Context, orig v1alpha1.Aiv } } +// handlePreconditionNotMet matches errPreconditionNotMet and returns +// (soft-requeue result, true); for any other error it returns (zero, false). +// Create/Update may need dependency state that Observe never reads or resolves. +func (r *Reconciler[T]) handlePreconditionNotMet(ctx context.Context, obj T, err error) (ctrl.Result, bool) { + if !errors.Is(err, errPreconditionNotMet) { + return ctrl.Result{}, false + } + const msg = "preconditions are not met, requeue" + r.Recorder.Event(obj, corev1.EventTypeNormal, eventPreconditionsNotMet, msg) + logr.FromContextOrDiscard(ctx).V(1).Info(msg, "error", err) + return ctrl.Result{RequeueAfter: requeueTimeout}, true +} + // SetupWithManager sets up the controller with the Manager. func (r *Reconciler[T]) SetupWithManager(mgr ctrl.Manager) error { + // Indexers must be registered before the cache starts, so they run first. + for _, fn := range r.indexes { + if err := fn(context.Background(), mgr); err != nil { + return fmt.Errorf("registering field indexer: %w", err) + } + } + obj := r.newObj() b := ctrl.NewControllerManagedBy(mgr).For(obj) if _, ok := any(obj).(objWithSecret); ok { @@ -491,6 +535,10 @@ func (r *Reconciler[T]) SetupWithManager(mgr ctrl.Manager) error { b = b.WithOptions(*r.options) } + for _, fn := range r.watches { + b = fn(b) + } + return b.Complete(r) } diff --git a/docs/docs/resources/examples/kafkaschema.with_explicit_refs.yaml b/docs/docs/resources/examples/kafkaschema.with_explicit_refs.yaml new file mode 100644 index 00000000..c420df9f --- /dev/null +++ b/docs/docs/resources/examples/kafkaschema.with_explicit_refs.yaml @@ -0,0 +1,31 @@ +apiVersion: aiven.io/v1alpha1 +kind: KafkaSchema +metadata: + name: order-event-pinned +spec: + authSecretRef: + name: aiven-token + key: token + + project: my-aiven-project + serviceName: my-kafka + subjectName: com.example.OrderEvent + schemaType: PROTOBUF + schema: | + syntax = "proto3"; + import "common/money.proto"; + message OrderEvent { + common.Money total = 1; + } + references: + # Pin the referenced subject and version explicitly. Use this when the + # referenced schema is managed outside this operator, or when you want + # the dependent to keep pointing at a specific version regardless of + # what the referent advances to. + # + # If the referent IS another KafkaSchema in the same namespace, prefer + # the kafkaSchemaRef variant — see the with_refs example. That form + # auto-propagates new versions instead of requiring manual updates here. + - name: common/money.proto + subject: common.Money + version: 1 diff --git a/docs/docs/resources/examples/kafkaschema.with_refs.yaml b/docs/docs/resources/examples/kafkaschema.with_refs.yaml new file mode 100644 index 00000000..ee9cac30 --- /dev/null +++ b/docs/docs/resources/examples/kafkaschema.with_refs.yaml @@ -0,0 +1,46 @@ +apiVersion: aiven.io/v1alpha1 +kind: KafkaSchema +metadata: + name: money-type +spec: + authSecretRef: + name: aiven-token + key: token + + project: my-aiven-project + serviceName: my-kafka + subjectName: common.Money + schemaType: PROTOBUF + schema: | + syntax = "proto3"; + package common; + message Money { + string currency = 1; + int64 amount_cents = 2; + } +--- +apiVersion: aiven.io/v1alpha1 +kind: KafkaSchema +metadata: + name: order-event +spec: + authSecretRef: + name: aiven-token + key: token + + project: my-aiven-project + serviceName: my-kafka + subjectName: com.example.OrderEvent + schemaType: PROTOBUF + schema: | + syntax = "proto3"; + import "common/money.proto"; + message OrderEvent { + common.Money total = 1; + } + references: + # Resolve subject and version from another KafkaSchema in the same namespace. + # No need to hard-code the version - dependents pick up new versions automatically. + - name: common/money.proto + kafkaSchemaRef: + name: money-type diff --git a/docs/docs/resources/kafkaschema.md b/docs/docs/resources/kafkaschema.md index 5da4d4fb..157c51a6 100644 --- a/docs/docs/resources/kafkaschema.md +++ b/docs/docs/resources/kafkaschema.md @@ -23,37 +23,129 @@ This resource uses the following API operations, and for each operation, _any_ o | [ServiceSchemaRegistrySubjectVersionPost](https://api.aiven.io/doc/#operation/ServiceSchemaRegistrySubjectVersionPost) | `service:data:write` | | [ServiceSchemaRegistrySubjectVersionsGet](https://api.aiven.io/doc/#operation/ServiceSchemaRegistrySubjectVersionsGet) | `service:data:write` | -## Usage example - -```yaml linenums="1" -apiVersion: aiven.io/v1alpha1 -kind: KafkaSchema -metadata: - name: my-schema -spec: - authSecretRef: - name: aiven-token - key: token - - project: my-aiven-project - serviceName: my-kafka - subjectName: mny-subject - compatibilityLevel: BACKWARD - schema: | - { - "doc": "example_doc", - "fields": [{ - "default": 5, - "doc": "field_doc", - "name": "field_name", - "namespace": "field_namespace", - "type": "int" - }], - "name": "example_name", - "namespace": "example_namespace", - "type": "record" - } -``` +## Usage examples + + +=== "with_explicit_refs" + + ```yaml linenums="1" + apiVersion: aiven.io/v1alpha1 + kind: KafkaSchema + metadata: + name: order-event-pinned + spec: + authSecretRef: + name: aiven-token + key: token + + project: my-aiven-project + serviceName: my-kafka + subjectName: com.example.OrderEvent + schemaType: PROTOBUF + schema: | + syntax = "proto3"; + import "common/money.proto"; + message OrderEvent { + common.Money total = 1; + } + references: + # Pin the referenced subject and version explicitly. Use this when the + # referenced schema is managed outside this operator, or when you want + # the dependent to keep pointing at a specific version regardless of + # what the referent advances to. + # + # If the referent IS another KafkaSchema in the same namespace, prefer + # the kafkaSchemaRef variant — see the with_refs example. That form + # auto-propagates new versions instead of requiring manual updates here. + - name: common/money.proto + subject: common.Money + version: 1 + ``` + + +=== "with_refs" + + ```yaml linenums="1" + apiVersion: aiven.io/v1alpha1 + kind: KafkaSchema + metadata: + name: money-type + spec: + authSecretRef: + name: aiven-token + key: token + + project: my-aiven-project + serviceName: my-kafka + subjectName: common.Money + schemaType: PROTOBUF + schema: | + syntax = "proto3"; + package common; + message Money { + string currency = 1; + int64 amount_cents = 2; + } + --- + apiVersion: aiven.io/v1alpha1 + kind: KafkaSchema + metadata: + name: order-event + spec: + authSecretRef: + name: aiven-token + key: token + + project: my-aiven-project + serviceName: my-kafka + subjectName: com.example.OrderEvent + schemaType: PROTOBUF + schema: | + syntax = "proto3"; + import "common/money.proto"; + message OrderEvent { + common.Money total = 1; + } + references: + # Resolve subject and version from another KafkaSchema in the same namespace. + # No need to hard-code the version - dependents pick up new versions automatically. + - name: common/money.proto + kafkaSchemaRef: + name: money-type + ``` + + +=== "example" + + ```yaml linenums="1" + apiVersion: aiven.io/v1alpha1 + kind: KafkaSchema + metadata: + name: my-schema + spec: + authSecretRef: + name: aiven-token + key: token + + project: my-aiven-project + serviceName: my-kafka + subjectName: mny-subject + compatibilityLevel: BACKWARD + schema: | + { + "doc": "example_doc", + "fields": [{ + "default": 5, + "doc": "field_doc", + "name": "field_name", + "namespace": "field_namespace", + "type": "int" + }], + "name": "example_name", + "namespace": "example_namespace", + "type": "record" + } + ``` Apply the resource with: @@ -64,13 +156,13 @@ kubectl apply -f example.yaml Verify the newly created `KafkaSchema`: ```shell -kubectl get kafkaschemas my-schema +kubectl get kafkaschemas order-event-pinned ``` The output is similar to the following: ```shell -Name Service Name Project Subject Compatibility Level Version -my-schema my-kafka my-aiven-project mny-subject BACKWARD +Name Service Name Project Subject Version +order-event-pinned my-kafka my-aiven-project com.example.OrderEvent ``` --- @@ -79,6 +171,13 @@ my-schema my-kafka my-aiven-project mny-subject BACKWARD KafkaSchema is the Schema for the kafkaschemas API. +Self-references (A -> A) are blocked at admission; transitive cycles +(A -> B -> A) are not detected at admission time. + +Deletion: the operator performs a soft delete followed by a hard delete on +the subject. The subject disappears from the registry's listing, re-applying a KafkaSchema with the same subjectName +after deletion starts a brand-new subject at version 1. + **Required** - [`apiVersion`](#apiVersion-property){: name='apiVersion-property'} (string). Value `aiven.io/v1alpha1`. @@ -95,7 +194,7 @@ KafkaSchemaSpec defines the desired state of KafkaSchema. **Required** - [`project`](#spec.project-property){: name='spec.project-property'} (string, Immutable, Pattern: `^[a-zA-Z0-9_-]+$`, MaxLength: 63). Identifies the project this resource belongs to. -- [`schema`](#spec.schema-property){: name='spec.schema-property'} (string). Kafka Schema configuration should be a valid Avro Schema JSON format. +- [`schema`](#spec.schema-property){: name='spec.schema-property'} (string). Kafka Schema definition. Format depends on schemaType (AVRO/JSON/PROTOBUF). - [`serviceName`](#spec.serviceName-property){: name='spec.serviceName-property'} (string, Immutable, Pattern: `^[a-z][-a-z0-9]+$`, MaxLength: 63). Specifies the name of the service that this resource belongs to. - [`subjectName`](#spec.subjectName-property){: name='spec.subjectName-property'} (string, Immutable). Kafka Schema Subject name. @@ -103,7 +202,8 @@ KafkaSchemaSpec defines the desired state of KafkaSchema. - [`authSecretRef`](#spec.authSecretRef-property){: name='spec.authSecretRef-property'} (object). Authentication reference to Aiven token in a secret. See below for [nested schema](#spec.authSecretRef). - [`compatibilityLevel`](#spec.compatibilityLevel-property){: name='spec.compatibilityLevel-property'} (string, Enum: `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE`, `NONE`). Kafka Schemas compatibility level. -- [`references`](#spec.references-property){: name='spec.references-property'} (array of objects). Schema references for Protobuf or JSON schemas that import other schemas. See below for [nested schema](#spec.references). +- [`references`](#spec.references-property){: name='spec.references-property'} (array of objects, MaxItems: 100). Schema references for Protobuf or JSON schemas that import other schemas. + References must form a directed acyclic graph (DAG); cycles are not allowed. See below for [nested schema](#spec.references). - [`schemaType`](#spec.schemaType-property){: name='spec.schemaType-property'} (string, Enum: `AVRO`, `JSON`, `PROTOBUF`, Immutable). Schema type. ## authSecretRef {: #spec.authSecretRef } @@ -122,10 +222,31 @@ Authentication reference to Aiven token in a secret. _Appears on [`spec`](#spec)._ SchemaReference is a reference to another schema in the registry. +Exactly one of {subject+version} or kafkaSchemaRef must be set. + +**Required** + +- [`name`](#spec.references.name-property){: name='spec.references.name-property'} (string, MinLength: 1, MaxLength: 512). Name used to reference the schema (e.g., the import path in Protobuf). + +**Optional** + +- [`kafkaSchemaRef`](#spec.references.kafkaSchemaRef-property){: name='spec.references.kafkaSchemaRef-property'} (object). Reference to another KafkaSchema resource in the same namespace. + Mutually exclusive with subject/version. + + Cleanup order matters: delete the dependent before the referent. See below for [nested schema](#spec.references.kafkaSchemaRef). +- [`subject`](#spec.references.subject-property){: name='spec.references.subject-property'} (string, MinLength: 1, MaxLength: 512). Subject name of the referenced schema in the registry. Mutually exclusive with kafkaSchemaRef. +- [`version`](#spec.references.version-property){: name='spec.references.version-property'} (integer, Minimum: 1). Version of the referenced schema. Mutually exclusive with kafkaSchemaRef. + +### kafkaSchemaRef {: #spec.references.kafkaSchemaRef } + +_Appears on [`spec.references`](#spec.references)._ + +Reference to another KafkaSchema resource in the same namespace. +Mutually exclusive with subject/version. + +Cleanup order matters: delete the dependent before the referent. **Required** -- [`name`](#spec.references.name-property){: name='spec.references.name-property'} (string, MinLength: 1). Name used to reference the schema (e.g., the import path in Protobuf). -- [`subject`](#spec.references.subject-property){: name='spec.references.subject-property'} (string, MinLength: 1). Subject name of the referenced schema in the registry. -- [`version`](#spec.references.version-property){: name='spec.references.version-property'} (integer, Minimum: 1). Version of the referenced schema. +- [`name`](#spec.references.kafkaSchemaRef.name-property){: name='spec.references.kafkaSchemaRef.name-property'} (string, MinLength: 1, MaxLength: 253). Name of the KafkaSchema resource in the same namespace. diff --git a/go.mod b/go.mod index 2ec3b7eb..8bc06bf1 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.26 require ( github.com/ClickHouse/clickhouse-go/v2 v2.46.0 github.com/aiven/go-api-schemas v1.190.0 - github.com/aiven/go-client-codegen v0.182.0 + github.com/aiven/go-client-codegen v0.184.0 github.com/avast/retry-go v3.0.0+incompatible github.com/dave/jennifer v1.7.1 github.com/docker/go-units v0.5.0 @@ -60,9 +60,7 @@ require ( github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-retryablehttp v0.7.8 // indirect github.com/hashicorp/go-version v1.8.0 // indirect - github.com/huandu/xstrings v1.5.0 // indirect github.com/imdario/mergo v0.3.12 // indirect - github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.18.3 // indirect @@ -87,7 +85,6 @@ require ( github.com/rs/zerolog v1.35.1 // indirect github.com/segmentio/asm v1.2.1 // indirect github.com/shopspring/decimal v1.4.0 // indirect - github.com/spf13/cobra v1.10.2 // indirect github.com/spf13/pflag v1.0.10 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect diff --git a/go.sum b/go.sum index 223694b9..b5d195c5 100644 --- a/go.sum +++ b/go.sum @@ -2,12 +2,10 @@ github.com/ClickHouse/ch-go v0.71.0 h1:bUdZ/EZj/LcVHsMqaRUP2holqygrPWQKeMjc6nZoy github.com/ClickHouse/ch-go v0.71.0/go.mod h1:NwbNc+7jaqfY58dmdDUbG4Jl22vThgx1cYjBw0vtgXw= github.com/ClickHouse/clickhouse-go/v2 v2.46.0 h1:s3eRy+hYmu5uzotB6ZhDofgHu8kDgGN/fpmjxRkqSpk= github.com/ClickHouse/clickhouse-go/v2 v2.46.0/go.mod h1:giJfUVlMkcfUEPVfRpt51zZaGEx9i17gCos8gBl392c= -github.com/aiven/go-api-schemas v1.189.0 h1:Z5W2uk3F66zc5dDPjEvUWo9ik9SInCdJuPqupiZQw+I= -github.com/aiven/go-api-schemas v1.189.0/go.mod h1:WTWdlammndlBvINEUP2F8JDmOef6rEWkpNVhJS33GcQ= github.com/aiven/go-api-schemas v1.190.0 h1:Ym0uNhJSmea8yL5BWFf4FDJucij5fUG2eSN0dQiN09U= github.com/aiven/go-api-schemas v1.190.0/go.mod h1:WTWdlammndlBvINEUP2F8JDmOef6rEWkpNVhJS33GcQ= -github.com/aiven/go-client-codegen v0.182.0 h1:M+oYaSPOx8FhIDxJy5b0Fh35XIwHcLgEaGR3ZS3C6gc= -github.com/aiven/go-client-codegen v0.182.0/go.mod h1:wX+vwJ1nogBHPfGxwq5953zjfXHHiLMQCySw6IUlo6Q= +github.com/aiven/go-client-codegen v0.184.0 h1:0yRDGqRcYyCcbB2j4vltV8BpYvp04WJ883Qz2/yIdNM= +github.com/aiven/go-client-codegen v0.184.0/go.mod h1:wX+vwJ1nogBHPfGxwq5953zjfXHHiLMQCySw6IUlo6Q= github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= @@ -17,7 +15,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/dave/jennifer v1.7.1 h1:B4jJJDHelWcDhlRQxWeo0Npa/pYKBLrirAQoTN45txo= github.com/dave/jennifer v1.7.1/go.mod h1:nXbxhEmQfOZhWml3D1cDK5M1FLnMSozpbFN/m3RmGZc= @@ -94,12 +91,8 @@ github.com/hashicorp/go-retryablehttp v0.7.8 h1:ylXZWnqa7Lhqpk0L1P1LzDtGcCR0rPVU github.com/hashicorp/go-retryablehttp v0.7.8/go.mod h1:rjiScheydd+CxvumBsIrFKlx3iS0jrZ7LvzFGFmuKbw= github.com/hashicorp/go-version v1.8.0 h1:KAkNb1HAiZd1ukkxDFGmokVZe1Xy9HG6NUp+bPle2i4= github.com/hashicorp/go-version v1.8.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= -github.com/huandu/xstrings v1.5.0 h1:2ag3IFq9ZDANvthTwTiqSSZLjDc+BedvHPAp5tJy2TI= -github.com/huandu/xstrings v1.5.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= -github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= -github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= @@ -170,16 +163,12 @@ github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0t github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rs/zerolog v1.35.1 h1:m7xQeoiLIiV0BCEY4Hs+j2NG4Gp2o2KPKmhnnLiazKI= github.com/rs/zerolog v1.35.1/go.mod h1:EjML9kdfa/RMA7h/6z6pYmq1ykOuA8/mjWaEvGI+jcw= -github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/samber/lo v1.53.0 h1:t975lj2py4kJPQ6haz1QMgtId2gtmfktACxIXArw3HM= github.com/samber/lo v1.53.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= github.com/segmentio/asm v1.2.1 h1:DTNbBqs57ioxAD4PrArqftgypG4/qNpXoJx8TVXxPR0= github.com/segmentio/asm v1.2.1/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= -github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= -github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= -github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stoewer/go-strcase v1.3.1 h1:iS0MdW+kVTxgMoE1LAZyMiYJFKlOzLooE4MxjirtkAs= diff --git a/tests/kafkaschema_test.go b/tests/kafkaschema_test.go index 4b6fefa9..0709f548 100644 --- a/tests/kafkaschema_test.go +++ b/tests/kafkaschema_test.go @@ -12,6 +12,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aiven/aiven-operator/api/v1alpha1" "github.com/aiven/aiven-operator/controllers" @@ -165,7 +166,6 @@ func TestKafkaSchemaReferences(t *testing.T) { t.Parallel() defer recoverPanic(t) - // GIVEN ctx, cancel := testCtx() defer cancel() @@ -174,55 +174,377 @@ func TestKafkaSchemaReferences(t *testing.T) { defer releaseKafka() kafkaName := kafka.GetName() - refSchemaName := randName("kafka-schema-ref") - refSubjectName := randName("kafka-schema-ref") - mainSchemaName := randName("kafka-schema-main") - mainSubjectName := randName("kafka-schema-main") - s := NewSession(ctx, k8sClient) - // Create the referenced (base) schema first - refYml := getKafkaSchemaRefBaseYaml(cfg.Project, kafkaName, refSchemaName, refSubjectName) - require.NoError(t, s.Apply(refYml)) - - refSchema := new(v1alpha1.KafkaSchema) - require.NoError(t, s.GetRunning(refSchema, refSchemaName)) - assert.Equal(t, refSubjectName, refSchema.Spec.SubjectName) - assert.Equal(t, kafkaschemaregistry.SchemaTypeProtobuf, refSchema.Spec.SchemaType) - assert.Equal(t, 1, refSchema.Status.Version) - - // Create a schema that references the base schema - mainYml := getKafkaSchemaRefYaml(cfg.Project, kafkaName, mainSchemaName, mainSubjectName, refSubjectName, refSchema.Status.Version) - require.NoError(t, s.Apply(mainYml)) - - mainSchema := new(v1alpha1.KafkaSchema) - require.NoError(t, s.GetRunning(mainSchema, mainSchemaName)) - assert.Equal(t, mainSubjectName, mainSchema.Spec.SubjectName) - assert.Equal(t, kafkaschemaregistry.SchemaTypeProtobuf, mainSchema.Spec.SchemaType) - - // Verify the references are set in the spec - require.Len(t, mainSchema.Spec.References, 1) - assert.Equal(t, "customer.proto", mainSchema.Spec.References[0].Name) - assert.Equal(t, refSubjectName, mainSchema.Spec.References[0].Subject) - assert.Equal(t, refSchema.Status.Version, mainSchema.Spec.References[0].Version) - - // Verify the schema was created with references - avnSchema, err := avnGen.ServiceSchemaRegistrySubjectVersionGet(ctx, cfg.Project, kafkaName, mainSubjectName, mainSchema.Status.Version) - require.NoError(t, err) - assert.Equal(t, mainSchema.Status.ID, avnSchema.Id) - require.Len(t, avnSchema.References, 1) - assert.Equal(t, "customer.proto", avnSchema.References[0].Name) - assert.Equal(t, refSubjectName, avnSchema.References[0].Subject) - assert.Equal(t, refSchema.Status.Version, avnSchema.References[0].Version) - - // Cleanup: delete the referencing schema explicitly. - // The referenced (base) schema is cleaned up by the shared Kafka service teardown, - // because schema registry soft-delete does not clear the reference — - // a referenced schema cannot be soft-deleted even after the referencing one is removed. - assert.NoError(t, s.Delete(mainSchema, func() error { - _, err := avnGen.ServiceSchemaRegistrySubjectVersionGet(ctx, cfg.Project, kafkaName, mainSubjectName, mainSchema.Status.Version) - return err - })) + // subjectExists returns nil when the named subject still appears in the + // registry's active listing, NotFound otherwise. The active listing excludes soft-deleted subjects. + subjectExists := func(subjectName string) func() error { + return func() error { + list, err := avnGen.ServiceSchemaRegistrySubjects(ctx, cfg.Project, kafkaName) + if err != nil { + return fmt.Errorf("cannot list Kafka Subjects: %w", err) + } + for _, subject := range list { + if subject == subjectName { + return nil + } + } + return controllers.NewNotFound(fmt.Sprintf("Kafka Subject %q not found", subjectName)) + } + } + + t.Run("explicit subject and version", func(t *testing.T) { + refSchemaName := randName("kafka-schema-ref") + refSubjectName := randName("kafka-schema-ref") + mainSchemaName := randName("kafka-schema-main") + mainSubjectName := randName("kafka-schema-main") + + // Create the referenced (base) schema first + refYml := getKafkaSchemaRefBaseYaml(cfg.Project, kafkaName, refSchemaName, refSubjectName) + require.NoError(t, s.Apply(refYml)) + + refSchema := new(v1alpha1.KafkaSchema) + require.NoError(t, s.GetRunning(refSchema, refSchemaName)) + assert.Equal(t, refSubjectName, refSchema.Spec.SubjectName) + assert.Equal(t, kafkaschemaregistry.SchemaTypeProtobuf, refSchema.Spec.SchemaType) + assert.Equal(t, 1, refSchema.Status.Version) + + // Create a schema that references the base schema + mainYml := getKafkaSchemaRefYaml(cfg.Project, kafkaName, mainSchemaName, mainSubjectName, refSubjectName, refSchema.Status.Version) + require.NoError(t, s.Apply(mainYml)) + + mainSchema := new(v1alpha1.KafkaSchema) + require.NoError(t, s.GetRunning(mainSchema, mainSchemaName)) + assert.Equal(t, mainSubjectName, mainSchema.Spec.SubjectName) + assert.Equal(t, kafkaschemaregistry.SchemaTypeProtobuf, mainSchema.Spec.SchemaType) + + // Verify the references are set in the spec + require.Len(t, mainSchema.Spec.References, 1) + assert.Equal(t, "customer.proto", mainSchema.Spec.References[0].Name) + assert.Equal(t, refSubjectName, mainSchema.Spec.References[0].Subject) + assert.Equal(t, refSchema.Status.Version, mainSchema.Spec.References[0].Version) + + // Verify the schema was created with references + avnSchema, err := avnGen.ServiceSchemaRegistrySubjectVersionGet(ctx, cfg.Project, kafkaName, mainSubjectName, mainSchema.Status.Version) + require.NoError(t, err) + assert.Equal(t, mainSchema.Status.ID, avnSchema.Id) + require.Len(t, avnSchema.References, 1) + assert.Equal(t, "customer.proto", avnSchema.References[0].Name) + assert.Equal(t, refSubjectName, avnSchema.References[0].Subject) + assert.Equal(t, refSchema.Status.Version, avnSchema.References[0].Version) + + // Delete the dependent first. + assert.NoError(t, s.Delete(mainSchema, subjectExists(mainSubjectName))) + assert.NoError(t, s.Delete(refSchema, subjectExists(refSubjectName))) + }) + + // 1. referent moves -> dependent must propagate to the new version + // 2. dependent moves (own spec) -> reference still pins the same referent version + // 3. delete dependent then referent -> both succeed + t.Run("kafkaSchemaRef tracks referent on both sides of edits and cleans up", func(t *testing.T) { + refSchemaName := randName("kafka-schema-ref-track") + refSubjectName := randName("kafka-schema-ref-track") + mainSchemaName := randName("kafka-schema-main-track") + mainSubjectName := randName("kafka-schema-main-track") + + // Apply referent, then dependent. + require.NoError(t, s.Apply(getKafkaSchemaRefBaseYaml(cfg.Project, kafkaName, refSchemaName, refSubjectName))) + + refSchema := new(v1alpha1.KafkaSchema) + require.NoError(t, s.GetRunning(refSchema, refSchemaName)) + initialRefVersion := refSchema.Status.Version + require.Greater(t, initialRefVersion, 0, "referent must have a registry version") + + require.NoError(t, s.Apply(getKafkaSchemaKafkaSchemaRefYaml(cfg.Project, kafkaName, mainSchemaName, mainSubjectName, refSchemaName))) + + mainSchema := new(v1alpha1.KafkaSchema) + require.NoError(t, s.GetRunning(mainSchema, mainSchemaName)) + initialMainVersion := mainSchema.Status.Version + + avnSchema, err := avnGen.ServiceSchemaRegistrySubjectVersionGet(ctx, cfg.Project, kafkaName, mainSubjectName, initialMainVersion) + require.NoError(t, err) + require.Len(t, avnSchema.References, 1) + assert.Equal(t, "customer.proto", avnSchema.References[0].Name) + assert.Equal(t, refSubjectName, avnSchema.References[0].Subject) + assert.Equal(t, initialRefVersion, avnSchema.References[0].Version, "dependent must reference the referent's initial version") + + // Mutate the referent. + updatedRef := refSchema.DeepCopy() + updatedRef.Spec.Schema = strings.ReplaceAll(refSchema.Spec.Schema, "string email", "string email_address") + require.NoError(t, k8sClient.Update(ctx, updatedRef)) + require.NoError(t, s.GetRunning(updatedRef, refSchemaName)) + require.Greater(t, updatedRef.Status.Version, initialRefVersion, "referent did not advance to a new version") + newRefVersion := updatedRef.Status.Version + + require.NoError(t, retryForever(ctx, "dependent advances to new referent version", func() (bool, error) { + latestMain := new(v1alpha1.KafkaSchema) + if err := k8sClient.Get(ctx, client.ObjectKey{Name: mainSchemaName, Namespace: defaultNamespace}, latestMain); err != nil { + return true, err + } + got, err := avnGen.ServiceSchemaRegistrySubjectVersionGet(ctx, cfg.Project, kafkaName, mainSubjectName, latestMain.Status.Version) + if err != nil { + return true, err + } + if len(got.References) != 1 { + return true, fmt.Errorf("expected 1 reference, got %d", len(got.References)) + } + if got.References[0].Version != newRefVersion { + return true, nil + } + return false, nil + })) + + // Edit the dependent's own spec.schema. The new dependent version must still reference the + // referent at the same version. + latestMain := new(v1alpha1.KafkaSchema) + require.NoError(t, k8sClient.Get(ctx, client.ObjectKey{Name: mainSchemaName, Namespace: defaultNamespace}, latestMain)) + mainBeforeSelfEdit := latestMain.Status.Version + + updatedMain := latestMain.DeepCopy() + updatedMain.Spec.Schema = strings.ReplaceAll(latestMain.Spec.Schema, "string order_id = 1;", "string order_id = 1;\n string note = 3;") + require.NoError(t, k8sClient.Update(ctx, updatedMain)) + require.NoError(t, s.GetRunning(updatedMain, mainSchemaName)) + require.Greater(t, updatedMain.Status.Version, mainBeforeSelfEdit, "dependent did not advance after self-edit") + + require.NoError(t, retryForever(ctx, "dependent self-edit preserves referent version", func() (bool, error) { + latest := new(v1alpha1.KafkaSchema) + if err := k8sClient.Get(ctx, client.ObjectKey{Name: mainSchemaName, Namespace: defaultNamespace}, latest); err != nil { + return true, err + } + got, err := avnGen.ServiceSchemaRegistrySubjectVersionGet(ctx, cfg.Project, kafkaName, mainSubjectName, latest.Status.Version) + if err != nil { + return true, err + } + if len(got.References) != 1 { + return true, fmt.Errorf("expected 1 reference after dependent self-edit, got %d", len(got.References)) + } + if got.References[0].Subject != refSubjectName { + return false, fmt.Errorf("dependent's reference subject changed: %s", got.References[0].Subject) + } + if got.References[0].Version != newRefVersion { + return true, nil + } + return false, nil + })) + + // The referent itself never moved. + stillRef := new(v1alpha1.KafkaSchema) + require.NoError(t, k8sClient.Get(ctx, client.ObjectKey{Name: refSchemaName, Namespace: defaultNamespace}, stillRef)) + assert.Equal(t, newRefVersion, stillRef.Status.Version, "referent must not have advanced during dependent self-edit") + + require.NoError(t, s.Delete(updatedMain, subjectExists(mainSubjectName))) + require.NoError(t, s.Delete(refSchema, subjectExists(refSubjectName))) + }) + + t.Run("kafkaSchemaRef removed from spec drops references in the registry", func(t *testing.T) { + refSchemaName := randName("kafka-schema-ref-drop") + refSubjectName := randName("kafka-schema-ref-drop") + mainSchemaName := randName("kafka-schema-main-drop") + mainSubjectName := randName("kafka-schema-main-drop") + + // 1) Apply referent first. + require.NoError(t, s.Apply(getKafkaSchemaRefBaseYaml(cfg.Project, kafkaName, refSchemaName, refSubjectName))) + refSchema := new(v1alpha1.KafkaSchema) + require.NoError(t, s.GetRunning(refSchema, refSchemaName)) + require.Greater(t, refSchema.Status.Version, 0, "referent must have a registry version") + + // 2) Apply dependent that imports the referent via kafkaSchemaRef. + require.NoError(t, s.Apply(getKafkaSchemaKafkaSchemaRefYaml(cfg.Project, kafkaName, mainSchemaName, mainSubjectName, refSchemaName))) + mainSchema := new(v1alpha1.KafkaSchema) + require.NoError(t, s.GetRunning(mainSchema, mainSchemaName)) + + avnSchema, err := avnGen.ServiceSchemaRegistrySubjectVersionGet(ctx, cfg.Project, kafkaName, mainSubjectName, mainSchema.Status.Version) + require.NoError(t, err) + require.Len(t, avnSchema.References, 1, "dependent must start with one reference attached") + + // 3) Edit the dependent. + updatedMain := mainSchema.DeepCopy() + updatedMain.Spec.References = nil + updatedMain.Spec.Schema = `syntax = "proto3"; +message Order { + string order_id = 1; +} +` + require.NoError(t, k8sClient.Update(ctx, updatedMain)) + require.NoError(t, s.GetRunning(updatedMain, mainSchemaName)) + + // 4) The registry must report zero references. + require.NoError(t, retryForever(ctx, "registry drops references when spec drops them", func() (bool, error) { + latestMain := new(v1alpha1.KafkaSchema) + if err := k8sClient.Get(ctx, client.ObjectKey{Name: mainSchemaName, Namespace: defaultNamespace}, latestMain); err != nil { + return true, err + } + got, err := avnGen.ServiceSchemaRegistrySubjectVersionGet(ctx, cfg.Project, kafkaName, mainSubjectName, latestMain.Status.Version) + if err != nil { + return true, err + } + if len(got.References) != 0 { + // Still stale. Retry until the operator re-POSTs without references. + return true, nil + } + return false, nil + })) + + assert.NoError(t, s.Delete(updatedMain, subjectExists(mainSubjectName))) + }) + + t.Run("referent stays in Terminating while dependent exists, then cleans up", func(t *testing.T) { + refSchemaName := randName("kafka-schema-guard-ref") + refSubjectName := randName("kafka-schema-guard-ref") + mainSchemaName := randName("kafka-schema-guard-main") + mainSubjectName := randName("kafka-schema-guard-main") + + require.NoError(t, s.Apply(getKafkaSchemaRefBaseYaml(cfg.Project, kafkaName, refSchemaName, refSubjectName))) + refSchema := new(v1alpha1.KafkaSchema) + require.NoError(t, s.GetRunning(refSchema, refSchemaName)) + + require.NoError(t, s.Apply(getKafkaSchemaKafkaSchemaRefYaml(cfg.Project, kafkaName, mainSchemaName, mainSubjectName, refSchemaName))) + mainSchema := new(v1alpha1.KafkaSchema) + require.NoError(t, s.GetRunning(mainSchema, mainSchemaName)) + + // The k8s API server accepts it (sets deletionTimestamp), + // but the operator refuses while the dependent's index entry still exists. + require.NoError(t, k8sClient.Delete(ctx, refSchema)) + + require.NoError(t, retryForever(ctx, "referent stays in Terminating while dependent exists", func() (bool, error) { + latest := new(v1alpha1.KafkaSchema) + err := k8sClient.Get(ctx, client.ObjectKey{Name: refSchemaName, Namespace: defaultNamespace}, latest) + if isNotFound(err) { + return false, fmt.Errorf("referent vanished while dependent still references it — local guard regressed") + } + if err != nil { + return true, err + } + if latest.DeletionTimestamp.IsZero() { + return true, nil // delete hasn't landed yet, retry + } + return false, nil + })) + + // Subject must still be in the registry. + require.NoError(t, subjectExists(refSubjectName)(), + "referent's subject must still be registered while the local guard holds the delete back") + + // Delete the dependent. + require.NoError(t, s.Delete(mainSchema, subjectExists(mainSubjectName))) + + require.NoError(t, retryForever(ctx, "referent finishes terminating after dependent is removed", func() (bool, error) { + err := k8sClient.Get(ctx, client.ObjectKey{Name: refSchemaName, Namespace: defaultNamespace}, new(v1alpha1.KafkaSchema)) + return !isNotFound(err), nil + })) + err := subjectExists(refSubjectName)() + require.True(t, isNotFound(err), "referent's subject must be gone from the active listing after the CR finishes terminating: %v", err) + }) + + // With A -> B -> C all connected via kafkaSchemaRef, mutating A must cause B to re-POST + // against A's new version AND C to re-POST against B's new version. + t.Run("3-level chain converges when root advances", func(t *testing.T) { + aSchemaName := randName("kafka-schema-chain-a") + aSubjectName := randName("kafka-schema-chain-a") + bSchemaName := randName("kafka-schema-chain-b") + bSubjectName := randName("kafka-schema-chain-b") + cSchemaName := randName("kafka-schema-chain-c") + cSubjectName := randName("kafka-schema-chain-c") + + // A: leaf referent. customer.proto contributes the Customer type. + require.NoError(t, s.Apply(getKafkaSchemaRefBaseYaml(cfg.Project, kafkaName, aSchemaName, aSubjectName))) + aSchema := new(v1alpha1.KafkaSchema) + require.NoError(t, s.GetRunning(aSchema, aSchemaName)) + initialAVersion := aSchema.Status.Version + require.Greater(t, initialAVersion, 0, "A must have a registry version") + + // B: depends on A via kafkaSchemaRef. order.proto imports customer.proto. + require.NoError(t, s.Apply(getKafkaSchemaChainMidYaml(cfg.Project, kafkaName, bSchemaName, bSubjectName, aSchemaName))) + bSchema := new(v1alpha1.KafkaSchema) + require.NoError(t, s.GetRunning(bSchema, bSchemaName)) + + avnB, err := avnGen.ServiceSchemaRegistrySubjectVersionGet(ctx, cfg.Project, kafkaName, bSubjectName, bSchema.Status.Version) + require.NoError(t, err) + require.Len(t, avnB.References, 1, "B must reference A") + require.Equal(t, initialAVersion, avnB.References[0].Version, "B must reference A's initial version") + + // C: depends on B via kafkaSchemaRef. order_confirmation.proto imports order.proto. + require.NoError(t, s.Apply(getKafkaSchemaChainTopYaml(cfg.Project, kafkaName, cSchemaName, cSubjectName, bSchemaName))) + cSchema := new(v1alpha1.KafkaSchema) + require.NoError(t, s.GetRunning(cSchema, cSchemaName)) + + avnC, err := avnGen.ServiceSchemaRegistrySubjectVersionGet(ctx, cfg.Project, kafkaName, cSubjectName, cSchema.Status.Version) + require.NoError(t, err) + require.Len(t, avnC.References, 1, "C must reference B") + require.Equal(t, bSchema.Status.Version, avnC.References[0].Version, "C must reference B's initial version") + + // Mutate A. Aiven assigns A v2. B and C must propagate. + updatedA := aSchema.DeepCopy() + updatedA.Spec.Schema = strings.ReplaceAll(aSchema.Spec.Schema, "string email", "string email_address") + require.NoError(t, k8sClient.Update(ctx, updatedA)) + require.NoError(t, s.GetRunning(updatedA, aSchemaName)) + require.Greater(t, updatedA.Status.Version, initialAVersion, "A did not advance") + newAVersion := updatedA.Status.Version + + var newBVersion int + require.NoError(t, retryForever(ctx, "B propagates A's new version", func() (bool, error) { + latestB := new(v1alpha1.KafkaSchema) + if err := k8sClient.Get(ctx, client.ObjectKey{Name: bSchemaName, Namespace: defaultNamespace}, latestB); err != nil { + return true, err + } + got, err := avnGen.ServiceSchemaRegistrySubjectVersionGet(ctx, cfg.Project, kafkaName, bSubjectName, latestB.Status.Version) + if err != nil { + return true, err + } + if len(got.References) != 1 { + return true, fmt.Errorf("expected 1 reference on B, got %d", len(got.References)) + } + if got.References[0].Version != newAVersion { + return true, nil // still stale + } + newBVersion = latestB.Status.Version + return false, nil + })) + require.Greater(t, newBVersion, bSchema.Status.Version, "B did not advance to a new registry version") + + require.NoError(t, retryForever(ctx, "C propagates B's new version", func() (bool, error) { + latestC := new(v1alpha1.KafkaSchema) + if err := k8sClient.Get(ctx, client.ObjectKey{Name: cSchemaName, Namespace: defaultNamespace}, latestC); err != nil { + return true, err + } + got, err := avnGen.ServiceSchemaRegistrySubjectVersionGet(ctx, cfg.Project, kafkaName, cSubjectName, latestC.Status.Version) + if err != nil { + return true, err + } + if len(got.References) != 1 { + return true, fmt.Errorf("expected 1 reference on C, got %d", len(got.References)) + } + if got.References[0].Version != newBVersion { + return true, nil // still stale + } + return false, nil + })) + + assert.NoError(t, s.Delete(cSchema, subjectExists(cSubjectName))) + assert.NoError(t, s.Delete(bSchema, subjectExists(bSubjectName))) + assert.NoError(t, s.Delete(aSchema, subjectExists(aSubjectName))) + }) + + // Hard-delete purges the subject's metadata, re-applied KafkaSchema with + // the same subjectName starts at version 1. + t.Run("subject can be recreated cleanly after delete", func(t *testing.T) { + schemaName := randName("kafka-schema-recreate") + subjectName := randName("kafka-schema-recreate") + + require.NoError(t, s.Apply(getKafkaSchemaRefBaseYaml(cfg.Project, kafkaName, schemaName, subjectName))) + first := new(v1alpha1.KafkaSchema) + require.NoError(t, s.GetRunning(first, schemaName)) + + require.NoError(t, s.Delete(first, subjectExists(subjectName))) + + require.NoError(t, s.Apply(getKafkaSchemaRefBaseYaml(cfg.Project, kafkaName, schemaName, subjectName))) + second := new(v1alpha1.KafkaSchema) + require.NoError(t, s.GetRunning(second, schemaName)) + + assert.Equal(t, 1, second.Status.Version, "recreated subject must start at version 1; soft-delete-only would have preserved the old version counter") + + require.NoError(t, s.Delete(second, subjectExists(subjectName))) + }) } func TestKafkaSchemaReferencesValidation(t *testing.T) { @@ -308,7 +630,7 @@ spec: subject: "" version: 1 `, randName("kafka-schema-val"), cfg.Project), - expectErrorMsgContains: "spec.references[0].subject", + expectErrorMsgContains: "set both subject and version, or set kafkaSchemaRef, but not both", }, { name: "reference with zero version", @@ -331,7 +653,103 @@ spec: subject: valid-subject version: 0 `, randName("kafka-schema-val"), cfg.Project), - expectErrorMsgContains: "spec.references[0].version", + expectErrorMsgContains: "set both subject and version, or set kafkaSchemaRef, but not both", + }, + { + name: "reference sets both explicit and kafkaSchemaRef", + yaml: fmt.Sprintf(` +apiVersion: aiven.io/v1alpha1 +kind: KafkaSchema +metadata: + name: %s +spec: + authSecretRef: + name: aiven-token + key: token + project: %s + serviceName: fake-kafka + subjectName: both-ref-kinds + schemaType: PROTOBUF + schema: 'syntax = "proto3";' + references: + - name: customer.proto + subject: valid-subject + version: 1 + kafkaSchemaRef: + name: some-other-schema +`, randName("kafka-schema-val"), cfg.Project), + expectErrorMsgContains: "set both subject and version, or set kafkaSchemaRef, but not both", + }, + { + name: "reference sets neither explicit nor kafkaSchemaRef", + yaml: fmt.Sprintf(` +apiVersion: aiven.io/v1alpha1 +kind: KafkaSchema +metadata: + name: %s +spec: + authSecretRef: + name: aiven-token + key: token + project: %s + serviceName: fake-kafka + subjectName: empty-ref + schemaType: PROTOBUF + schema: 'syntax = "proto3";' + references: + - name: customer.proto +`, randName("kafka-schema-val"), cfg.Project), + expectErrorMsgContains: "set both subject and version, or set kafkaSchemaRef, but not both", + }, + { + // Reference names must be unique within a single KafkaSchema. + name: "duplicate reference name", + yaml: fmt.Sprintf(` +apiVersion: aiven.io/v1alpha1 +kind: KafkaSchema +metadata: + name: %s +spec: + authSecretRef: + name: aiven-token + key: token + project: %s + serviceName: fake-kafka + subjectName: dup-ref-name + schemaType: PROTOBUF + schema: 'syntax = "proto3";' + references: + - name: customer.proto + subject: subject-a + version: 1 + - name: customer.proto + subject: subject-b + version: 1 +`, randName("kafka-schema-val"), cfg.Project), + expectErrorMsgContains: "duplicate", + }, + { + name: "kafkaSchemaRef points at the owning KafkaSchema", + yaml: fmt.Sprintf(` +apiVersion: aiven.io/v1alpha1 +kind: KafkaSchema +metadata: + name: %[1]s +spec: + authSecretRef: + name: aiven-token + key: token + project: %[2]s + serviceName: fake-kafka + subjectName: self-ref + schemaType: PROTOBUF + schema: 'syntax = "proto3";' + references: + - name: customer.proto + kafkaSchemaRef: + name: %[1]s +`, randName("kafka-schema-val"), cfg.Project), + expectErrorMsgContains: "kafkaSchemaRef cannot point to the KafkaSchema itself", }, } @@ -372,6 +790,99 @@ spec: `, project, kafkaName, schemaName, subjectName, refSubject, refVersion) } +// getKafkaSchemaKafkaSchemaRefYaml builds a dependent KafkaSchema that resolves +// its reference via kafkaSchemaRef instead of pinning subject+version. +func getKafkaSchemaKafkaSchemaRefYaml(project, kafkaName, schemaName, subjectName, refName string) string { + return fmt.Sprintf(` +apiVersion: aiven.io/v1alpha1 +kind: KafkaSchema +metadata: + name: %[3]s +spec: + authSecretRef: + name: aiven-token + key: token + + project: %[1]s + serviceName: %[2]s + subjectName: %[4]s + schemaType: PROTOBUF + schema: | + syntax = "proto3"; + import "customer.proto"; + message Order { + string order_id = 1; + Customer customer = 2; + } + references: + - name: customer.proto + kafkaSchemaRef: + name: %[5]s +`, project, kafkaName, schemaName, subjectName, refName) +} + +// getKafkaSchemaChainMidYaml builds B in the A -> B -> C chain. B imports +// customer.proto from A and exports an Order message that C will import. +func getKafkaSchemaChainMidYaml(project, kafkaName, schemaName, subjectName, refName string) string { + return fmt.Sprintf(` +apiVersion: aiven.io/v1alpha1 +kind: KafkaSchema +metadata: + name: %[3]s +spec: + authSecretRef: + name: aiven-token + key: token + + project: %[1]s + serviceName: %[2]s + subjectName: %[4]s + schemaType: PROTOBUF + schema: | + syntax = "proto3"; + import "customer.proto"; + message Order { + string order_id = 1; + Customer customer = 2; + } + references: + - name: customer.proto + kafkaSchemaRef: + name: %[5]s +`, project, kafkaName, schemaName, subjectName, refName) +} + +// getKafkaSchemaChainTopYaml builds C in the A -> B -> C chain. C imports +// order.proto from B. +func getKafkaSchemaChainTopYaml(project, kafkaName, schemaName, subjectName, refName string) string { + return fmt.Sprintf(` +apiVersion: aiven.io/v1alpha1 +kind: KafkaSchema +metadata: + name: %[3]s +spec: + authSecretRef: + name: aiven-token + key: token + + project: %[1]s + serviceName: %[2]s + subjectName: %[4]s + schemaType: PROTOBUF + schema: | + syntax = "proto3"; + import "order.proto"; + message OrderConfirmation { + Order order = 1; + string confirmation_id = 2; + } + references: + - name: order.proto + kafkaSchemaRef: + name: %[5]s +`, project, kafkaName, schemaName, subjectName, refName) +} + func getKafkaSchemaRefBaseYaml(project, kafkaName, schemaName, subjectName string) string { return fmt.Sprintf(` apiVersion: aiven.io/v1alpha1