259 lines
7.1 KiB
Go
259 lines
7.1 KiB
Go
package node
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/klog/v2"
|
|
|
|
"example.com/monok8s/pkg/kube"
|
|
"example.com/monok8s/pkg/system"
|
|
)
|
|
|
|
const (
|
|
healthCheckNamespace = "kube-system"
|
|
healthCheckTimeout = 60 * time.Second
|
|
)
|
|
|
|
func RunKubeadmUpgradeApply(ctx context.Context, nctx *NodeContext) error {
|
|
if nctx.BootstrapState == nil {
|
|
return errors.New("BootstrapState is nil. Please run earlier steps first")
|
|
}
|
|
|
|
if nctx.BootstrapState.Action != BootstrapActionUpgradeControlPlane {
|
|
klog.V(4).Infof("skipped for %s", nctx.BootstrapState.Action)
|
|
return nil
|
|
}
|
|
|
|
if strings.TrimSpace(tmpKubeadmInitConf) == "" {
|
|
return fmt.Errorf("tmp kubeadm config path is empty")
|
|
}
|
|
|
|
pauseImage, err := resolvePauseImage(ctx, nctx, nctx.Config.Spec.KubernetesVersion)
|
|
if err != nil {
|
|
return fmt.Errorf("resolve pause image: %w", err)
|
|
}
|
|
klog.InfoS("resolved kubeadm pause image", "image", pauseImage)
|
|
|
|
clients, err := kube.NewClientsFromKubeconfig(adminKubeconfigPath)
|
|
if err != nil {
|
|
return fmt.Errorf("build kube clients from %s: %w", adminKubeconfigPath, err)
|
|
}
|
|
|
|
if err := runUpgradeSelfHealthCheck(ctx, clients.Kubernetes, pauseImage); err != nil {
|
|
return fmt.Errorf("pre-upgrade self health check failed: %w", err)
|
|
}
|
|
|
|
args := []string{
|
|
"upgrade", "apply", "-y",
|
|
nctx.Config.Spec.KubernetesVersion,
|
|
"--ignore-preflight-errors=CreateJob",
|
|
}
|
|
|
|
_, err = nctx.SystemRunner.RunWithOptions(
|
|
ctx,
|
|
"kubeadm",
|
|
args,
|
|
system.RunOptions{
|
|
Timeout: 15 * time.Minute,
|
|
OnStdoutLine: func(line string) {
|
|
klog.Infof("[kubeadm] %s", line)
|
|
},
|
|
OnStderrLine: func(line string) {
|
|
klog.Infof("[kubeadm] %s", line)
|
|
},
|
|
},
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("run kubeadm upgrade apply: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func resolvePauseImage(ctx context.Context, nctx *NodeContext, kubeVersion string) (string, error) {
|
|
result, err := nctx.SystemRunner.Run(
|
|
ctx,
|
|
"kubeadm",
|
|
"config", "images", "list",
|
|
"--kubernetes-version", kubeVersion,
|
|
)
|
|
if err != nil {
|
|
return "", fmt.Errorf("kubeadm config images list: %w", err)
|
|
}
|
|
|
|
for _, line := range strings.Split(result.Stdout, "\n") {
|
|
line = strings.TrimSpace(line)
|
|
if line == "" {
|
|
continue
|
|
}
|
|
|
|
if strings.Contains(line, "/pause:") || strings.HasPrefix(line, "pause:") {
|
|
return line, nil
|
|
}
|
|
}
|
|
|
|
return "", fmt.Errorf("pause image not found in kubeadm image list output")
|
|
}
|
|
|
|
func runUpgradeSelfHealthCheck(ctx context.Context, kubeClient kubernetes.Interface, pauseImage string) error {
|
|
name := fmt.Sprintf("preupgrade-health-check-%d", time.Now().UnixMilli())
|
|
|
|
pod := &corev1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: name,
|
|
Namespace: healthCheckNamespace,
|
|
Labels: map[string]string{
|
|
"app.kubernetes.io/name": "preupgrade-health-check",
|
|
"app.kubernetes.io/managed-by": "monok8s",
|
|
},
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
RestartPolicy: corev1.RestartPolicyNever,
|
|
Tolerations: []corev1.Toleration{
|
|
{
|
|
Operator: corev1.TolerationOpExists,
|
|
},
|
|
},
|
|
Containers: []corev1.Container{
|
|
{
|
|
Name: "check",
|
|
Image: pauseImage,
|
|
ImagePullPolicy: corev1.PullIfNotPresent,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
klog.InfoS("creating pre-upgrade health-check pod", "namespace", pod.Namespace, "name", pod.Name, "image", pauseImage)
|
|
|
|
created, err := kubeClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("create health-check pod %s/%s: %w", pod.Namespace, pod.Name, err)
|
|
}
|
|
|
|
defer func() {
|
|
delCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
|
defer cancel()
|
|
|
|
propagation := metav1.DeletePropagationBackground
|
|
err := kubeClient.CoreV1().Pods(created.Namespace).Delete(delCtx, created.Name, metav1.DeleteOptions{
|
|
PropagationPolicy: &propagation,
|
|
})
|
|
if err != nil && !apierrors.IsNotFound(err) {
|
|
klog.ErrorS(err, "failed to delete health-check pod", "namespace", created.Namespace, "name", created.Name)
|
|
}
|
|
}()
|
|
|
|
waitCtx, cancel := context.WithTimeout(ctx, healthCheckTimeout)
|
|
defer cancel()
|
|
|
|
err = wait.PollUntilContextCancel(waitCtx, 1*time.Second, true, func(ctx context.Context) (bool, error) {
|
|
cur, err := kubeClient.CoreV1().Pods(created.Namespace).Get(ctx, created.Name, metav1.GetOptions{})
|
|
if err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
return false, nil
|
|
}
|
|
return false, err
|
|
}
|
|
|
|
switch cur.Status.Phase {
|
|
case corev1.PodRunning:
|
|
if isPodReady(cur) {
|
|
klog.InfoS("pre-upgrade health-check pod is ready", "namespace", cur.Namespace, "name", cur.Name, "node", cur.Spec.NodeName)
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
|
|
case corev1.PodSucceeded:
|
|
// unlikely for pause, but fine if it somehow happens
|
|
klog.InfoS("pre-upgrade health-check pod succeeded", "namespace", cur.Namespace, "name", cur.Name, "node", cur.Spec.NodeName)
|
|
return true, nil
|
|
|
|
case corev1.PodFailed:
|
|
return false, fmt.Errorf("health-check pod failed: reason=%q message=%q", cur.Status.Reason, cur.Status.Message)
|
|
|
|
default:
|
|
return false, nil
|
|
}
|
|
})
|
|
if err != nil {
|
|
descErr := describeHealthCheckFailure(ctx, kubeClient, created.Namespace, created.Name)
|
|
if descErr != nil {
|
|
klog.ErrorS(descErr, "failed to collect health-check diagnostics", "namespace", created.Namespace, "name", created.Name)
|
|
}
|
|
return fmt.Errorf("wait for health-check pod readiness: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func isPodReady(pod *corev1.Pod) bool {
|
|
for _, cond := range pod.Status.Conditions {
|
|
if cond.Type == corev1.PodReady {
|
|
return cond.Status == corev1.ConditionTrue
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func describeHealthCheckFailure(ctx context.Context, kubeClient kubernetes.Interface, namespace, name string) error {
|
|
pod, err := kubeClient.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("get failed health-check pod: %w", err)
|
|
}
|
|
|
|
klog.ErrorS(nil, "health-check pod did not become ready",
|
|
"namespace", pod.Namespace,
|
|
"name", pod.Name,
|
|
"phase", pod.Status.Phase,
|
|
"reason", pod.Status.Reason,
|
|
"message", pod.Status.Message,
|
|
"node", pod.Spec.NodeName,
|
|
)
|
|
|
|
for _, cs := range pod.Status.ContainerStatuses {
|
|
if cs.State.Waiting != nil {
|
|
klog.ErrorS(nil, "container waiting",
|
|
"container", cs.Name,
|
|
"reason", cs.State.Waiting.Reason,
|
|
"message", cs.State.Waiting.Message,
|
|
)
|
|
}
|
|
if cs.State.Terminated != nil {
|
|
klog.ErrorS(nil, "container terminated",
|
|
"container", cs.Name,
|
|
"reason", cs.State.Terminated.Reason,
|
|
"message", cs.State.Terminated.Message,
|
|
"exitCode", cs.State.Terminated.ExitCode,
|
|
)
|
|
}
|
|
}
|
|
|
|
events, err := kubeClient.CoreV1().Events(namespace).List(ctx, metav1.ListOptions{
|
|
FieldSelector: fmt.Sprintf("involvedObject.kind=Pod,involvedObject.name=%s", name),
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("list pod events: %w", err)
|
|
}
|
|
|
|
for _, ev := range events.Items {
|
|
klog.ErrorS(nil, "health-check pod event",
|
|
"type", ev.Type,
|
|
"reason", ev.Reason,
|
|
"message", ev.Message,
|
|
"count", ev.Count,
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|