Files
monok8s/clitools/pkg/cmd/agent/agent.go

205 lines
5.2 KiB
Go

package agent
import (
"context"
"fmt"
"time"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/klog/v2"
monov1alpha1 "example.com/monok8s/pkg/apis/monok8s/v1alpha1"
mkscmd "example.com/monok8s/pkg/cmd"
osupgradeController "example.com/monok8s/pkg/controller/osupgrade"
"example.com/monok8s/pkg/kube"
"example.com/monok8s/pkg/templates"
)
const defaultPollInterval = 15 * time.Second
var runtimeDefaultUnstructuredConverter = runtime.DefaultUnstructuredConverter
func NewCmdAgent(flags *genericclioptions.ConfigFlags) *cobra.Command {
var namespace string
var envFile string
var pollInterval time.Duration
cmd := &cobra.Command{
Use: "agent --env-file path",
Short: "Watch OSUpgrade resources and process matching upgrades for this node",
RunE: func(cmd *cobra.Command, _ []string) error {
if envFile == "" {
return fmt.Errorf("--env-file is required")
}
if err := mkscmd.LoadEnvFile(envFile); err != nil {
return fmt.Errorf("load env file %q: %w", envFile, err)
}
vals := templates.LoadTemplateValuesFromEnv()
rendered := templates.DefaultMonoKSConfig(vals)
cfg := &rendered
if cfg.Spec.NodeName == "" {
return fmt.Errorf("node name is empty in rendered config")
}
klog.InfoS("starting agent",
"node", cfg.Spec.NodeName,
"namespace", namespace,
"envFile", envFile,
"pollInterval", pollInterval,
)
clients, err := kube.NewClients(flags)
if err != nil {
return fmt.Errorf("create kube clients: %w", err)
}
ctx := cmd.Context()
return runPollLoop(ctx, clients, namespace, cfg.Spec.NodeName, pollInterval)
},
}
cmd.Flags().StringVar(&namespace, "namespace", "kube-system", "namespace to watch")
cmd.Flags().StringVar(&envFile, "env-file", "", "path to env file containing MKS_* variables")
cmd.Flags().DurationVar(&pollInterval, "poll-interval", defaultPollInterval, "poll interval for OSUpgrade resources")
return cmd
}
func runPollLoop(ctx context.Context, clients *kube.Clients, namespace, nodeName string, interval time.Duration) error {
gvr := schema.GroupVersionResource{
Group: monov1alpha1.Group,
Version: monov1alpha1.Version,
Resource: "osupgrades",
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
if err := pollOnce(ctx, clients, gvr, namespace, nodeName); err != nil {
klog.ErrorS(err, "poll failed", "namespace", namespace, "node", nodeName)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
}
func pollOnce(
ctx context.Context,
clients *kube.Clients,
gvr schema.GroupVersionResource,
namespace string,
nodeName string,
) error {
list, err := clients.Dynamic.Resource(gvr).Namespace(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("list osupgrades: %w", err)
}
klog.InfoS("agent tick", "namespace", namespace, "items", len(list.Items), "node", nodeName)
nodeLabels := labels.Set{
"kubernetes.io/hostname": nodeName,
"monok8s.io/node-name": nodeName,
"monok8s.io/control-agent": "true",
}
for i := range list.Items {
item := &list.Items[i]
osu, err := decodeOSUpgrade(item)
if err != nil {
klog.ErrorS(err, "failed to decode osupgrade",
"name", item.GetName(),
"resourceVersion", item.GetResourceVersion(),
)
continue
}
if !matchesNode(osu, nodeName, nodeLabels) {
klog.V(2).InfoS("skipping osupgrade; not targeted to this node",
"name", osu.Name,
"node", nodeName,
)
continue
}
klog.InfoS("matched osupgrade",
"name", osu.Name,
"node", nodeName,
"desiredVersion", osu.Spec.DesiredVersion,
"phase", statusPhase(osu.Status),
"resourceVersion", osu.ResourceVersion,
)
if err := osupgradeController.HandleOSUpgrade(ctx, clients, namespace, nodeName, osu); err != nil {
klog.ErrorS(err, "failed to handle osupgrade",
"name", osu.Name,
"node", nodeName,
)
continue
}
}
return nil
}
func decodeOSUpgrade(item *unstructured.Unstructured) (*monov1alpha1.OSUpgrade, error) {
var osu monov1alpha1.OSUpgrade
if err := runtimeDefaultUnstructuredConverter.FromUnstructured(item.Object, &osu); err != nil {
return nil, fmt.Errorf("convert unstructured to OSUpgrade: %w", err)
}
return &osu, nil
}
func matchesNode(osu *monov1alpha1.OSUpgrade, nodeName string, nodeLabels labels.Set) bool {
if osu == nil {
return false
}
sel := osu.Spec.NodeSelector
if sel == nil {
// No selector means "match all nodes".
return true
}
selector, err := metav1.LabelSelectorAsSelector(sel)
if err != nil {
klog.ErrorS(err, "invalid node selector on osupgrade", "name", osu.Name)
return false
}
if selector.Empty() {
return true
}
if selector.Matches(nodeLabels) {
return true
}
// Cheap fallback in case you temporarily target by explicit hostname-ish labels only.
return nodeName != "" && selector.Matches(labels.Set{
"kubernetes.io/hostname": nodeName,
})
}
func statusPhase(st *monov1alpha1.OSUpgradeStatus) string {
if st == nil {
return ""
}
return string(st.Phase)
}