package osupgrade import ( "context" "fmt" "os" "sync/atomic" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" monov1alpha1 "example.com/monok8s/pkg/apis/monok8s/v1alpha1" "example.com/monok8s/pkg/buildinfo" "example.com/monok8s/pkg/catalog" "example.com/monok8s/pkg/controller/osimage" "example.com/monok8s/pkg/kube" "example.com/monok8s/pkg/node/uboot" ) type UpgradeRunner struct { running atomic.Bool rebooting atomic.Bool } var r UpgradeRunner func (r *UpgradeRunner) Run(fn func() error) error { if r.rebooting.Load() { return nil } if !r.running.CompareAndSwap(false, true) { return nil } defer r.running.Store(false) if r.rebooting.Load() { return nil } return fn() } func HandleOSUpgrade(ctx context.Context, clients *kube.Clients, namespace string, nodeName string, osu *monov1alpha1.OSUpgrade, ) error { return r.Run(func() error { return handleOSUpgradeLocked(ctx, clients, namespace, nodeName, osu) }) } func handleOSUpgradeLocked(ctx context.Context, clients *kube.Clients, namespace string, nodeName string, osu *monov1alpha1.OSUpgrade, ) error { osup, err := ensureProgressHeartbeat(ctx, clients, namespace, nodeName, osu) if err != nil { return err } klog.InfoS("handling osupgrade", "name", osu.Name, "node", nodeName, "desiredVersion", osu.Spec.DesiredVersion, ) kata, err := catalog.ResolveCatalog(ctx, clients.Kubernetes, namespace, osu.Spec.Catalog) if err != nil { return failProgress(ctx, clients, osup, "resolve catalog", err) } plan, err := PlanUpgrade(buildinfo.KubeVersion, osu, kata) if err != nil { return failProgress(ctx, clients, osup, "plan upgrade", err) } if len(plan.Path) == 0 { osup.Status.CurrentVersion = buildinfo.KubeVersion osup.Status.TargetVersion = buildinfo.KubeVersion if err := markProgressCompleted(ctx, clients, osup, "already at target version"); err != nil { return err } klog.InfoS("osupgrade already satisfied", "name", osu.Name, "node", nodeName, "target", plan.ResolvedTarget, ) return nil } first := plan.Path[0] updated, err := updateProgressRobust(ctx, clients, osup.Namespace, osup.Name, func(cur *monov1alpha1.OSUpgradeProgress) { now := metav1.Now() cur.Status.CurrentVersion = buildinfo.KubeVersion cur.Status.TargetVersion = plan.ResolvedTarget cur.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseDownloading cur.Status.Message = fmt.Sprintf("downloading image: %s", first.URL) cur.Status.LastUpdatedAt = &now }) if updated != nil { osup = updated } if err != nil { return fmt.Errorf("update progress status: %w", err) } klog.InfoS("planned osupgrade", "name", osu.Name, "node", nodeName, "resolvedTarget", plan.ResolvedTarget, "steps", len(plan.Path), "currentVersion", buildinfo.KubeVersion, "firstVersion", first.Version, "firstURL", first.URL, "size", first.Size, ) imageSHA, err := first.SHA256() if err != nil { return failProgress(ctx, clients, osup, "apply image", err) } pLogger := osimage.NewProgressLogger(2, 25) statusUpdater := osimage.NewTimeBasedUpdater(15) imageOptions := osimage.ApplyOptions{ URL: first.URL, TargetPath: monov1alpha1.AltPartDeviceLink, ExpectedRawSHA256: imageSHA, ExpectedRawSize: first.Size, BufferSize: 6 * 1024 * 1024, Progress: func(p osimage.Progress) { pLogger.Log(p) if err := statusUpdater.Run(func() error { klog.Infof("%s: %d%%", p.Stage, osimage.PercentOf(p.BytesComplete, p.BytesTotal)) updated, err := updateProgressRobust(ctx, clients, osup.Namespace, osup.Name, func(cur *monov1alpha1.OSUpgradeProgress) { now := metav1.Now() switch p.Stage { case "flash": cur.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseWriting case "verify": cur.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseVerifying } cur.Status.TargetVersion = plan.ResolvedTarget cur.Status.LastUpdatedAt = &now cur.Status.Message = fmt.Sprintf( "%s: %d%%", p.Stage, osimage.PercentOf(p.BytesComplete, p.BytesTotal), ) }) if updated != nil { osup = updated } if err != nil { return fmt.Errorf("update progress status: %w", err) } return nil }); err != nil { klog.ErrorS(err, "throttled progress update failed") } }, } result, err := osimage.ApplyImageStreamed(ctx, imageOptions) if err != nil { return failProgress(ctx, clients, osup, "apply image", err) } klog.Info(result) cfgPath := os.Getenv("FW_ENV_CONFIG_FILE") if err := uboot.ConfigureNextBoot(ctx, cfgPath); err != nil { return failProgress(ctx, clients, osup, "set boot env", err) } updated, err = updateProgressRobust(ctx, clients, osup.Namespace, osup.Name, func(cur *monov1alpha1.OSUpgradeProgress) { now := metav1.Now() cur.Status.TargetVersion = plan.ResolvedTarget cur.Status.Message = "image applied, verified, and next boot environment updated" cur.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseRebooting cur.Status.LastUpdatedAt = &now }) if updated != nil { osup = updated } if err != nil { return fmt.Errorf("update progress status: %w", err) } // TODO: Drain the node here // Get all running pods outta here! // kubectl.Run() // Wait for the node to be drained // kubectl.Wait() r.rebooting.Store(true) if err := triggerReboot(); err != nil { r.rebooting.Store(false) return fmt.Errorf("trigger reboot: %w", err) } select {} } func triggerReboot() error { _ = os.WriteFile("/proc/sysrq-trigger", []byte("s\n"), 0) _ = os.WriteFile("/proc/sysrq-trigger", []byte("u\n"), 0) return os.WriteFile("/proc/sysrq-trigger", []byte("b\n"), 0) }