package osupgrade import ( "context" "fmt" "time" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" "k8s.io/klog/v2" monov1alpha1 "example.com/monok8s/pkg/apis/monok8s/v1alpha1" "example.com/monok8s/pkg/kube" ) func Watch(ctx context.Context, clients *kube.Clients, namespace string) error { var resourceVersion string for { if ctx.Err() != nil { return ctx.Err() } err := watchOnce(ctx, clients, namespace, &resourceVersion) if err != nil { if ctx.Err() != nil { return ctx.Err() } // Expired RV is normal enough; clear it and relist. if apierrors.IsResourceExpired(err) { klog.InfoS("OSUpgrade watch resourceVersion expired; resetting", "namespace", namespace, "resourceVersion", resourceVersion, ) resourceVersion = "" } else { klog.ErrorS(err, "OSUpgrade watch failed; retrying", "namespace", namespace, "resourceVersion", resourceVersion, ) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(2 * time.Second): } continue } } } func watchOnce( ctx context.Context, clients *kube.Clients, namespace string, resourceVersion *string, ) error { // Cold start: list existing objects once, handle them, then watch from list RV. if *resourceVersion == "" { list, err := clients.MonoKS. Monok8sV1alpha1(). OSUpgrades(namespace). List(ctx, metav1.ListOptions{}) if err != nil { return fmt.Errorf("list OSUpgrades: %w", err) } for i := range list.Items { osu := &list.Items[i] handled, err := handleOSUpgrade(ctx, clients, namespace, osu) if err != nil { klog.ErrorS(err, "reconcile existing OSUpgrade failed", "name", osu.Name, "resourceVersion", osu.ResourceVersion, ) continue } if !handled { klog.V(2).InfoS("skipping existing OSUpgrade", "name", osu.Name, "phase", osu.StatusPhase(), ) } } *resourceVersion = list.ResourceVersion klog.InfoS("initial OSUpgrade sync complete", "namespace", namespace, "resourceVersion", *resourceVersion, "count", len(list.Items), ) } w, err := clients.MonoKS. Monok8sV1alpha1(). OSUpgrades(namespace). Watch(ctx, metav1.ListOptions{ ResourceVersion: *resourceVersion, AllowWatchBookmarks: true, }) if err != nil { return fmt.Errorf("watch OSUpgrades: %w", err) } defer w.Stop() klog.InfoS("watching OSUpgrades", "namespace", namespace, "resourceVersion", *resourceVersion, ) for { select { case <-ctx.Done(): return ctx.Err() case evt, ok := <-w.ResultChan(): if !ok { return fmt.Errorf("watch channel closed") } switch evt.Type { case watch.Bookmark: if rv := extractResourceVersion(evt.Object); rv != "" { *resourceVersion = rv } continue case watch.Error: // Let outer loop retry / relist. return fmt.Errorf("watch returned error event") case watch.Deleted: // Top-level delete does not require action here. continue case watch.Added, watch.Modified: // handled below default: klog.V(1).InfoS("skipping unexpected watch event type", "eventType", evt.Type, ) continue } osu, ok := evt.Object.(*monov1alpha1.OSUpgrade) if !ok { klog.V(1).InfoS("skipping unexpected watch object type", "type", fmt.Sprintf("%T", evt.Object), ) continue } if osu.ResourceVersion != "" { *resourceVersion = osu.ResourceVersion } handled, err := handleOSUpgrade(ctx, clients, namespace, osu) if err != nil { klog.ErrorS(err, "reconcile OSUpgrade failed", "name", osu.Name, "eventType", evt.Type, "resourceVersion", osu.ResourceVersion, ) continue } if !handled { klog.V(2).InfoS("skipping OSUpgrade", "name", osu.Name, "eventType", evt.Type, "phase", osu.StatusPhase(), ) } } } } func handleOSUpgrade( ctx context.Context, clients *kube.Clients, namespace string, osu *monov1alpha1.OSUpgrade, ) (bool, error) { if !shouldHandle(osu) { return false, nil } if osu.Status == nil || osu.Status.ObservedGeneration != osu.Generation { return true, reconcileSpec(ctx, clients, namespace, osu) } if osu.Status.Phase == monov1alpha1.OSUpgradePhaseAccepted { return true, reconcileFanout(ctx, clients, namespace, osu) } return false, nil } func reconcileSpec( ctx context.Context, clients *kube.Clients, namespace string, osu *monov1alpha1.OSUpgrade, ) error { osu = osu.DeepCopy() osu.Status = &monov1alpha1.OSUpgradeStatus{ Phase: monov1alpha1.OSUpgradePhaseAccepted, ResolvedVersion: osu.Spec.DesiredVersion, ObservedGeneration: osu.Generation, } _, err := clients.MonoKS. Monok8sV1alpha1(). OSUpgrades(namespace). UpdateStatus(ctx, osu, metav1.UpdateOptions{}) return err } func reconcileFanout( ctx context.Context, clients *kube.Clients, namespace string, osu *monov1alpha1.OSUpgrade, ) error { nodeNames, err := listTargetNodeNames(ctx, clients, osu) if err != nil { return fmt.Errorf("list target nodes for %s: %w", osu.Name, err) } if len(nodeNames) == 0 { klog.InfoS("no targets", "osupgrade", osu.Name) return nil } klog.InfoS("ensuring OSUpgradeProgress for target nodes", "osupgrade", osu.Name, "targets", len(nodeNames), ) for _, nodeName := range nodeNames { if err := EnsureOSUpgradeProgressForNode( ctx, clients, namespace, nodeName, osu, ); err != nil { klog.ErrorS(err, "ensure OSUpgradeProgress for node failed", "osupgrade", osu.Name, "node", nodeName, ) } } return nil } func listTargetNodeNames( ctx context.Context, clients *kube.Clients, osu *monov1alpha1.OSUpgrade, ) ([]string, error) { selector := labels.SelectorFromSet(labels.Set{ monov1alpha1.ControlAgentKey: "true", }) if osu.Spec.NodeSelector != nil { sel, err := metav1.LabelSelectorAsSelector(osu.Spec.NodeSelector) if err != nil { return nil, fmt.Errorf("invalid nodeSelector: %w", err) } reqs, selectable := sel.Requirements() if !selectable { return nil, fmt.Errorf("nodeSelector is not selectable") } selector = selector.Add(reqs...) } list, err := clients.Kubernetes.CoreV1(). Nodes(). List(ctx, metav1.ListOptions{ LabelSelector: selector.String(), }) if err != nil { return nil, fmt.Errorf("list nodes: %w", err) } out := make([]string, 0, len(list.Items)) for i := range list.Items { node := &list.Items[i] if shouldUseNode(node) { out = append(out, node.Name) } } return out, nil } func shouldUseNode(node *corev1.Node) bool { // Keep this conservative for now. Tighten if you want only Ready nodes. return node != nil && node.Name != "" } func shouldHandle(osu *monov1alpha1.OSUpgrade) bool { if osu == nil || osu.DeletionTimestamp != nil { return false } if osu.Spec.DesiredVersion == "" { return false } // NEW: initial processing stage if osu.Status == nil { return true } // Reconcile if spec changed if osu.Status.ObservedGeneration != osu.Generation { return true } // Fanout stage return osu.Status.Phase == monov1alpha1.OSUpgradePhaseAccepted } func extractResourceVersion(obj interface{}) string { type hasRV interface { GetResourceVersion() string } if o, ok := obj.(hasRV); ok { return o.GetResourceVersion() } return "" }