From 03a5e5bedbba50f83114c9d179e7cdaacb8d7bdba1d713b5690b6f643d87e720 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=9F=E9=85=8C=20=E9=B5=AC=E5=85=84?= Date: Sun, 29 Mar 2026 22:26:54 +0800 Subject: [PATCH] detect local cluster states --- clitools/makefile | 2 +- clitools/pkg/bootstrap/registry.go | 4 +- clitools/pkg/bootstrap/runner.go | 4 +- clitools/pkg/node/context.go | 43 ++- clitools/pkg/node/kubeadm.go | 476 +++++++++++++++++++++++++++- clitools/pkg/node/kubelet.go | 9 +- clitools/pkg/templates/templates.go | 2 +- 7 files changed, 524 insertions(+), 16 deletions(-) diff --git a/clitools/makefile b/clitools/makefile index f4ce3f3..86e32cd 100644 --- a/clitools/makefile +++ b/clitools/makefile @@ -1,6 +1,6 @@ VERSION ?= dev -KUBE_VERSION=v1.35.1 +KUBE_VERSION=v1.35.3 BIN_DIR := bin diff --git a/clitools/pkg/bootstrap/registry.go b/clitools/pkg/bootstrap/registry.go index 964ec2d..777c425 100644 --- a/clitools/pkg/bootstrap/registry.go +++ b/clitools/pkg/bootstrap/registry.go @@ -23,13 +23,13 @@ func NewRegistry(ctx *node.NodeContext) *Registry { return &Registry{ steps: map[string]node.Step{ "validate_network_requirements": node.ValidateNetworkRequirements, + "detect_local_cluster_state": node.DetectLocalClusterState, "configure_default_cni": node.ConfigureDefaultCNI, "start_crio": node.StartCRIO, "wait_for_existing_cluster_if_needed": node.WaitForExistingClusterIfNeeded, - "check_required_images": node.CheckRequiredImages, + "validate_required_images": node.ValidateRequiredImagesPresent, "generate_kubeadm_config": node.GenerateKubeadmConfig, "run_kubeadm_init": node.RunKubeadmInit, - "restart_kubelet": node.RestartKubelet, "apply_local_node_metadata_if_possible": node.ApplyLocalNodeMetadataIfPossible, "allow_single_node_scheduling": node.AllowSingleNodeScheduling, "ensure_ip_forward": node.EnsureIPForward, diff --git a/clitools/pkg/bootstrap/runner.go b/clitools/pkg/bootstrap/runner.go index 677c8d3..7a6c86a 100644 --- a/clitools/pkg/bootstrap/runner.go +++ b/clitools/pkg/bootstrap/runner.go @@ -30,10 +30,12 @@ func (r *Runner) Init(ctx context.Context) error { "configure_hostname", "configure_dns", "configure_mgmt_interface", + "detect_local_cluster_state", "validate_network_requirements", "configure_default_cni", "start_crio", - "check_container_images", + "check_required_images", + "chcek_for_version_skew", "wait_for_existing_cluster_if_needed", "decide_bootstrap_action", "check_required_images", diff --git a/clitools/pkg/node/context.go b/clitools/pkg/node/context.go index 4a19a4d..ba01c63 100644 --- a/clitools/pkg/node/context.go +++ b/clitools/pkg/node/context.go @@ -7,9 +7,46 @@ import ( "undecided.project/monok8s/pkg/system" ) +type Step func(context.Context, *NodeContext) error + type NodeContext struct { - Config *monov1alpha1.MonoKSConfig - SystemRunner *system.Runner + Config *monov1alpha1.MonoKSConfig + SystemRunner *system.Runner + LocalClusterState *LocalClusterState + BootstrapState *BootstrapState } -type Step func(context.Context, *NodeContext) error +type LocalMembershipKind string + +const ( + LocalMembershipFresh LocalMembershipKind = "fresh" + LocalMembershipExistingWorker LocalMembershipKind = "existing-worker" + LocalMembershipExistingControlPlane LocalMembershipKind = "existing-control-plane" + LocalMembershipPartial LocalMembershipKind = "partial" +) + +type LocalClusterState struct { + HasAdminKubeconfig bool + HasKubeletKubeconfig bool + MembershipKind LocalMembershipKind +} + +type BootstrapAction string + +const ( + BootstrapActionInitControlPlane BootstrapAction = "init-control-plane" + BootstrapActionJoinWorker BootstrapAction = "join-worker" + BootstrapActionJoinControlPlane BootstrapAction = "join-control-plane" + BootstrapActionReconcileWorker BootstrapAction = "reconcile-worker" + BootstrapActionReconcileControlPlane BootstrapAction = "reconcile-control-plane" + BootstrapActionUpgradeWorker BootstrapAction = "upgrade-worker" + BootstrapActionUpgradeControlPlane BootstrapAction = "upgrade-control-plane" +) + +type BootstrapState struct { + Action BootstrapAction + DetectedClusterVersion string + + UnsupportedWorkerVersionSkew bool + VersionSkewReason string +} diff --git a/clitools/pkg/node/kubeadm.go b/clitools/pkg/node/kubeadm.go index b8a8805..ccbd268 100644 --- a/clitools/pkg/node/kubeadm.go +++ b/clitools/pkg/node/kubeadm.go @@ -2,18 +2,462 @@ package node import ( "context" + "errors" "fmt" + "net" + "os" "strings" + "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/discovery" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" ) -func WaitForExistingClusterIfNeeded(context.Context, *NodeContext) error { - klog.Info("wait_for_existing_cluster_if_needed: TODO implement kubelet/admin.conf waits") +const ( + adminKubeconfigPath = "/etc/kubernetes/admin.conf" + kubeletKubeconfigPath = "/etc/kubernetes/kubelet.conf" +) + +func DetectLocalClusterState(ctx context.Context, nctx *NodeContext) error { + _ = ctx + + if nctx == nil { + return fmt.Errorf("node context is nil") + } + + _, err := os.Stat(adminKubeconfigPath) + hasAdmin := err == nil + if err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("stat admin kubeconfig: %w", err) + } + + _, err = os.Stat(kubeletKubeconfigPath) + hasKubelet := err == nil + if err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("stat kubelet kubeconfig: %w", err) + } + + state := LocalClusterState{ + HasAdminKubeconfig: hasAdmin, + HasKubeletKubeconfig: hasKubelet, + } + + switch { + case !hasAdmin && !hasKubelet: + state.MembershipKind = LocalMembershipFresh + case !hasAdmin && hasKubelet: + state.MembershipKind = LocalMembershipExistingWorker + case hasAdmin && hasKubelet: + state.MembershipKind = LocalMembershipExistingControlPlane + case hasAdmin && !hasKubelet: + state.MembershipKind = LocalMembershipPartial + default: + return fmt.Errorf("unreachable local cluster state") + } + + nctx.LocalClusterState = &state return nil } -func CheckRequiredImages(ctx context.Context, n *NodeContext) error { +func WaitForExistingClusterIfNeeded(ctx context.Context, nctx *NodeContext) error { + switch nctx.LocalClusterState.MembershipKind { + case LocalMembershipFresh: + return nil + + case LocalMembershipExistingWorker: + if err := StartKubelet(ctx, nctx); err != nil { + return fmt.Errorf("start kubelet: %w", err) + } + return nil + + case LocalMembershipExistingControlPlane: + if err := StartKubelet(ctx, nctx); err != nil { + return fmt.Errorf("start kubelet: %w", err) + } + + // Existing local control-plane state: wait for local apiserver if this + // machine is meant to be a control-plane node. + if strings.TrimSpace(nctx.Config.Spec.ClusterRole) == "control-plane" { + if err := waitForLocalAPIServer(ctx, nctx, 2*time.Minute); err != nil { + return fmt.Errorf("wait for local apiserver: %w", err) + } + if err := waitForAPIViaKubeconfig(ctx, adminKubeconfigPath, 2*time.Minute); err != nil { + return fmt.Errorf("wait for admin api: %w", err) + } + } + return nil + + case LocalMembershipPartial: + // Be strict here. Partial state is suspicious. + return fmt.Errorf("partial local cluster state detected: admin=%t kubelet=%t", + nctx.LocalClusterState.HasAdminKubeconfig, + nctx.LocalClusterState.HasKubeletKubeconfig, + ) + + default: + return fmt.Errorf("unknown local membership kind %q", nctx.LocalClusterState.MembershipKind) + } +} + +func CheckForVersionSkew(ctx context.Context, nctx *NodeContext) error { + role := strings.TrimSpace(nctx.Config.Spec.ClusterRole) + wantVersion := normalizeKubeVersion(strings.TrimSpace(nctx.Config.Spec.KubernetesVersion)) + if wantVersion == "" { + return errors.New("spec.kubernetesVersion is required") + } + + switch nctx.LocalClusterState.MembershipKind { + case LocalMembershipFresh: + // Fresh node has no existing cluster membership to compare against. + return nil + case LocalMembershipPartial: + return fmt.Errorf("cannot check version skew with partial local cluster state") + } + + versionKubeconfig := chooseVersionKubeconfig(nctx.LocalClusterState) + if versionKubeconfig == "" { + return fmt.Errorf("no kubeconfig available for version detection") + } + + currentVersion, err := getServerVersion(ctx, versionKubeconfig) + if err != nil { + if role == "control-plane" { + return fmt.Errorf("existing control-plane state found, but cluster version could not be determined: %w", err) + } + + // Worker path stays permissive. + nctx.BootstrapState.UnsupportedWorkerVersionSkew = true + nctx.BootstrapState.VersionSkewReason = "cluster version could not be determined" + return nil + } + + nctx.BootstrapState.DetectedClusterVersion = currentVersion + + switch role { + case "control-plane": + if !isSupportedControlPlaneSkew(currentVersion, wantVersion) { + return fmt.Errorf( + "unsupported control-plane version skew: cluster=%s node=%s", + currentVersion, wantVersion, + ) + } + + case "worker": + if !isSupportedWorkerSkew(currentVersion, wantVersion) { + nctx.BootstrapState.UnsupportedWorkerVersionSkew = true + nctx.BootstrapState.VersionSkewReason = fmt.Sprintf( + "unsupported worker version skew: cluster=%s node=%s", + currentVersion, wantVersion, + ) + } + + default: + return fmt.Errorf("unsupported cluster role %q", role) + } + + return nil +} + +func ClassifyBootstrapAction(ctx context.Context, nctx *NodeContext) error { + _ = ctx + + role := strings.TrimSpace(nctx.Config.Spec.ClusterRole) + initControlPlane := nctx.Config.Spec.InitControlPlane + wantVersion := normalizeKubeVersion(strings.TrimSpace(nctx.Config.Spec.KubernetesVersion)) + if wantVersion == "" { + return errors.New("spec.kubernetesVersion is required") + } + + nctx.BootstrapState = &BootstrapState{} + switch role { + case "worker": + switch nctx.LocalClusterState.MembershipKind { + case LocalMembershipFresh: + nctx.BootstrapState.Action = BootstrapActionJoinWorker + return nil + + case LocalMembershipExistingWorker: + if nctx.BootstrapState.DetectedClusterVersion == "" { + nctx.BootstrapState.Action = BootstrapActionReconcileWorker + return nil + } + + if versionEq(nctx.BootstrapState.DetectedClusterVersion, wantVersion) { + nctx.BootstrapState.Action = BootstrapActionReconcileWorker + } else { + nctx.BootstrapState.Action = BootstrapActionUpgradeWorker + } + return nil + + case LocalMembershipExistingControlPlane, LocalMembershipPartial: + return fmt.Errorf("local state %q is invalid for worker role", nctx.LocalClusterState.MembershipKind) + + default: + return fmt.Errorf("unknown local membership kind %q", nctx.LocalClusterState.MembershipKind) + } + + case "control-plane": + switch nctx.LocalClusterState.MembershipKind { + case LocalMembershipFresh: + if initControlPlane { + nctx.BootstrapState.Action = BootstrapActionInitControlPlane + } else { + nctx.BootstrapState.Action = BootstrapActionJoinControlPlane + } + return nil + + case LocalMembershipExistingControlPlane: + if nctx.BootstrapState.DetectedClusterVersion == "" { + return errors.New("existing control-plane state found, but detected cluster version is empty") + } + + if versionEq(nctx.BootstrapState.DetectedClusterVersion, wantVersion) { + nctx.BootstrapState.Action = BootstrapActionReconcileControlPlane + } else { + nctx.BootstrapState.Action = BootstrapActionUpgradeControlPlane + } + return nil + + case LocalMembershipExistingWorker: + return fmt.Errorf("local state %q is invalid for control-plane role", nctx.LocalClusterState.MembershipKind) + + case LocalMembershipPartial: + return fmt.Errorf("partial local cluster state is invalid for control-plane role") + + default: + return fmt.Errorf("unknown local membership kind %q", nctx.LocalClusterState.MembershipKind) + } + + default: + return fmt.Errorf("unsupported cluster role %q", role) + } +} + +func InitControlPlane(ctx context.Context, nctx *NodeContext) error { + if nctx == nil { + return errors.New("node context is nil") + } + if nctx.Config == nil { + return errors.New("node config is nil") + } + + if strings.TrimSpace(nctx.Config.Spec.ClusterRole) != "control-plane" { + return fmt.Errorf("init control-plane called for non-control-plane role %q", nctx.Config.Spec.ClusterRole) + } + if !nctx.Config.Spec.InitControlPlane { + return errors.New("init control-plane called but spec.initControlPlane is false") + } + if nctx.BootstrapState.Action != BootstrapActionInitControlPlane { + return fmt.Errorf("init control-plane called with bootstrap action %q", nctx.BootstrapState.Action) + } + + // Fresh init only. Existing control-plane recovery/wait belongs elsewhere. + switch nctx.LocalClusterState.MembershipKind { + case LocalMembershipFresh: + // continue + default: + return fmt.Errorf("init control-plane requires fresh local state, got %q", nctx.LocalClusterState.MembershipKind) + } + + // Example: + // if err := RunKubeadmInit(ctx, nctx); err != nil { + // return fmt.Errorf("kubeadm init: %w", err) + // } + + return nil +} + +func waitForLocalAPIServer(ctx context.Context, nctx *NodeContext, timeout time.Duration) error { + addr := strings.TrimSpace(nctx.Config.Spec.APIServerAdvertiseAddress) + if addr == "" { + return errors.New("spec.apiServerAdvertiseAddress is required for local control-plane wait") + } + + d := net.Dialer{Timeout: 2 * time.Second} + deadlineCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + target := net.JoinHostPort(addr, "6443") + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + conn, err := d.DialContext(deadlineCtx, "tcp", target) + if err == nil { + _ = conn.Close() + return nil + } + + select { + case <-deadlineCtx.Done(): + return fmt.Errorf("apiserver %s did not become reachable within %s", target, timeout) + case <-ticker.C: + } + } +} + +func waitForAPIViaKubeconfig(ctx context.Context, kubeconfigPath string, timeout time.Duration) error { + deadlineCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + _, err := getServerVersion(deadlineCtx, kubeconfigPath) + if err == nil { + return nil + } + + select { + case <-deadlineCtx.Done(): + return fmt.Errorf("api via kubeconfig %s did not become reachable within %s", kubeconfigPath, timeout) + case <-ticker.C: + } + } +} + +func getServerVersion(ctx context.Context, kubeconfigPath string) (string, error) { + restCfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) + if err != nil { + return "", fmt.Errorf("build kubeconfig %s: %w", kubeconfigPath, err) + } + + // Keep this short. This is a probe, not a long-running client. + restCfg.Timeout = 5 * time.Second + + clientset, err := kubernetes.NewForConfig(restCfg) + if err != nil { + return "", fmt.Errorf("create clientset: %w", err) + } + + disc := clientset.Discovery() + return discoverServerVersion(ctx, disc) +} + +func discoverServerVersion(ctx context.Context, disc discovery.DiscoveryInterface) (string, error) { + info, err := disc.ServerVersion() + if err != nil { + return "", err + } + if info == nil || strings.TrimSpace(info.GitVersion) == "" { + return "", errors.New("server version is empty") + } + return normalizeKubeVersion(info.GitVersion), nil +} + +type kubeVersion struct { + Major int + Minor int + Patch int +} + +func parseKubeVersion(s string) (kubeVersion, error) { + s = strings.TrimSpace(s) + s = strings.TrimPrefix(s, "v") + + var v kubeVersion + n, err := fmt.Sscanf(s, "%d.%d.%d", &v.Major, &v.Minor, &v.Patch) + // Accepts "1.29" or "1.29.3" + if err != nil || n < 2 { + return kubeVersion{}, fmt.Errorf("invalid kubernetes version %q", s) + } + return v, nil +} + +// Control-plane: keep this strict. +// Accept same version, or a one-minor step where the node binary is newer than the current cluster. +// That covers normal control-plane upgrade flow but blocks nonsense. +func isSupportedControlPlaneSkew(clusterVersion, nodeVersion string) bool { + cv, err := parseKubeVersion(clusterVersion) + if err != nil { + return false + } + nv, err := parseKubeVersion(nodeVersion) + if err != nil { + return false + } + + if cv.Major != nv.Major { + return false + } + if cv.Minor == nv.Minor { + return true + } + if nv.Minor == cv.Minor+1 { + return true + } + return false +} + +// Worker: kubelet generally must not be newer than the apiserver. +// Older kubelets are allowed within supported skew range. +// Your requirement says unsupported worker skew should still proceed, so this +// only classifies support status and must NOT be used to block this function. +func isSupportedWorkerSkew(clusterVersion, nodeVersion string) bool { + cv, err := parseKubeVersion(clusterVersion) + if err != nil { + return false + } + nv, err := parseKubeVersion(nodeVersion) + if err != nil { + return false + } + + if cv.Major != nv.Major { + return false + } + + // kubelet newer than apiserver => unsupported + if nv.Minor > cv.Minor { + return false + } + + // kubelet up to 3 minors older than apiserver => supported + if cv.Minor-nv.Minor <= 3 { + return true + } + + return false +} + +// This should not try to taint the node directly here. +// Just record intent and let a later reconcile step apply the taint. +func markUnsupportedWorkerVersionSkew(nctx *NodeContext, clusterVersion, nodeVersion string) { + // Replace this with whatever state carrier you already use. + // + // Example: + // nctx.Metadata.UnsupportedWorkerVersionSkew = true + // nctx.Metadata.UnsupportedWorkerVersionSkewReason = + // fmt.Sprintf("unsupported worker version skew: cluster=%s node=%s", clusterVersion, nodeVersion) + + _ = nctx + _ = clusterVersion + _ = nodeVersion +} + +// Optional helper if you want to probe readiness later through the API. +// Keeping this here in case you want a very cheap liveness call elsewhere. +func apiServerReady(ctx context.Context, kubeconfigPath string) error { + restCfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) + if err != nil { + return err + } + restCfg.Timeout = 5 * time.Second + + clientset, err := kubernetes.NewForConfig(restCfg) + if err != nil { + return err + } + + _, err = clientset.CoreV1().Namespaces().Get(ctx, metav1.NamespaceSystem, metav1.GetOptions{}) + return err +} + +func ValidateRequiredImagesPresent(ctx context.Context, n *NodeContext) error { if n.Config.Spec.SkipImageCheck { klog.Infof("skipping image check (skipImageCheck=true)") return nil @@ -65,6 +509,32 @@ func checkImagePresent(ctx context.Context, n *NodeContext, image string) error } return nil } + +func chooseVersionKubeconfig(state *LocalClusterState) string { + if state.HasAdminKubeconfig { + return adminKubeconfigPath + } + if state.HasKubeletKubeconfig { + return kubeletKubeconfigPath + } + return "" +} + +func versionEq(a, b string) bool { + return normalizeKubeVersion(a) == normalizeKubeVersion(b) +} + +func normalizeKubeVersion(v string) string { + v = strings.TrimSpace(v) + if v == "" { + return "" + } + if !strings.HasPrefix(v, "v") { + v = "v" + v + } + return v +} + func GenerateKubeadmConfig(context.Context, *NodeContext) error { klog.Info("generate_kubeadm_config: TODO render kubeadm v1beta4 config from MonoKSConfig") return nil diff --git a/clitools/pkg/node/kubelet.go b/clitools/pkg/node/kubelet.go index a7003fd..6096976 100644 --- a/clitools/pkg/node/kubelet.go +++ b/clitools/pkg/node/kubelet.go @@ -1,12 +1,11 @@ package node import ( - "context" + "context" - "k8s.io/klog/v2" + system "undecided.project/monok8s/pkg/system" ) -func RestartKubelet(context.Context, *NodeContext) error { - klog.Info("restart_kubelet: TODO implement rc-service kubelet restart") - return nil +func StartKubelet(ctx context.Context, n *NodeContext) error { + return system.EnsureServiceRunning(ctx, n.SystemRunner, "kubelet") } diff --git a/clitools/pkg/templates/templates.go b/clitools/pkg/templates/templates.go index ac9cae0..619a2e6 100644 --- a/clitools/pkg/templates/templates.go +++ b/clitools/pkg/templates/templates.go @@ -55,7 +55,7 @@ func DefaultMonoKSConfig() types.MonoKSConfig { Network: types.NetworkSpec{ Hostname: "monok8s-master-1", - ManagementIface: "eth0", + ManagementIface: "eth1", ManagementCIDR: "10.0.0.10/24", ManagementGW: "10.0.0.1", DNSNameservers: []string{