package agent import ( "context" "fmt" "time" "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/klog/v2" monov1alpha1 "example.com/monok8s/pkg/apis/monok8s/v1alpha1" mkscmd "example.com/monok8s/pkg/cmd" osupgradeController "example.com/monok8s/pkg/controller/osupgrade" "example.com/monok8s/pkg/kube" "example.com/monok8s/pkg/templates" ) const defaultPollInterval = 15 * time.Second var runtimeDefaultUnstructuredConverter = runtime.DefaultUnstructuredConverter func NewCmdAgent(flags *genericclioptions.ConfigFlags) *cobra.Command { var namespace string var envFile string var pollInterval time.Duration cmd := &cobra.Command{ Use: "agent --env-file path", Short: "Watch OSUpgrade resources and process matching upgrades for this node", RunE: func(cmd *cobra.Command, _ []string) error { if envFile == "" { return fmt.Errorf("--env-file is required") } if err := mkscmd.LoadEnvFile(envFile); err != nil { return fmt.Errorf("load env file %q: %w", envFile, err) } vals := templates.LoadTemplateValuesFromEnv() rendered := templates.DefaultMonoKSConfig(vals) cfg := &rendered if cfg.Spec.NodeName == "" { return fmt.Errorf("node name is empty in rendered config") } klog.InfoS("starting agent", "node", cfg.Spec.NodeName, "namespace", namespace, "envFile", envFile, "pollInterval", pollInterval, ) clients, err := kube.NewClients(flags) if err != nil { return fmt.Errorf("create kube clients: %w", err) } ctx := cmd.Context() return runPollLoop(ctx, clients, namespace, cfg.Spec.NodeName, pollInterval) }, } cmd.Flags().StringVar(&namespace, "namespace", "kube-system", "namespace to watch") cmd.Flags().StringVar(&envFile, "env-file", "", "path to env file containing MKS_* variables") cmd.Flags().DurationVar(&pollInterval, "poll-interval", defaultPollInterval, "poll interval for OSUpgrade resources") return cmd } func runPollLoop(ctx context.Context, clients *kube.Clients, namespace, nodeName string, interval time.Duration) error { gvr := schema.GroupVersionResource{ Group: monov1alpha1.Group, Version: monov1alpha1.Version, Resource: "osupgrades", } ticker := time.NewTicker(interval) defer ticker.Stop() for { if err := pollOnce(ctx, clients, gvr, namespace, nodeName); err != nil { klog.ErrorS(err, "poll failed", "namespace", namespace, "node", nodeName) } select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: } } } func pollOnce( ctx context.Context, clients *kube.Clients, gvr schema.GroupVersionResource, namespace string, nodeName string, ) error { list, err := clients.Dynamic.Resource(gvr).Namespace(namespace).List(ctx, metav1.ListOptions{}) if err != nil { return fmt.Errorf("list osupgrades: %w", err) } klog.InfoS("agent tick", "namespace", namespace, "items", len(list.Items), "node", nodeName) nodeLabels := labels.Set{ "kubernetes.io/hostname": nodeName, "monok8s.io/node-name": nodeName, "monok8s.io/control-agent": "true", } for i := range list.Items { item := &list.Items[i] osu, err := decodeOSUpgrade(item) if err != nil { klog.ErrorS(err, "failed to decode osupgrade", "name", item.GetName(), "resourceVersion", item.GetResourceVersion(), ) continue } if !matchesNode(osu, nodeName, nodeLabels) { klog.V(2).InfoS("skipping osupgrade; not targeted to this node", "name", osu.Name, "node", nodeName, ) continue } klog.InfoS("matched osupgrade", "name", osu.Name, "node", nodeName, "desiredVersion", osu.Spec.DesiredVersion, "phase", statusPhase(osu.Status), "resourceVersion", osu.ResourceVersion, ) if err := osupgradeController.HandleOSUpgrade(ctx, clients, namespace, nodeName, osu); err != nil { klog.ErrorS(err, "failed to handle osupgrade", "name", osu.Name, "node", nodeName, ) continue } } return nil } func decodeOSUpgrade(item *unstructured.Unstructured) (*monov1alpha1.OSUpgrade, error) { var osu monov1alpha1.OSUpgrade if err := runtimeDefaultUnstructuredConverter.FromUnstructured(item.Object, &osu); err != nil { return nil, fmt.Errorf("convert unstructured to OSUpgrade: %w", err) } return &osu, nil } func matchesNode(osu *monov1alpha1.OSUpgrade, nodeName string, nodeLabels labels.Set) bool { if osu == nil { return false } sel := osu.Spec.NodeSelector if sel == nil { // No selector means "match all nodes". return true } selector, err := metav1.LabelSelectorAsSelector(sel) if err != nil { klog.ErrorS(err, "invalid node selector on osupgrade", "name", osu.Name) return false } if selector.Empty() { return true } if selector.Matches(nodeLabels) { return true } return false } func statusPhase(st *monov1alpha1.OSUpgradeStatus) string { if st == nil { return "" } return string(st.Phase) }