356 lines
9.0 KiB
Go
356 lines
9.0 KiB
Go
package osupgrade
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/client-go/util/retry"
|
|
"k8s.io/klog/v2"
|
|
|
|
monov1alpha1 "example.com/monok8s/pkg/apis/monok8s/v1alpha1"
|
|
"example.com/monok8s/pkg/buildinfo"
|
|
"example.com/monok8s/pkg/kube"
|
|
)
|
|
|
|
var (
|
|
unstructuredConverter = runtime.DefaultUnstructuredConverter
|
|
|
|
osup_gvr = schema.GroupVersionResource{
|
|
Group: monov1alpha1.Group,
|
|
Version: monov1alpha1.Version,
|
|
Resource: "osupgradeprogresses",
|
|
}
|
|
)
|
|
|
|
func ensureProgressHeartbeat(ctx context.Context, clients *kube.Clients,
|
|
namespace string, nodeName string,
|
|
osu *monov1alpha1.OSUpgrade,
|
|
) (*monov1alpha1.OSUpgradeProgress, error) {
|
|
name := fmt.Sprintf("%s-%s", osu.Name, nodeName)
|
|
now := metav1.Now()
|
|
|
|
currentVersion := buildinfo.KubeVersion
|
|
targetVersion := ""
|
|
if osu.Status != nil {
|
|
targetVersion = osu.Status.ResolvedVersion
|
|
}
|
|
|
|
progress := &monov1alpha1.OSUpgradeProgress{
|
|
TypeMeta: metav1.TypeMeta{
|
|
APIVersion: monov1alpha1.APIVersion,
|
|
Kind: "OSUpgradeProgress",
|
|
},
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: name,
|
|
Namespace: namespace,
|
|
},
|
|
Spec: monov1alpha1.OSUpgradeProgressSpec{
|
|
NodeName: nodeName,
|
|
SourceRef: monov1alpha1.OSUpgradeSourceRef{
|
|
Name: osu.Name,
|
|
},
|
|
},
|
|
Status: &monov1alpha1.OSUpgradeProgressStatus{
|
|
CurrentVersion: currentVersion,
|
|
TargetVersion: targetVersion,
|
|
Phase: monov1alpha1.OSUpgradeProgressPhasePending,
|
|
LastUpdatedAt: &now,
|
|
Message: "acknowledged",
|
|
},
|
|
}
|
|
|
|
created, err := createProgress(ctx, clients, osup_gvr, progress)
|
|
if err == nil {
|
|
klog.InfoS("created osupgradeprogress", "name", created.Name, "namespace", created.Namespace)
|
|
return created, nil
|
|
}
|
|
if !apierrors.IsAlreadyExists(err) {
|
|
return nil, fmt.Errorf("create OSUpgradeProgress %s/%s: %w", namespace, name, err)
|
|
}
|
|
|
|
var out *monov1alpha1.OSUpgradeProgress
|
|
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
|
existing, err := getProgress(ctx, clients, osup_gvr, namespace, name)
|
|
if err != nil {
|
|
return fmt.Errorf("get existing OSUpgradeProgress %s/%s: %w", namespace, name, err)
|
|
}
|
|
|
|
// Keep spec aligned with source and node.
|
|
existing.Spec.NodeName = nodeName
|
|
existing.Spec.SourceRef.Name = osu.Name
|
|
|
|
existing, err = updateProgressSpec(ctx, clients, osup_gvr, existing)
|
|
if err != nil {
|
|
if isUnknownUpdateResult(err) {
|
|
latest, getErr := getProgress(ctx, clients, osup_gvr, namespace, name)
|
|
if getErr == nil {
|
|
out = latest
|
|
}
|
|
}
|
|
return fmt.Errorf("update OSUpgradeProgress spec %s/%s: %w", namespace, name, err)
|
|
}
|
|
|
|
if existing.Status == nil {
|
|
existing.Status = &monov1alpha1.OSUpgradeProgressStatus{}
|
|
}
|
|
|
|
existing.Status.CurrentVersion = currentVersion
|
|
existing.Status.TargetVersion = targetVersion
|
|
existing.Status.LastUpdatedAt = &now
|
|
|
|
if existing.Status.Phase == "" {
|
|
existing.Status.Phase = monov1alpha1.OSUpgradeProgressPhasePending
|
|
}
|
|
if existing.Status.Message == "" {
|
|
existing.Status.Message = "acknowledged"
|
|
}
|
|
|
|
existing, err = updateProgressStatus(ctx, clients, osup_gvr, existing)
|
|
if err != nil {
|
|
if isUnknownUpdateResult(err) {
|
|
latest, getErr := getProgress(ctx, clients, osup_gvr, namespace, name)
|
|
if getErr == nil {
|
|
out = latest
|
|
}
|
|
}
|
|
return fmt.Errorf("update OSUpgradeProgress status %s/%s: %w", namespace, name, err)
|
|
}
|
|
|
|
out = existing
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
if out != nil {
|
|
return out, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
klog.InfoS("updated osupgradeprogress", "name", out.Name, "namespace", out.Namespace)
|
|
return out, nil
|
|
}
|
|
|
|
func updateProgressRobust(
|
|
ctx context.Context,
|
|
clients *kube.Clients,
|
|
namespace string,
|
|
name string,
|
|
mutate func(*monov1alpha1.OSUpgradeProgress),
|
|
) (*monov1alpha1.OSUpgradeProgress, error) {
|
|
var out *monov1alpha1.OSUpgradeProgress
|
|
|
|
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
|
current, err := getProgress(ctx, clients, osup_gvr, namespace, name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if current.Status == nil {
|
|
current.Status = &monov1alpha1.OSUpgradeProgressStatus{}
|
|
}
|
|
|
|
mutate(current)
|
|
|
|
updated, err := updateProgressStatus(ctx, clients, osup_gvr, current)
|
|
if err != nil {
|
|
if isUnknownUpdateResult(err) {
|
|
latest, getErr := getProgress(ctx, clients, osup_gvr, namespace, name)
|
|
if getErr == nil {
|
|
out = latest
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
out = updated
|
|
return nil
|
|
})
|
|
|
|
if err != nil && out != nil {
|
|
// Unknown-result case: caller gets latest known server state plus error.
|
|
return out, err
|
|
}
|
|
|
|
return out, err
|
|
}
|
|
|
|
func isUnknownUpdateResult(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
|
|
if apierrors.IsTimeout(err) ||
|
|
apierrors.IsServerTimeout(err) ||
|
|
apierrors.IsTooManyRequests(err) {
|
|
return true
|
|
}
|
|
|
|
msg := strings.ToLower(err.Error())
|
|
return strings.Contains(msg, "request timed out") ||
|
|
strings.Contains(msg, "context deadline exceeded") ||
|
|
strings.Contains(msg, "etcdserver: request timed out") ||
|
|
strings.Contains(msg, "connection reset by peer") ||
|
|
strings.Contains(msg, "http2: client connection lost")
|
|
}
|
|
|
|
func createProgress(
|
|
ctx context.Context,
|
|
clients *kube.Clients,
|
|
gvr schema.GroupVersionResource,
|
|
progress *monov1alpha1.OSUpgradeProgress,
|
|
) (*monov1alpha1.OSUpgradeProgress, error) {
|
|
obj, err := toUnstructured(progress)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
created, err := clients.Dynamic.
|
|
Resource(gvr).
|
|
Namespace(progress.Namespace).
|
|
Create(ctx, obj, metav1.CreateOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return fromUnstructuredProgress(created)
|
|
}
|
|
|
|
func getProgress(
|
|
ctx context.Context,
|
|
clients *kube.Clients,
|
|
gvr schema.GroupVersionResource,
|
|
namespace, name string,
|
|
) (*monov1alpha1.OSUpgradeProgress, error) {
|
|
got, err := clients.Dynamic.
|
|
Resource(gvr).
|
|
Namespace(namespace).
|
|
Get(ctx, name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return fromUnstructuredProgress(got)
|
|
}
|
|
|
|
func updateProgressSpec(
|
|
ctx context.Context,
|
|
clients *kube.Clients,
|
|
gvr schema.GroupVersionResource,
|
|
progress *monov1alpha1.OSUpgradeProgress,
|
|
) (*monov1alpha1.OSUpgradeProgress, error) {
|
|
obj, err := toUnstructured(progress)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
updated, err := clients.Dynamic.
|
|
Resource(gvr).
|
|
Namespace(progress.Namespace).
|
|
Update(ctx, obj, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return fromUnstructuredProgress(updated)
|
|
}
|
|
|
|
func updateProgressStatus(
|
|
ctx context.Context,
|
|
clients *kube.Clients,
|
|
gvr schema.GroupVersionResource,
|
|
progress *monov1alpha1.OSUpgradeProgress,
|
|
) (*monov1alpha1.OSUpgradeProgress, error) {
|
|
obj, err := toUnstructured(progress)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
updated, err := clients.Dynamic.
|
|
Resource(gvr).
|
|
Namespace(progress.Namespace).
|
|
UpdateStatus(ctx, obj, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return fromUnstructuredProgress(updated)
|
|
}
|
|
|
|
func failProgress(
|
|
ctx context.Context,
|
|
clients *kube.Clients,
|
|
osup *monov1alpha1.OSUpgradeProgress,
|
|
action string,
|
|
cause error,
|
|
) error {
|
|
_, err := updateProgressRobust(ctx, clients, osup.Namespace, osup.Name, func(cur *monov1alpha1.OSUpgradeProgress) {
|
|
now := metav1.Now()
|
|
|
|
if cur.Status == nil {
|
|
cur.Status = &monov1alpha1.OSUpgradeProgressStatus{}
|
|
}
|
|
|
|
cur.Status.LastUpdatedAt = &now
|
|
cur.Status.Message = fmt.Sprintf("%s: %v", action, cause)
|
|
cur.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseFailed
|
|
})
|
|
if err != nil {
|
|
klog.ErrorS(err, "failed to update osupgradeprogress status after error",
|
|
"action", action,
|
|
"name", osup.Name,
|
|
"namespace", osup.Namespace,
|
|
)
|
|
}
|
|
|
|
return fmt.Errorf("%s: %w", action, cause)
|
|
}
|
|
|
|
func markProgressCompleted(
|
|
ctx context.Context,
|
|
clients *kube.Clients,
|
|
osup *monov1alpha1.OSUpgradeProgress,
|
|
message string,
|
|
) error {
|
|
_, err := updateProgressRobust(ctx, clients, osup.Namespace, osup.Name, func(cur *monov1alpha1.OSUpgradeProgress) {
|
|
now := metav1.Now()
|
|
|
|
if cur.Status == nil {
|
|
cur.Status = &monov1alpha1.OSUpgradeProgressStatus{}
|
|
}
|
|
|
|
cur.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseCompleted
|
|
cur.Status.Message = message
|
|
cur.Status.CurrentVersion = osup.Status.CurrentVersion
|
|
cur.Status.TargetVersion = osup.Status.TargetVersion
|
|
cur.Status.LastUpdatedAt = &now
|
|
cur.Status.CompletedAt = &now
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("mark progress completed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func toUnstructured(progress *monov1alpha1.OSUpgradeProgress) (*unstructured.Unstructured, error) {
|
|
m, err := unstructuredConverter.ToUnstructured(progress)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("convert OSUpgradeProgress to unstructured: %w", err)
|
|
}
|
|
return &unstructured.Unstructured{Object: m}, nil
|
|
}
|
|
|
|
func fromUnstructuredProgress(obj *unstructured.Unstructured) (*monov1alpha1.OSUpgradeProgress, error) {
|
|
var progress monov1alpha1.OSUpgradeProgress
|
|
if err := unstructuredConverter.FromUnstructured(obj.Object, &progress); err != nil {
|
|
return nil, fmt.Errorf("convert unstructured to OSUpgradeProgress: %w", err)
|
|
}
|
|
return &progress, nil
|
|
}
|