package node import ( "bytes" "context" "errors" "fmt" "net" "os" "strings" "time" "gopkg.in/yaml.v3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" types "undecided.project/monok8s/pkg/apis/monok8s/v1alpha1" system "undecided.project/monok8s/pkg/system" ) const ( adminKubeconfigPath = "/etc/kubernetes/admin.conf" kubeletKubeconfigPath = "/etc/kubernetes/kubelet.conf" tmpKubeadmInitConf = "/tmp/kubeadm-init.yaml" ) func DetectLocalClusterState(ctx context.Context, nctx *NodeContext) error { _ = ctx if nctx == nil { return fmt.Errorf("node context is nil") } _, err := os.Stat(adminKubeconfigPath) hasAdmin := err == nil if err != nil && !errors.Is(err, os.ErrNotExist) { return fmt.Errorf("stat admin kubeconfig: %w", err) } _, err = os.Stat(kubeletKubeconfigPath) hasKubelet := err == nil if err != nil && !errors.Is(err, os.ErrNotExist) { return fmt.Errorf("stat kubelet kubeconfig: %w", err) } state := LocalClusterState{ HasAdminKubeconfig: hasAdmin, HasKubeletKubeconfig: hasKubelet, } switch { case !hasAdmin && !hasKubelet: state.MembershipKind = LocalMembershipFresh case !hasAdmin && hasKubelet: state.MembershipKind = LocalMembershipExistingWorker case hasAdmin && hasKubelet: state.MembershipKind = LocalMembershipExistingControlPlane case hasAdmin && !hasKubelet: state.MembershipKind = LocalMembershipPartial default: return fmt.Errorf("unreachable local cluster state") } klog.V(4).Infof("Detected local state: %+v", state) nctx.LocalClusterState = &state return nil } func WaitForExistingClusterIfNeeded(ctx context.Context, nctx *NodeContext) error { if nctx.LocalClusterState == nil { return errors.New("LocalClusterState is nil, please run dependency step first") } switch nctx.LocalClusterState.MembershipKind { case LocalMembershipFresh: klog.V(4).Infof("Nothing to to do LocalMembershipFresh") return nil case LocalMembershipExistingWorker: klog.V(4).Infof("Starting Kubelet in LocalMembershipExistingWorker") if err := StartKubelet(ctx, nctx); err != nil { return fmt.Errorf("start kubelet: %w", err) } return nil case LocalMembershipExistingControlPlane: if err := StartKubelet(ctx, nctx); err != nil { return fmt.Errorf("start kubelet: %w", err) } klog.V(4).Infof("Waiting for local apiserver in LocalMembershipExistingControlPlane") // Existing local control-plane state: wait for local apiserver if this // machine is meant to be a control-plane node. if strings.TrimSpace(nctx.Config.Spec.ClusterRole) == "control-plane" { if err := waitForLocalAPIServer(ctx, nctx, 2*time.Minute); err != nil { return fmt.Errorf("wait for local apiserver: %w", err) } if err := waitForAPIViaKubeconfig(ctx, adminKubeconfigPath, 2*time.Minute); err != nil { return fmt.Errorf("wait for admin api: %w", err) } } return nil case LocalMembershipPartial: return fmt.Errorf("partial local cluster state detected: admin=%t kubelet=%t", nctx.LocalClusterState.HasAdminKubeconfig, nctx.LocalClusterState.HasKubeletKubeconfig, ) default: return fmt.Errorf("unknown local membership kind %q", nctx.LocalClusterState.MembershipKind) } } func ClassifyBootstrapAction(ctx context.Context, nctx *NodeContext) error { _ = ctx if nctx.LocalClusterState == nil { return errors.New("LocalClusterState is nil, call detect_local_cluster_state()") } role := strings.TrimSpace(nctx.Config.Spec.ClusterRole) initControlPlane := nctx.Config.Spec.InitControlPlane wantVersion := normalizeKubeVersion(strings.TrimSpace(nctx.Config.Spec.KubernetesVersion)) if wantVersion == "" { return errors.New("spec.kubernetesVersion is required") } state := &BootstrapState{} if nctx.BootstrapState != nil { *state = *nctx.BootstrapState } switch role { case "worker": switch nctx.LocalClusterState.MembershipKind { case LocalMembershipFresh: state.Action = BootstrapActionJoinWorker case LocalMembershipExistingWorker: state.Action = BootstrapActionManageWorker case LocalMembershipExistingControlPlane, LocalMembershipPartial: return fmt.Errorf("local state %q is invalid for worker role", nctx.LocalClusterState.MembershipKind) default: return fmt.Errorf("unknown local membership kind %q", nctx.LocalClusterState.MembershipKind) } case "control-plane": switch nctx.LocalClusterState.MembershipKind { case LocalMembershipFresh: if initControlPlane { state.Action = BootstrapActionInitControlPlane } else { state.Action = BootstrapActionJoinControlPlane } case LocalMembershipExistingControlPlane: state.Action = BootstrapActionManageControlPlane case LocalMembershipExistingWorker: return fmt.Errorf("local state %q is invalid for control-plane role", nctx.LocalClusterState.MembershipKind) case LocalMembershipPartial: return fmt.Errorf("partial local cluster state is invalid for control-plane role") default: return fmt.Errorf("unknown local membership kind %q", nctx.LocalClusterState.MembershipKind) } default: return fmt.Errorf("unsupported cluster role %q", role) } nctx.BootstrapState = state klog.V(4).Infof("Bootstrap action classified: %+v", *state) return nil } func InitControlPlane(ctx context.Context, nctx *NodeContext) error { if nctx == nil { return errors.New("node context is nil") } if nctx.Config == nil { return errors.New("node config is nil") } if strings.TrimSpace(nctx.Config.Spec.ClusterRole) != "control-plane" { return fmt.Errorf("init control-plane called for non-control-plane role %q", nctx.Config.Spec.ClusterRole) } if !nctx.Config.Spec.InitControlPlane { return errors.New("init control-plane called but spec.initControlPlane is false") } if nctx.BootstrapState.Action != BootstrapActionInitControlPlane { return fmt.Errorf("init control-plane called with bootstrap action %q", nctx.BootstrapState.Action) } // Fresh init only. Existing control-plane recovery/wait belongs elsewhere. switch nctx.LocalClusterState.MembershipKind { case LocalMembershipFresh: // continue default: return fmt.Errorf("init control-plane requires fresh local state, got %q", nctx.LocalClusterState.MembershipKind) } // Example: // if err := RunKubeadmInit(ctx, nctx); err != nil { // return fmt.Errorf("kubeadm init: %w", err) // } return nil } func waitForLocalAPIServer(ctx context.Context, nctx *NodeContext, timeout time.Duration) error { addr := strings.TrimSpace(nctx.Config.Spec.APIServerAdvertiseAddress) if addr == "" { return errors.New("spec.apiServerAdvertiseAddress is required for local control-plane wait") } d := net.Dialer{Timeout: 2 * time.Second} deadlineCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() target := net.JoinHostPort(addr, "6443") ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { conn, err := d.DialContext(deadlineCtx, "tcp", target) if err == nil { _ = conn.Close() return nil } select { case <-deadlineCtx.Done(): return fmt.Errorf("apiserver %s did not become reachable within %s", target, timeout) case <-ticker.C: } } } func waitForAPIViaKubeconfig(ctx context.Context, kubeconfigPath string, timeout time.Duration) error { deadlineCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { _, err := getServerVersion(deadlineCtx, kubeconfigPath) if err == nil { return nil } select { case <-deadlineCtx.Done(): return fmt.Errorf("api via kubeconfig %s did not become reachable within %s", kubeconfigPath, timeout) case <-ticker.C: } } } func getServerVersion(ctx context.Context, kubeconfigPath string) (string, error) { restCfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) if err != nil { return "", fmt.Errorf("build kubeconfig %s: %w", kubeconfigPath, err) } // Keep this short. This is a probe, not a long-running client. restCfg.Timeout = 5 * time.Second clientset, err := kubernetes.NewForConfig(restCfg) if err != nil { return "", fmt.Errorf("create clientset: %w", err) } disc := clientset.Discovery() return discoverServerVersion(ctx, disc) } func discoverServerVersion(ctx context.Context, disc discovery.DiscoveryInterface) (string, error) { info, err := disc.ServerVersion() if err != nil { return "", err } if info == nil || strings.TrimSpace(info.GitVersion) == "" { return "", errors.New("server version is empty") } return normalizeKubeVersion(info.GitVersion), nil } type kubeVersion struct { Major int Minor int Patch int } func parseKubeVersion(s string) (kubeVersion, error) { s = strings.TrimSpace(s) s = strings.TrimPrefix(s, "v") var v kubeVersion n, err := fmt.Sscanf(s, "%d.%d.%d", &v.Major, &v.Minor, &v.Patch) // Accepts "1.29" or "1.29.3" if err != nil || n < 2 { return kubeVersion{}, fmt.Errorf("invalid kubernetes version %q", s) } return v, nil } // Control-plane: keep this strict. // Accept same version, or a one-minor step where the node binary is newer than the current cluster. // That covers normal control-plane upgrade flow but blocks nonsense. func isSupportedControlPlaneSkew(clusterVersion, nodeVersion string) bool { cv, err := parseKubeVersion(clusterVersion) if err != nil { return false } nv, err := parseKubeVersion(nodeVersion) if err != nil { return false } if cv.Major != nv.Major { return false } if cv.Minor == nv.Minor { return true } if nv.Minor == cv.Minor+1 { return true } return false } // Worker: kubelet generally must not be newer than the apiserver. // Older kubelets are allowed within supported skew range. // Your requirement says unsupported worker skew should still proceed, so this // only classifies support status and must NOT be used to block this function. func isSupportedWorkerSkew(clusterVersion, nodeVersion string) bool { cv, err := parseKubeVersion(clusterVersion) if err != nil { return false } nv, err := parseKubeVersion(nodeVersion) if err != nil { return false } if cv.Major != nv.Major { return false } // kubelet newer than apiserver => unsupported if nv.Minor > cv.Minor { return false } // kubelet up to 3 minors older than apiserver => supported if cv.Minor-nv.Minor <= 3 { return true } return false } // This should not try to taint the node directly here. // Just record intent and let a later reconcile step apply the taint. func markUnsupportedWorkerVersionSkew(nctx *NodeContext, clusterVersion, nodeVersion string) { // Replace this with whatever state carrier you already use. // // Example: // nctx.Metadata.UnsupportedWorkerVersionSkew = true // nctx.Metadata.UnsupportedWorkerVersionSkewReason = // fmt.Sprintf("unsupported worker version skew: cluster=%s node=%s", clusterVersion, nodeVersion) _ = nctx _ = clusterVersion _ = nodeVersion } // Optional helper if you want to probe readiness later through the API. // Keeping this here in case you want a very cheap liveness call elsewhere. func apiServerReady(ctx context.Context, kubeconfigPath string) error { restCfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) if err != nil { return err } restCfg.Timeout = 5 * time.Second clientset, err := kubernetes.NewForConfig(restCfg) if err != nil { return err } _, err = clientset.CoreV1().Namespaces().Get(ctx, metav1.NamespaceSystem, metav1.GetOptions{}) return err } func ValidateRequiredImagesPresent(ctx context.Context, n *NodeContext) error { if n.Config.Spec.SkipImageCheck { klog.Infof("skipping image check (skipImageCheck=true)") return nil } k8sVersion := strings.TrimSpace(n.Config.Spec.KubernetesVersion) if k8sVersion == "" { return fmt.Errorf("kubernetesVersion is required") } klog.Infof("checking required Kubernetes images for %s...", k8sVersion) result, err := n.SystemRunner.Run(ctx, "kubeadm", "config", "images", "list", "--kubernetes-version", k8sVersion, ) if err != nil { return fmt.Errorf("list required Kubernetes images for %s: %w", k8sVersion, err) } var missing []string for _, img := range strings.Fields(result.Stdout) { if err := checkImagePresent(ctx, n, img); err != nil { klog.Errorf("MISSING image: %s", img) missing = append(missing, img) continue } klog.Infof("found image: %s", img) } if len(missing) > 0 { return fmt.Errorf("preload the Kubernetes images before bootstrapping; missing: %s", strings.Join(missing, ", ")) } klog.Infof("all required images are present") return nil } func checkImagePresent(ctx context.Context, n *NodeContext, image string) error { image = strings.TrimSpace(image) if image == "" { return fmt.Errorf("image is required") } // crictl inspecti exits non-zero when the image is absent. _, err := n.SystemRunner.RunRetry(ctx, system.RetryOptions{ Attempts: 3, Delay: 1 * system.DefaultSecond, }, "crictl", "inspecti", image) if err != nil { return fmt.Errorf("image %q not present: %w", image, err) } return nil } func chooseVersionKubeconfig(state *LocalClusterState) string { if state.HasAdminKubeconfig { return adminKubeconfigPath } if state.HasKubeletKubeconfig { return kubeletKubeconfigPath } return "" } func versionEq(a, b string) bool { return normalizeKubeVersion(a) == normalizeKubeVersion(b) } func normalizeKubeVersion(v string) string { v = strings.TrimSpace(v) if v == "" { return "" } if !strings.HasPrefix(v, "v") { v = "v" + v } return v } func buildNodeRegistration(spec types.MonoKSConfigSpec) NodeRegistrationOptions { nodeName := strings.TrimSpace(spec.NodeName) criSocket := strings.TrimSpace(spec.ContainerRuntimeEndpoint) advertiseAddress := strings.TrimSpace(spec.APIServerAdvertiseAddress) nr := NodeRegistrationOptions{ Name: nodeName, CRISocket: criSocket, ImagePullPolicy: "IfNotPresent", KubeletExtraArgs: []KubeadmArg{ {Name: "hostname-override", Value: nodeName}, {Name: "pod-manifest-path", Value: "/etc/kubernetes/manifests"}, }, } if advertiseAddress != "" { nr.KubeletExtraArgs = append(nr.KubeletExtraArgs, KubeadmArg{Name: "node-ip", Value: advertiseAddress}, ) } return nr } func maybeAddBootstrapTaint(nr *NodeRegistrationOptions, role string) { if strings.TrimSpace(role) != "worker" { return } nr.Taints = []KubeadmTaint{ { Key: "monok8s/bootstrap", Effect: "NoSchedule", }, } } func GenerateKubeadmInitConfig(_ context.Context, nctx *NodeContext) error { if nctx == nil { return fmt.Errorf("node context is nil") } spec := nctx.Config.Spec advertiseAddress := strings.TrimSpace(spec.APIServerAdvertiseAddress) if advertiseAddress == "" { return fmt.Errorf("api server advertise address is required") } clusterName := strings.TrimSpace(spec.ClusterName) if clusterName == "" { return fmt.Errorf("cluster name is required") } kubernetesVersion := strings.TrimSpace(spec.KubernetesVersion) if kubernetesVersion == "" { return fmt.Errorf("kubernetes version is required") } podSubnet := strings.TrimSpace(spec.PodSubnet) if podSubnet == "" { return fmt.Errorf("pod subnet is required") } serviceSubnet := strings.TrimSpace(spec.ServiceSubnet) if serviceSubnet == "" { return fmt.Errorf("service subnet is required") } clusterDomain := strings.TrimSpace(spec.ClusterDomain) if clusterDomain == "" { return fmt.Errorf("cluster domain is required") } certSANs := []string{advertiseAddress} seen := map[string]struct{}{advertiseAddress: {}} for _, raw := range spec.SubjectAltNames { san := strings.TrimSpace(raw) if san == "" { continue } if _, ok := seen[san]; ok { continue } seen[san] = struct{}{} certSANs = append(certSANs, san) } nodeReg := buildNodeRegistration(spec) if spec.ClusterRole == "worker" { maybeAddBootstrapTaint(&nodeReg, spec.ClusterRole) } initCfg := InitConfiguration{ APIVersion: "kubeadm.k8s.io/v1beta4", Kind: "InitConfiguration", LocalAPIEndpoint: LocalAPIEndpoint{ AdvertiseAddress: advertiseAddress, BindPort: 6443, }, NodeRegistration: nodeReg, } clusterCfg := ClusterConfiguration{ APIVersion: "kubeadm.k8s.io/v1beta4", Kind: "ClusterConfiguration", ClusterName: clusterName, KubernetesVersion: kubernetesVersion, Networking: Networking{ PodSubnet: podSubnet, ServiceSubnet: serviceSubnet, DNSDomain: clusterDomain, }, APIServer: APIServer{ CertSANs: certSANs, }, } kubeletCfg := KubeletConfiguration{ APIVersion: "kubelet.config.k8s.io/v1beta1", Kind: "KubeletConfiguration", CgroupDriver: "cgroupfs", ContainerRuntimeEndpoint: strings.TrimSpace(spec.ContainerRuntimeEndpoint), } return writeKubeadmYAML(tmpKubeadmInitConf, initCfg, clusterCfg, kubeletCfg) } func GenerateKubeadmJoinConfig(_ context.Context, nctx *NodeContext) error { if nctx == nil { return fmt.Errorf("node context is nil") } spec := nctx.Config.Spec apiServerEndpoint := strings.TrimSpace(spec.APIServerEndpoint) if apiServerEndpoint == "" { return fmt.Errorf("spec.apiServerEndpoint is required") } bootstrapToken := strings.TrimSpace(spec.BootstrapToken) if bootstrapToken == "" { return fmt.Errorf("spec.bootstrapToken is required") } discoveryTokenCACertHash := strings.TrimSpace(spec.DiscoveryTokenCACertHash) if discoveryTokenCACertHash == "" { return fmt.Errorf("spec.discoveryTokenCACertHash is required") } nodeReg := buildNodeRegistration(spec) if nctx.BootstrapState != nil && nctx.BootstrapState.Action == BootstrapActionJoinWorker { maybeAddBootstrapTaint(&nodeReg, spec.ClusterRole) } joinCfg := JoinConfiguration{ APIVersion: "kubeadm.k8s.io/v1beta4", Kind: "JoinConfiguration", NodeRegistration: nodeReg, Discovery: Discovery{ BootstrapToken: BootstrapTokenDiscovery{ APIServerEndpoint: apiServerEndpoint, Token: bootstrapToken, CACertHashes: []string{discoveryTokenCACertHash}, }, }, } if nctx.BootstrapState != nil && nctx.BootstrapState.Action == BootstrapActionJoinControlPlane { certKey := strings.TrimSpace(spec.ControlPlaneCertKey) if certKey == "" { return fmt.Errorf("spec.controlPlaneCertKey is required for control-plane join") } advertiseAddress := strings.TrimSpace(spec.APIServerAdvertiseAddress) if advertiseAddress == "" { return fmt.Errorf("spec.apiServerAdvertiseAddress is required for control-plane join") } joinCfg.ControlPlane = &JoinControlPlane{ CertificateKey: certKey, } joinCfg.LocalAPIEndpoint = &LocalAPIEndpoint{ AdvertiseAddress: advertiseAddress, BindPort: 6443, } } kubeletCfg := KubeletConfiguration{ APIVersion: "kubelet.config.k8s.io/v1beta1", Kind: "KubeletConfiguration", CgroupDriver: "cgroupfs", ContainerRuntimeEndpoint: strings.TrimSpace(spec.ContainerRuntimeEndpoint), } return writeKubeadmYAML(tmpKubeadmInitConf, joinCfg, kubeletCfg) } func writeKubeadmYAML(path string, docs ...any) error { var renderedDocs [][]byte for _, doc := range docs { b, err := yaml.Marshal(doc) if err != nil { return fmt.Errorf("marshal kubeadm config document: %w", err) } renderedDocs = append(renderedDocs, bytes.TrimSpace(b)) } var buf bytes.Buffer for i, doc := range renderedDocs { if i > 0 { buf.WriteString("\n---\n") } buf.Write(doc) buf.WriteByte('\n') } rendered := buf.String() if err := os.WriteFile(path, []byte(rendered), 0o600); err != nil { return fmt.Errorf("write kubeadm config to %s: %w", path, err) } klog.V(4).Infof("generated kubeadm config at %s:\n%s", path, rendered) return nil } func RunKubeadmInit(ctx context.Context, nctx *NodeContext) error { if nctx.BootstrapState == nil { return errors.New("BootstrapState is nil. Please run earlier steps first") } if nctx.BootstrapState.Action != BootstrapActionInitControlPlane { klog.V(4).Infof("skipped for %s", nctx.BootstrapState.Action) return nil } if err := GenerateKubeadmInitConfig(ctx, nctx); err != nil { return err } _, err := nctx.SystemRunner.RunWithOptions( ctx, "kubeadm", []string{"init", "--config", tmpKubeadmInitConf}, system.RunOptions{ Timeout: 10 * time.Minute, OnStdoutLine: func(line string) { klog.Infof("[kubeadm] %s", line) }, OnStderrLine: func(line string) { klog.Infof("[kubeadm] %s", line) }, }, ) return err } func RunKubeadmJoin(ctx context.Context, nctx *NodeContext) error { if nctx.BootstrapState == nil { return errors.New("BootstrapState is nil. Please run earlier steps first") } switch nctx.BootstrapState.Action { case BootstrapActionJoinWorker, BootstrapActionJoinControlPlane: // continue default: klog.V(4).Infof("RunKubeadmJoin skipped for action %q", nctx.BootstrapState.Action) return nil } if err := GenerateKubeadmJoinConfig(ctx, nctx); err != nil { return err } klog.Infof("running kubeadm join for action %q", nctx.BootstrapState.Action) _, err := nctx.SystemRunner.RunWithOptions( ctx, "kubeadm", []string{"join", "--config", tmpKubeadmInitConf}, system.RunOptions{ Timeout: 5 * time.Minute, OnStdoutLine: func(line string) { klog.Infof("[kubeadm] %s", line) }, OnStderrLine: func(line string) { klog.Infof("[kubeadm] %s", line) }, }, ) if err != nil { return fmt.Errorf("run kubeadm join: %w", err) } return nil } func RunKubeadmUpgradeApply(context.Context, *NodeContext) error { klog.Info("run_kubeadm_upgrade_apply: TODO implement kubeadm upgrade apply") return nil } func RunKubeadmUpgradeNode(context.Context, *NodeContext) error { klog.Info("run_kubeadm_upgrade_node: TODO implement kubeadm upgrade node") return nil } func ReconcileControlPlane(ctx context.Context, nctx *NodeContext) error { if nctx.BootstrapState == nil { return errors.New("BootstrapState is nil, call ClassifyBootstrapAction() first") } if nctx.BootstrapState.Action != BootstrapActionReconcileControlPlane { klog.V(4).Infof("ReconcileControlPlane skipped for action %q", nctx.BootstrapState.Action) return nil } if err := StartKubelet(ctx, nctx); err != nil { return fmt.Errorf("start kubelet: %w", err) } if err := waitForLocalAPIServer(ctx, nctx, 2*time.Minute); err != nil { return fmt.Errorf("wait for local apiserver: %w", err) } if err := waitForAPIViaKubeconfig(ctx, adminKubeconfigPath, 2*time.Minute); err != nil { return fmt.Errorf("wait for admin api: %w", err) } return nil } func ReconcileWorker(ctx context.Context, nctx *NodeContext) error { if nctx.BootstrapState == nil { return errors.New("BootstrapState is nil, call ClassifyBootstrapAction() first") } if nctx.BootstrapState.Action != BootstrapActionReconcileWorker { klog.V(4).Infof("ReconcileWorker skipped for action %q", nctx.BootstrapState.Action) return nil } if err := StartKubelet(ctx, nctx); err != nil { return fmt.Errorf("start kubelet: %w", err) } if err := waitForKubeletHealthy(ctx, 2*time.Minute); err != nil { return fmt.Errorf("wait for kubelet healthy: %w", err) } return nil }