Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 80 additions & 18 deletions pkg/kubelet/cm/cpumanager/cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,16 @@ func (m *manager) GetAllCPUs() cpuset.CPUSet {

type reconciledContainer struct {
podName string
podUID string
containerName string
containerID string
}

type reconciledContainerAllocation struct {
reconciledContainer
allocatedSet cpuset.CPUSet
}

func (m *manager) removeStaleState(rootLogger logr.Logger) {
// Only once all sources are ready do we attempt to remove any stale state.
// This ensures that the call to `m.activePods()` below will succeed with
Expand Down Expand Up @@ -473,15 +479,19 @@ func (m *manager) reconcileState(ctx context.Context) (success []reconciledConta
failure = []reconciledContainer{}

rootLogger := klog.FromContext(ctx)

m.removeStaleState(rootLogger)

exclusiveCPUContainers := []reconciledContainerAllocation{}
nonExclusiveCPUContainers := []reconciledContainerAllocation{}

m.Lock()
for _, pod := range m.activePods() {
podLogger := klog.LoggerWithValues(rootLogger, "pod", klog.KObj(pod))

pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
if !ok {
podLogger.V(5).Info("skipping pod; status not found")
failure = append(failure, reconciledContainer{pod.Name, "", ""})
failure = append(failure, reconciledContainer{pod.Name, string(pod.UID), "", ""})
continue
}

Expand All @@ -493,25 +503,24 @@ func (m *manager) reconcileState(ctx context.Context) (success []reconciledConta
containerID, err := findContainerIDByName(&pstatus, container.Name)
if err != nil {
logger.V(5).Info("skipping container; ID not found in pod status", "err", err)
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
failure = append(failure, reconciledContainer{pod.Name, string(pod.UID), container.Name, ""})
continue
}

cstatus, err := findContainerStatusByName(&pstatus, container.Name)
if err != nil {
logger.V(5).Info("skipping container; container status not found in pod status", "err", err)
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
failure = append(failure, reconciledContainer{pod.Name, string(pod.UID), container.Name, ""})
continue
}

if cstatus.State.Waiting != nil ||
(cstatus.State.Waiting == nil && cstatus.State.Running == nil && cstatus.State.Terminated == nil) {
logger.V(4).Info("skipping container; container still in the waiting state", "err", err)
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
failure = append(failure, reconciledContainer{pod.Name, string(pod.UID), container.Name, ""})
continue
}

m.Lock()
if cstatus.State.Terminated != nil {
// The container is terminated but we can't call m.RemoveContainer()
// here because it could remove the allocated cpuset for the container
Expand All @@ -522,38 +531,91 @@ func (m *manager) reconcileState(ctx context.Context) (success []reconciledConta
if err == nil {
logger.V(4).Info("ignoring terminated container", "containerID", containerID)
}
m.Unlock()
continue
}

// Once we make it here we know we have a running container.
// Idempotently add it to the containerMap incase it is missing.
// This can happen after a kubelet restart, for example.
m.containerMap.Add(string(pod.UID), container.Name, containerID)
m.Unlock()

cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name)
cset, exclusive := m.state.GetCPUSet(string(pod.UID), container.Name)
if !exclusive {
cset = m.state.GetDefaultCPUSet()
}
if cset.IsEmpty() {
// NOTE: This should not happen outside of tests.
logger.V(2).Info("ReconcileState: skipping container; empty cpuset assigned")
failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
failure = append(failure, reconciledContainer{pod.Name, string(pod.UID), container.Name, containerID})
continue
}

lcset := m.lastUpdateState.GetCPUSetOrDefault(string(pod.UID), container.Name)
if !cset.Equals(lcset) {
logger.V(5).Info("updating container", "containerID", containerID, "cpuSet", cset)
err = m.updateContainerCPUSet(ctx, containerID, cset)
rca := reconciledContainerAllocation{
reconciledContainer{pod.Name, string(pod.UID), container.Name, containerID},
cset,
}
if exclusive {
exclusiveCPUContainers = append(exclusiveCPUContainers, rca)
} else {
nonExclusiveCPUContainers = append(nonExclusiveCPUContainers, rca)
}

}
}
m.Unlock()

failedContainersCPUSet := cpuset.New()

updateContainers := func(containers []reconciledContainerAllocation, preliminary bool) {
for _, rca := range containers {
logger := klog.LoggerWithValues(rootLogger, "podName", rca.podName, "containerName", rca.containerName)

lcset := m.lastUpdateState.GetCPUSetOrDefault(rca.podUID, rca.containerName)

// Determine the CPU set to use based on the pass
var targetCPUSet cpuset.CPUSet
if preliminary {
targetCPUSet = rca.allocatedSet.Intersection(lcset)
} else {
targetCPUSet = rca.allocatedSet
}

// Check if update is needed
if !targetCPUSet.Equals(lcset) {
if !preliminary && !targetCPUSet.Intersection(failedContainersCPUSet).IsEmpty() {
logger.Error(fmt.Errorf("Conflict with previously failed container CPUSet updates"), "failed to update container", "containerID", rca.containerID, "cpuSet", rca.allocatedSet)
failure = append(failure, rca.reconciledContainer)
failedContainersCPUSet = failedContainersCPUSet.Union(lcset)
continue
}

logger.V(5).Info("updating container", "containerID", rca.containerID, "cpuSet", targetCPUSet)
err := m.updateContainerCPUSet(ctx, rca.containerID, targetCPUSet)
if err != nil {
logger.Error(err, "failed to update container", "containerID", containerID, "cpuSet", cset)
failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
logger.Error(err, "failed to update container", "containerID", rca.containerID, "cpuSet", targetCPUSet)
failure = append(failure, rca.reconciledContainer)
failedContainersCPUSet = failedContainersCPUSet.Union(lcset)
continue
}
m.lastUpdateState.SetCPUSet(string(pod.UID), container.Name, cset)
m.lastUpdateState.SetCPUSet(rca.podUID, rca.containerName, targetCPUSet)
}

// Add to success list if required
if !preliminary {
success = append(success, rca.reconciledContainer)
}
success = append(success, reconciledContainer{pod.Name, container.Name, containerID})
}
}

// first pass - only remove CPUs from containers using exclusive CPUs
updateContainers(exclusiveCPUContainers, true)

// second pass - apply CPU sets to non exclusive CPU containers
updateContainers(nonExclusiveCPUContainers, false)

// third pass - apply final CPU set to containers using exclusive CPUs
updateContainers(exclusiveCPUContainers, false)

return success, failure
}

Expand Down
36 changes: 36 additions & 0 deletions pkg/kubelet/cm/cpumanager/cpu_manager_others_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//go:build !windows

/*
Copyright 2026 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cpumanager

import (
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/utils/cpuset"
)

func (rt mockRuntimeService) getCPUSetFromResources(resources *runtimeapi.ContainerResources) cpuset.CPUSet {
if resources != nil && resources.Linux != nil {
set, err := cpuset.Parse(resources.Linux.CpusetCpus)
if err != nil {
rt.t.Errorf("(%v) Cannot parse Linux CPUSet resources %v", rt.testCaseDescription, resources.Linux.CpusetCpus)
return cpuset.New()
}
return set
}
return cpuset.New()
}
Loading