package agent import ( "context" "fmt" "os" "path/filepath" "time" "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/klog/v2" monov1alpha1 "example.com/monok8s/pkg/apis/monok8s/v1alpha1" mkscmd "example.com/monok8s/pkg/cmd" osupgradeController "example.com/monok8s/pkg/controller/osupgrade" "example.com/monok8s/pkg/kube" "example.com/monok8s/pkg/templates" ) func NewCmdAgent(flags *genericclioptions.ConfigFlags) *cobra.Command { var namespace string var envFile string cmd := &cobra.Command{ Use: "agent --env-file path", Short: "Watch OSUpgradeProgress resources for this node and process upgrades", RunE: func(cmd *cobra.Command, _ []string) error { if envFile == "" { return fmt.Errorf("--env-file is required") } if err := mkscmd.LoadEnvFile(envFile); err != nil { return fmt.Errorf("load env file %q: %w", envFile, err) } vals := templates.LoadTemplateValuesFromEnv() rendered := templates.DefaultMonoKSConfig(vals) cfg := &rendered if cfg.Spec.NodeName == "" { return fmt.Errorf("node name is empty in rendered config") } ctx := cmd.Context() if err := waitForControlGate(ctx, envFile, 2*time.Second); err != nil { return fmt.Errorf("wait for control gate to release: %w", err) } klog.InfoS("starting agent", "node", cfg.Spec.NodeName, "namespace", namespace, "envFile", envFile, ) clients, err := kube.NewClients(flags) if err != nil { return fmt.Errorf("create kube clients: %w", err) } return runWatchLoop(ctx, clients, namespace, cfg.Spec.NodeName) }, } cmd.Flags().StringVar(&namespace, "namespace", templates.DefaultNamespace, "namespace to watch") cmd.Flags().StringVar(&envFile, "env-file", "", "path to env file containing MKS_* variables") return cmd } func waitForControlGate(ctx context.Context, envFile string, pollInterval time.Duration) error { dir := filepath.Dir(envFile) marker := filepath.Join(dir, ".control-gate") if pollInterval <= 0 { pollInterval = 2 * time.Second } ticker := time.NewTicker(pollInterval) defer ticker.Stop() for { _, err := os.Stat(marker) if err == nil { klog.InfoS("Control gate is present; waiting before starting watch loop", "path", marker) } else if os.IsNotExist(err) { klog.InfoS("Control gate not present; starting watch loop", "path", marker) return nil } else { return fmt.Errorf("stat upgrade marker %s: %w", marker, err) } select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: } } } func runWatchLoop(ctx context.Context, clients *kube.Clients, namespace, nodeName string) error { var resourceVersion string for { if ctx.Err() != nil { return ctx.Err() } err := watchOnce(ctx, clients, namespace, nodeName, &resourceVersion) if err != nil { if ctx.Err() != nil { return ctx.Err() } klog.ErrorS(err, "watch failed; retrying", "namespace", namespace, "node", nodeName, "resourceVersion", resourceVersion, ) select { case <-ctx.Done(): return ctx.Err() case <-time.After(2 * time.Second): } continue } } } func watchOnce( ctx context.Context, clients *kube.Clients, namespace string, nodeName string, resourceVersion *string, ) error { list, err := clients.MonoKS. Monok8sV1alpha1(). OSUpgradeProgresses(namespace). List(ctx, metav1.ListOptions{}) if err != nil { return fmt.Errorf("list osupgradeprogresses: %w", err) } for i := range list.Items { item := &list.Items[i] if !targetsNode(item, nodeName) { continue } if !shouldHandle(item) { continue } klog.InfoS("found existing osupgradeprogress", "name", item.Name, "node", nodeName, "phase", progressPhase(item.Status), "resourceVersion", item.ResourceVersion, ) if err := osupgradeController.HandleOSUpgradeProgress(ctx, clients, namespace, nodeName, item); err != nil { klog.ErrorS(err, "failed to handle existing osupgradeprogress", "name", item.Name, "node", nodeName, ) } } *resourceVersion = list.ResourceVersion w, err := clients.MonoKS. Monok8sV1alpha1(). OSUpgradeProgresses(namespace). Watch(ctx, metav1.ListOptions{ ResourceVersion: *resourceVersion, }) if err != nil { return fmt.Errorf("watch osupgradeprogresses: %w", err) } defer w.Stop() klog.InfoS("watching osupgradeprogresses", "namespace", namespace, "node", nodeName, "resourceVersion", *resourceVersion, ) for { select { case <-ctx.Done(): return ctx.Err() case evt, ok := <-w.ResultChan(): if !ok { return fmt.Errorf("watch channel closed") } switch evt.Type { case watch.Bookmark: obj, ok := evt.Object.(*monov1alpha1.OSUpgradeProgress) if ok && obj != nil && obj.ResourceVersion != "" { *resourceVersion = obj.ResourceVersion } continue case watch.Error: return fmt.Errorf("watch returned error event") } osup, ok := evt.Object.(*monov1alpha1.OSUpgradeProgress) if !ok { klog.V(1).InfoS("skipping unexpected watch object type", "type", fmt.Sprintf("%T", evt.Object), ) continue } if osup.ResourceVersion != "" { *resourceVersion = osup.ResourceVersion } if !targetsNode(osup, nodeName) { continue } if !shouldHandle(osup) { klog.V(2).InfoS("skipping osupgradeprogress due to phase", "name", osup.Name, "node", nodeName, "phase", progressPhase(osup.Status), "eventType", evt.Type, ) continue } klog.InfoS("received osupgradeprogress event", "name", osup.Name, "node", nodeName, "phase", progressPhase(osup.Status), "eventType", evt.Type, "resourceVersion", osup.ResourceVersion, ) if err := osupgradeController.HandleOSUpgradeProgress(ctx, clients, namespace, nodeName, osup); err != nil { klog.ErrorS(err, "failed to handle osupgradeprogress", "name", osup.Name, "node", nodeName, "eventType", evt.Type, ) } } } } func targetsNode(osup *monov1alpha1.OSUpgradeProgress, nodeName string) bool { if osup == nil { return false } return osup.Spec.NodeName == nodeName } func shouldHandle(osup *monov1alpha1.OSUpgradeProgress) bool { if osup == nil { return false } switch osup.Status.Phase { case "", monov1alpha1.OSUpgradeProgressPhasePending: return true default: return false } } func progressPhase(st *monov1alpha1.OSUpgradeProgressStatus) string { if st == nil { return "" } return string(st.Phase) }