diff --git a/test/e2e/performanceprofile/functests/7_performance_kubelet_node/cgroups.go b/test/e2e/performanceprofile/functests/7_performance_kubelet_node/cgroups.go index fe32b53621..248f3efd03 100644 --- a/test/e2e/performanceprofile/functests/7_performance_kubelet_node/cgroups.go +++ b/test/e2e/performanceprofile/functests/7_performance_kubelet_node/cgroups.go @@ -23,8 +23,10 @@ import ( performancev2 "github.com/openshift/cluster-node-tuning-operator/pkg/apis/performanceprofile/v2" testutils "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils" + "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/baseload" "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/cgroup" testclient "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/client" + "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/cluster" "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/discovery" "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/images" "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/label" @@ -45,16 +47,18 @@ var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(lab cgroupRoot string = "/rootfs/sys/fs/cgroup" ) var ( - onlineCPUSet cpuset.CPUSet - workerRTNode *corev1.Node - workerRTNodes []corev1.Node - profile, initialProfile *performancev2.PerformanceProfile - poolName string - ovsSliceCgroup string - ctx context.Context = context.Background() - ovsSystemdServices []string - isCgroupV2 bool - err error + onlineCPUSet cpuset.CPUSet + reservedCPUSet cpuset.CPUSet + workerRTNode *corev1.Node + workerRTNodes []corev1.Node + profile, initialProfile *performancev2.PerformanceProfile + poolName string + ovsSliceCgroup string + ctx context.Context = context.Background() + ovsSystemdServices []string + isCgroupV2 bool + isWorkloadPartitioningEnabled bool + err error ) BeforeAll(func() { @@ -68,6 +72,14 @@ var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(lab Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("error looking for the optional selector: %v", err)) workerRTNode = &workerRTNodes[0] + if _, ok := workerRTNode.Labels["node-role.kubernetes.io/control-plane"]; ok { + isSchedulable, err := cluster.IsControlPlaneSchedulable(ctx) + Expect(err).ToNot(HaveOccurred(), "Unable to check if control plane is schedulable") + if !isSchedulable { + Skip("workerRTNode is a control plane node but masters are not schedulable") + } + } + profile, err = profiles.GetByNodeLabels(testutils.NodeSelectorLabels) Expect(err).ToNot(HaveOccurred()) @@ -78,6 +90,14 @@ var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(lab ovsSystemdServices = ovsSystemdServicesOnOvsSlice(ctx, workerRTNode) + isWorkloadPartitioningEnabled, err = cluster.IsWorkloadPartitioningEnabled(ctx) + Expect(err).ToNot(HaveOccurred(), "Unable to check if workload partitioning is enabled") + testlog.Infof("Workload partitioning enabled: %v", isWorkloadPartitioningEnabled) + + Expect(profile.Spec.CPU.Reserved).ToNot(BeNil()) + reservedCPUSet, err = cpuset.Parse(string(*profile.Spec.CPU.Reserved)) + Expect(err).ToNot(HaveOccurred(), "Failed to parse reserved CPUs from profile") + testlog.Infof("Reserved CPUSet: %s", reservedCPUSet) }) BeforeEach(func() { @@ -128,7 +148,13 @@ var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(lab cpus := testutils.ToString(out) containerCpuset, err := cpuset.Parse(cpus) Expect(err).ToNot(HaveOccurred()) - Expect(containerCpuset).To(Equal(onlineCPUSet), "Burstable pod containers cpuset.cpus do not match total online cpus") + if isWorkloadPartitioningEnabled { + Expect(containerCpuset.Equals(reservedCPUSet)).To(BeTrue(), + "Under workload partitioning, OVN pod cpuset.cpus should match reserved cpus, got %s expected %s", containerCpuset, reservedCPUSet) + } else { + Expect(containerCpuset.Equals(onlineCPUSet)).To(BeTrue(), + "Burstable pod containers cpuset.cpus do not match total online cpus, got %s expected %s", containerCpuset, onlineCPUSet) + } } }) @@ -265,205 +291,74 @@ var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(lab var ctx context.Context = context.TODO() Context("ovn-kubenode Pods affinity ", Label(string(label.Tier2)), func() { It("[test_id:64100] matches with ovs process affinity", func() { - ovnPod, err := ovnCnfNodePod(ctx, workerRTNode) - Expect(err).ToNot(HaveOccurred(), "Unable to get ovnPod") - ovnContainerids, err := ovnPodContainers(&ovnPod) - Expect(err).ToNot(HaveOccurred()) - // Generally there are many containers inside a kubenode pods - // we don't need to check cpus used by all the containers - // we take first container - containerPid, err := nodes.ContainerPid(ctx, workerRTNode, ovnContainerids[0]) - Expect(err).ToNot(HaveOccurred()) - // we need to wait as process affinity can change - time.Sleep(30 * time.Second) - ctnCpuset := taskSet(ctx, containerPid, workerRTNode) - testlog.Infof("Cpus used by ovn Containers are %s", ctnCpuset) - pidList, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - pidToCPUs, err := getCPUMaskForPids(ctx, pidList, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - for pid, cpumask := range pidToCPUs { - testlog.Infof("OVS service pid %s is using cpus %s", pid, cpumask) - Expect(ctnCpuset.Equals(cpumask)).To(BeTrue(), "affinity of ovn kube node pods(%s) do not match with ovservices pid %s (%s)", ctnCpuset, pid, cpumask) - } + By("Collecting OVN container and OVS process affinities") + ovnAffinity := getOvnContainerAffinity(ctx, workerRTNode) + ovsAffinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + By("Verifying OVS affinity matches expected") + verifyOvsMatchesExpected(isWorkloadPartitioningEnabled, ovnAffinity, ovsAffinities, + onlineCPUSet, reservedCPUSet, cpuset.New()) }) It("[test_id:64101] Creating gu pods modifies affinity of ovs", func() { - var testpod *corev1.Pod - var err error - testpod = pods.GetTestPod() - testpod.Namespace = testutils.NamespaceTesting - testpod.Spec.Containers[0].Resources = corev1.ResourceRequirements{Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2"), - corev1.ResourceMemory: resource.MustParse("200Mi"), - }, - } - testpod.Spec.NodeSelector = map[string]string{testutils.LabelHostname: workerRTNode.Name} - err = testclient.DataPlaneClient.Create(ctx, testpod) - Expect(err).ToNot(HaveOccurred()) - testpod, err = pods.WaitForCondition(ctx, client.ObjectKeyFromObject(testpod), corev1.PodReady, corev1.ConditionTrue, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) - Expect(testpod.Status.QOSClass).To(Equal(corev1.PodQOSGuaranteed)) + By("Creating a guaranteed pod on the worker node") + testpod := createGuPod(ctx, workerRTNode) - cmd := []string{"taskset", "-pc", "1"} - outputb, err := pods.ExecCommandOnPod(testclient.K8sClient, testpod, "", cmd) - Expect(err).ToNot(HaveOccurred()) - testpodCpus := bytes.Split(outputb, []byte(":")) - testlog.Infof("%v pod is using cpus %v", testpod.Name, string(testpodCpus[1])) + By("Collecting OVN container and OVS process affinities") + ovnAffinity := getOvnContainerAffinity(ctx, workerRTNode) + ovsAffinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) - By("Get ovnpods running on the worker cnf node") - ovnPod, err := ovnCnfNodePod(ctx, workerRTNode) - Expect(err).ToNot(HaveOccurred(), "Unable to get ovnPod") - - By("Get cpu used by ovn pod containers") - // We are fetching the container Process pid and - // using taskset we are fetching cpus used by the container process - // instead of using containers cpuset.cpus - ovnContainers, err := ovnPodContainers(&ovnPod) - Expect(err).ToNot(HaveOccurred()) - containerPid, err := nodes.ContainerPid(ctx, workerRTNode, ovnContainers[0]) - Expect(err).ToNot(HaveOccurred()) - // we need to wait as process affinity can change - time.Sleep(30 * time.Second) - ctnCpuset := taskSet(ctx, containerPid, workerRTNode) - testlog.Infof("Container of ovn pod %s is using cpus %s", ovnPod.Name, ctnCpuset) + By("Verifying OVS affinity excludes guaranteed pod CPUs") + guCPUs := getGuPodCPUs(ctx, testpod) + verifyOvsMatchesExpected(isWorkloadPartitioningEnabled, ovnAffinity, ovsAffinities, + onlineCPUSet, reservedCPUSet, guCPUs) - pidList, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - - pidToCPUs, err := getCPUMaskForPids(ctx, pidList, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - for pid, cpumask := range pidToCPUs { - testlog.Infof("OVS service pid %s is using cpus %s", pid, cpumask) - Expect(ctnCpuset.Equals(cpumask)).To(BeTrue(), "affinity of ovn kube node pods(%s) do not match with ovservices pid %s (%s)", ctnCpuset, pid, cpumask) - } Expect(pods.DeleteAndSync(ctx, testclient.DataPlaneClient, testpod)).To(Succeed()) - }) It("[test_id:64102] Create and remove gu pods to verify affinity of ovs are changed appropriately", func() { - var testpod1, testpod2 *corev1.Pod - var err error - ovnPod, err := ovnCnfNodePod(ctx, workerRTNode) - Expect(err).ToNot(HaveOccurred()) checkCpuCount(ctx, workerRTNode) - // Create testpod1 - testpod1 = pods.GetTestPod() - testpod1.Namespace = testutils.NamespaceTesting - testpod1.Spec.Containers[0].Resources = corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2"), - corev1.ResourceMemory: resource.MustParse("200Mi"), - }, - } - testpod1.Spec.NodeSelector = map[string]string{testutils.LabelHostname: workerRTNode.Name} - err = testclient.DataPlaneClient.Create(ctx, testpod1) - Expect(err).ToNot(HaveOccurred()) - testpod1, err = pods.WaitForCondition(ctx, client.ObjectKeyFromObject(testpod1), corev1.PodReady, corev1.ConditionTrue, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) - Expect(testpod1.Status.QOSClass).To(Equal(corev1.PodQOSGuaranteed)) - - tasksetcmd := []string{"taskset", "-pc", "1"} - testpod1Cpus, err := pods.ExecCommandOnPod(testclient.K8sClient, testpod1, "", tasksetcmd) - Expect(err).ToNot(HaveOccurred()) - testlog.Infof("%v pod is using %v cpus", testpod1.Name, string(testpod1Cpus)) - - // Create testpod2 - testpod2 = pods.GetTestPod() - testpod2.Namespace = testutils.NamespaceTesting - testpod2.Spec.Containers[0].Resources = corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("2"), - corev1.ResourceMemory: resource.MustParse("200Mi"), - }, - } - testpod2.Spec.NodeSelector = map[string]string{testutils.LabelHostname: workerRTNode.Name} - err = testclient.DataPlaneClient.Create(ctx, testpod2) - Expect(err).ToNot(HaveOccurred()) - testpod2, err = pods.WaitForCondition(ctx, client.ObjectKeyFromObject(testpod2), corev1.PodReady, corev1.ConditionTrue, 5*time.Minute) - Expect(err).ToNot(HaveOccurred()) - Expect(testpod1.Status.QOSClass).To(Equal(corev1.PodQOSGuaranteed)) + By("Creating two guaranteed pods on the worker node") + testpod1 := createGuPod(ctx, workerRTNode) + testpod2 := createGuPod(ctx, workerRTNode) - By("fetch cpus used by container process using taskset") - testpod2Cpus, err := pods.ExecCommandOnPod(testclient.K8sClient, testpod2, "", tasksetcmd) - Expect(err).ToNot(HaveOccurred()) - testlog.Infof("%v pod is using %v cpus", testpod2.Name, string(testpod2Cpus)) + By("Collecting affinities with both GU pods running") + ovnAffinity := getOvnContainerAffinity(ctx, workerRTNode) + ovsAffinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) - // Get cpus used by the ovnkubenode-pods containers - // Each kubenode pods have many containers, we check cpus of only 1 container - ovnContainers, err := ovnPodContainers(&ovnPod) - Expect(err).ToNot(HaveOccurred()) - containerPid, err := nodes.ContainerPid(context.TODO(), workerRTNode, ovnContainers[0]) - Expect(err).ToNot(HaveOccurred()) - // we need to wait as process affinity can change - time.Sleep(30 * time.Second) - ovnContainerCpuset1 := taskSet(ctx, containerPid, workerRTNode) - testlog.Infof("Container of ovn pod %s is using cpus %s", ovnPod.Name, ovnContainerCpuset1) - pidList, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) - Expect(err).ToNot(HaveOccurred()) + By("Verifying OVS affinity excludes both GU pods' CPUs") + bothGU := getGuPodCPUs(ctx, testpod1).Union(getGuPodCPUs(ctx, testpod2)) + verifyOvsMatchesExpected(isWorkloadPartitioningEnabled, ovnAffinity, ovsAffinities, + onlineCPUSet, reservedCPUSet, bothGU) - // We wait for 30 seconds for ovs process cpu affinity to be updated - time.Sleep(30 * time.Second) - // Verify ovs-vswitchd and ovsdb-server process affinity is updated - pidToCPUs, err := getCPUMaskForPids(ctx, pidList, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - for pid, cpumask := range pidToCPUs { - testlog.Infof("OVS service pid %s is using cpus %s", pid, cpumask) - Expect(ovnContainerCpuset1.Equals(cpumask)).To(BeTrue(), "affinity of ovn kube node pods(%s) do not match with ovservices pid %s (%s)", ovnContainerCpuset1, pid, cpumask) - } - // Delete testpod1 - testlog.Infof("Deleting pod %v", testpod1.Name) - Expect(pods.DeleteAndSync(context.TODO(), testclient.DataPlaneClient, testpod1)).To(Succeed()) + By("Deleting first GU pod and verifying OVS affinity adjusts") + Expect(pods.DeleteAndSync(ctx, testclient.DataPlaneClient, testpod1)).To(Succeed()) + ovnAffinityAfterDelete := getOvnContainerAffinity(ctx, workerRTNode) + ovsAffinities = getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + verifyOvsMatchesExpected(isWorkloadPartitioningEnabled, ovnAffinityAfterDelete, ovsAffinities, + onlineCPUSet, reservedCPUSet, getGuPodCPUs(ctx, testpod2)) - time.Sleep(30 * time.Second) - // Check the cpus of ovnkubenode pods - ovnContainerCpuset2 := taskSet(ctx, containerPid, workerRTNode) - testlog.Infof("cpus used by ovn kube node pods after deleting pod %v is %v", testpod1.Name, ovnContainerCpuset2) - // we wait some time for ovs process affinity to change - time.Sleep(30 * time.Second) - - // Verify ovs-vswitchd and ovsdb-server process affinity is updated - pidToCPUs, err = getCPUMaskForPids(ctx, pidList, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - for pid, cpumask := range pidToCPUs { - testlog.Infof("OVS service pid %s is using cpus %s", pid, cpumask) - Expect(ovnContainerCpuset2.Equals(cpumask)).To(BeTrue(), "affinity of ovn kube node pods(%s) do not match with ovservices pid %s (%s)", ovnContainerCpuset2, pid, cpumask) - } - // Delete testpod2 - Expect(pods.DeleteAndSync(context.TODO(), testclient.DataPlaneClient, testpod2)).To(Succeed()) + By("Cleaning up second GU pod") + Expect(pods.DeleteAndSync(ctx, testclient.DataPlaneClient, testpod2)).To(Succeed()) }) It("[test_id:64103] ovs process affinity still excludes guaranteed pods after reboot", func() { - checkCpuCount(context.TODO(), workerRTNode) - var dp *appsv1.Deployment = newDeployment() - // create a deployment to deploy gu pods - testNode := make(map[string]string) - testNode["kubernetes.io/hostname"] = workerRTNode.Name - dp.Spec.Template.Spec.NodeSelector = testNode - err := testclient.DataPlaneClient.Create(ctx, dp) - Expect(err).ToNot(HaveOccurred(), "Unable to create Deployment") + checkCpuCount(ctx, workerRTNode) + + By("Creating a deployment with guaranteed pods on the worker node") + dp := newDeployment() + dp.Spec.Template.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": workerRTNode.Name} + Expect(testclient.DataPlaneClient.Create(ctx, dp)).To(Succeed(), "Unable to create Deployment") defer func() { - // delete deployment - testlog.Infof("Deleting Deployment %v", dp.Name) - err := testclient.DataPlaneClient.Delete(ctx, dp) - Expect(err).ToNot(HaveOccurred()) - // once deployment is deleted - // wait till the ovs process affinity is reverted back - pidList, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) - Expect(err).ToNot(HaveOccurred()) + testlog.Infof("Deleting Deployment %s from Namespace %s", dp.Name, dp.Namespace) + Expect(testclient.DataPlaneClient.Delete(ctx, dp)).To(Succeed()) Eventually(func() bool { - pidToCPUs, err := getCPUMaskForPids(ctx, pidList, workerRTNode) - Expect(err).ToNot(HaveOccurred(), "Unable to fetch affinity of ovs services") - for pid, cpumask := range pidToCPUs { - // since cpuset.CPUSet contains map in its struct field we can't compare - // the structs directly. After the deployment is deleted, the cpu mask - // of ovs services should contain all cpus , which is generally 0-N (where - // N is total number of cpus, this should be easy to compare. - if !cpumask.Equals(onlineCPUSet) { - testlog.Warningf("ovs servics pid %s cpu mask is %s instead of %s", pid, cpumask, onlineCPUSet) + affinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + for pid, mask := range affinities { + if !mask.Equals(onlineCPUSet) { + testlog.Warningf("OVS pid %s mask is %s instead of %s", pid, mask, onlineCPUSet) return false } } @@ -471,131 +366,142 @@ var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(lab }, 5*time.Minute, 10*time.Second).Should(BeTrue()) }() - ovnPod, err := ovnCnfNodePod(ctx, workerRTNode) - Expect(err).ToNot(HaveOccurred(), "Unable to get ovnPod") - ovnContainerids, err := ovnPodContainers(&ovnPod) - Expect(err).ToNot(HaveOccurred()) - containerPid, err := nodes.ContainerPid(ctx, workerRTNode, ovnContainerids[0]) - Expect(err).ToNot(HaveOccurred()) - // we need to wait as process affinity can change - time.Sleep(30 * time.Second) - ovnContainerCpuset := taskSet(ctx, containerPid, workerRTNode) - testlog.Infof("Container of ovn pod %s is using cpus %s", ovnPod.Name, ovnContainerCpuset) - pidList, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) - Expect(err).ToNot(HaveOccurred()) + dpListOpts := &client.ListOptions{ + Namespace: dp.Namespace, + FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": workerRTNode.Name}), + LabelSelector: labels.SelectorFromSet(labels.Set{"type": "telco"}), + } - //wait for 30 seconds for ovs process to have its cpu affinity updated - time.Sleep(30 * time.Second) - // Verify ovs-vswitchd and ovsdb-server process affinity is updated - pidToCPUs1, err := getCPUMaskForPids(ctx, pidList, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - for pid, cpumask := range pidToCPUs1 { - testlog.Infof("OVS service pid %s is using cpus %s", pid, cpumask) - Expect(ovnContainerCpuset).To(Equal(cpumask), "affinity of ovn kube node pods(%s) do not match with ovservices pid %s (%s)", ovnContainerCpuset, pid, cpumask) + collectAndVerify := func(phase string) { + ovnAffinity := getOvnContainerAffinity(ctx, workerRTNode) + ovsAffinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + guCPUs := collectGuCPUsFromPodList(ctx, dpListOpts) + testlog.Infof("Phase %s: GU CPUs = %s", phase, guCPUs) + verifyOvsMatchesExpected(isWorkloadPartitioningEnabled, + ovnAffinity, ovsAffinities, onlineCPUSet, reservedCPUSet, guCPUs) } - testlog.Info("Rebooting the node") - rebootCmd := "chroot /rootfs systemctl reboot" - testlog.TaggedInfof("Reboot", "Node %q: Rebooting", workerRTNode.Name) - _, _ = nodes.ExecCommand(ctx, workerRTNode, []string{"sh", "-c", rebootCmd}) - testlog.Info("Node Rebooted") - By("Waiting for node to go into not ready state after reboot") + waitForDeploymentReady(ctx, dp, dpListOpts, 2) + + By("Verifying affinities before reboot") + collectAndVerify("before-reboot") + + By(fmt.Sprintf("Rebooting the worker node %q", workerRTNode.Name)) + _, _ = nodes.ExecCommand(ctx, workerRTNode, []string{"sh", "-c", "chroot /rootfs systemctl reboot"}) nodes.WaitForNotReadyOrFail("Reboot", workerRTNode.Name, 10*time.Minute, 30*time.Second) - By("Waiting for node to be ready again after reboot") nodes.WaitForReadyOrFail("Reboot", workerRTNode.Name, 10*time.Minute, 30*time.Second) - // After reboot verify test pod created using deployment is running - // Get pods from the deployment - listOptions := &client.ListOptions{ - Namespace: dp.Namespace, - FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": workerRTNode.Name}), - LabelSelector: labels.SelectorFromSet(labels.Set{"type": "telco"}), + By("Waiting for deployment pods to be ready after reboot") + waitForDeploymentReady(ctx, dp, dpListOpts, 2) + + By("Verifying affinities after reboot") + collectAndVerify("after-reboot") + }) + + // This test must run on both compact/SNO clusters (where workerRTNode is + // itself a control-plane node) and MNO clusters with schedulable control + // plane. On MNO, the CP node may be governed by a different profile, so we + // avoid relying on the worker profile's reservedCPUSet. Instead we verify + // OVS affinity against the node's online CPU set, which is always correct + // regardless of which profile manages the node. + It("Verify OVS affinity is not restricted to reserved CPUs after control plane node reboot", func() { + isSchedulable, err := cluster.IsControlPlaneSchedulable(ctx) + Expect(err).ToNot(HaveOccurred(), "Unable to check if control plane is schedulable") + if !isSchedulable { + Skip("Control plane nodes are not schedulable") } - podList := &corev1.PodList{} - dpObj := client.ObjectKeyFromObject(dp) - Eventually(func() bool { - if err := testclient.DataPlaneClient.List(context.TODO(), podList, listOptions); err != nil { - return false - } - if err = testclient.DataPlaneClient.Get(context.TODO(), dpObj, dp); err != nil { - return false - } - if dp.Status.ReadyReplicas != int32(2) { - testlog.Warningf("Waiting for deployment: %q to have %d replicas ready, current number of replicas: %d", dpObj.String(), int32(2), dp.Status.ReadyReplicas) - return false - } - for _, s := range podList.Items[0].Status.ContainerStatuses { - if !s.Ready { - return false + + By("Finding a control plane node with OVS dynamic pinning") + var cpNode *corev1.Node + + if _, ok := workerRTNode.Labels["node-role.kubernetes.io/control-plane"]; ok { + cpNode = workerRTNode + } else { + cpNodes, err := nodes.GetByLabels(map[string]string{"node-role.kubernetes.io/control-plane": ""}) + Expect(err).ToNot(HaveOccurred()) + for i := range cpNodes { + cmd := []string{"ls", activation_file} + _, err := nodes.ExecCommand(ctx, &cpNodes[i], cmd) + if err == nil { + cpNode = &cpNodes[i] + break } } - return true - }, 5*time.Minute, 10*time.Second).Should(BeTrue()) - ovnPodAfterReboot, err := ovnCnfNodePod(ctx, workerRTNode) - Expect(err).ToNot(HaveOccurred(), "Unable to get ovnPod") - ovnContainerIdsAfterReboot, err := ovnPodContainers(&ovnPodAfterReboot) - Expect(err).ToNot(HaveOccurred()) - containerPid, err = nodes.ContainerPid(ctx, workerRTNode, ovnContainerIdsAfterReboot[0]) - Expect(err).ToNot(HaveOccurred()) - // we need to wait as process affinity can change - time.Sleep(30 * time.Second) - ovnContainerCpusetAfterReboot := taskSet(ctx, containerPid, workerRTNode) - testlog.Infof("cpus used by ovn kube node pods %v", ovnContainerCpusetAfterReboot) - pidListAfterReboot, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) + } - Expect(err).ToNot(HaveOccurred()) + if cpNode == nil { + Skip("No control plane node with OVS dynamic pinning found") + } + testlog.Infof("Using control plane node: %s", cpNode.Name) - // Verify ovs-vswitchd and ovsdb-server process affinity is updated - pidToCPUs2, err := getCPUMaskForPids(ctx, pidListAfterReboot, workerRTNode) + cpOnlineCPUs, err := nodes.GetOnlineCPUsSet(ctx, cpNode) Expect(err).ToNot(HaveOccurred()) - for pid, cpumask := range pidToCPUs2 { - testlog.Infof("OVS service pid %s is using cpus %s", pid, cpumask) - Expect(ovnContainerCpusetAfterReboot).To(Equal(cpumask), "affinity of ovn kube node pods(%s) do not match with ovservices pid %s (%s)", ovnContainerCpusetAfterReboot, pid, cpumask) + + cpOvsServices := ovsSystemdServicesOnOvsSlice(ctx, cpNode) + + By("Verify OVS affinity before reboot spans all online CPUs") + ovsBeforeReboot := getOvsAffinities(ctx, cpOvsServices, cpNode) + Expect(ovsBeforeReboot).ToNot(BeEmpty(), "Expected non-empty OVS affinities on control-plane node before reboot") + for pid, mask := range ovsBeforeReboot { + testlog.Infof("Control plane OVS pid %s affinity before reboot: %s", pid, mask) + Expect(mask.Equals(cpOnlineCPUs)).To(BeTrue(), + "OVS pid %s affinity (%s) should match online CPUs (%s) before reboot", + pid, mask, cpOnlineCPUs) + } + + By(fmt.Sprintf("Rebooting the control plane node %q", cpNode.Name)) + _, _ = nodes.ExecCommand(ctx, cpNode, []string{"sh", "-c", "chroot /rootfs systemctl reboot"}) + nodes.WaitForNotReadyOrFail("Reboot", cpNode.Name, 10*time.Minute, 30*time.Second) + nodes.WaitForReadyOrFail("Reboot", cpNode.Name, 10*time.Minute, 30*time.Second) + + By("Waiting for OVN pod to be ready on the control plane node") + Eventually(func() error { + _, err := ovnCnfNodePod(ctx, cpNode) + return err + }, 5*time.Minute, 10*time.Second).Should(Succeed(), + "OVN pod did not become ready on control plane node after reboot") + + By("Verify OVS affinity after reboot spans all online CPUs") + ovsAfterReboot := getOvsAffinities(ctx, cpOvsServices, cpNode) + Expect(ovsAfterReboot).ToNot(BeEmpty(), "Expected non-empty OVS affinities on control-plane node after reboot") + for pid, mask := range ovsAfterReboot { + testlog.Infof("Control plane OVS pid %s affinity after reboot: %s", pid, mask) + Expect(mask.Equals(cpOnlineCPUs)).To(BeTrue(), + "OVS pid %s affinity (%s) should match online CPUs (%s) after reboot", + pid, mask, cpOnlineCPUs) } }) // Automates OCPBUGS-35347: ovs-vswitchd is using isolated cpu pool instead of reserved pool It("[test_id:75257] verify ovs-switchd threads inherit cpu affinity", func() { - checkCpuCount(context.TODO(), workerRTNode) - By("Get Thread Affinity of ovs-vswitchd process") - //Get ovs-switchd thread affinity + checkCpuCount(ctx, workerRTNode) + + By("Verifying ovs-vswitchd thread affinity covers online CPUs") threadAffinity, err := ovsSwitchdThreadAffinity(ctx, workerRTNode) Expect(err).ToNot(HaveOccurred()) - - // Verify thread affinity contains all the online cpu's for _, line := range threadAffinity { if line != "" { - cpumask := strings.Split(line, ":") - threadsCpuset, err := cpuset.Parse(strings.TrimSpace(cpumask[1])) + parts := strings.Split(line, ":") + threadsCpuset, err := cpuset.Parse(strings.TrimSpace(parts[1])) Expect(err).ToNot(HaveOccurred()) - Expect(threadsCpuset.Equals(onlineCPUSet), "actual cpuset %s not equals to expected cpuset %s", threadsCpuset, onlineCPUSet) + Expect(threadsCpuset.Equals(onlineCPUSet)).To(BeTrue(), + "actual cpuset %s not equals to expected cpuset %s", threadsCpuset, onlineCPUSet) } } - // create deployment with 2 replicas and each pod have 2 cpus - var dp *appsv1.Deployment = newDeployment() - testNode := make(map[string]string) - testNode["kubernetes.io/hostname"] = workerRTNode.Name - dp.Spec.Template.Spec.NodeSelector = testNode - err = testclient.DataPlaneClient.Create(ctx, dp) - Expect(err).ToNot(HaveOccurred(), "Unable to create Deployment") + By("Creating a deployment with 2 guaranteed replicas") + dp := newDeployment() + dp.Spec.Template.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": workerRTNode.Name} + Expect(testclient.DataPlaneClient.Create(ctx, dp)).To(Succeed(), "Unable to create Deployment") - // Delete deployment defer func() { - By("Delete Deployment") - testlog.Infof("Deleting Deployment %v", dp.Name) - err := testclient.DataPlaneClient.Delete(ctx, dp) - Expect(err).ToNot(HaveOccurred()) - // once deployment is deleted - // wait till the ovs process affinity is reverted back - pidList, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) - Expect(err).ToNot(HaveOccurred()) + testlog.Infof("Deleting Deployment %s", dp.Name) + Expect(testclient.DataPlaneClient.Delete(ctx, dp)).To(Succeed()) Eventually(func() bool { - pidToCPUs, err := getCPUMaskForPids(ctx, pidList, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - for pid, cpumask := range pidToCPUs { - if !cpumask.Equals(onlineCPUSet) { - testlog.Warningf("ovs servics pid %s cpu mask is %s instead of %s", pid, cpumask, onlineCPUSet) + affinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + for pid, mask := range affinities { + if !mask.Equals(onlineCPUSet) { + testlog.Warningf("OVS pid %s mask is %s instead of %s", pid, mask, onlineCPUSet) return false } } @@ -603,110 +509,221 @@ var _ = Describe("[performance] Cgroups and affinity", Ordered, Label(string(lab }, 2*time.Minute, 10*time.Second).Should(BeTrue()) }() - testlog.Info("Get the pods list form the deployment") - // Get pods from the deployment - listOptions := &client.ListOptions{ + listOpts := &client.ListOptions{ Namespace: dp.Namespace, FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": workerRTNode.Name}), LabelSelector: labels.SelectorFromSet(labels.Set{"type": "telco"}), } - podList := &corev1.PodList{} - dpObj := client.ObjectKeyFromObject(dp) - Eventually(func() bool { - if err := testclient.DataPlaneClient.List(context.TODO(), podList, listOptions); err != nil { - return false - } - if err = testclient.DataPlaneClient.Get(context.TODO(), dpObj, dp); err != nil { - return false - } - if dp.Status.ReadyReplicas != int32(2) { - testlog.Warningf("Waiting for deployment: %q to have %d replicas ready, current number of replicas: %d", dpObj.String(), int32(2), dp.Status.ReadyReplicas) - return false - } - for _, s := range podList.Items[0].Status.ContainerStatuses { - if !s.Ready { - return false - } - } - return true - }, 10*time.Second, 5*time.Second).Should(BeTrue()) - testlog.Info("Get ovs-vswitchd threads affinity post deployment") - postDeploymentThreadAffinity, err := ovsSwitchdThreadAffinity(ctx, workerRTNode) + podList := waitForDeploymentReady(ctx, dp, listOpts, 2) + + By("Verifying GU pod CPUs are not a subset of ovs-vswitchd thread affinity") + postDeployThreadAffinity, err := ovsSwitchdThreadAffinity(ctx, workerRTNode) Expect(err).ToNot(HaveOccurred()) - for _, pod := range podList.Items { - cmd := []string{"taskset", "-pc", "1"} - outputb, err := pods.ExecCommandOnPod(testclient.K8sClient, &pod, "", cmd) - Expect(err).ToNot(HaveOccurred()) - testpodCpus := bytes.Split(outputb, []byte(":")) - testlog.Infof("%v pod is using cpus %v", pod.Name, string(testpodCpus[1])) - podcpus, err := cpuset.Parse(strings.TrimSpace(string(testpodCpus[1]))) - Expect(err).ToNot(HaveOccurred()) - for _, line := range postDeploymentThreadAffinity { + for i := range podList.Items { + podcpus := getGuPodCPUs(ctx, &podList.Items[i]) + for _, line := range postDeployThreadAffinity { if line != "" { - cpumask := strings.Split(line, ":") - threadsCpuset, err := cpuset.Parse(strings.TrimSpace(cpumask[1])) + parts := strings.Split(line, ":") + threadsCpuset, err := cpuset.Parse(strings.TrimSpace(parts[1])) Expect(err).ToNot(HaveOccurred()) - testlog.Infof("ovs-switchd thread CpuAffinity: %s, pod %s Affinity: %s", threadsCpuset, pod.Name, podcpus) - Expect(podcpus.IsSubsetOf(threadsCpuset)).To(Equal(false)) + testlog.Infof("ovs-vswitchd thread affinity: %s, pod %s affinity: %s", threadsCpuset, podList.Items[i].Name, podcpus) + Expect(podcpus.IsSubsetOf(threadsCpuset)).To(BeFalse()) } } } - testlog.Infof("Deleting one of the pods of the deployment %q", dpObj.String()) + + By("Deleting one pod and waiting for replacement") podToDelete := podList.Items[0] Expect(pods.DeleteAndSync(ctx, testclient.DataPlaneClient, &podToDelete)).To(Succeed()) + podList = waitForDeploymentReady(ctx, dp, listOpts, 2) - Eventually(func() bool { - err = testclient.DataPlaneClient.Get(context.TODO(), dpObj, dp) - if dp.Status.ReadyReplicas != int32(2) { - testlog.Warningf("Waiting for deployment: %q to have %d replicas ready, current number of replicas: %d", dpObj.String(), int32(2), dp.Status.ReadyReplicas) - return false + By("Verifying GU pod CPUs are still not a subset of ovs-vswitchd thread affinity") + refreshedThreadAffinity, err := ovsSwitchdThreadAffinity(ctx, workerRTNode) + Expect(err).ToNot(HaveOccurred()) + for i := range podList.Items { + podcpus := getGuPodCPUs(ctx, &podList.Items[i]) + for _, line := range refreshedThreadAffinity { + if line != "" { + parts := strings.Split(line, ":") + threadsCpuset, err := cpuset.Parse(strings.TrimSpace(parts[1])) + Expect(err).ToNot(HaveOccurred()) + testlog.Infof("ovs-vswitchd thread affinity: %s, pod %s affinity: %s", threadsCpuset, podList.Items[i].Name, podcpus) + Expect(podcpus.IsSubsetOf(threadsCpuset)).To(BeFalse()) + } } - return true - }).WithTimeout(time.Minute*5).WithPolling(time.Second*30).Should(BeTrue(), "deployment %q failed to have %d running replicas within the defined period", dpObj.String(), int32(2)) + } + }) + }) - // Get pods from the deployment - listOptions = &client.ListOptions{ - Namespace: dp.Namespace, - FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": workerRTNode.Name}), - LabelSelector: labels.SelectorFromSet(labels.Set{"type": "telco"}), + Context("Workload Partitioning OVS affinity", Label(string(label.Tier2)), func() { + BeforeEach(func() { + if !isWorkloadPartitioningEnabled { + Skip("Workload partitioning is not enabled on this cluster") } - podList = &corev1.PodList{} - Eventually(func() bool { - if err := testclient.DataPlaneClient.List(context.TODO(), podList, listOptions); err != nil { - return false - } - if len(podList.Items) < 1 { - return false + }) + + It("Verify OVN pod is restricted to reserved CPUs under workload partitioning", func() { + ovnPod, err := ovnCnfNodePod(ctx, workerRTNode) + Expect(err).ToNot(HaveOccurred(), "Unable to get ovnPod") + containerIds, err := ovnPodContainers(&ovnPod) + Expect(err).ToNot(HaveOccurred()) + + By("Verify each OVN container's cgroup cpuset and process affinity are restricted to reserved CPUs") + for _, ctn := range containerIds { + pid, err := nodes.ContainerPid(ctx, workerRTNode, ctn) + Expect(err).ToNot(HaveOccurred()) + + ctnAffinity := taskSet(ctx, pid, workerRTNode) + testlog.Infof("OVN container pid %s affinity: %s", pid, ctnAffinity) + Expect(ctnAffinity).To(Equal(reservedCPUSet), + "Under workload partitioning, OVN container pid %s affinity (%s) should equal reserved CPUs (%s)", + pid, ctnAffinity, reservedCPUSet) + } + }) + + It("Verify OVS affinity is wider than OVN pod under workload partitioning", func() { + By("Get OVN container affinity") + ovnPod, err := ovnCnfNodePod(ctx, workerRTNode) + Expect(err).ToNot(HaveOccurred(), "Unable to get ovnPod") + ovnContainerids, err := ovnPodContainers(&ovnPod) + Expect(err).ToNot(HaveOccurred()) + containerPid, err := nodes.ContainerPid(ctx, workerRTNode, ovnContainerids[0]) + Expect(err).ToNot(HaveOccurred()) + + time.Sleep(30 * time.Second) + ctnCpuset := taskSet(ctx, containerPid, workerRTNode) + testlog.Infof("OVN container affinity: %s", ctnCpuset) + Expect(ctnCpuset).To(Equal(reservedCPUSet), + "OVN container should be restricted to reserved CPUs under workload partitioning") + + By("Get OVS process affinity and verify it is wider") + pidList, err := ovsPids(ctx, ovsSystemdServices, workerRTNode) + Expect(err).ToNot(HaveOccurred()) + pidToCPUs, err := getCPUMaskForPids(ctx, pidList, workerRTNode) + Expect(err).ToNot(HaveOccurred()) + + for pid, ovsAffinity := range pidToCPUs { + testlog.Infof("OVS service pid %s affinity: %s", pid, ovsAffinity) + Expect(reservedCPUSet.IsSubsetOf(ovsAffinity)).To(BeTrue(), + "Reserved CPUs (%s) should be a subset of OVS affinity (%s)", reservedCPUSet, ovsAffinity) + Expect(ovsAffinity.Equals(reservedCPUSet)).To(BeFalse(), + "OVS affinity (%s) should be wider than reserved CPUs (%s) when no GU pods exist", ovsAffinity, reservedCPUSet) + } + }) + + It("Verify reserved CPUs are always included in OVS affinity under workload partitioning", func() { + checkCpuCount(ctx, workerRTNode) + + By("Verify reserved CPUs are part of OVS affinity before creating GU pods") + ovsAffinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + for pid, mask := range ovsAffinities { + Expect(reservedCPUSet.IsSubsetOf(mask)).To(BeTrue(), + "Reserved CPUs (%s) should be a subset of OVS affinity (%s) for pid %s", reservedCPUSet, mask, pid) + } + + By("Create a GU pod and verify reserved CPUs remain in OVS affinity") + testpod := createGuPod(ctx, workerRTNode) + guCPUs := getGuPodCPUs(ctx, testpod) + + ovsAffinities = getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + for pid, mask := range ovsAffinities { + testlog.Infof("OVS pid %s affinity with GU pod: %s", pid, mask) + Expect(reservedCPUSet.IsSubsetOf(mask)).To(BeTrue(), + "Reserved CPUs (%s) should still be a subset of OVS affinity (%s) for pid %s", reservedCPUSet, mask, pid) + Expect(guCPUs.IsSubsetOf(mask)).To(BeFalse(), + "GU pod CPUs (%s) should NOT be a subset of OVS affinity (%s)", guCPUs, mask) + } + + By("Delete the GU pod and verify reserved CPUs are still in OVS affinity") + Expect(pods.DeleteAndSync(ctx, testclient.DataPlaneClient, testpod)).To(Succeed()) + + ovsAffinities = getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + for pid, mask := range ovsAffinities { + testlog.Infof("OVS pid %s affinity after GU pod deletion: %s", pid, mask) + Expect(reservedCPUSet.IsSubsetOf(mask)).To(BeTrue(), + "Reserved CPUs (%s) should be a subset of OVS affinity (%s) for pid %s after deletion", reservedCPUSet, mask, pid) + Expect(mask).To(Equal(onlineCPUSet), + "OVS affinity should return to all online CPUs after GU pod deletion") + } + }) + + It("Verify OVS affinity excludes CPUs pinned by guaranteed pods under workload partitioning", func() { + checkCpuCount(ctx, workerRTNode) + + isolatedCPUs := onlineCPUSet.Difference(reservedCPUSet) + isolatedCount := isolatedCPUs.Size() + if isolatedCount < 2 { + Skip("Not enough isolated CPUs to run this test") + } + + nodeLoad, err := baseload.ForNode(ctx, testclient.DataPlaneClient, workerRTNode.Name) + Expect(err).ToNot(HaveOccurred()) + availableCPUs := nodeLoad.AvailableCPUs(isolatedCount) + availableCPUs = availableCPUs &^ 1 // round down to even to satisfy SMT alignment + testlog.Infof("Isolated CPUs: %d, already consumed: %d, available (even-aligned): %d", + isolatedCount, nodeLoad.CPURequestedCores(), availableCPUs) + if availableCPUs < 2 { + Skip(fmt.Sprintf("Not enough available isolated CPUs: %d available out of %d total", availableCPUs, isolatedCount)) + } + + By(fmt.Sprintf("Creating GU pods to consume %d available isolated CPUs", availableCPUs)) + var guPods []*corev1.Pod + remainingIsolated := availableCPUs + for remainingIsolated > 0 { + cpuRequest := remainingIsolated + if cpuRequest > 2 { + cpuRequest = 2 } - for _, s := range podList.Items[0].Status.ContainerStatuses { - if !s.Ready { - return false - } + testpod := pods.GetTestPod() + testpod.Namespace = testutils.NamespaceTesting + testpod.Spec.Containers[0].Resources = corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", cpuRequest)), + corev1.ResourceMemory: resource.MustParse("200Mi"), + }, } - return true - }, 10*time.Second, 5*time.Second).Should(BeTrue()) - // Post deletion of deployment pods verify ovs-vswitchd thread affinity - refresshedThreadAffinity, err := ovsSwitchdThreadAffinity(ctx, workerRTNode) - Expect(err).ToNot(HaveOccurred()) - for _, pod := range podList.Items { - cmd := []string{"taskset", "-pc", "1"} - outputb, err := pods.ExecCommandOnPod(testclient.K8sClient, &pod, "", cmd) + testpod.Spec.NodeSelector = map[string]string{testutils.LabelHostname: workerRTNode.Name} + err := testclient.DataPlaneClient.Create(ctx, testpod) Expect(err).ToNot(HaveOccurred()) - testpodCpus := bytes.Split(outputb, []byte(":")) - testlog.Infof("%v pod is using cpus %v", pod.Name, string(testpodCpus[1])) - podcpus, err := cpuset.Parse(strings.TrimSpace(string(testpodCpus[1]))) + testpod, err = pods.WaitForCondition(ctx, client.ObjectKeyFromObject(testpod), corev1.PodReady, corev1.ConditionTrue, 5*time.Minute) Expect(err).ToNot(HaveOccurred()) - for _, line := range refresshedThreadAffinity { - if line != "" { - cpumask := strings.Split(line, ":") - threadsCpuset, err := cpuset.Parse(strings.TrimSpace(cpumask[1])) - Expect(err).ToNot(HaveOccurred()) - testlog.Infof("ovs-switchd thread CpuAffinity: %s, pod %s Affinity: %s", threadsCpuset, pod.Name, podcpus) - Expect(podcpus.IsSubsetOf(threadsCpuset)).To(Equal(false)) - } - } + Expect(testpod.Status.QOSClass).To(Equal(corev1.PodQOSGuaranteed)) + guPods = append(guPods, testpod) + remainingIsolated -= cpuRequest } + defer func() { + for _, p := range guPods { + testlog.Infof("Cleaning up GU pod %s", p.Name) + Expect(pods.DeleteAndSync(ctx, testclient.DataPlaneClient, p)).To(Succeed()) + } + Eventually(func() bool { + affinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + for pid, mask := range affinities { + if !mask.Equals(onlineCPUSet) { + testlog.Warningf("OVS pid %s mask is %s instead of %s", pid, mask, onlineCPUSet) + return false + } + } + return true + }, 5*time.Minute, 10*time.Second).Should(BeTrue()) + }() + + guPodCPUs := cpuset.New() + for _, p := range guPods { + guPodCPUs = guPodCPUs.Union(getGuPodCPUs(ctx, p)) + } + expected := expectedOvsAffinity(onlineCPUSet, guPodCPUs) + testlog.Infof("GU pod CPUs: %s, expected OVS affinity: %s", guPodCPUs, expected) + + By("Verify OVS affinity excludes the guaranteed pods' pinned CPUs") + ovsAffinities := getOvsAffinities(ctx, ovsSystemdServices, workerRTNode) + for pid, mask := range ovsAffinities { + testlog.Infof("OVS pid %s affinity: %s (expected: %s)", pid, mask, expected) + Expect(mask.Equals(expected)).To(BeTrue(), + "OVS pid %s affinity (%s) should equal expected (%s) after GU pod pinning", + pid, mask, expected) + } }) }) @@ -732,6 +749,7 @@ func cpuSpecToString(cpus *performancev2.CPU) string { // checkCpuCount check if the node has sufficient cpus func checkCpuCount(ctx context.Context, workerNode *corev1.Node) { + GinkgoHelper() out, err := nodes.ExecCommand(ctx, workerNode, []string{"nproc", "--all"}) if err != nil { Fail(fmt.Sprintf("Failed to fetch online CPUs: %v", err)) @@ -850,6 +868,7 @@ func newDeployment() *appsv1.Deployment { // ovsSystemdServicesOnOvsSlice returns the systemd services dependent on ovs.slice cgroup func ovsSystemdServicesOnOvsSlice(ctx context.Context, workerRTNode *corev1.Node) []string { + GinkgoHelper() ovsServices, err := systemd.ShowProperty(context.TODO(), "ovs.slice", "RequiredBy", workerRTNode) Expect(err).ToNot(HaveOccurred()) serviceList := strings.Split(strings.TrimSpace(ovsServices), "=") @@ -883,6 +902,7 @@ func ovsPids(ctx context.Context, ovsSystemdServices []string, workerRTNode *cor // taskSet returns cpus used by the pid func taskSet(ctx context.Context, pid string, workerRTNode *corev1.Node) cpuset.CPUSet { + GinkgoHelper() cmd := []string{"taskset", "-pc", pid} out, err := nodes.ExecCommand(ctx, workerRTNode, cmd) Expect(err).ToNot(HaveOccurred(), "unable to fetch cpus using taskset") @@ -910,3 +930,144 @@ func ovsSwitchdThreadAffinity(ctx context.Context, workerRTNode *corev1.Node) ([ threadAffinity := strings.Split(string(out), "\n") return threadAffinity, nil } + +// expectedOvsAffinity computes the expected OVS process CPU affinity set. +// Formula: Online - GU_Pinned +// Guaranteed pods are pinned exclusively to isolated CPUs, so removing them +// from the full online set yields the correct expected affinity. +func expectedOvsAffinity(onlineCPUs, guPodCPUs cpuset.CPUSet) cpuset.CPUSet { + return onlineCPUs.Difference(guPodCPUs) +} + +// getOvnContainerAffinity returns the CPU affinity of the first OVN container +// on the given node, waiting 30s for affinity to stabilise. +func getOvnContainerAffinity(ctx context.Context, node *corev1.Node) cpuset.CPUSet { + GinkgoHelper() + ovnPod, err := ovnCnfNodePod(ctx, node) + Expect(err).ToNot(HaveOccurred(), "Unable to get ovnPod") + containerIds, err := ovnPodContainers(&ovnPod) + Expect(err).ToNot(HaveOccurred()) + pid, err := nodes.ContainerPid(ctx, node, containerIds[0]) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(30 * time.Second) + affinity := taskSet(ctx, pid, node) + testlog.Infof("OVN container %s affinity: %s", ovnPod.Name, affinity) + return affinity +} + +// getOvsAffinities returns a pid-to-cpuset map for all OVS service processes +// on the given node, waiting 30s for affinity to stabilise. +func getOvsAffinities(ctx context.Context, services []string, node *corev1.Node) map[string]cpuset.CPUSet { + GinkgoHelper() + pidList, err := ovsPids(ctx, services, node) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(30 * time.Second) + affinities, err := getCPUMaskForPids(ctx, pidList, node) + Expect(err).ToNot(HaveOccurred()) + return affinities +} + +// verifyOvsAffinity asserts that every OVS process has the given expected CPU set. +func verifyOvsAffinity(ovsAffinities map[string]cpuset.CPUSet, expected cpuset.CPUSet) { + GinkgoHelper() + for pid, mask := range ovsAffinities { + testlog.Infof("OVS pid %s affinity: %s (expected: %s)", pid, mask, expected) + Expect(mask).To(Equal(expected)) + } +} + +// verifyOvsMatchesExpected handles the WP/non-WP verification pattern: +// - Under WP: asserts OVN is on reservedCPUs, computes expected OVS affinity +// via the formula, and verifies all OVS pids match. +// - Without WP: asserts all OVS pids match the OVN container affinity. +func verifyOvsMatchesExpected(isWP bool, ovnAffinity cpuset.CPUSet, + ovsAffinities map[string]cpuset.CPUSet, onlineCPUs, reservedCPUs, guCPUs cpuset.CPUSet) { + GinkgoHelper() + if isWP { + Expect(ovnAffinity).To(Equal(reservedCPUs), + "Under WP, OVN container should be restricted to reserved cpus") + expected := expectedOvsAffinity(onlineCPUs, guCPUs) + verifyOvsAffinity(ovsAffinities, expected) + } else { + verifyOvsAffinity(ovsAffinities, ovnAffinity) + } +} + +// createGuPod creates a 2-CPU Guaranteed QoS pod on the given node, waits for +// it to be ready, and returns it. +func createGuPod(ctx context.Context, node *corev1.Node) *corev1.Pod { + GinkgoHelper() + testpod := pods.GetTestPod() + testpod.Namespace = testutils.NamespaceTesting + testpod.Spec.Containers[0].Resources = corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("200Mi"), + }, + } + testpod.Spec.NodeSelector = map[string]string{testutils.LabelHostname: node.Name} + err := testclient.DataPlaneClient.Create(ctx, testpod) + Expect(err).ToNot(HaveOccurred()) + testpod, err = pods.WaitForCondition(ctx, client.ObjectKeyFromObject(testpod), corev1.PodReady, corev1.ConditionTrue, 5*time.Minute) + Expect(err).ToNot(HaveOccurred()) + Expect(testpod.Status.QOSClass).To(Equal(corev1.PodQOSGuaranteed)) + testlog.Infof("GU pod %s pinned to cpus %s", testpod.Name, getGuPodCPUs(ctx, testpod)) + return testpod +} + +// waitForDeploymentReady polls until the deployment has the expected number of +// ready replicas and returns the matching pod list. +func waitForDeploymentReady(ctx context.Context, dp *appsv1.Deployment, listOpts *client.ListOptions, replicas int32) *corev1.PodList { + GinkgoHelper() + podList := &corev1.PodList{} + dpObj := client.ObjectKeyFromObject(dp) + Eventually(func() bool { + if err := testclient.DataPlaneClient.List(ctx, podList, listOpts); err != nil { + return false + } + if err := testclient.DataPlaneClient.Get(ctx, dpObj, dp); err != nil { + return false + } + if dp.Status.ReadyReplicas != replicas { + testlog.Warningf("Deployment %q: %d/%d replicas ready", dpObj.String(), dp.Status.ReadyReplicas, replicas) + return false + } + if len(podList.Items) == 0 { + return false + } + for _, s := range podList.Items[0].Status.ContainerStatuses { + if !s.Ready { + return false + } + } + return true + }, 5*time.Minute, 10*time.Second).Should(BeTrue()) + return podList +} + +// collectGuCPUsFromPodList lists pods matching the given options and returns +// the union of all their exclusively pinned CPUs. +func collectGuCPUsFromPodList(ctx context.Context, listOpts *client.ListOptions) cpuset.CPUSet { + GinkgoHelper() + podList := &corev1.PodList{} + err := testclient.DataPlaneClient.List(ctx, podList, listOpts) + Expect(err).ToNot(HaveOccurred()) + allCPUs := cpuset.New() + for i := range podList.Items { + allCPUs = allCPUs.Union(getGuPodCPUs(ctx, &podList.Items[i])) + } + return allCPUs +} + +// getGuPodCPUs extracts the exclusively pinned CPUs from a Guaranteed QoS pod +// by running taskset inside the pod. +func getGuPodCPUs(ctx context.Context, pod *corev1.Pod) cpuset.CPUSet { + GinkgoHelper() + cmd := []string{"taskset", "-pc", "1"} + outputb, err := pods.ExecCommandOnPod(testclient.K8sClient, pod, "", cmd) + Expect(err).ToNot(HaveOccurred()) + parts := bytes.Split(outputb, []byte(":")) + podcpus, err := cpuset.Parse(strings.TrimSpace(string(parts[1]))) + Expect(err).ToNot(HaveOccurred()) + return podcpus +} diff --git a/test/e2e/performanceprofile/functests/utils/cluster/cluster.go b/test/e2e/performanceprofile/functests/utils/cluster/cluster.go index 44bc1e2dba..ce17a4c878 100644 --- a/test/e2e/performanceprofile/functests/utils/cluster/cluster.go +++ b/test/e2e/performanceprofile/functests/utils/cluster/cluster.go @@ -4,6 +4,7 @@ import ( "context" "time" + configv1 "github.com/openshift/api/config/v1" clientconfigv1 "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1" testclient "github.com/openshift/cluster-node-tuning-operator/test/e2e/performanceprofile/functests/utils/client" corev1 "k8s.io/api/core/v1" @@ -47,3 +48,19 @@ func IsControlPlaneSchedulable(ctx context.Context) (bool, error) { } return schedulerInfo.Spec.MastersSchedulable, nil } + +// IsWorkloadPartitioningEnabled checks whether CPU partitioning is enabled +// cluster-wide by querying the Infrastructure resource's CPUPartitioning status. +func IsWorkloadPartitioningEnabled(ctx context.Context) (bool, error) { + cfg, err := config.GetConfig() + if err != nil { + return false, err + } + + openshiftConfigClient := clientconfigv1.NewForConfigOrDie(cfg) + infra, err := openshiftConfigClient.Infrastructures().Get(ctx, "cluster", metav1.GetOptions{}) + if err != nil { + return false, err + } + return infra.Status.CPUPartitioning == configv1.CPUPartitioningAllNodes, nil +}