Fixed some race conditions

This commit is contained in:
2026-04-06 05:18:06 +08:00
parent 50d9440e0a
commit d662162921
4 changed files with 259 additions and 87 deletions

View File

@@ -2,6 +2,7 @@ package osimage
import ( import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
"sync"
"time" "time"
) )
@@ -92,8 +93,10 @@ func (l *ProgressLogger) Log(p Progress) {
} }
type TimeBasedUpdater struct { type TimeBasedUpdater struct {
mu sync.Mutex
interval time.Duration interval time.Duration
lastRun time.Time lastRun time.Time
inFlight bool
} }
func NewTimeBasedUpdater(seconds int) *TimeBasedUpdater { func NewTimeBasedUpdater(seconds int) *TimeBasedUpdater {
@@ -106,16 +109,28 @@ func NewTimeBasedUpdater(seconds int) *TimeBasedUpdater {
} }
func (u *TimeBasedUpdater) Run(fn func() error) error { func (u *TimeBasedUpdater) Run(fn func() error) error {
u.mu.Lock()
now := time.Now() now := time.Now()
if !u.lastRun.IsZero() && now.Sub(u.lastRun) < u.interval { if u.inFlight {
u.mu.Unlock()
return nil return nil
} }
if err := fn(); err != nil { if !u.lastRun.IsZero() && now.Sub(u.lastRun) < u.interval {
return err u.mu.Unlock()
return nil
} }
u.lastRun = now u.lastRun = now
return nil u.inFlight = true
u.mu.Unlock()
defer func() {
u.mu.Lock()
u.inFlight = false
u.mu.Unlock()
}()
return fn()
} }

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"os" "os"
"sync/atomic"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@@ -16,9 +17,42 @@ import (
"example.com/monok8s/pkg/node/uboot" "example.com/monok8s/pkg/node/uboot"
) )
type UpgradeRunner struct {
running atomic.Bool
rebooting atomic.Bool
}
var r UpgradeRunner
func (r *UpgradeRunner) Run(fn func() error) error {
if r.rebooting.Load() {
return nil
}
if !r.running.CompareAndSwap(false, true) {
return nil
}
defer r.running.Store(false)
if r.rebooting.Load() {
return nil
}
return fn()
}
func HandleOSUpgrade(ctx context.Context, clients *kube.Clients, func HandleOSUpgrade(ctx context.Context, clients *kube.Clients,
namespace string, nodeName string, namespace string, nodeName string,
osu *monov1alpha1.OSUpgrade, osu *monov1alpha1.OSUpgrade,
) error {
return r.Run(func() error {
return handleOSUpgradeLocked(ctx, clients, namespace, nodeName, osu)
})
}
func handleOSUpgradeLocked(ctx context.Context, clients *kube.Clients,
namespace string, nodeName string,
osu *monov1alpha1.OSUpgrade,
) error { ) error {
osup, err := ensureProgressHeartbeat(ctx, clients, namespace, nodeName, osu) osup, err := ensureProgressHeartbeat(ctx, clients, namespace, nodeName, osu)
if err != nil { if err != nil {
@@ -57,14 +91,16 @@ func HandleOSUpgrade(ctx context.Context, clients *kube.Clients,
first := plan.Path[0] first := plan.Path[0]
osup.Status.TargetVersion = plan.ResolvedTarget updated, err := updateProgressRobust(ctx, clients, osup.Namespace, osup.Name, func(cur *monov1alpha1.OSUpgradeProgress) {
osup.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseDownloading
osup.Status.Message = fmt.Sprintf("downloading image: %s", first.URL)
now := metav1.Now() now := metav1.Now()
osup.Status.LastUpdatedAt = &now cur.Status.TargetVersion = plan.ResolvedTarget
osup, err = updateProgressStatus(ctx, clients, osup_gvr, osup) cur.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseDownloading
cur.Status.Message = fmt.Sprintf("downloading image: %s", first.URL)
cur.Status.LastUpdatedAt = &now
})
if updated != nil {
osup = updated
}
if err != nil { if err != nil {
return fmt.Errorf("update progress status: %w", err) return fmt.Errorf("update progress status: %w", err)
} }
@@ -82,7 +118,6 @@ func HandleOSUpgrade(ctx context.Context, clients *kube.Clients,
imageSHA, err := first.SHA256() imageSHA, err := first.SHA256()
if err != nil { if err != nil {
now = metav1.Now()
return failProgress(ctx, clients, osup, "apply image", err) return failProgress(ctx, clients, osup, "apply image", err)
} }
@@ -91,31 +126,38 @@ func HandleOSUpgrade(ctx context.Context, clients *kube.Clients,
imageOptions := osimage.ApplyOptions{ imageOptions := osimage.ApplyOptions{
URL: first.URL, URL: first.URL,
TargetPath: "/dev/sda?", TargetPath: "/dev/mksaltpart",
ExpectedRawSHA256: imageSHA, ExpectedRawSHA256: imageSHA,
ExpectedRawSize: first.Size, ExpectedRawSize: first.Size,
BufferSize: 6 * 1024 * 1024, BufferSize: 6 * 1024 * 1024,
Progress: func(p osimage.Progress) { Progress: func(p osimage.Progress) {
pLogger.Log(p) pLogger.Log(p)
if err := statusUpdater.Run(func() error {
if err := statusUpdater.Run(func() error {
updated, err := updateProgressRobust(ctx, clients, osup.Namespace, osup.Name, func(cur *monov1alpha1.OSUpgradeProgress) {
now := metav1.Now() now := metav1.Now()
switch p.Stage { switch p.Stage {
case "flash": case "flash":
osup.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseWriting cur.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseWriting
case "verify": case "verify":
osup.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseVerifying cur.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseVerifying
}
osup.Status.LastUpdatedAt = &now
osup.Status.Message = fmt.Sprintf("%s: %d%%", p.Stage, osimage.PercentOf(p.BytesComplete, p.BytesTotal))
updated, err := updateProgressStatus(ctx, clients, osup_gvr, osup)
if err != nil {
klog.ErrorS(err, "update progress status")
return err
} }
cur.Status.TargetVersion = plan.ResolvedTarget
cur.Status.LastUpdatedAt = &now
cur.Status.Message = fmt.Sprintf(
"%s: %d%%",
p.Stage,
osimage.PercentOf(p.BytesComplete, p.BytesTotal),
)
})
if updated != nil {
osup = updated osup = updated
}
if err != nil {
return fmt.Errorf("update progress status: %w", err)
}
return nil return nil
}); err != nil { }); err != nil {
klog.ErrorS(err, "throttled progress update failed") klog.ErrorS(err, "throttled progress update failed")
@@ -125,7 +167,6 @@ func HandleOSUpgrade(ctx context.Context, clients *kube.Clients,
result, err := osimage.ApplyImageStreamed(ctx, imageOptions) result, err := osimage.ApplyImageStreamed(ctx, imageOptions)
if err != nil { if err != nil {
now = metav1.Now()
return failProgress(ctx, clients, osup, "apply image", err) return failProgress(ctx, clients, osup, "apply image", err)
} }
@@ -136,18 +177,36 @@ func HandleOSUpgrade(ctx context.Context, clients *kube.Clients,
return failProgress(ctx, clients, osup, "set boot env", err) return failProgress(ctx, clients, osup, "set boot env", err)
} }
now = metav1.Now() updated, err = updateProgressRobust(ctx, clients, osup.Namespace, osup.Name, func(cur *monov1alpha1.OSUpgradeProgress) {
osup.Status.LastUpdatedAt = &now now := metav1.Now()
osup.Status.Message = "image applied, verified, and next boot environment updated" cur.Status.TargetVersion = plan.ResolvedTarget
osup.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseRebooting cur.Status.Message = "image applied, verified, and next boot environment updated"
cur.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseRebooting
osup, err = updateProgressStatus(ctx, clients, osup_gvr, osup) cur.Status.LastUpdatedAt = &now
})
if updated != nil {
osup = updated
}
if err != nil { if err != nil {
return fmt.Errorf("update progress status: %w", err) return fmt.Errorf("update progress status: %w", err)
} }
// TODO: Drain the node here // TODO: Drain the node here
// TODO: Issue Reboot // Get all running pods outta here!
// kubectl.Run()
// Wait for the node to be drained
// kubectl.Wait()
return nil r.rebooting.Store(true)
if err := triggerReboot(); err != nil {
r.rebooting.Store(false)
return fmt.Errorf("trigger reboot: %w", err)
}
select {}
}
func triggerReboot() error {
_ = os.WriteFile("/proc/sysrq-trigger", []byte("s\n"), 0)
_ = os.WriteFile("/proc/sysrq-trigger", []byte("u\n"), 0)
return os.WriteFile("/proc/sysrq-trigger", []byte("b\n"), 0)
} }

View File

@@ -3,12 +3,14 @@ package osupgrade
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2" "k8s.io/klog/v2"
monov1alpha1 "example.com/monok8s/pkg/apis/monok8s/v1alpha1" monov1alpha1 "example.com/monok8s/pkg/apis/monok8s/v1alpha1"
@@ -30,13 +32,11 @@ func ensureProgressHeartbeat(ctx context.Context, clients *kube.Clients,
namespace string, nodeName string, namespace string, nodeName string,
osu *monov1alpha1.OSUpgrade, osu *monov1alpha1.OSUpgrade,
) (*monov1alpha1.OSUpgradeProgress, error) { ) (*monov1alpha1.OSUpgradeProgress, error) {
name := fmt.Sprintf("%s-%s", osu.Name, nodeName) name := fmt.Sprintf("%s-%s", osu.Name, nodeName)
now := metav1.Now() now := metav1.Now()
currentVersion := buildinfo.KubeVersion currentVersion := buildinfo.KubeVersion
targetVersion := "" targetVersion := ""
if osu.Status != nil { if osu.Status != nil {
targetVersion = osu.Status.ResolvedVersion targetVersion = osu.Status.ResolvedVersion
} }
@@ -74,17 +74,26 @@ func ensureProgressHeartbeat(ctx context.Context, clients *kube.Clients,
return nil, fmt.Errorf("create OSUpgradeProgress %s/%s: %w", namespace, name, err) return nil, fmt.Errorf("create OSUpgradeProgress %s/%s: %w", namespace, name, err)
} }
var out *monov1alpha1.OSUpgradeProgress
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
existing, err := getProgress(ctx, clients, osup_gvr, namespace, name) existing, err := getProgress(ctx, clients, osup_gvr, namespace, name)
if err != nil { if err != nil {
return nil, fmt.Errorf("get existing OSUpgradeProgress %s/%s: %w", namespace, name, err) return fmt.Errorf("get existing OSUpgradeProgress %s/%s: %w", namespace, name, err)
} }
// Spec should remain aligned with the source and node. // Keep spec aligned with source and node.
existing.Spec.NodeName = nodeName existing.Spec.NodeName = nodeName
existing.Spec.SourceRef.Name = osu.Name existing.Spec.SourceRef.Name = osu.Name
if existing, err = updateProgressSpec(ctx, clients, osup_gvr, existing); err != nil { existing, err = updateProgressSpec(ctx, clients, osup_gvr, existing)
return nil, fmt.Errorf("update OSUpgradeProgress spec %s/%s: %w", namespace, name, err) if err != nil {
if isUnknownUpdateResult(err) {
latest, getErr := getProgress(ctx, clients, osup_gvr, namespace, name)
if getErr == nil {
out = latest
}
}
return fmt.Errorf("update OSUpgradeProgress spec %s/%s: %w", namespace, name, err)
} }
if existing.Status == nil { if existing.Status == nil {
@@ -95,18 +104,99 @@ func ensureProgressHeartbeat(ctx context.Context, clients *kube.Clients,
existing.Status.TargetVersion = targetVersion existing.Status.TargetVersion = targetVersion
existing.Status.LastUpdatedAt = &now existing.Status.LastUpdatedAt = &now
// Only set phase/message if they are still empty, so later real state machine
// updates are not clobbered by the heartbeat.
if existing.Status.Phase == "" { if existing.Status.Phase == "" {
existing.Status.Phase = monov1alpha1.OSUpgradeProgressPhasePending existing.Status.Phase = monov1alpha1.OSUpgradeProgressPhasePending
} }
if existing.Status.Message == "" {
if existing, err = updateProgressStatus(ctx, clients, osup_gvr, existing); err != nil { existing.Status.Message = "acknowledged"
return nil, fmt.Errorf("update OSUpgradeProgress status %s/%s: %w", namespace, name, err)
} }
klog.InfoS("updated osupgradeprogress", "name", existing.Name, "namespace", existing.Namespace) existing, err = updateProgressStatus(ctx, clients, osup_gvr, existing)
return existing, nil if err != nil {
if isUnknownUpdateResult(err) {
latest, getErr := getProgress(ctx, clients, osup_gvr, namespace, name)
if getErr == nil {
out = latest
}
}
return fmt.Errorf("update OSUpgradeProgress status %s/%s: %w", namespace, name, err)
}
out = existing
return nil
})
if err != nil {
if out != nil {
return out, nil
}
return nil, err
}
klog.InfoS("updated osupgradeprogress", "name", out.Name, "namespace", out.Namespace)
return out, nil
}
func updateProgressRobust(
ctx context.Context,
clients *kube.Clients,
namespace string,
name string,
mutate func(*monov1alpha1.OSUpgradeProgress),
) (*monov1alpha1.OSUpgradeProgress, error) {
var out *monov1alpha1.OSUpgradeProgress
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
current, err := getProgress(ctx, clients, osup_gvr, namespace, name)
if err != nil {
return err
}
if current.Status == nil {
current.Status = &monov1alpha1.OSUpgradeProgressStatus{}
}
mutate(current)
updated, err := updateProgressStatus(ctx, clients, osup_gvr, current)
if err != nil {
if isUnknownUpdateResult(err) {
latest, getErr := getProgress(ctx, clients, osup_gvr, namespace, name)
if getErr == nil {
out = latest
}
}
return err
}
out = updated
return nil
})
if err != nil && out != nil {
// Unknown-result case: caller gets latest known server state plus error.
return out, err
}
return out, err
}
func isUnknownUpdateResult(err error) bool {
if err == nil {
return false
}
if apierrors.IsTimeout(err) ||
apierrors.IsServerTimeout(err) ||
apierrors.IsTooManyRequests(err) {
return true
}
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "request timed out") ||
strings.Contains(msg, "context deadline exceeded") ||
strings.Contains(msg, "etcdserver: request timed out") ||
strings.Contains(msg, "connection reset by peer") ||
strings.Contains(msg, "http2: client connection lost")
} }
func createProgress( func createProgress(
@@ -199,17 +289,18 @@ func failProgress(
action string, action string,
cause error, cause error,
) error { ) error {
_, err := updateProgressRobust(ctx, clients, osup.Namespace, osup.Name, func(cur *monov1alpha1.OSUpgradeProgress) {
now := metav1.Now() now := metav1.Now()
if osup.Status == nil { if cur.Status == nil {
osup.Status = &monov1alpha1.OSUpgradeProgressStatus{} cur.Status = &monov1alpha1.OSUpgradeProgressStatus{}
} }
osup.Status.LastUpdatedAt = &now cur.Status.LastUpdatedAt = &now
osup.Status.Message = fmt.Sprintf("%s: %v", action, cause) cur.Status.Message = fmt.Sprintf("%s: %v", action, cause)
osup.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseFailed cur.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseFailed
})
if _, err := updateProgressStatus(ctx, clients, osup_gvr, osup); err != nil { if err != nil {
klog.ErrorS(err, "failed to update osupgradeprogress status after error", klog.ErrorS(err, "failed to update osupgradeprogress status after error",
"action", action, "action", action,
"name", osup.Name, "name", osup.Name,
@@ -226,18 +317,18 @@ func markProgressCompleted(
osup *monov1alpha1.OSUpgradeProgress, osup *monov1alpha1.OSUpgradeProgress,
message string, message string,
) error { ) error {
_, err := updateProgressRobust(ctx, clients, osup.Namespace, osup.Name, func(cur *monov1alpha1.OSUpgradeProgress) {
now := metav1.Now() now := metav1.Now()
if osup.Status == nil { if cur.Status == nil {
osup.Status = &monov1alpha1.OSUpgradeProgressStatus{} cur.Status = &monov1alpha1.OSUpgradeProgressStatus{}
} }
osup.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseCompleted cur.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseCompleted
osup.Status.Message = message cur.Status.Message = message
osup.Status.LastUpdatedAt = &now cur.Status.LastUpdatedAt = &now
osup.Status.CompletedAt = &now cur.Status.CompletedAt = &now
})
_, err := updateProgressStatus(ctx, clients, osup_gvr, osup)
if err != nil { if err != nil {
return fmt.Errorf("mark progress completed: %w", err) return fmt.Errorf("mark progress completed: %w", err)
} }

View File

@@ -174,8 +174,6 @@ resolve_preferred_root() {
find_part_by_partuuid "$pref_root" find_part_by_partuuid "$pref_root"
} }
# Decide which root PARTNAME we want for the requested slot.
# Keep a compatibility fallback for legacy "rootfs" as slot A.
wanted_root_labels_for_slot() { wanted_root_labels_for_slot() {
slot="$1" slot="$1"
@@ -184,8 +182,7 @@ wanted_root_labels_for_slot() {
echo "rootfsB" echo "rootfsB"
;; ;;
*) *)
# Try modern rootfsA first, then legacy rootfs echo "rootfsA"
echo "rootfsA rootfs"
;; ;;
esac esac
} }
@@ -276,6 +273,16 @@ mount_or_panic -t overlay overlay \
-o "lowerdir=/newroot/etc,upperdir=/newroot/data/etc-overlay/upper,workdir=/newroot/data/etc-overlay/work" \ -o "lowerdir=/newroot/etc,upperdir=/newroot/data/etc-overlay/upper,workdir=/newroot/data/etc-overlay/work" \
/newroot/etc /newroot/etc
if [ "$BOOT_PART" = "A" ]; then
ALT_PART="$(find_sibling_part_on_same_disk "$ROOT_DEV" rootfsB || true)"
else
ALT_PART="$(find_sibling_part_on_same_disk "$ROOT_DEV" rootfsA || true)"
fi
if [ -n "$ALT_PART" ]; then
ln -sf "$ALT_PART" /dev/mksaltpart
fi
mount_or_panic --move /dev /newroot/dev mount_or_panic --move /dev /newroot/dev
mount_or_panic --move /proc /newroot/proc mount_or_panic --move /proc /newroot/proc
mount_or_panic --move /sys /newroot/sys mount_or_panic --move /sys /newroot/sys