Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions internal/controller/checkpointrestoreoperator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ func (r *CheckpointRestoreOperatorReconciler) Reconcile(ctx context.Context, req
}

func resetAllPoliciesToDefault(log logr.Logger) {
policyMutex.Lock()
defer policyMutex.Unlock()

retainOrphan = nil
maxCheckpointsPerContainer = 10
maxCheckpointsPerPod = 20
Expand Down Expand Up @@ -354,9 +357,6 @@ type Policy struct {
}

func applyPolicies(log logr.Logger, details *checkpointDetails) {
policyMutex.Lock()
defer policyMutex.Unlock()

// Function to handle default "infinity" value for count-based policies
toInfinityCount := func(value *int) int {
if value == nil {
Expand Down Expand Up @@ -402,6 +402,8 @@ func applyPolicies(log logr.Logger, details *checkpointDetails) {
MaxTotalSize: toInfinitySize(policy.MaxTotalSize),
})
} else {
policyMutex.RLock()

// Apply global policies if no specific policy found
handleCheckpointsForLevel(log, details, "container", Policy{
RetainOrphan: *retainOrphan,
Expand All @@ -421,10 +423,15 @@ func applyPolicies(log logr.Logger, details *checkpointDetails) {
MaxCheckpointSize: maxCheckpointSize,
MaxTotalSize: maxTotalSizePerNamespace,
})

policyMutex.RUnlock()
}
}

func findContainerPolicy(details *checkpointDetails) *criuorgv1.ContainerPolicySpec {
policyMutex.RLock()
defer policyMutex.RUnlock()

for _, policy := range containerPolicies {
if policy.Namespace == details.namespace && policy.Pod == details.pod && policy.Container == details.container {
return &policy
Expand All @@ -434,6 +441,9 @@ func findContainerPolicy(details *checkpointDetails) *criuorgv1.ContainerPolicyS
}

func findPodPolicy(details *checkpointDetails) *criuorgv1.PodPolicySpec {
policyMutex.RLock()
defer policyMutex.RUnlock()

for _, policy := range podPolicies {
if policy.Namespace == details.namespace && policy.Pod == details.pod {
return &policy
Expand All @@ -443,6 +453,9 @@ func findPodPolicy(details *checkpointDetails) *criuorgv1.PodPolicySpec {
}

func findNamespacePolicy(details *checkpointDetails) *criuorgv1.NamespacePolicySpec {
policyMutex.RLock()
defer policyMutex.RUnlock()

for _, policy := range namespacePolicies {
if policy.Namespace == details.namespace {
return &policy
Expand Down Expand Up @@ -514,6 +527,9 @@ func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, leve
return
}

// Read Lock for checkpointDirectory
policyMutex.RLock()

var globPattern string
switch level {
case "container":
Expand Down Expand Up @@ -545,6 +561,9 @@ func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, leve
)
}

// Removing RLock for checkpointDirectory
policyMutex.RUnlock()

checkpointArchives, err := filepath.Glob(globPattern)
if err != nil {
log.Error(err, "error looking for checkpoint archives", "pattern", globPattern)
Expand Down Expand Up @@ -931,6 +950,8 @@ func (gc *garbageCollector) runGarbageCollector() {
}
}()

policyMutex.RLock()

// Add a path.
log.Info("Watching", "directory", checkpointDirectory)
log.Info("MaxCheckpointsPerContainer", "maxCheckpointsPerContainer", maxCheckpointsPerContainer)
Expand All @@ -941,6 +962,8 @@ func (gc *garbageCollector) runGarbageCollector() {
log.Info("MaxTotalSizePerPod", "maxTotalSizePerPod", maxTotalSizePerPod)
log.Info("MaxTotalSizePerContainer", "maxTotalSizePerContainer", maxTotalSizePerContainer)

policyMutex.RUnlock()

err = watcher.Add(checkpointDirectory)
if err != nil {
log.Error(err, "runGarbageCollector()")
Expand Down