package node import ( "context" "errors" "fmt" "strings" "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "example.com/monok8s/pkg/kube" "example.com/monok8s/pkg/system" ) const ( healthCheckNamespace = "kube-system" healthCheckTimeout = 60 * time.Second ) func RunKubeadmUpgradeApply(ctx context.Context, nctx *NodeContext) error { if nctx.BootstrapState == nil { return errors.New("BootstrapState is nil. Please run earlier steps first") } if nctx.BootstrapState.Action != BootstrapActionUpgradeControlPlane { klog.V(4).Infof("skipped for %s", nctx.BootstrapState.Action) return nil } if strings.TrimSpace(tmpKubeadmInitConf) == "" { return fmt.Errorf("tmp kubeadm config path is empty") } pauseImage, err := resolvePauseImage(ctx, nctx, nctx.Config.Spec.KubernetesVersion) if err != nil { return fmt.Errorf("resolve pause image: %w", err) } klog.InfoS("resolved kubeadm pause image", "image", pauseImage) clients, err := kube.NewClientsFromKubeconfig(adminKubeconfigPath) if err != nil { return fmt.Errorf("build kube clients from %s: %w", adminKubeconfigPath, err) } if err := runUpgradeSelfHealthCheck(ctx, clients.Kubernetes, pauseImage); err != nil { return fmt.Errorf("pre-upgrade self health check failed: %w", err) } args := []string{ "upgrade", "apply", "-y", nctx.Config.Spec.KubernetesVersion, "--ignore-preflight-errors=CreateJob", } _, err = nctx.SystemRunner.RunWithOptions( ctx, "kubeadm", args, system.RunOptions{ Timeout: 15 * time.Minute, OnStdoutLine: func(line string) { klog.Infof("[kubeadm] %s", line) }, OnStderrLine: func(line string) { klog.Infof("[kubeadm] %s", line) }, }, ) if err != nil { return fmt.Errorf("run kubeadm upgrade apply: %w", err) } return nil } func resolvePauseImage(ctx context.Context, nctx *NodeContext, kubeVersion string) (string, error) { result, err := nctx.SystemRunner.Run( ctx, "kubeadm", "config", "images", "list", "--kubernetes-version", kubeVersion, ) if err != nil { return "", fmt.Errorf("kubeadm config images list: %w", err) } for _, line := range strings.Split(result.Stdout, "\n") { line = strings.TrimSpace(line) if line == "" { continue } // examples: // registry.k8s.io/pause:3.10 // some.registry.local/pause:3.10 if strings.Contains(line, "/pause:") || strings.HasPrefix(line, "pause:") { return line, nil } } return "", fmt.Errorf("pause image not found in kubeadm image list output") } func runUpgradeSelfHealthCheck(ctx context.Context, kubeClient kubernetes.Interface, pauseImage string) error { name := fmt.Sprintf("preupgrade-health-check-%d", time.Now().UnixMilli()) pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: healthCheckNamespace, Labels: map[string]string{ "app.kubernetes.io/name": "preupgrade-health-check", "app.kubernetes.io/managed-by": "monok8s", }, }, Spec: corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyNever, Tolerations: []corev1.Toleration{ { Operator: corev1.TolerationOpExists, }, }, Containers: []corev1.Container{ { Name: "check", Image: pauseImage, ImagePullPolicy: corev1.PullIfNotPresent, }, }, }, } klog.InfoS("creating pre-upgrade health-check pod", "namespace", pod.Namespace, "name", pod.Name, "image", pauseImage) created, err := kubeClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("create health-check pod %s/%s: %w", pod.Namespace, pod.Name, err) } defer func() { delCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() propagation := metav1.DeletePropagationBackground err := kubeClient.CoreV1().Pods(created.Namespace).Delete(delCtx, created.Name, metav1.DeleteOptions{ PropagationPolicy: &propagation, }) if err != nil && !apierrors.IsNotFound(err) { klog.ErrorS(err, "failed to delete health-check pod", "namespace", created.Namespace, "name", created.Name) } }() waitCtx, cancel := context.WithTimeout(ctx, healthCheckTimeout) defer cancel() err = wait.PollUntilContextCancel(waitCtx, 1*time.Second, true, func(ctx context.Context) (bool, error) { cur, err := kubeClient.CoreV1().Pods(created.Namespace).Get(ctx, created.Name, metav1.GetOptions{}) if err != nil { if apierrors.IsNotFound(err) { return false, nil } return false, err } switch cur.Status.Phase { case corev1.PodRunning: if isPodReady(cur) { klog.InfoS("pre-upgrade health-check pod is ready", "namespace", cur.Namespace, "name", cur.Name, "node", cur.Spec.NodeName) return true, nil } return false, nil case corev1.PodSucceeded: // unlikely for pause, but fine if it somehow happens klog.InfoS("pre-upgrade health-check pod succeeded", "namespace", cur.Namespace, "name", cur.Name, "node", cur.Spec.NodeName) return true, nil case corev1.PodFailed: return false, fmt.Errorf("health-check pod failed: reason=%q message=%q", cur.Status.Reason, cur.Status.Message) default: return false, nil } }) if err != nil { descErr := describeHealthCheckFailure(ctx, kubeClient, created.Namespace, created.Name) if descErr != nil { klog.ErrorS(descErr, "failed to collect health-check diagnostics", "namespace", created.Namespace, "name", created.Name) } return fmt.Errorf("wait for health-check pod readiness: %w", err) } return nil } func isPodReady(pod *corev1.Pod) bool { for _, cond := range pod.Status.Conditions { if cond.Type == corev1.PodReady { return cond.Status == corev1.ConditionTrue } } return false } func describeHealthCheckFailure(ctx context.Context, kubeClient kubernetes.Interface, namespace, name string) error { pod, err := kubeClient.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { return fmt.Errorf("get failed health-check pod: %w", err) } klog.ErrorS(nil, "health-check pod did not become ready", "namespace", pod.Namespace, "name", pod.Name, "phase", pod.Status.Phase, "reason", pod.Status.Reason, "message", pod.Status.Message, "node", pod.Spec.NodeName, ) for _, cs := range pod.Status.ContainerStatuses { if cs.State.Waiting != nil { klog.ErrorS(nil, "container waiting", "container", cs.Name, "reason", cs.State.Waiting.Reason, "message", cs.State.Waiting.Message, ) } if cs.State.Terminated != nil { klog.ErrorS(nil, "container terminated", "container", cs.Name, "reason", cs.State.Terminated.Reason, "message", cs.State.Terminated.Message, "exitCode", cs.State.Terminated.ExitCode, ) } } events, err := kubeClient.CoreV1().Events(namespace).List(ctx, metav1.ListOptions{ FieldSelector: fmt.Sprintf("involvedObject.kind=Pod,involvedObject.name=%s", name), }) if err != nil { return fmt.Errorf("list pod events: %w", err) } for _, ev := range events.Items { klog.ErrorS(nil, "health-check pod event", "type", ev.Type, "reason", ev.Reason, "message", ev.Message, "count", ev.Count, ) } return nil }