package node import ( "bytes" "context" "errors" "fmt" "net" "os" "strings" "time" "gopkg.in/yaml.v3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" system "undecided.project/monok8s/pkg/system" ) const ( adminKubeconfigPath = "/etc/kubernetes/admin.conf" kubeletKubeconfigPath = "/etc/kubernetes/kubelet.conf" tmpKubeadmInitConf = "/tmp/kubeadm-init.yaml" ) func DetectLocalClusterState(ctx context.Context, nctx *NodeContext) error { _ = ctx if nctx == nil { return fmt.Errorf("node context is nil") } _, err := os.Stat(adminKubeconfigPath) hasAdmin := err == nil if err != nil && !errors.Is(err, os.ErrNotExist) { return fmt.Errorf("stat admin kubeconfig: %w", err) } _, err = os.Stat(kubeletKubeconfigPath) hasKubelet := err == nil if err != nil && !errors.Is(err, os.ErrNotExist) { return fmt.Errorf("stat kubelet kubeconfig: %w", err) } state := LocalClusterState{ HasAdminKubeconfig: hasAdmin, HasKubeletKubeconfig: hasKubelet, } switch { case !hasAdmin && !hasKubelet: state.MembershipKind = LocalMembershipFresh case !hasAdmin && hasKubelet: state.MembershipKind = LocalMembershipExistingWorker case hasAdmin && hasKubelet: state.MembershipKind = LocalMembershipExistingControlPlane case hasAdmin && !hasKubelet: state.MembershipKind = LocalMembershipPartial default: return fmt.Errorf("unreachable local cluster state") } klog.V(4).Infof("Detected local state: %+v", state) nctx.LocalClusterState = &state return nil } func WaitForExistingClusterIfNeeded(ctx context.Context, nctx *NodeContext) error { if nctx.LocalClusterState == nil { return errors.New("LocalClusterState is nil, please run dependency step first") } switch nctx.LocalClusterState.MembershipKind { case LocalMembershipFresh: klog.V(4).Infof("Nothing to to do LocalMembershipFresh") return nil case LocalMembershipExistingWorker: klog.V(4).Infof("Starting Kubelet in LocalMembershipExistingWorker") if err := StartKubelet(ctx, nctx); err != nil { return fmt.Errorf("start kubelet: %w", err) } return nil case LocalMembershipExistingControlPlane: if err := StartKubelet(ctx, nctx); err != nil { return fmt.Errorf("start kubelet: %w", err) } klog.V(4).Infof("Waiting for local apiserver in LocalMembershipExistingControlPlane") // Existing local control-plane state: wait for local apiserver if this // machine is meant to be a control-plane node. if strings.TrimSpace(nctx.Config.Spec.ClusterRole) == "control-plane" { if err := waitForLocalAPIServer(ctx, nctx, 2*time.Minute); err != nil { return fmt.Errorf("wait for local apiserver: %w", err) } if err := waitForAPIViaKubeconfig(ctx, adminKubeconfigPath, 2*time.Minute); err != nil { return fmt.Errorf("wait for admin api: %w", err) } } return nil case LocalMembershipPartial: return fmt.Errorf("partial local cluster state detected: admin=%t kubelet=%t", nctx.LocalClusterState.HasAdminKubeconfig, nctx.LocalClusterState.HasKubeletKubeconfig, ) default: return fmt.Errorf("unknown local membership kind %q", nctx.LocalClusterState.MembershipKind) } } func CheckForVersionSkew(ctx context.Context, nctx *NodeContext) error { role := strings.TrimSpace(nctx.Config.Spec.ClusterRole) wantVersion := normalizeKubeVersion(strings.TrimSpace(nctx.Config.Spec.KubernetesVersion)) if wantVersion == "" { return errors.New("spec.kubernetesVersion is required") } switch nctx.LocalClusterState.MembershipKind { case LocalMembershipFresh: // Fresh node has no existing cluster membership to compare against. return nil case LocalMembershipPartial: return fmt.Errorf("cannot check version skew with partial local cluster state") } versionKubeconfig := chooseVersionKubeconfig(nctx.LocalClusterState) if versionKubeconfig == "" { return fmt.Errorf("no kubeconfig available for version detection") } currentVersion, err := getServerVersion(ctx, versionKubeconfig) if err != nil { if role == "control-plane" { return fmt.Errorf("existing control-plane state found, but cluster version could not be determined: %w", err) } // Worker path stays permissive. nctx.BootstrapState.UnsupportedWorkerVersionSkew = true nctx.BootstrapState.VersionSkewReason = "cluster version could not be determined" return nil } nctx.BootstrapState.DetectedClusterVersion = currentVersion switch role { case "control-plane": if !isSupportedControlPlaneSkew(currentVersion, wantVersion) { return fmt.Errorf( "unsupported control-plane version skew: cluster=%s node=%s", currentVersion, wantVersion, ) } case "worker": if !isSupportedWorkerSkew(currentVersion, wantVersion) { nctx.BootstrapState.UnsupportedWorkerVersionSkew = true nctx.BootstrapState.VersionSkewReason = fmt.Sprintf( "unsupported worker version skew: cluster=%s node=%s", currentVersion, wantVersion, ) } default: return fmt.Errorf("unsupported cluster role %q", role) } return nil } func ClassifyBootstrapAction(ctx context.Context, nctx *NodeContext) error { _ = ctx if nctx.LocalClusterState == nil { return errors.New("LocalClusterState is nil, call detect_local_cluster_state()") } role := strings.TrimSpace(nctx.Config.Spec.ClusterRole) initControlPlane := nctx.Config.Spec.InitControlPlane wantVersion := normalizeKubeVersion(strings.TrimSpace(nctx.Config.Spec.KubernetesVersion)) if wantVersion == "" { return errors.New("spec.kubernetesVersion is required") } state := &BootstrapState{} // Preserve already-detected info if earlier steps populated it. if nctx.BootstrapState != nil { state.DetectedClusterVersion = nctx.BootstrapState.DetectedClusterVersion } switch role { case "worker": switch nctx.LocalClusterState.MembershipKind { case LocalMembershipFresh: state.Action = BootstrapActionJoinWorker case LocalMembershipExistingWorker: if state.DetectedClusterVersion == "" { state.Action = BootstrapActionReconcileWorker } else if versionEq(state.DetectedClusterVersion, wantVersion) { state.Action = BootstrapActionReconcileWorker } else { state.Action = BootstrapActionUpgradeWorker } case LocalMembershipExistingControlPlane, LocalMembershipPartial: return fmt.Errorf("local state %q is invalid for worker role", nctx.LocalClusterState.MembershipKind) default: return fmt.Errorf("unknown local membership kind %q", nctx.LocalClusterState.MembershipKind) } case "control-plane": switch nctx.LocalClusterState.MembershipKind { case LocalMembershipFresh: if initControlPlane { state.Action = BootstrapActionInitControlPlane } else { state.Action = BootstrapActionJoinControlPlane } case LocalMembershipExistingControlPlane: if state.DetectedClusterVersion == "" { return errors.New("existing control-plane state found, but detected cluster version is empty") } if versionEq(state.DetectedClusterVersion, wantVersion) { state.Action = BootstrapActionReconcileControlPlane } else { state.Action = BootstrapActionUpgradeControlPlane } case LocalMembershipExistingWorker: return fmt.Errorf("local state %q is invalid for control-plane role", nctx.LocalClusterState.MembershipKind) case LocalMembershipPartial: return fmt.Errorf("partial local cluster state is invalid for control-plane role") default: return fmt.Errorf("unknown local membership kind %q", nctx.LocalClusterState.MembershipKind) } default: return fmt.Errorf("unsupported cluster role %q", role) } nctx.BootstrapState = state klog.V(4).Infof("Bootstrap action: %+v", *state) return nil } func InitControlPlane(ctx context.Context, nctx *NodeContext) error { if nctx == nil { return errors.New("node context is nil") } if nctx.Config == nil { return errors.New("node config is nil") } if strings.TrimSpace(nctx.Config.Spec.ClusterRole) != "control-plane" { return fmt.Errorf("init control-plane called for non-control-plane role %q", nctx.Config.Spec.ClusterRole) } if !nctx.Config.Spec.InitControlPlane { return errors.New("init control-plane called but spec.initControlPlane is false") } if nctx.BootstrapState.Action != BootstrapActionInitControlPlane { return fmt.Errorf("init control-plane called with bootstrap action %q", nctx.BootstrapState.Action) } // Fresh init only. Existing control-plane recovery/wait belongs elsewhere. switch nctx.LocalClusterState.MembershipKind { case LocalMembershipFresh: // continue default: return fmt.Errorf("init control-plane requires fresh local state, got %q", nctx.LocalClusterState.MembershipKind) } // Example: // if err := RunKubeadmInit(ctx, nctx); err != nil { // return fmt.Errorf("kubeadm init: %w", err) // } return nil } func waitForLocalAPIServer(ctx context.Context, nctx *NodeContext, timeout time.Duration) error { addr := strings.TrimSpace(nctx.Config.Spec.APIServerAdvertiseAddress) if addr == "" { return errors.New("spec.apiServerAdvertiseAddress is required for local control-plane wait") } d := net.Dialer{Timeout: 2 * time.Second} deadlineCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() target := net.JoinHostPort(addr, "6443") ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { conn, err := d.DialContext(deadlineCtx, "tcp", target) if err == nil { _ = conn.Close() return nil } select { case <-deadlineCtx.Done(): return fmt.Errorf("apiserver %s did not become reachable within %s", target, timeout) case <-ticker.C: } } } func waitForAPIViaKubeconfig(ctx context.Context, kubeconfigPath string, timeout time.Duration) error { deadlineCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() for { _, err := getServerVersion(deadlineCtx, kubeconfigPath) if err == nil { return nil } select { case <-deadlineCtx.Done(): return fmt.Errorf("api via kubeconfig %s did not become reachable within %s", kubeconfigPath, timeout) case <-ticker.C: } } } func getServerVersion(ctx context.Context, kubeconfigPath string) (string, error) { restCfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) if err != nil { return "", fmt.Errorf("build kubeconfig %s: %w", kubeconfigPath, err) } // Keep this short. This is a probe, not a long-running client. restCfg.Timeout = 5 * time.Second clientset, err := kubernetes.NewForConfig(restCfg) if err != nil { return "", fmt.Errorf("create clientset: %w", err) } disc := clientset.Discovery() return discoverServerVersion(ctx, disc) } func discoverServerVersion(ctx context.Context, disc discovery.DiscoveryInterface) (string, error) { info, err := disc.ServerVersion() if err != nil { return "", err } if info == nil || strings.TrimSpace(info.GitVersion) == "" { return "", errors.New("server version is empty") } return normalizeKubeVersion(info.GitVersion), nil } type kubeVersion struct { Major int Minor int Patch int } func parseKubeVersion(s string) (kubeVersion, error) { s = strings.TrimSpace(s) s = strings.TrimPrefix(s, "v") var v kubeVersion n, err := fmt.Sscanf(s, "%d.%d.%d", &v.Major, &v.Minor, &v.Patch) // Accepts "1.29" or "1.29.3" if err != nil || n < 2 { return kubeVersion{}, fmt.Errorf("invalid kubernetes version %q", s) } return v, nil } // Control-plane: keep this strict. // Accept same version, or a one-minor step where the node binary is newer than the current cluster. // That covers normal control-plane upgrade flow but blocks nonsense. func isSupportedControlPlaneSkew(clusterVersion, nodeVersion string) bool { cv, err := parseKubeVersion(clusterVersion) if err != nil { return false } nv, err := parseKubeVersion(nodeVersion) if err != nil { return false } if cv.Major != nv.Major { return false } if cv.Minor == nv.Minor { return true } if nv.Minor == cv.Minor+1 { return true } return false } // Worker: kubelet generally must not be newer than the apiserver. // Older kubelets are allowed within supported skew range. // Your requirement says unsupported worker skew should still proceed, so this // only classifies support status and must NOT be used to block this function. func isSupportedWorkerSkew(clusterVersion, nodeVersion string) bool { cv, err := parseKubeVersion(clusterVersion) if err != nil { return false } nv, err := parseKubeVersion(nodeVersion) if err != nil { return false } if cv.Major != nv.Major { return false } // kubelet newer than apiserver => unsupported if nv.Minor > cv.Minor { return false } // kubelet up to 3 minors older than apiserver => supported if cv.Minor-nv.Minor <= 3 { return true } return false } // This should not try to taint the node directly here. // Just record intent and let a later reconcile step apply the taint. func markUnsupportedWorkerVersionSkew(nctx *NodeContext, clusterVersion, nodeVersion string) { // Replace this with whatever state carrier you already use. // // Example: // nctx.Metadata.UnsupportedWorkerVersionSkew = true // nctx.Metadata.UnsupportedWorkerVersionSkewReason = // fmt.Sprintf("unsupported worker version skew: cluster=%s node=%s", clusterVersion, nodeVersion) _ = nctx _ = clusterVersion _ = nodeVersion } // Optional helper if you want to probe readiness later through the API. // Keeping this here in case you want a very cheap liveness call elsewhere. func apiServerReady(ctx context.Context, kubeconfigPath string) error { restCfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) if err != nil { return err } restCfg.Timeout = 5 * time.Second clientset, err := kubernetes.NewForConfig(restCfg) if err != nil { return err } _, err = clientset.CoreV1().Namespaces().Get(ctx, metav1.NamespaceSystem, metav1.GetOptions{}) return err } func ValidateRequiredImagesPresent(ctx context.Context, n *NodeContext) error { if n.Config.Spec.SkipImageCheck { klog.Infof("skipping image check (skipImageCheck=true)") return nil } k8sVersion := strings.TrimSpace(n.Config.Spec.KubernetesVersion) if k8sVersion == "" { return fmt.Errorf("kubernetesVersion is required") } klog.Infof("checking required Kubernetes images for %s...", k8sVersion) result, err := n.SystemRunner.Run(ctx, "kubeadm", "config", "images", "list", "--kubernetes-version", k8sVersion, ) if err != nil { return fmt.Errorf("list required Kubernetes images for %s: %w", k8sVersion, err) } var missing []string for _, img := range strings.Fields(result.Stdout) { if err := checkImagePresent(ctx, n, img); err != nil { klog.Errorf("MISSING image: %s", img) missing = append(missing, img) continue } klog.Infof("found image: %s", img) } if len(missing) > 0 { return fmt.Errorf("preload the Kubernetes images before bootstrapping; missing: %s", strings.Join(missing, ", ")) } klog.Infof("all required images are present") return nil } func checkImagePresent(ctx context.Context, n *NodeContext, image string) error { image = strings.TrimSpace(image) if image == "" { return fmt.Errorf("image is required") } // crictl inspecti exits non-zero when the image is absent. _, err := n.SystemRunner.Run(ctx, "crictl", "inspecti", image) if err != nil { return fmt.Errorf("image %q not present: %w", image, err) } return nil } func chooseVersionKubeconfig(state *LocalClusterState) string { if state.HasAdminKubeconfig { return adminKubeconfigPath } if state.HasKubeletKubeconfig { return kubeletKubeconfigPath } return "" } func versionEq(a, b string) bool { return normalizeKubeVersion(a) == normalizeKubeVersion(b) } func normalizeKubeVersion(v string) string { v = strings.TrimSpace(v) if v == "" { return "" } if !strings.HasPrefix(v, "v") { v = "v" + v } return v } func GenerateKubeadmConfig(_ context.Context, nctx *NodeContext) error { if nctx == nil { return fmt.Errorf("node context is nil") } spec := nctx.Config.Spec advertiseAddress := strings.TrimSpace(spec.APIServerAdvertiseAddress) if advertiseAddress == "" { return fmt.Errorf("api server advertise address is required") } nodeName := strings.TrimSpace(spec.NodeName) if nodeName == "" { return fmt.Errorf("node name is required") } criSocket := strings.TrimSpace(spec.ContainerRuntimeEndpoint) if criSocket == "" { return fmt.Errorf("container runtime endpoint is required") } clusterName := strings.TrimSpace(spec.ClusterName) if clusterName == "" { return fmt.Errorf("cluster name is required") } kubernetesVersion := strings.TrimSpace(spec.KubernetesVersion) if kubernetesVersion == "" { return fmt.Errorf("kubernetes version is required") } podSubnet := strings.TrimSpace(spec.PodSubnet) if podSubnet == "" { return fmt.Errorf("pod subnet is required") } serviceSubnet := strings.TrimSpace(spec.ServiceSubnet) if serviceSubnet == "" { return fmt.Errorf("service subnet is required") } clusterDomain := strings.TrimSpace(spec.ClusterDomain) if clusterDomain == "" { return fmt.Errorf("cluster domain is required") } certSANs := []string{advertiseAddress} seen := map[string]struct{}{ advertiseAddress: {}, } for _, raw := range spec.SubjectAltNames { san := strings.TrimSpace(raw) if san == "" { continue } if _, ok := seen[san]; ok { continue } seen[san] = struct{}{} certSANs = append(certSANs, san) } type kubeadmInitConfiguration struct { APIVersion string `yaml:"apiVersion"` Kind string `yaml:"kind"` LocalAPIEndpoint struct { AdvertiseAddress string `yaml:"advertiseAddress"` BindPort int `yaml:"bindPort"` } `yaml:"localAPIEndpoint"` NodeRegistration struct { Name string `yaml:"name"` CRISocket string `yaml:"criSocket"` ImagePullPolicy string `yaml:"imagePullPolicy"` KubeletExtraArgs []struct { Name string `yaml:"name"` Value string `yaml:"value"` } `yaml:"kubeletExtraArgs"` } `yaml:"nodeRegistration"` } type kubeadmClusterConfiguration struct { APIVersion string `yaml:"apiVersion"` Kind string `yaml:"kind"` ClusterName string `yaml:"clusterName"` KubernetesVersion string `yaml:"kubernetesVersion"` Networking struct { PodSubnet string `yaml:"podSubnet"` ServiceSubnet string `yaml:"serviceSubnet"` DNSDomain string `yaml:"dnsDomain"` } `yaml:"networking"` APIServer struct { CertSANs []string `yaml:"certSANs"` } `yaml:"apiServer"` } type kubeletConfiguration struct { APIVersion string `yaml:"apiVersion"` Kind string `yaml:"kind"` CgroupDriver string `yaml:"cgroupDriver"` ContainerRuntimeEndpoint string `yaml:"containerRuntimeEndpoint"` } initCfg := kubeadmInitConfiguration{ APIVersion: "kubeadm.k8s.io/v1beta4", Kind: "InitConfiguration", } initCfg.LocalAPIEndpoint.AdvertiseAddress = advertiseAddress initCfg.LocalAPIEndpoint.BindPort = 6443 initCfg.NodeRegistration.Name = nodeName initCfg.NodeRegistration.CRISocket = criSocket initCfg.NodeRegistration.ImagePullPolicy = "IfNotPresent" initCfg.NodeRegistration.KubeletExtraArgs = []struct { Name string `yaml:"name"` Value string `yaml:"value"` }{ {Name: "hostname-override", Value: nodeName}, {Name: "node-ip", Value: advertiseAddress}, {Name: "pod-manifest-path", Value: "/etc/kubernetes/manifests"}, } clusterCfg := kubeadmClusterConfiguration{ APIVersion: "kubeadm.k8s.io/v1beta4", Kind: "ClusterConfiguration", ClusterName: clusterName, KubernetesVersion: kubernetesVersion, } clusterCfg.Networking.PodSubnet = podSubnet clusterCfg.Networking.ServiceSubnet = serviceSubnet clusterCfg.Networking.DNSDomain = clusterDomain clusterCfg.APIServer.CertSANs = certSANs kubeletCfg := kubeletConfiguration{ APIVersion: "kubelet.config.k8s.io/v1beta1", Kind: "KubeletConfiguration", CgroupDriver: "cgroupfs", ContainerRuntimeEndpoint: criSocket, } var docs [][]byte for _, doc := range []any{initCfg, clusterCfg, kubeletCfg} { b, err := yaml.Marshal(doc) if err != nil { return fmt.Errorf("marshal kubeadm config document: %w", err) } docs = append(docs, bytes.TrimSpace(b)) } var buf bytes.Buffer for i, doc := range docs { if i > 0 { buf.WriteString("\n---\n") } buf.Write(doc) buf.WriteByte('\n') } rendered := buf.String() if err := os.WriteFile(tmpKubeadmInitConf, []byte(rendered), 0o600); err != nil { return fmt.Errorf("write kubeadm config to %s: %w", tmpKubeadmInitConf, err) } klog.V(4).Infof("generated kubeadm config at %s:\n%s", tmpKubeadmInitConf, rendered) return nil } func RunKubeadmInit(ctx context.Context, nctx *NodeContext) error { if err := GenerateKubeadmConfig(ctx, nctx); err != nil { return err } _, err := nctx.SystemRunner.RunWithOptions( ctx, "kubeadm", []string{"init", "--config", tmpKubeadmInitConf}, system.RunOptions{ Timeout: 10 * time.Minute, OnStdoutLine: func(line string) { klog.Infof("[kubeadm] %s", line) }, OnStderrLine: func(line string) { klog.Infof("[kubeadm] %s", line) }, }, ) return err } func RunKubeadmUpgradeApply(context.Context, *NodeContext) error { klog.Info("run_kubeadm_upgrade_apply: TODO implement kubeadm upgrade apply") return nil } func RunKubeadmJoin(context.Context, *NodeContext) error { /* run_kubeadm_join() { log "running kubeadm join..." case "$JOIN_KIND" in worker) kubeadm join "${API_SERVER_ENDPOINT}" \ --token "${BOOTSTRAP_TOKEN}" \ --discovery-token-ca-cert-hash "${DISCOVERY_TOKEN_CA_CERT_HASH}" \ --node-name "${NODE_NAME}" \ --cri-socket "${CONTAINER_RUNTIME_ENDPOINT}" ;; control-plane) kubeadm join "${API_SERVER_ENDPOINT}" \ --token "${BOOTSTRAP_TOKEN}" \ --discovery-token-ca-cert-hash "${DISCOVERY_TOKEN_CA_CERT_HASH}" \ --control-plane \ --certificate-key "${CONTROL_PLANE_CERT_KEY}" \ --apiserver-advertise-address "${APISERVER_ADVERTISE_ADDRESS}" \ --node-name "${NODE_NAME}" \ --cri-socket "${CONTAINER_RUNTIME_ENDPOINT}" ;; esac } */ return nil } func RunKubeadmUpgradeNode(context.Context, *NodeContext) error { klog.Info("run_kubeadm_upgrade_node: TODO implement kubeadm upgrade node") return nil }