From 68e7dcd0019f3004ce548a944de4e616bc58a8e03e11b3dcf1c46319100d1878 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=96=9F=E9=85=8C=20=E9=B5=AC=E5=85=84?= Date: Mon, 30 Mar 2026 18:41:18 +0800 Subject: [PATCH] Can now reconcile --- clitools/pkg/bootstrap/registry.go | 5 +- clitools/pkg/bootstrap/runner.go | 36 +++------ clitools/pkg/node/kubeadm.go | 121 +++++++++------------------- clitools/pkg/node/kubelet.go | 38 +++++++++ clitools/pkg/node/metadata.go | 97 +++++++++++++++------- clitools/pkg/node/prereqs.go | 83 ++++++++++++++++++- clitools/pkg/templates/templates.go | 6 +- 7 files changed, 245 insertions(+), 141 deletions(-) diff --git a/clitools/pkg/bootstrap/registry.go b/clitools/pkg/bootstrap/registry.go index ff84686..922f619 100644 --- a/clitools/pkg/bootstrap/registry.go +++ b/clitools/pkg/bootstrap/registry.go @@ -25,7 +25,6 @@ func NewRegistry(ctx *node.NodeContext) *Registry { "AllowSingleNodeScheduling": node.AllowSingleNodeScheduling, "ApplyLocalNodeMetadataIfPossible": node.ApplyLocalNodeMetadataIfPossible, "CheckForVersionSkew": node.CheckForVersionSkew, - "CheckUpgradePrereqs": node.CheckUpgradePrereqs, "ClassifyBootstrapAction": node.ClassifyBootstrapAction, "ConfigureDNS": node.ConfigureDNS(netCfg), "ConfigureDefaultCNI": node.ConfigureDefaultCNI, @@ -33,14 +32,12 @@ func NewRegistry(ctx *node.NodeContext) *Registry { "ConfigureMgmtInterface": node.ConfigureMgmtInterface(netCfg), "DetectLocalClusterState": node.DetectLocalClusterState, "EnsureIPForward": node.EnsureIPForward, - "PrintSummary": node.PrintSummary, "ReconcileControlPlane": node.ReconcileControlPlane, - "ReconcileNode": node.ReconcileNode, + "ReconcileWorker": node.ReconcileWorker, "RunKubeadmInit": node.RunKubeadmInit, "RunKubeadmJoin": node.RunKubeadmJoin, "RunKubeadmUpgradeApply": node.RunKubeadmUpgradeApply, "RunKubeadmUpgradeNode": node.RunKubeadmUpgradeNode, - "SetHostnameIfNeeded": node.SetHostnameIfNeeded, "StartCRIO": node.StartCRIO, "ValidateNodeIPAndAPIServerReachability": node.ValidateNodeIPAndAPIServerReachability, "ValidateRequiredImagesPresent": node.ValidateRequiredImagesPresent, diff --git a/clitools/pkg/bootstrap/runner.go b/clitools/pkg/bootstrap/runner.go index 9f3d0c7..f4b30a0 100644 --- a/clitools/pkg/bootstrap/runner.go +++ b/clitools/pkg/bootstrap/runner.go @@ -86,16 +86,16 @@ func NewRunner(cfg *monov1alpha1.MonoKSConfig) *Runner { Name: "Classify bootstrap action", Desc: "Decide whether to init, join, upgrade, or reconcile based on local state and desired version", }, - { - RegKey: "CheckForVersionSkew", - Name: "Check for version skew", - Desc: "Validate wether version satisfy the requirements againts current cluster if any", - }, { RegKey: "RunKubeadmInit", Name: "Run kubeadm init", Desc: "Initialize a new Kubernetes control plane using kubeadm", }, + { + RegKey: "RunKubeadmJoin", + Name: "Run kubeadm join", + Desc: "Join node to existing cluster as worker or control-plane", + }, { RegKey: "WaitForExistingClusterIfNeeded", Name: "Wait for existing cluster", @@ -107,25 +107,20 @@ func NewRunner(cfg *monov1alpha1.MonoKSConfig) *Runner { Desc: "Ensure control plane components match desired state without full reinitialization", }, { - RegKey: "CheckUpgradePrereqs", - Name: "Check upgrade prerequisites", - Desc: "Validate cluster state and version compatibility before upgrade", + RegKey: "ReconcileWorker", + Name: "Reconcile worker node", + Desc: "Reconcile the worker node", + }, + { + RegKey: "CheckForVersionSkew", + Name: "Check for version skew", + Desc: "Validate wether version satisfy the requirements againts current cluster if any", }, { RegKey: "RunKubeadmUpgradeApply", Name: "Run kubeadm upgrade apply", Desc: "Upgrade control plane components using kubeadm", }, - { - RegKey: "RunKubeadmJoin", - Name: "Run kubeadm join", - Desc: "Join node to existing cluster as worker or control-plane", - }, - { - RegKey: "ReconcileNode", - Name: "Reconcile node state", - Desc: "Ensure node configuration matches desired state after join or upgrade", - }, { RegKey: "RunKubeadmUpgradeNode", Name: "Run kubeadm upgrade node", @@ -141,11 +136,6 @@ func NewRunner(cfg *monov1alpha1.MonoKSConfig) *Runner { Name: "Allow single-node scheduling", Desc: "Remove control-plane taints to allow workloads on single-node clusters", }, - { - RegKey: "PrintSummary", - Name: "Print summary", - Desc: "Output final bootstrap summary and detected state", - }, }, } } diff --git a/clitools/pkg/node/kubeadm.go b/clitools/pkg/node/kubeadm.go index fd943ce..adc9b86 100644 --- a/clitools/pkg/node/kubeadm.go +++ b/clitools/pkg/node/kubeadm.go @@ -114,89 +114,6 @@ func WaitForExistingClusterIfNeeded(ctx context.Context, nctx *NodeContext) erro } } -func CheckForVersionSkew(ctx context.Context, nctx *NodeContext) error { - if nctx.BootstrapState == nil { - return errors.New("BootstrapState is nil, call ClassifyBootstrapAction() first") - } - - role := strings.TrimSpace(nctx.Config.Spec.ClusterRole) - wantVersion := normalizeKubeVersion(strings.TrimSpace(nctx.Config.Spec.KubernetesVersion)) - if wantVersion == "" { - return errors.New("spec.kubernetesVersion is required") - } - - switch nctx.LocalClusterState.MembershipKind { - case LocalMembershipFresh: - // Nothing to compare for fresh nodes. - return nil - case LocalMembershipPartial: - return fmt.Errorf("cannot check version skew with partial local cluster state") - } - - versionKubeconfig := chooseVersionKubeconfig(nctx.LocalClusterState) - if versionKubeconfig == "" { - return fmt.Errorf("no kubeconfig available for version detection") - } - - currentVersion, err := getServerVersion(ctx, versionKubeconfig) - if err != nil { - if role == "control-plane" { - return fmt.Errorf("existing control-plane state found, but cluster version could not be determined: %w", err) - } - - // Worker path stays permissive. - nctx.BootstrapState.UnsupportedWorkerVersionSkew = true - nctx.BootstrapState.VersionSkewReason = "cluster version could not be determined" - - if nctx.BootstrapState.Action == BootstrapActionManageWorker { - nctx.BootstrapState.Action = BootstrapActionReconcileWorker - } - return nil - } - - nctx.BootstrapState.DetectedClusterVersion = currentVersion - - switch role { - case "control-plane": - if !isSupportedControlPlaneSkew(currentVersion, wantVersion) { - return fmt.Errorf( - "unsupported control-plane version skew: cluster=%s node=%s", - currentVersion, wantVersion, - ) - } - - if nctx.BootstrapState.Action == BootstrapActionManageControlPlane { - if versionEq(currentVersion, wantVersion) { - nctx.BootstrapState.Action = BootstrapActionReconcileControlPlane - } else { - nctx.BootstrapState.Action = BootstrapActionUpgradeControlPlane - } - } - - case "worker": - if !isSupportedWorkerSkew(currentVersion, wantVersion) { - nctx.BootstrapState.UnsupportedWorkerVersionSkew = true - nctx.BootstrapState.VersionSkewReason = fmt.Sprintf( - "unsupported worker version skew: cluster=%s node=%s", - currentVersion, wantVersion, - ) - } - - if nctx.BootstrapState.Action == BootstrapActionManageWorker { - if versionEq(currentVersion, wantVersion) { - nctx.BootstrapState.Action = BootstrapActionReconcileWorker - } else { - nctx.BootstrapState.Action = BootstrapActionUpgradeWorker - } - } - - default: - return fmt.Errorf("unsupported cluster role %q", role) - } - - return nil -} - func ClassifyBootstrapAction(ctx context.Context, nctx *NodeContext) error { _ = ctx @@ -863,3 +780,41 @@ func RunKubeadmUpgradeNode(context.Context, *NodeContext) error { klog.Info("run_kubeadm_upgrade_node: TODO implement kubeadm upgrade node") return nil } + +func ReconcileControlPlane(ctx context.Context, nctx *NodeContext) error { + if nctx.BootstrapState == nil { + return errors.New("BootstrapState is nil, call ClassifyBootstrapAction() first") + } + + if nctx.BootstrapState.Action != BootstrapActionReconcileControlPlane { + klog.V(4).Infof("ReconcileControlPlane skipped for action %q", nctx.BootstrapState.Action) + return nil + } + if err := StartKubelet(ctx, nctx); err != nil { + return fmt.Errorf("start kubelet: %w", err) + } + if err := waitForLocalAPIServer(ctx, nctx, 2*time.Minute); err != nil { + return fmt.Errorf("wait for local apiserver: %w", err) + } + if err := waitForAPIViaKubeconfig(ctx, adminKubeconfigPath, 2*time.Minute); err != nil { + return fmt.Errorf("wait for admin api: %w", err) + } + return nil +} + +func ReconcileWorker(ctx context.Context, nctx *NodeContext) error { + if nctx.BootstrapState == nil { + return errors.New("BootstrapState is nil, call ClassifyBootstrapAction() first") + } + if nctx.BootstrapState.Action != BootstrapActionReconcileWorker { + klog.V(4).Infof("ReconcileWorker skipped for action %q", nctx.BootstrapState.Action) + return nil + } + if err := StartKubelet(ctx, nctx); err != nil { + return fmt.Errorf("start kubelet: %w", err) + } + if err := waitForKubeletHealthy(ctx, 2*time.Minute); err != nil { + return fmt.Errorf("wait for kubelet healthy: %w", err) + } + return nil +} diff --git a/clitools/pkg/node/kubelet.go b/clitools/pkg/node/kubelet.go index 6096976..6227bfc 100644 --- a/clitools/pkg/node/kubelet.go +++ b/clitools/pkg/node/kubelet.go @@ -2,6 +2,11 @@ package node import ( "context" + "fmt" + "io" + "net/http" + "strings" + "time" system "undecided.project/monok8s/pkg/system" ) @@ -9,3 +14,36 @@ import ( func StartKubelet(ctx context.Context, n *NodeContext) error { return system.EnsureServiceRunning(ctx, n.SystemRunner, "kubelet") } + +func waitForKubeletHealthy(ctx context.Context, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + url := "http://127.0.0.1:10248/healthz" + client := &http.Client{ + Timeout: 2 * time.Second, + } + + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err == nil { + resp, err := client.Do(req) + if err == nil { + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + if resp.StatusCode == http.StatusOK && strings.TrimSpace(string(body)) == "ok" { + return nil + } + } + } + + select { + case <-ctx.Done(): + return fmt.Errorf("kubelet health endpoint did not become ready: %w", ctx.Err()) + case <-ticker.C: + } + } +} diff --git a/clitools/pkg/node/metadata.go b/clitools/pkg/node/metadata.go index cbd869b..125d470 100644 --- a/clitools/pkg/node/metadata.go +++ b/clitools/pkg/node/metadata.go @@ -1,37 +1,80 @@ package node import ( - "context" + "context" + "fmt" + "os" + "strings" - "k8s.io/klog/v2" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubernetes "k8s.io/client-go/kubernetes" ) -func ApplyLocalNodeMetadataIfPossible(context.Context, *NodeContext) error { - klog.Info("apply_local_node_metadata_if_possible: TODO implement node labels/annotations") - return nil +func ApplyLocalNodeMetadataIfPossible(ctx context.Context, nctx *NodeContext) error { + spec := nctx.Config.Spec + + if len(spec.NodeAnnotations) == 0 && len(spec.NodeLabels) == 0 { + return nil // nothing to do + } + + nodeName := strings.TrimSpace(spec.NodeName) + if nodeName == "" { + return fmt.Errorf("spec.nodeName is required") + } + + // Prefer admin kubeconfig (control-plane) + kubeconfig := adminKubeconfigPath + + if _, err := os.Stat(kubeconfig); err != nil { + klog.V(2).Infof("admin kubeconfig not found, skipping node metadata apply") + return nil + } + + restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + return fmt.Errorf("build kubeconfig: %w", err) + } + + client, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return fmt.Errorf("create kubernetes client: %w", err) + } + + node, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("get node %q: %w", nodeName, err) + } + + if node.Labels == nil { + node.Labels = make(map[string]string) + } + if node.Annotations == nil { + node.Annotations = make(map[string]string) + } + + // Apply labels + for k, v := range spec.NodeLabels { + node.Labels[k] = v + } + + // Apply annotations + for k, v := range spec.NodeAnnotations { + node.Annotations[k] = v + } + + _, err = client.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("update node metadata: %w", err) + } + + klog.Infof("applied labels/annotations to node %q", nodeName) + return nil } func AllowSingleNodeScheduling(context.Context, *NodeContext) error { - klog.Info("allow_single_node_scheduling: TODO implement control-plane taint removal") - return nil -} - -func SetHostnameIfNeeded(context.Context, *NodeContext) error { - klog.Info("set_hostname_if_needed: TODO implement hostname and /etc/hostname reconciliation") - return nil -} - -func PrintSummary(context.Context, *NodeContext) error { - klog.Info("print_summary: TODO emit final summary") - return nil -} - -func ReconcileControlPlane(context.Context, *NodeContext) error { - klog.Info("reconcile_control_plane: TODO implement existing CP reconciliation") - return nil -} - -func ReconcileNode(context.Context, *NodeContext) error { - klog.Info("reconcile_node: TODO implement existing joined node reconciliation") - return nil + klog.Info("allow_single_node_scheduling: TODO implement control-plane taint removal") + return nil } diff --git a/clitools/pkg/node/prereqs.go b/clitools/pkg/node/prereqs.go index 5bb4e73..6551116 100644 --- a/clitools/pkg/node/prereqs.go +++ b/clitools/pkg/node/prereqs.go @@ -2,6 +2,7 @@ package node import ( "context" + "errors" "fmt" "net" "strings" @@ -106,7 +107,85 @@ func ValidateNodeIPAndAPIServerReachability(ctx context.Context, nct *NodeContex return nil } -func CheckUpgradePrereqs(context.Context, *NodeContext) error { - klog.Info("check_upgrade_prereqs: TODO implement kubeadm version / skew checks") +func CheckForVersionSkew(ctx context.Context, nctx *NodeContext) error { + if nctx.BootstrapState == nil { + return errors.New("BootstrapState is nil, call ClassifyBootstrapAction() first") + } + + role := strings.TrimSpace(nctx.Config.Spec.ClusterRole) + wantVersion := normalizeKubeVersion(strings.TrimSpace(nctx.Config.Spec.KubernetesVersion)) + if wantVersion == "" { + return errors.New("spec.kubernetesVersion is required") + } + + switch nctx.LocalClusterState.MembershipKind { + case LocalMembershipFresh: + // Nothing to compare for fresh nodes. + return nil + case LocalMembershipPartial: + return fmt.Errorf("cannot check version skew with partial local cluster state") + } + + versionKubeconfig := chooseVersionKubeconfig(nctx.LocalClusterState) + if versionKubeconfig == "" { + return fmt.Errorf("no kubeconfig available for version detection") + } + + currentVersion, err := getServerVersion(ctx, versionKubeconfig) + if err != nil { + if role == "control-plane" { + return fmt.Errorf("existing control-plane state found, but cluster version could not be determined: %w", err) + } + + // Worker path stays permissive. + nctx.BootstrapState.UnsupportedWorkerVersionSkew = true + nctx.BootstrapState.VersionSkewReason = "cluster version could not be determined" + + if nctx.BootstrapState.Action == BootstrapActionManageWorker { + nctx.BootstrapState.Action = BootstrapActionReconcileWorker + } + return nil + } + + nctx.BootstrapState.DetectedClusterVersion = currentVersion + + switch role { + case "control-plane": + if !isSupportedControlPlaneSkew(currentVersion, wantVersion) { + return fmt.Errorf( + "unsupported control-plane version skew: cluster=%s node=%s", + currentVersion, wantVersion, + ) + } + + if nctx.BootstrapState.Action == BootstrapActionManageControlPlane { + if versionEq(currentVersion, wantVersion) { + nctx.BootstrapState.Action = BootstrapActionReconcileControlPlane + } else { + nctx.BootstrapState.Action = BootstrapActionUpgradeControlPlane + } + } + + case "worker": + if !isSupportedWorkerSkew(currentVersion, wantVersion) { + nctx.BootstrapState.UnsupportedWorkerVersionSkew = true + nctx.BootstrapState.VersionSkewReason = fmt.Sprintf( + "unsupported worker version skew: cluster=%s node=%s", + currentVersion, wantVersion, + ) + } + + if nctx.BootstrapState.Action == BootstrapActionManageWorker { + if versionEq(currentVersion, wantVersion) { + nctx.BootstrapState.Action = BootstrapActionReconcileWorker + } else { + nctx.BootstrapState.Action = BootstrapActionUpgradeWorker + } + } + + default: + return fmt.Errorf("unsupported cluster role %q", role) + } + return nil } diff --git a/clitools/pkg/templates/templates.go b/clitools/pkg/templates/templates.go index e68a96e..7598a09 100644 --- a/clitools/pkg/templates/templates.go +++ b/clitools/pkg/templates/templates.go @@ -66,10 +66,12 @@ func DefaultMonoKSConfig() types.MonoKSConfig { }, NodeLabels: map[string]string{ - "monok8s.io/label": "label", + "monok8s.io/label": "value", }, - NodeAnnotations: map[string]string{}, + NodeAnnotations: map[string]string{ + "monok8s.io/annotation": "value", + }, Network: types.NetworkSpec{ Hostname: "monok8s-worker-1",