334 lines
8.3 KiB
Go
334 lines
8.3 KiB
Go
package node
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"strings"
|
|
"time"
|
|
|
|
"k8s.io/client-go/discovery"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
"k8s.io/klog/v2"
|
|
)
|
|
|
|
type kubeVersion struct {
|
|
Major int
|
|
Minor int
|
|
Patch int
|
|
}
|
|
|
|
func ValidateNodeIPAndAPIServerReachability(ctx context.Context, nct *NodeContext) error {
|
|
requireLocalIP := func(wantedIP string) error {
|
|
wantedIP = strings.TrimSpace(wantedIP)
|
|
if wantedIP == "" {
|
|
return fmt.Errorf("API server advertise address is required")
|
|
}
|
|
|
|
ip := net.ParseIP(wantedIP)
|
|
if ip == nil {
|
|
return fmt.Errorf("invalid API server advertise address %q", wantedIP)
|
|
}
|
|
|
|
ifaces, err := net.Interfaces()
|
|
if err != nil {
|
|
return fmt.Errorf("list interfaces: %w", err)
|
|
}
|
|
|
|
for _, iface := range ifaces {
|
|
addrs, err := iface.Addrs()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
for _, addr := range addrs {
|
|
var got net.IP
|
|
switch v := addr.(type) {
|
|
case *net.IPNet:
|
|
got = v.IP
|
|
case *net.IPAddr:
|
|
got = v.IP
|
|
}
|
|
if got != nil && got.Equal(ip) {
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return fmt.Errorf("required local IP is not present on any interface: %s", wantedIP)
|
|
}
|
|
|
|
checkAPIServerReachable := func(endpoint string) error {
|
|
endpoint = strings.TrimSpace(endpoint)
|
|
if endpoint == "" {
|
|
return fmt.Errorf("API server endpoint is required")
|
|
}
|
|
|
|
host, port, err := net.SplitHostPort(endpoint)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid API server endpoint %q: %w", endpoint, err)
|
|
}
|
|
if strings.TrimSpace(host) == "" || strings.TrimSpace(port) == "" {
|
|
return fmt.Errorf("invalid API server endpoint %q", endpoint)
|
|
}
|
|
|
|
klog.Infof("checking API server reachability: %s:%s", host, port)
|
|
|
|
var lastErr error
|
|
for i := 0; i < 20; i++ {
|
|
d := net.Dialer{Timeout: 1 * time.Second}
|
|
conn, err := d.DialContext(ctx, "tcp", endpoint)
|
|
if err == nil {
|
|
_ = conn.Close()
|
|
klog.Infof("API server is reachable")
|
|
return nil
|
|
}
|
|
lastErr = err
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(1 * time.Second):
|
|
}
|
|
}
|
|
|
|
return fmt.Errorf("cannot reach API server at %s: %w", endpoint, lastErr)
|
|
}
|
|
|
|
cfg := nct.Config.Spec
|
|
switch strings.TrimSpace(cfg.ClusterRole) {
|
|
case "control-plane":
|
|
if err := requireLocalIP(cfg.APIServerAdvertiseAddress); err != nil {
|
|
return err
|
|
}
|
|
case "worker":
|
|
if err := requireLocalIP(cfg.APIServerAdvertiseAddress); err != nil {
|
|
return err
|
|
}
|
|
if err := checkAPIServerReachable(cfg.APIServerEndpoint); err != nil {
|
|
return err
|
|
}
|
|
default:
|
|
return fmt.Errorf("Incorrect ClusterRole: %s", cfg.ClusterRole)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func CheckForVersionSkew(ctx context.Context, nctx *NodeContext) error {
|
|
if nctx.BootstrapState == nil {
|
|
return errors.New("BootstrapState is nil, call ClassifyBootstrapAction() first")
|
|
}
|
|
|
|
role := strings.TrimSpace(nctx.Config.Spec.ClusterRole)
|
|
wantVersion := normalizeKubeVersion(strings.TrimSpace(nctx.Config.Spec.KubernetesVersion))
|
|
if wantVersion == "" {
|
|
return errors.New("spec.kubernetesVersion is required")
|
|
}
|
|
|
|
switch nctx.LocalClusterState.MembershipKind {
|
|
case LocalMembershipFresh:
|
|
// Nothing to compare for fresh nodes.
|
|
return nil
|
|
case LocalMembershipPartial:
|
|
return fmt.Errorf("cannot check version skew with partial local cluster state")
|
|
}
|
|
|
|
versionKubeconfig := chooseVersionKubeconfig(nctx.LocalClusterState)
|
|
if versionKubeconfig == "" {
|
|
return fmt.Errorf("no kubeconfig available for version detection")
|
|
}
|
|
|
|
currentVersion, err := getServerVersion(ctx, versionKubeconfig)
|
|
if err != nil {
|
|
if role == "control-plane" {
|
|
return fmt.Errorf("existing control-plane state found, but cluster version could not be determined: %w", err)
|
|
}
|
|
|
|
// Worker path stays permissive.
|
|
nctx.BootstrapState.UnsupportedWorkerVersionSkew = true
|
|
nctx.BootstrapState.VersionSkewReason = "cluster version could not be determined"
|
|
|
|
if nctx.BootstrapState.Action == BootstrapActionManageWorker {
|
|
nctx.BootstrapState.Action = BootstrapActionReconcileWorker
|
|
}
|
|
return nil
|
|
}
|
|
|
|
nctx.BootstrapState.DetectedClusterVersion = currentVersion
|
|
|
|
switch role {
|
|
case "control-plane":
|
|
if !isSupportedControlPlaneSkew(currentVersion, wantVersion) {
|
|
return fmt.Errorf(
|
|
"unsupported control-plane version skew: cluster=%s node=%s",
|
|
currentVersion, wantVersion,
|
|
)
|
|
}
|
|
|
|
if nctx.BootstrapState.Action == BootstrapActionManageControlPlane {
|
|
if versionEq(currentVersion, wantVersion) {
|
|
nctx.BootstrapState.Action = BootstrapActionReconcileControlPlane
|
|
} else {
|
|
nctx.BootstrapState.Action = BootstrapActionUpgradeControlPlane
|
|
}
|
|
}
|
|
|
|
case "worker":
|
|
if !isSupportedWorkerSkew(currentVersion, wantVersion) {
|
|
nctx.BootstrapState.UnsupportedWorkerVersionSkew = true
|
|
nctx.BootstrapState.VersionSkewReason = fmt.Sprintf(
|
|
"unsupported worker version skew: cluster=%s node=%s",
|
|
currentVersion, wantVersion,
|
|
)
|
|
}
|
|
|
|
if nctx.BootstrapState.Action == BootstrapActionManageWorker {
|
|
if versionEq(currentVersion, wantVersion) {
|
|
nctx.BootstrapState.Action = BootstrapActionReconcileWorker
|
|
} else {
|
|
nctx.BootstrapState.Action = BootstrapActionUpgradeWorker
|
|
}
|
|
}
|
|
|
|
default:
|
|
return fmt.Errorf("unsupported cluster role %q", role)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func versionEq(a, b string) bool {
|
|
return normalizeKubeVersion(a) == normalizeKubeVersion(b)
|
|
}
|
|
|
|
func versionLt(a, b string) (bool, error) {
|
|
av, err := parseKubeVersion(a)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
bv, err := parseKubeVersion(b)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if av.Major != bv.Major {
|
|
return av.Major < bv.Major, nil
|
|
}
|
|
if av.Minor != bv.Minor {
|
|
return av.Minor < bv.Minor, nil
|
|
}
|
|
return av.Patch < bv.Patch, nil
|
|
}
|
|
|
|
func normalizeKubeVersion(v string) string {
|
|
v = strings.TrimSpace(v)
|
|
if v == "" {
|
|
return ""
|
|
}
|
|
if !strings.HasPrefix(v, "v") {
|
|
v = "v" + v
|
|
}
|
|
return v
|
|
}
|
|
|
|
func parseKubeVersion(s string) (kubeVersion, error) {
|
|
s = strings.TrimSpace(s)
|
|
s = strings.TrimPrefix(s, "v")
|
|
|
|
var v kubeVersion
|
|
n, err := fmt.Sscanf(s, "%d.%d.%d", &v.Major, &v.Minor, &v.Patch)
|
|
// Accepts "1.29" or "1.29.3"
|
|
if err != nil || n < 2 {
|
|
return kubeVersion{}, fmt.Errorf("invalid kubernetes version %q", s)
|
|
}
|
|
return v, nil
|
|
}
|
|
|
|
// Control-plane: keep this strict.
|
|
// Accept same version, or a one-minor step where the node binary is newer than the current cluster.
|
|
// That covers normal control-plane upgrade flow but blocks nonsense.
|
|
func isSupportedControlPlaneSkew(clusterVersion, nodeVersion string) bool {
|
|
cv, err := parseKubeVersion(clusterVersion)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
nv, err := parseKubeVersion(nodeVersion)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
if cv.Major != nv.Major {
|
|
return false
|
|
}
|
|
if cv.Minor == nv.Minor {
|
|
return true
|
|
}
|
|
if nv.Minor == cv.Minor+1 {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Worker: kubelet generally must not be newer than the apiserver.
|
|
// Older kubelets are allowed within supported skew range.
|
|
// Your requirement says unsupported worker skew should still proceed, so this
|
|
// only classifies support status and must NOT be used to block this function.
|
|
func isSupportedWorkerSkew(clusterVersion, nodeVersion string) bool {
|
|
cv, err := parseKubeVersion(clusterVersion)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
nv, err := parseKubeVersion(nodeVersion)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
if cv.Major != nv.Major {
|
|
return false
|
|
}
|
|
|
|
// kubelet newer than apiserver => unsupported
|
|
if nv.Minor > cv.Minor {
|
|
return false
|
|
}
|
|
|
|
// kubelet up to 3 minors older than apiserver => supported
|
|
if cv.Minor-nv.Minor <= 3 {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func getServerVersion(ctx context.Context, kubeconfigPath string) (string, error) {
|
|
restCfg, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
|
|
if err != nil {
|
|
return "", fmt.Errorf("build kubeconfig %s: %w", kubeconfigPath, err)
|
|
}
|
|
|
|
// Keep this short. This is a probe, not a long-running client.
|
|
restCfg.Timeout = 5 * time.Second
|
|
|
|
clientset, err := kubernetes.NewForConfig(restCfg)
|
|
if err != nil {
|
|
return "", fmt.Errorf("create clientset: %w", err)
|
|
}
|
|
|
|
disc := clientset.Discovery()
|
|
return discoverServerVersion(ctx, disc)
|
|
}
|
|
|
|
func discoverServerVersion(ctx context.Context, disc discovery.DiscoveryInterface) (string, error) {
|
|
info, err := disc.ServerVersion()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if info == nil || strings.TrimSpace(info.GitVersion) == "" {
|
|
return "", errors.New("server version is empty")
|
|
}
|
|
return normalizeKubeVersion(info.GitVersion), nil
|
|
}
|