Files
monok8s/clitools/pkg/node/kubeadm.go

821 lines
23 KiB
Go

package node
import (
"bytes"
"context"
"errors"
"fmt"
"net"
"os"
"strings"
"time"
"gopkg.in/yaml.v3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
types "undecided.project/monok8s/pkg/apis/monok8s/v1alpha1"
system "undecided.project/monok8s/pkg/system"
)
const (
adminKubeconfigPath = "/etc/kubernetes/admin.conf"
kubeletKubeconfigPath = "/etc/kubernetes/kubelet.conf"
tmpKubeadmInitConf = "/tmp/kubeadm-init.yaml"
)
func DetectLocalClusterState(ctx context.Context, nctx *NodeContext) error {
_ = ctx
if nctx == nil {
return fmt.Errorf("node context is nil")
}
_, err := os.Stat(adminKubeconfigPath)
hasAdmin := err == nil
if err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("stat admin kubeconfig: %w", err)
}
_, err = os.Stat(kubeletKubeconfigPath)
hasKubelet := err == nil
if err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("stat kubelet kubeconfig: %w", err)
}
state := LocalClusterState{
HasAdminKubeconfig: hasAdmin,
HasKubeletKubeconfig: hasKubelet,
}
switch {
case !hasAdmin && !hasKubelet:
state.MembershipKind = LocalMembershipFresh
case !hasAdmin && hasKubelet:
state.MembershipKind = LocalMembershipExistingWorker
case hasAdmin && hasKubelet:
state.MembershipKind = LocalMembershipExistingControlPlane
case hasAdmin && !hasKubelet:
state.MembershipKind = LocalMembershipPartial
default:
return fmt.Errorf("unreachable local cluster state")
}
klog.V(4).Infof("Detected local state: %+v", state)
nctx.LocalClusterState = &state
return nil
}
func WaitForExistingClusterIfNeeded(ctx context.Context, nctx *NodeContext) error {
if nctx.LocalClusterState == nil {
return errors.New("LocalClusterState is nil, please run dependency step first")
}
switch nctx.LocalClusterState.MembershipKind {
case LocalMembershipFresh:
klog.V(4).Infof("Nothing to to do LocalMembershipFresh")
return nil
case LocalMembershipExistingWorker:
klog.V(4).Infof("Starting Kubelet in LocalMembershipExistingWorker")
if err := StartKubelet(ctx, nctx); err != nil {
return fmt.Errorf("start kubelet: %w", err)
}
return nil
case LocalMembershipExistingControlPlane:
if err := StartKubelet(ctx, nctx); err != nil {
return fmt.Errorf("start kubelet: %w", err)
}
klog.V(4).Infof("Waiting for local apiserver in LocalMembershipExistingControlPlane")
// Existing local control-plane state: wait for local apiserver if this
// machine is meant to be a control-plane node.
if strings.TrimSpace(nctx.Config.Spec.ClusterRole) == "control-plane" {
if err := waitForLocalAPIServer(ctx, nctx, 2*time.Minute); err != nil {
return fmt.Errorf("wait for local apiserver: %w", err)
}
if err := waitForAPIViaKubeconfig(ctx, adminKubeconfigPath, 2*time.Minute); err != nil {
return fmt.Errorf("wait for admin api: %w", err)
}
}
return nil
case LocalMembershipPartial:
return fmt.Errorf("partial local cluster state detected: admin=%t kubelet=%t",
nctx.LocalClusterState.HasAdminKubeconfig,
nctx.LocalClusterState.HasKubeletKubeconfig,
)
default:
return fmt.Errorf("unknown local membership kind %q", nctx.LocalClusterState.MembershipKind)
}
}
func ClassifyBootstrapAction(ctx context.Context, nctx *NodeContext) error {
_ = ctx
if nctx.LocalClusterState == nil {
return errors.New("LocalClusterState is nil, call detect_local_cluster_state()")
}
role := strings.TrimSpace(nctx.Config.Spec.ClusterRole)
initControlPlane := nctx.Config.Spec.InitControlPlane
wantVersion := normalizeKubeVersion(strings.TrimSpace(nctx.Config.Spec.KubernetesVersion))
if wantVersion == "" {
return errors.New("spec.kubernetesVersion is required")
}
state := &BootstrapState{}
if nctx.BootstrapState != nil {
*state = *nctx.BootstrapState
}
switch role {
case "worker":
switch nctx.LocalClusterState.MembershipKind {
case LocalMembershipFresh:
state.Action = BootstrapActionJoinWorker
case LocalMembershipExistingWorker:
state.Action = BootstrapActionManageWorker
case LocalMembershipExistingControlPlane, LocalMembershipPartial:
return fmt.Errorf("local state %q is invalid for worker role", nctx.LocalClusterState.MembershipKind)
default:
return fmt.Errorf("unknown local membership kind %q", nctx.LocalClusterState.MembershipKind)
}
case "control-plane":
switch nctx.LocalClusterState.MembershipKind {
case LocalMembershipFresh:
if initControlPlane {
state.Action = BootstrapActionInitControlPlane
} else {
state.Action = BootstrapActionJoinControlPlane
}
case LocalMembershipExistingControlPlane:
state.Action = BootstrapActionManageControlPlane
case LocalMembershipExistingWorker:
return fmt.Errorf("local state %q is invalid for control-plane role", nctx.LocalClusterState.MembershipKind)
case LocalMembershipPartial:
return fmt.Errorf("partial local cluster state is invalid for control-plane role")
default:
return fmt.Errorf("unknown local membership kind %q", nctx.LocalClusterState.MembershipKind)
}
default:
return fmt.Errorf("unsupported cluster role %q", role)
}
nctx.BootstrapState = state
klog.V(4).Infof("Bootstrap action classified: %+v", *state)
return nil
}
func InitControlPlane(ctx context.Context, nctx *NodeContext) error {
if nctx == nil {
return errors.New("node context is nil")
}
if nctx.Config == nil {
return errors.New("node config is nil")
}
if strings.TrimSpace(nctx.Config.Spec.ClusterRole) != "control-plane" {
return fmt.Errorf("init control-plane called for non-control-plane role %q", nctx.Config.Spec.ClusterRole)
}
if !nctx.Config.Spec.InitControlPlane {
return errors.New("init control-plane called but spec.initControlPlane is false")
}
if nctx.BootstrapState.Action != BootstrapActionInitControlPlane {
return fmt.Errorf("init control-plane called with bootstrap action %q", nctx.BootstrapState.Action)
}
// Fresh init only. Existing control-plane recovery/wait belongs elsewhere.
switch nctx.LocalClusterState.MembershipKind {
case LocalMembershipFresh:
// continue
default:
return fmt.Errorf("init control-plane requires fresh local state, got %q", nctx.LocalClusterState.MembershipKind)
}
// Example:
// if err := RunKubeadmInit(ctx, nctx); err != nil {
// return fmt.Errorf("kubeadm init: %w", err)
// }
return nil
}
func waitForLocalAPIServer(ctx context.Context, nctx *NodeContext, timeout time.Duration) error {
addr := strings.TrimSpace(nctx.Config.Spec.APIServerAdvertiseAddress)
if addr == "" {
return errors.New("spec.apiServerAdvertiseAddress is required for local control-plane wait")
}
d := net.Dialer{Timeout: 2 * time.Second}
deadlineCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
target := net.JoinHostPort(addr, "6443")
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
conn, err := d.DialContext(deadlineCtx, "tcp", target)
if err == nil {
_ = conn.Close()
return nil
}
select {
case <-deadlineCtx.Done():
return fmt.Errorf("apiserver %s did not become reachable within %s", target, timeout)
case <-ticker.C:
}
}
}
func waitForAPIViaKubeconfig(ctx context.Context, kubeconfigPath string, timeout time.Duration) error {
deadlineCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
_, err := getServerVersion(deadlineCtx, kubeconfigPath)
if err == nil {
return nil
}
select {
case <-deadlineCtx.Done():
return fmt.Errorf("api via kubeconfig %s did not become reachable within %s", kubeconfigPath, timeout)
case <-ticker.C:
}
}
}
func getServerVersion(ctx context.Context, kubeconfigPath string) (string, error) {
restCfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return "", fmt.Errorf("build kubeconfig %s: %w", kubeconfigPath, err)
}
// Keep this short. This is a probe, not a long-running client.
restCfg.Timeout = 5 * time.Second
clientset, err := kubernetes.NewForConfig(restCfg)
if err != nil {
return "", fmt.Errorf("create clientset: %w", err)
}
disc := clientset.Discovery()
return discoverServerVersion(ctx, disc)
}
func discoverServerVersion(ctx context.Context, disc discovery.DiscoveryInterface) (string, error) {
info, err := disc.ServerVersion()
if err != nil {
return "", err
}
if info == nil || strings.TrimSpace(info.GitVersion) == "" {
return "", errors.New("server version is empty")
}
return normalizeKubeVersion(info.GitVersion), nil
}
type kubeVersion struct {
Major int
Minor int
Patch int
}
func parseKubeVersion(s string) (kubeVersion, error) {
s = strings.TrimSpace(s)
s = strings.TrimPrefix(s, "v")
var v kubeVersion
n, err := fmt.Sscanf(s, "%d.%d.%d", &v.Major, &v.Minor, &v.Patch)
// Accepts "1.29" or "1.29.3"
if err != nil || n < 2 {
return kubeVersion{}, fmt.Errorf("invalid kubernetes version %q", s)
}
return v, nil
}
// Control-plane: keep this strict.
// Accept same version, or a one-minor step where the node binary is newer than the current cluster.
// That covers normal control-plane upgrade flow but blocks nonsense.
func isSupportedControlPlaneSkew(clusterVersion, nodeVersion string) bool {
cv, err := parseKubeVersion(clusterVersion)
if err != nil {
return false
}
nv, err := parseKubeVersion(nodeVersion)
if err != nil {
return false
}
if cv.Major != nv.Major {
return false
}
if cv.Minor == nv.Minor {
return true
}
if nv.Minor == cv.Minor+1 {
return true
}
return false
}
// Worker: kubelet generally must not be newer than the apiserver.
// Older kubelets are allowed within supported skew range.
// Your requirement says unsupported worker skew should still proceed, so this
// only classifies support status and must NOT be used to block this function.
func isSupportedWorkerSkew(clusterVersion, nodeVersion string) bool {
cv, err := parseKubeVersion(clusterVersion)
if err != nil {
return false
}
nv, err := parseKubeVersion(nodeVersion)
if err != nil {
return false
}
if cv.Major != nv.Major {
return false
}
// kubelet newer than apiserver => unsupported
if nv.Minor > cv.Minor {
return false
}
// kubelet up to 3 minors older than apiserver => supported
if cv.Minor-nv.Minor <= 3 {
return true
}
return false
}
// This should not try to taint the node directly here.
// Just record intent and let a later reconcile step apply the taint.
func markUnsupportedWorkerVersionSkew(nctx *NodeContext, clusterVersion, nodeVersion string) {
// Replace this with whatever state carrier you already use.
//
// Example:
// nctx.Metadata.UnsupportedWorkerVersionSkew = true
// nctx.Metadata.UnsupportedWorkerVersionSkewReason =
// fmt.Sprintf("unsupported worker version skew: cluster=%s node=%s", clusterVersion, nodeVersion)
_ = nctx
_ = clusterVersion
_ = nodeVersion
}
// Optional helper if you want to probe readiness later through the API.
// Keeping this here in case you want a very cheap liveness call elsewhere.
func apiServerReady(ctx context.Context, kubeconfigPath string) error {
restCfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return err
}
restCfg.Timeout = 5 * time.Second
clientset, err := kubernetes.NewForConfig(restCfg)
if err != nil {
return err
}
_, err = clientset.CoreV1().Namespaces().Get(ctx, metav1.NamespaceSystem, metav1.GetOptions{})
return err
}
func ValidateRequiredImagesPresent(ctx context.Context, n *NodeContext) error {
if n.Config.Spec.SkipImageCheck {
klog.Infof("skipping image check (skipImageCheck=true)")
return nil
}
k8sVersion := strings.TrimSpace(n.Config.Spec.KubernetesVersion)
if k8sVersion == "" {
return fmt.Errorf("kubernetesVersion is required")
}
klog.Infof("checking required Kubernetes images for %s...", k8sVersion)
result, err := n.SystemRunner.Run(ctx,
"kubeadm", "config", "images", "list",
"--kubernetes-version", k8sVersion,
)
if err != nil {
return fmt.Errorf("list required Kubernetes images for %s: %w", k8sVersion, err)
}
var missing []string
for _, img := range strings.Fields(result.Stdout) {
if err := checkImagePresent(ctx, n, img); err != nil {
klog.Errorf("MISSING image: %s", img)
missing = append(missing, img)
continue
}
klog.V(4).Infof("found image: %s", img)
}
if len(missing) > 0 {
return fmt.Errorf("preload the Kubernetes images before bootstrapping; missing: %s", strings.Join(missing, ", "))
}
klog.Infof("all required images are present")
return nil
}
func checkImagePresent(ctx context.Context, n *NodeContext, image string) error {
image = strings.TrimSpace(image)
if image == "" {
return fmt.Errorf("image is required")
}
// crictl inspecti exits non-zero when the image is absent.
_, err := n.SystemRunner.RunRetry(ctx, system.RetryOptions{
Attempts: 3,
Delay: 1 * system.DefaultSecond,
}, "crictl", "inspecti", image)
if err != nil {
return fmt.Errorf("image %q not present: %w", image, err)
}
return nil
}
func chooseVersionKubeconfig(state *LocalClusterState) string {
if state.HasAdminKubeconfig {
return adminKubeconfigPath
}
if state.HasKubeletKubeconfig {
return kubeletKubeconfigPath
}
return ""
}
func versionEq(a, b string) bool {
return normalizeKubeVersion(a) == normalizeKubeVersion(b)
}
func normalizeKubeVersion(v string) string {
v = strings.TrimSpace(v)
if v == "" {
return ""
}
if !strings.HasPrefix(v, "v") {
v = "v" + v
}
return v
}
func buildNodeRegistration(spec types.MonoKSConfigSpec) NodeRegistrationOptions {
nodeName := strings.TrimSpace(spec.NodeName)
criSocket := strings.TrimSpace(spec.ContainerRuntimeEndpoint)
advertiseAddress := strings.TrimSpace(spec.APIServerAdvertiseAddress)
nr := NodeRegistrationOptions{
Name: nodeName,
CRISocket: criSocket,
ImagePullPolicy: "IfNotPresent",
KubeletExtraArgs: []KubeadmArg{
{Name: "hostname-override", Value: nodeName},
{Name: "pod-manifest-path", Value: "/etc/kubernetes/manifests"},
},
}
if advertiseAddress != "" {
nr.KubeletExtraArgs = append(nr.KubeletExtraArgs,
KubeadmArg{Name: "node-ip", Value: advertiseAddress},
)
}
return nr
}
func maybeAddBootstrapTaint(nr *NodeRegistrationOptions, role string) {
if strings.TrimSpace(role) != "worker" {
return
}
nr.Taints = []KubeadmTaint{
{
Key: "monok8s/bootstrap",
Effect: "NoSchedule",
},
}
}
func GenerateKubeadmInitConfig(_ context.Context, nctx *NodeContext) error {
if nctx == nil {
return fmt.Errorf("node context is nil")
}
spec := nctx.Config.Spec
advertiseAddress := strings.TrimSpace(spec.APIServerAdvertiseAddress)
if advertiseAddress == "" {
return fmt.Errorf("api server advertise address is required")
}
clusterName := strings.TrimSpace(spec.ClusterName)
if clusterName == "" {
return fmt.Errorf("cluster name is required")
}
kubernetesVersion := strings.TrimSpace(spec.KubernetesVersion)
if kubernetesVersion == "" {
return fmt.Errorf("kubernetes version is required")
}
podSubnet := strings.TrimSpace(spec.PodSubnet)
if podSubnet == "" {
return fmt.Errorf("pod subnet is required")
}
serviceSubnet := strings.TrimSpace(spec.ServiceSubnet)
if serviceSubnet == "" {
return fmt.Errorf("service subnet is required")
}
clusterDomain := strings.TrimSpace(spec.ClusterDomain)
if clusterDomain == "" {
return fmt.Errorf("cluster domain is required")
}
certSANs := []string{advertiseAddress}
seen := map[string]struct{}{advertiseAddress: {}}
for _, raw := range spec.SubjectAltNames {
san := strings.TrimSpace(raw)
if san == "" {
continue
}
if _, ok := seen[san]; ok {
continue
}
seen[san] = struct{}{}
certSANs = append(certSANs, san)
}
nodeReg := buildNodeRegistration(spec)
if spec.ClusterRole == "worker" {
maybeAddBootstrapTaint(&nodeReg, spec.ClusterRole)
}
initCfg := InitConfiguration{
APIVersion: "kubeadm.k8s.io/v1beta4",
Kind: "InitConfiguration",
LocalAPIEndpoint: LocalAPIEndpoint{
AdvertiseAddress: advertiseAddress,
BindPort: 6443,
},
NodeRegistration: nodeReg,
}
clusterCfg := ClusterConfiguration{
APIVersion: "kubeadm.k8s.io/v1beta4",
Kind: "ClusterConfiguration",
ClusterName: clusterName,
KubernetesVersion: kubernetesVersion,
Networking: Networking{
PodSubnet: podSubnet,
ServiceSubnet: serviceSubnet,
DNSDomain: clusterDomain,
},
APIServer: APIServer{
CertSANs: certSANs,
},
}
kubeletCfg := KubeletConfiguration{
APIVersion: "kubelet.config.k8s.io/v1beta1",
Kind: "KubeletConfiguration",
CgroupDriver: "cgroupfs",
ContainerRuntimeEndpoint: strings.TrimSpace(spec.ContainerRuntimeEndpoint),
}
return writeKubeadmYAML(tmpKubeadmInitConf, initCfg, clusterCfg, kubeletCfg)
}
func GenerateKubeadmJoinConfig(_ context.Context, nctx *NodeContext) error {
if nctx == nil {
return fmt.Errorf("node context is nil")
}
spec := nctx.Config.Spec
apiServerEndpoint := strings.TrimSpace(spec.APIServerEndpoint)
if apiServerEndpoint == "" {
return fmt.Errorf("spec.apiServerEndpoint is required")
}
bootstrapToken := strings.TrimSpace(spec.BootstrapToken)
if bootstrapToken == "" {
return fmt.Errorf("spec.bootstrapToken is required")
}
discoveryTokenCACertHash := strings.TrimSpace(spec.DiscoveryTokenCACertHash)
if discoveryTokenCACertHash == "" {
return fmt.Errorf("spec.discoveryTokenCACertHash is required")
}
nodeReg := buildNodeRegistration(spec)
if nctx.BootstrapState != nil && nctx.BootstrapState.Action == BootstrapActionJoinWorker {
maybeAddBootstrapTaint(&nodeReg, spec.ClusterRole)
}
joinCfg := JoinConfiguration{
APIVersion: "kubeadm.k8s.io/v1beta4",
Kind: "JoinConfiguration",
NodeRegistration: nodeReg,
Discovery: Discovery{
BootstrapToken: BootstrapTokenDiscovery{
APIServerEndpoint: apiServerEndpoint,
Token: bootstrapToken,
CACertHashes: []string{discoveryTokenCACertHash},
},
},
}
if nctx.BootstrapState != nil && nctx.BootstrapState.Action == BootstrapActionJoinControlPlane {
certKey := strings.TrimSpace(spec.ControlPlaneCertKey)
if certKey == "" {
return fmt.Errorf("spec.controlPlaneCertKey is required for control-plane join")
}
advertiseAddress := strings.TrimSpace(spec.APIServerAdvertiseAddress)
if advertiseAddress == "" {
return fmt.Errorf("spec.apiServerAdvertiseAddress is required for control-plane join")
}
joinCfg.ControlPlane = &JoinControlPlane{
CertificateKey: certKey,
}
joinCfg.LocalAPIEndpoint = &LocalAPIEndpoint{
AdvertiseAddress: advertiseAddress,
BindPort: 6443,
}
}
kubeletCfg := KubeletConfiguration{
APIVersion: "kubelet.config.k8s.io/v1beta1",
Kind: "KubeletConfiguration",
CgroupDriver: "cgroupfs",
ContainerRuntimeEndpoint: strings.TrimSpace(spec.ContainerRuntimeEndpoint),
}
return writeKubeadmYAML(tmpKubeadmInitConf, joinCfg, kubeletCfg)
}
func writeKubeadmYAML(path string, docs ...any) error {
var renderedDocs [][]byte
for _, doc := range docs {
b, err := yaml.Marshal(doc)
if err != nil {
return fmt.Errorf("marshal kubeadm config document: %w", err)
}
renderedDocs = append(renderedDocs, bytes.TrimSpace(b))
}
var buf bytes.Buffer
for i, doc := range renderedDocs {
if i > 0 {
buf.WriteString("\n---\n")
}
buf.Write(doc)
buf.WriteByte('\n')
}
rendered := buf.String()
if err := os.WriteFile(path, []byte(rendered), 0o600); err != nil {
return fmt.Errorf("write kubeadm config to %s: %w", path, err)
}
klog.V(4).Infof("generated kubeadm config at %s:\n%s", path, rendered)
return nil
}
func RunKubeadmInit(ctx context.Context, nctx *NodeContext) error {
if nctx.BootstrapState == nil {
return errors.New("BootstrapState is nil. Please run earlier steps first")
}
if nctx.BootstrapState.Action != BootstrapActionInitControlPlane {
klog.V(4).Infof("skipped for %s", nctx.BootstrapState.Action)
return nil
}
if err := GenerateKubeadmInitConfig(ctx, nctx); err != nil {
return err
}
_, err := nctx.SystemRunner.RunWithOptions(
ctx,
"kubeadm",
[]string{"init", "--config", tmpKubeadmInitConf},
system.RunOptions{
Timeout: 10 * time.Minute,
OnStdoutLine: func(line string) { klog.Infof("[kubeadm] %s", line) },
OnStderrLine: func(line string) { klog.Infof("[kubeadm] %s", line) },
},
)
return err
}
func RunKubeadmJoin(ctx context.Context, nctx *NodeContext) error {
if nctx.BootstrapState == nil {
return errors.New("BootstrapState is nil. Please run earlier steps first")
}
switch nctx.BootstrapState.Action {
case BootstrapActionJoinWorker, BootstrapActionJoinControlPlane:
// continue
default:
klog.V(4).Infof("RunKubeadmJoin skipped for action %q", nctx.BootstrapState.Action)
return nil
}
if err := GenerateKubeadmJoinConfig(ctx, nctx); err != nil {
return err
}
klog.Infof("running kubeadm join for action %q", nctx.BootstrapState.Action)
_, err := nctx.SystemRunner.RunWithOptions(
ctx,
"kubeadm",
[]string{"join", "--config", tmpKubeadmInitConf},
system.RunOptions{
Timeout: 5 * time.Minute,
OnStdoutLine: func(line string) {
klog.Infof("[kubeadm] %s", line)
},
OnStderrLine: func(line string) {
klog.Infof("[kubeadm] %s", line)
},
},
)
if err != nil {
return fmt.Errorf("run kubeadm join: %w", err)
}
return nil
}
func RunKubeadmUpgradeApply(context.Context, *NodeContext) error {
klog.Info("run_kubeadm_upgrade_apply: TODO implement kubeadm upgrade apply")
return nil
}
func RunKubeadmUpgradeNode(context.Context, *NodeContext) error {
klog.Info("run_kubeadm_upgrade_node: TODO implement kubeadm upgrade node")
return nil
}
func ReconcileControlPlane(ctx context.Context, nctx *NodeContext) error {
if nctx.BootstrapState == nil {
return errors.New("BootstrapState is nil, call ClassifyBootstrapAction() first")
}
if nctx.BootstrapState.Action != BootstrapActionReconcileControlPlane {
klog.V(4).Infof("ReconcileControlPlane skipped for action %q", nctx.BootstrapState.Action)
return nil
}
if err := StartKubelet(ctx, nctx); err != nil {
return fmt.Errorf("start kubelet: %w", err)
}
if err := waitForLocalAPIServer(ctx, nctx, 2*time.Minute); err != nil {
return fmt.Errorf("wait for local apiserver: %w", err)
}
if err := waitForAPIViaKubeconfig(ctx, adminKubeconfigPath, 2*time.Minute); err != nil {
return fmt.Errorf("wait for admin api: %w", err)
}
return nil
}
func ReconcileWorker(ctx context.Context, nctx *NodeContext) error {
if nctx.BootstrapState == nil {
return errors.New("BootstrapState is nil, call ClassifyBootstrapAction() first")
}
if nctx.BootstrapState.Action != BootstrapActionReconcileWorker {
klog.V(4).Infof("ReconcileWorker skipped for action %q", nctx.BootstrapState.Action)
return nil
}
if err := StartKubelet(ctx, nctx); err != nil {
return fmt.Errorf("start kubelet: %w", err)
}
if err := waitForKubeletHealthy(ctx, 2*time.Minute); err != nil {
return fmt.Errorf("wait for kubelet healthy: %w", err)
}
return nil
}