Refactor into RenderAgent and ApplyAgent
This commit is contained in:
@@ -3,37 +3,27 @@ package node
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
rbacv1 "k8s.io/api/rbac/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
monov1alpha1 "example.com/monok8s/pkg/apis/monok8s/v1alpha1"
|
||||
"example.com/monok8s/pkg/kube"
|
||||
"example.com/monok8s/pkg/render"
|
||||
templates "example.com/monok8s/pkg/templates"
|
||||
)
|
||||
|
||||
const (
|
||||
controlAgentImage = "localhost/monok8s/node-control:dev"
|
||||
kubeconfig = "/etc/kubernetes/admin.conf"
|
||||
)
|
||||
const kubeconfig = "/etc/kubernetes/admin.conf"
|
||||
|
||||
func ApplyNodeControlDaemonSetResources(ctx context.Context, n *NodeContext) error {
|
||||
// Only the control-plane should bootstrap this DaemonSet definition.
|
||||
// And only when the feature is enabled.
|
||||
if strings.TrimSpace(n.Config.Spec.ClusterRole) != "control-plane" || !n.Config.Spec.EnableNodeControl {
|
||||
klog.InfoS("skipped for", "clusterRole", n.Config.Spec.ClusterRole, "enableNodeAgent", n.Config.Spec.EnableNodeControl)
|
||||
klog.InfoS("skipped for",
|
||||
"clusterRole", n.Config.Spec.ClusterRole,
|
||||
"enableNodeAgent", n.Config.Spec.EnableNodeControl,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
err := ApplyCRDs(ctx, n)
|
||||
if err != nil {
|
||||
if err := ApplyCRDs(ctx, n); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -47,363 +37,13 @@ func ApplyNodeControlDaemonSetResources(ctx context.Context, n *NodeContext) err
|
||||
return fmt.Errorf("build kube clients from %s: %w", kubeconfig, err)
|
||||
}
|
||||
|
||||
labels := map[string]string{
|
||||
"app.kubernetes.io/name": monov1alpha1.NodeAgentName,
|
||||
"app.kubernetes.io/component": "agent",
|
||||
"app.kubernetes.io/part-of": "monok8s",
|
||||
"app.kubernetes.io/managed-by": monov1alpha1.NodeControlName,
|
||||
conf := render.AgentConf{
|
||||
Namespace: namespace,
|
||||
}
|
||||
|
||||
kubeClient := clients.Kubernetes
|
||||
|
||||
if err := ensureNamespace(ctx, kubeClient, namespace, labels); err != nil {
|
||||
return fmt.Errorf("ensure namespace %q: %w", namespace, err)
|
||||
}
|
||||
if err := applyNodeAgentServiceAccount(ctx, kubeClient, namespace, labels); err != nil {
|
||||
return fmt.Errorf("apply serviceaccount: %w", err)
|
||||
}
|
||||
if err := applyNodeAgentClusterRole(ctx, kubeClient, labels); err != nil {
|
||||
return fmt.Errorf("apply clusterrole: %w", err)
|
||||
}
|
||||
if err := applyNodeAgentClusterRoleBinding(ctx, kubeClient, namespace, labels); err != nil {
|
||||
return fmt.Errorf("apply clusterrolebinding: %w", err)
|
||||
}
|
||||
if err := applyNodeAgentDaemonSet(ctx, kubeClient, namespace, labels); err != nil {
|
||||
return fmt.Errorf("apply daemonset: %w", err)
|
||||
if err := render.ApplyAgentDaemonSets(ctx, clients.Kubernetes, conf); err != nil {
|
||||
return fmt.Errorf("apply node agent daemonset resources: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func ensureNamespace(
|
||||
ctx context.Context,
|
||||
kubeClient kubernetes.Interface,
|
||||
namespace string,
|
||||
labels map[string]string,
|
||||
) error {
|
||||
_, err := kubeClient.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if !apierrors.IsNotFound(err) {
|
||||
return fmt.Errorf("get namespace: %w", err)
|
||||
}
|
||||
|
||||
ns := &corev1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: namespace,
|
||||
Labels: copyStringMap(labels),
|
||||
},
|
||||
}
|
||||
|
||||
_, err = kubeClient.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
|
||||
if err != nil && !apierrors.IsAlreadyExists(err) {
|
||||
return fmt.Errorf("create namespace: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func copyStringMap(in map[string]string) map[string]string {
|
||||
if len(in) == 0 {
|
||||
return nil
|
||||
}
|
||||
out := make(map[string]string, len(in))
|
||||
for k, v := range in {
|
||||
out[k] = v
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func applyNodeAgentServiceAccount(ctx context.Context, kubeClient kubernetes.Interface, namespace string, labels map[string]string) error {
|
||||
want := &corev1.ServiceAccount{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: monov1alpha1.NodeAgentName,
|
||||
Namespace: namespace,
|
||||
Labels: labels,
|
||||
},
|
||||
}
|
||||
|
||||
existing, err := kubeClient.CoreV1().ServiceAccounts(namespace).Get(ctx, monov1alpha1.NodeAgentName, metav1.GetOptions{})
|
||||
if apierrors.IsNotFound(err) {
|
||||
_, err = kubeClient.CoreV1().ServiceAccounts(namespace).Create(ctx, want, metav1.CreateOptions{})
|
||||
return err
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
changed := false
|
||||
if !reflect.DeepEqual(existing.Labels, want.Labels) {
|
||||
existing.Labels = want.Labels
|
||||
changed = true
|
||||
}
|
||||
|
||||
if !changed {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = kubeClient.CoreV1().ServiceAccounts(namespace).Update(ctx, existing, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func applyNodeAgentClusterRole(ctx context.Context, kubeClient kubernetes.Interface, labels map[string]string) error {
|
||||
wantRules := []rbacv1.PolicyRule{
|
||||
{
|
||||
APIGroups: []string{monov1alpha1.Group},
|
||||
Resources: []string{"osupgrades"},
|
||||
Verbs: []string{"get"},
|
||||
},
|
||||
{
|
||||
APIGroups: []string{monov1alpha1.Group},
|
||||
Resources: []string{"osupgradeprogresses"},
|
||||
Verbs: []string{"get", "list", "watch", "create", "patch", "update"},
|
||||
},
|
||||
{
|
||||
APIGroups: []string{monov1alpha1.Group},
|
||||
Resources: []string{"osupgradeprogresses/status"},
|
||||
Verbs: []string{"get", "list", "watch", "create", "patch", "update"},
|
||||
},
|
||||
{
|
||||
APIGroups: []string{""},
|
||||
Resources: []string{"nodes"},
|
||||
Verbs: []string{"get", "list", "watch"},
|
||||
},
|
||||
}
|
||||
|
||||
want := &rbacv1.ClusterRole{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: monov1alpha1.NodeAgentName,
|
||||
Labels: labels,
|
||||
},
|
||||
Rules: wantRules,
|
||||
}
|
||||
|
||||
existing, err := kubeClient.RbacV1().ClusterRoles().Get(ctx, monov1alpha1.NodeAgentName, metav1.GetOptions{})
|
||||
if apierrors.IsNotFound(err) {
|
||||
_, err = kubeClient.RbacV1().ClusterRoles().Create(ctx, want, metav1.CreateOptions{})
|
||||
return err
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
changed := false
|
||||
if !reflect.DeepEqual(existing.Labels, want.Labels) {
|
||||
existing.Labels = want.Labels
|
||||
changed = true
|
||||
}
|
||||
if !reflect.DeepEqual(existing.Rules, want.Rules) {
|
||||
existing.Rules = want.Rules
|
||||
changed = true
|
||||
}
|
||||
|
||||
if !changed {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = kubeClient.RbacV1().ClusterRoles().Update(ctx, existing, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func applyNodeAgentClusterRoleBinding(ctx context.Context, kubeClient kubernetes.Interface, namespace string, labels map[string]string) error {
|
||||
wantRoleRef := rbacv1.RoleRef{
|
||||
APIGroup: rbacv1.GroupName,
|
||||
Kind: "ClusterRole",
|
||||
Name: monov1alpha1.NodeAgentName,
|
||||
}
|
||||
wantSubjects := []rbacv1.Subject{
|
||||
{
|
||||
Kind: "ServiceAccount",
|
||||
Name: monov1alpha1.NodeAgentName,
|
||||
Namespace: namespace,
|
||||
},
|
||||
}
|
||||
|
||||
want := &rbacv1.ClusterRoleBinding{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: monov1alpha1.NodeAgentName,
|
||||
Labels: labels,
|
||||
},
|
||||
RoleRef: wantRoleRef,
|
||||
Subjects: wantSubjects,
|
||||
}
|
||||
|
||||
existing, err := kubeClient.RbacV1().ClusterRoleBindings().Get(ctx, monov1alpha1.NodeAgentName, metav1.GetOptions{})
|
||||
if apierrors.IsNotFound(err) {
|
||||
_, err = kubeClient.RbacV1().ClusterRoleBindings().Create(ctx, want, metav1.CreateOptions{})
|
||||
return err
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// roleRef is immutable. If it differs, fail loudly instead of pretending we can patch it.
|
||||
if !reflect.DeepEqual(existing.RoleRef, want.RoleRef) {
|
||||
return fmt.Errorf("existing ClusterRoleBinding %q has different roleRef and must be recreated", monov1alpha1.NodeAgentName)
|
||||
}
|
||||
|
||||
changed := false
|
||||
if !reflect.DeepEqual(existing.Labels, want.Labels) {
|
||||
existing.Labels = want.Labels
|
||||
changed = true
|
||||
}
|
||||
if !reflect.DeepEqual(existing.Subjects, want.Subjects) {
|
||||
existing.Subjects = want.Subjects
|
||||
changed = true
|
||||
}
|
||||
|
||||
if !changed {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = kubeClient.RbacV1().ClusterRoleBindings().Update(ctx, existing, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func applyNodeAgentDaemonSet(ctx context.Context, kubeClient kubernetes.Interface, namespace string, labels map[string]string) error {
|
||||
privileged := true
|
||||
|
||||
dsLabels := monov1alpha1.NodeAgentLabels()
|
||||
|
||||
want := &appsv1.DaemonSet{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: monov1alpha1.NodeAgentName,
|
||||
Namespace: namespace,
|
||||
Labels: labels,
|
||||
},
|
||||
Spec: appsv1.DaemonSetSpec{
|
||||
Selector: &metav1.LabelSelector{
|
||||
MatchLabels: map[string]string{
|
||||
"app.kubernetes.io/name": monov1alpha1.NodeAgentName,
|
||||
},
|
||||
},
|
||||
Template: corev1.PodTemplateSpec{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: dsLabels,
|
||||
},
|
||||
Spec: corev1.PodSpec{
|
||||
ServiceAccountName: monov1alpha1.NodeAgentName,
|
||||
HostNetwork: true,
|
||||
HostPID: true,
|
||||
DNSPolicy: corev1.DNSClusterFirstWithHostNet,
|
||||
NodeSelector: map[string]string{
|
||||
monov1alpha1.NodeControlKey: "true",
|
||||
},
|
||||
Tolerations: []corev1.Toleration{
|
||||
{Operator: corev1.TolerationOpExists},
|
||||
},
|
||||
Containers: []corev1.Container{
|
||||
{
|
||||
Name: "agent",
|
||||
Image: controlAgentImage,
|
||||
ImagePullPolicy: corev1.PullNever,
|
||||
Args: []string{"agent", "--env-file", "$(CLUSTER_ENV_FILE)"},
|
||||
Env: []corev1.EnvVar{
|
||||
{
|
||||
Name: "NODE_NAME",
|
||||
ValueFrom: &corev1.EnvVarSource{
|
||||
FieldRef: &corev1.ObjectFieldSelector{
|
||||
APIVersion: "v1",
|
||||
FieldPath: "spec.nodeName",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "CLUSTER_ENV_FILE",
|
||||
Value: "/host/opt/monok8s/config/cluster.env",
|
||||
},
|
||||
{
|
||||
Name: "FW_ENV_CONFIG_FILE",
|
||||
Value: "/host/etc/fw_env.config",
|
||||
},
|
||||
},
|
||||
SecurityContext: &corev1.SecurityContext{
|
||||
Privileged: &privileged,
|
||||
},
|
||||
VolumeMounts: []corev1.VolumeMount{
|
||||
{
|
||||
Name: "host-dev",
|
||||
MountPath: "/dev",
|
||||
},
|
||||
{
|
||||
Name: "host-etc",
|
||||
MountPath: "/host/etc",
|
||||
ReadOnly: true,
|
||||
},
|
||||
{
|
||||
Name: "host-config",
|
||||
MountPath: "/host/opt/monok8s/config",
|
||||
ReadOnly: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Volumes: []corev1.Volume{
|
||||
{
|
||||
Name: "host-dev",
|
||||
VolumeSource: corev1.VolumeSource{
|
||||
HostPath: &corev1.HostPathVolumeSource{
|
||||
Path: "/dev",
|
||||
Type: hostPathType(corev1.HostPathDirectory),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "host-etc",
|
||||
VolumeSource: corev1.VolumeSource{
|
||||
HostPath: &corev1.HostPathVolumeSource{
|
||||
Path: "/etc",
|
||||
Type: hostPathType(corev1.HostPathDirectory),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "host-config",
|
||||
VolumeSource: corev1.VolumeSource{
|
||||
HostPath: &corev1.HostPathVolumeSource{
|
||||
Path: "/opt/monok8s/config",
|
||||
Type: hostPathType(corev1.HostPathDirectory),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
existing, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, monov1alpha1.NodeAgentName, metav1.GetOptions{})
|
||||
if apierrors.IsNotFound(err) {
|
||||
_, err = kubeClient.AppsV1().DaemonSets(namespace).Create(ctx, want, metav1.CreateOptions{})
|
||||
return err
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
changed := false
|
||||
if !reflect.DeepEqual(existing.Labels, want.Labels) {
|
||||
existing.Labels = want.Labels
|
||||
changed = true
|
||||
}
|
||||
if !reflect.DeepEqual(existing.Spec, want.Spec) {
|
||||
existing.Spec = want.Spec
|
||||
changed = true
|
||||
}
|
||||
|
||||
if !changed {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = kubeClient.AppsV1().DaemonSets(namespace).Update(ctx, existing, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func hostPathType(t corev1.HostPathType) *corev1.HostPathType {
|
||||
return &t
|
||||
}
|
||||
|
||||
func mountPropagationMode(m corev1.MountPropagationMode) *corev1.MountPropagationMode {
|
||||
return &m
|
||||
}
|
||||
|
||||
@@ -12,9 +12,6 @@ import (
|
||||
"time"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
monov1alpha1 "example.com/monok8s/pkg/apis/monok8s/v1alpha1"
|
||||
@@ -27,6 +24,16 @@ const (
|
||||
tmpKubeadmInitConf = "/tmp/kubeadm-init.yaml"
|
||||
)
|
||||
|
||||
func chooseVersionKubeconfig(state *LocalClusterState) string {
|
||||
if state.HasAdminKubeconfig {
|
||||
return adminKubeconfigPath
|
||||
}
|
||||
if state.HasKubeletKubeconfig {
|
||||
return kubeletKubeconfigPath
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func DetectLocalClusterState(ctx context.Context, nctx *NodeContext) error {
|
||||
_ = ctx
|
||||
|
||||
@@ -259,110 +266,6 @@ func waitForAPIViaKubeconfig(ctx context.Context, kubeconfigPath string, timeout
|
||||
}
|
||||
}
|
||||
|
||||
func getServerVersion(ctx context.Context, kubeconfigPath string) (string, error) {
|
||||
restCfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("build kubeconfig %s: %w", kubeconfigPath, err)
|
||||
}
|
||||
|
||||
// Keep this short. This is a probe, not a long-running client.
|
||||
restCfg.Timeout = 5 * time.Second
|
||||
|
||||
clientset, err := kubernetes.NewForConfig(restCfg)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("create clientset: %w", err)
|
||||
}
|
||||
|
||||
disc := clientset.Discovery()
|
||||
return discoverServerVersion(ctx, disc)
|
||||
}
|
||||
|
||||
func discoverServerVersion(ctx context.Context, disc discovery.DiscoveryInterface) (string, error) {
|
||||
info, err := disc.ServerVersion()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if info == nil || strings.TrimSpace(info.GitVersion) == "" {
|
||||
return "", errors.New("server version is empty")
|
||||
}
|
||||
return normalizeKubeVersion(info.GitVersion), nil
|
||||
}
|
||||
|
||||
type kubeVersion struct {
|
||||
Major int
|
||||
Minor int
|
||||
Patch int
|
||||
}
|
||||
|
||||
func parseKubeVersion(s string) (kubeVersion, error) {
|
||||
s = strings.TrimSpace(s)
|
||||
s = strings.TrimPrefix(s, "v")
|
||||
|
||||
var v kubeVersion
|
||||
n, err := fmt.Sscanf(s, "%d.%d.%d", &v.Major, &v.Minor, &v.Patch)
|
||||
// Accepts "1.29" or "1.29.3"
|
||||
if err != nil || n < 2 {
|
||||
return kubeVersion{}, fmt.Errorf("invalid kubernetes version %q", s)
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
|
||||
// Control-plane: keep this strict.
|
||||
// Accept same version, or a one-minor step where the node binary is newer than the current cluster.
|
||||
// That covers normal control-plane upgrade flow but blocks nonsense.
|
||||
func isSupportedControlPlaneSkew(clusterVersion, nodeVersion string) bool {
|
||||
cv, err := parseKubeVersion(clusterVersion)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
nv, err := parseKubeVersion(nodeVersion)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if cv.Major != nv.Major {
|
||||
return false
|
||||
}
|
||||
if cv.Minor == nv.Minor {
|
||||
return true
|
||||
}
|
||||
if nv.Minor == cv.Minor+1 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Worker: kubelet generally must not be newer than the apiserver.
|
||||
// Older kubelets are allowed within supported skew range.
|
||||
// Your requirement says unsupported worker skew should still proceed, so this
|
||||
// only classifies support status and must NOT be used to block this function.
|
||||
func isSupportedWorkerSkew(clusterVersion, nodeVersion string) bool {
|
||||
cv, err := parseKubeVersion(clusterVersion)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
nv, err := parseKubeVersion(nodeVersion)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if cv.Major != nv.Major {
|
||||
return false
|
||||
}
|
||||
|
||||
// kubelet newer than apiserver => unsupported
|
||||
if nv.Minor > cv.Minor {
|
||||
return false
|
||||
}
|
||||
|
||||
// kubelet up to 3 minors older than apiserver => supported
|
||||
if cv.Minor-nv.Minor <= 3 {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func ValidateRequiredImagesPresent(ctx context.Context, n *NodeContext) error {
|
||||
if n.Config.Spec.SkipImageCheck {
|
||||
klog.Infof("skipping image check (skipImageCheck=true)")
|
||||
@@ -419,31 +322,6 @@ func checkImagePresent(ctx context.Context, n *NodeContext, image string) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func chooseVersionKubeconfig(state *LocalClusterState) string {
|
||||
if state.HasAdminKubeconfig {
|
||||
return adminKubeconfigPath
|
||||
}
|
||||
if state.HasKubeletKubeconfig {
|
||||
return kubeletKubeconfigPath
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func versionEq(a, b string) bool {
|
||||
return normalizeKubeVersion(a) == normalizeKubeVersion(b)
|
||||
}
|
||||
|
||||
func normalizeKubeVersion(v string) string {
|
||||
v = strings.TrimSpace(v)
|
||||
if v == "" {
|
||||
return ""
|
||||
}
|
||||
if !strings.HasPrefix(v, "v") {
|
||||
v = "v" + v
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func buildNodeRegistration(spec monov1alpha1.MonoKSConfigSpec) NodeRegistrationOptions {
|
||||
nodeName := strings.TrimSpace(spec.NodeName)
|
||||
criSocket := strings.TrimSpace(spec.ContainerRuntimeEndpoint)
|
||||
@@ -781,11 +659,6 @@ func RunKubeadmJoin(ctx context.Context, nctx *NodeContext) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func RunKubeadmUpgradeNode(context.Context, *NodeContext) error {
|
||||
klog.Info("run_kubeadm_upgrade_node: TODO implement kubeadm upgrade node")
|
||||
return nil
|
||||
}
|
||||
|
||||
func ReconcileControlPlane(ctx context.Context, nctx *NodeContext) error {
|
||||
if nctx.BootstrapState == nil {
|
||||
return errors.New("BootstrapState is nil, call ClassifyBootstrapAction() first")
|
||||
|
||||
@@ -257,3 +257,95 @@ func describeHealthCheckFailure(ctx context.Context, kubeClient kubernetes.Inter
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func RunKubeadmUpgradeNode(ctx context.Context, nctx *NodeContext) error {
|
||||
if nctx == nil {
|
||||
return errors.New("node context is nil")
|
||||
}
|
||||
if nctx.Config == nil {
|
||||
return errors.New("node config is nil")
|
||||
}
|
||||
if nctx.LocalClusterState == nil {
|
||||
return errors.New("LocalClusterState is nil. Please run earlier steps first")
|
||||
}
|
||||
if nctx.BootstrapState == nil {
|
||||
return errors.New("BootstrapState is nil. Please run earlier steps first")
|
||||
}
|
||||
|
||||
switch nctx.BootstrapState.Action {
|
||||
case BootstrapActionUpgradeWorker:
|
||||
// continue
|
||||
default:
|
||||
klog.V(4).Infof("RunKubeadmUpgradeNode skipped for action %q", nctx.BootstrapState.Action)
|
||||
return nil
|
||||
}
|
||||
|
||||
wantVersion := normalizeKubeVersion(strings.TrimSpace(nctx.Config.Spec.KubernetesVersion))
|
||||
if wantVersion == "" {
|
||||
return errors.New("spec.kubernetesVersion is required")
|
||||
}
|
||||
|
||||
kubeconfigPath := chooseVersionKubeconfig(nctx.LocalClusterState)
|
||||
if kubeconfigPath == "" {
|
||||
return errors.New("no kubeconfig available for detecting cluster version")
|
||||
}
|
||||
|
||||
clusterVersion := strings.TrimSpace(nctx.BootstrapState.DetectedClusterVersion)
|
||||
if clusterVersion == "" {
|
||||
var err error
|
||||
clusterVersion, err = getServerVersion(ctx, kubeconfigPath)
|
||||
if err != nil {
|
||||
if nctx.BootstrapState.UnsupportedWorkerVersionSkew {
|
||||
klog.Warningf(
|
||||
"cluster version unavailable but worker skew was marked unsupported/permissive, continuing: reason=%s",
|
||||
nctx.BootstrapState.VersionSkewReason,
|
||||
)
|
||||
} else {
|
||||
return fmt.Errorf("get cluster version via %s: %w", kubeconfigPath, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if clusterVersion != "" && !isSupportedWorkerSkew(clusterVersion, wantVersion) {
|
||||
klog.Warningf(
|
||||
"unsupported worker version skew detected, continuing anyway: cluster=%s node=%s",
|
||||
clusterVersion,
|
||||
wantVersion,
|
||||
)
|
||||
}
|
||||
|
||||
klog.Infof(
|
||||
"running kubeadm upgrade node: role=%s clusterVersion=%s nodeVersion=%s kubeconfig=%s",
|
||||
strings.TrimSpace(nctx.Config.Spec.ClusterRole),
|
||||
clusterVersion,
|
||||
wantVersion,
|
||||
kubeconfigPath,
|
||||
)
|
||||
|
||||
args := []string{
|
||||
"upgrade",
|
||||
"node",
|
||||
"--kubeconfig",
|
||||
kubeconfigPath,
|
||||
}
|
||||
|
||||
_, err := nctx.SystemRunner.RunWithOptions(
|
||||
ctx,
|
||||
"kubeadm",
|
||||
args,
|
||||
system.RunOptions{
|
||||
Timeout: 10 * time.Minute,
|
||||
OnStdoutLine: func(line string) {
|
||||
klog.Infof("[kubeadm] %s", line)
|
||||
},
|
||||
OnStderrLine: func(line string) {
|
||||
klog.Infof("[kubeadm] %s", line)
|
||||
},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("run kubeadm upgrade node: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -8,9 +8,18 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
type kubeVersion struct {
|
||||
Major int
|
||||
Minor int
|
||||
Patch int
|
||||
}
|
||||
|
||||
func ValidateNodeIPAndAPIServerReachability(ctx context.Context, nct *NodeContext) error {
|
||||
requireLocalIP := func(wantedIP string) error {
|
||||
wantedIP = strings.TrimSpace(wantedIP)
|
||||
@@ -189,3 +198,116 @@ func CheckForVersionSkew(ctx context.Context, nctx *NodeContext) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func versionEq(a, b string) bool {
|
||||
return normalizeKubeVersion(a) == normalizeKubeVersion(b)
|
||||
}
|
||||
|
||||
func normalizeKubeVersion(v string) string {
|
||||
v = strings.TrimSpace(v)
|
||||
if v == "" {
|
||||
return ""
|
||||
}
|
||||
if !strings.HasPrefix(v, "v") {
|
||||
v = "v" + v
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func parseKubeVersion(s string) (kubeVersion, error) {
|
||||
s = strings.TrimSpace(s)
|
||||
s = strings.TrimPrefix(s, "v")
|
||||
|
||||
var v kubeVersion
|
||||
n, err := fmt.Sscanf(s, "%d.%d.%d", &v.Major, &v.Minor, &v.Patch)
|
||||
// Accepts "1.29" or "1.29.3"
|
||||
if err != nil || n < 2 {
|
||||
return kubeVersion{}, fmt.Errorf("invalid kubernetes version %q", s)
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
|
||||
// Control-plane: keep this strict.
|
||||
// Accept same version, or a one-minor step where the node binary is newer than the current cluster.
|
||||
// That covers normal control-plane upgrade flow but blocks nonsense.
|
||||
func isSupportedControlPlaneSkew(clusterVersion, nodeVersion string) bool {
|
||||
cv, err := parseKubeVersion(clusterVersion)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
nv, err := parseKubeVersion(nodeVersion)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if cv.Major != nv.Major {
|
||||
return false
|
||||
}
|
||||
if cv.Minor == nv.Minor {
|
||||
return true
|
||||
}
|
||||
if nv.Minor == cv.Minor+1 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Worker: kubelet generally must not be newer than the apiserver.
|
||||
// Older kubelets are allowed within supported skew range.
|
||||
// Your requirement says unsupported worker skew should still proceed, so this
|
||||
// only classifies support status and must NOT be used to block this function.
|
||||
func isSupportedWorkerSkew(clusterVersion, nodeVersion string) bool {
|
||||
cv, err := parseKubeVersion(clusterVersion)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
nv, err := parseKubeVersion(nodeVersion)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if cv.Major != nv.Major {
|
||||
return false
|
||||
}
|
||||
|
||||
// kubelet newer than apiserver => unsupported
|
||||
if nv.Minor > cv.Minor {
|
||||
return false
|
||||
}
|
||||
|
||||
// kubelet up to 3 minors older than apiserver => supported
|
||||
if cv.Minor-nv.Minor <= 3 {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func getServerVersion(ctx context.Context, kubeconfigPath string) (string, error) {
|
||||
restCfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("build kubeconfig %s: %w", kubeconfigPath, err)
|
||||
}
|
||||
|
||||
// Keep this short. This is a probe, not a long-running client.
|
||||
restCfg.Timeout = 5 * time.Second
|
||||
|
||||
clientset, err := kubernetes.NewForConfig(restCfg)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("create clientset: %w", err)
|
||||
}
|
||||
|
||||
disc := clientset.Discovery()
|
||||
return discoverServerVersion(ctx, disc)
|
||||
}
|
||||
|
||||
func discoverServerVersion(ctx context.Context, disc discovery.DiscoveryInterface) (string, error) {
|
||||
info, err := disc.ServerVersion()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if info == nil || strings.TrimSpace(info.GitVersion) == "" {
|
||||
return "", errors.New("server version is empty")
|
||||
}
|
||||
return normalizeKubeVersion(info.GitVersion), nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user