package node import ( "context" "fmt" "reflect" "strings" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "undecided.project/monok8s/pkg/crds" "undecided.project/monok8s/pkg/kube" ) const ( controlAgentName = "control-agent" controlAgentDefaultNamespace = "kube-system" controlAgentNodeSelectorKey = "monok8s.io/control-agent" controlAgentNodeSelectorValue = "true" controlAgentImage = "localhost/monok8s/control-agent:dev" kubeconfig = "/etc/kubernetes/admin.conf" ) func ApplyCRDs(ctx context.Context, n *NodeContext) error { if n.Config.Spec.ClusterRole != "control-plane" { return nil } clients, err := kube.NewClientsFromKubeconfig(kubeconfig) if err != nil { return fmt.Errorf("build kube clients from %s: %w", kubeconfig, err) } crdClient := clients.APIExtensions.ApiextensionsV1().CustomResourceDefinitions() for _, wanted := range crds.Definitions() { _, err := crdClient.Create(ctx, wanted, metav1.CreateOptions{}) if err == nil { klog.InfoS("crd created", "name", wanted.Name) continue } if !apierrors.IsAlreadyExists(err) { return fmt.Errorf("create CRD %s: %w", wanted.Name, err) } current, getErr := crdClient.Get(ctx, wanted.Name, metav1.GetOptions{}) if getErr != nil { return fmt.Errorf("get existing CRD %s: %w", wanted.Name, getErr) } updated := wanted.DeepCopy() updated.ResourceVersion = current.ResourceVersion _, err = crdClient.Update(ctx, updated, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("update CRD %s: %w", wanted.Name, err) } klog.InfoS("crd updated", "name", wanted.Name) } return nil } func ApplyControlAgentDaemonSetResources(ctx context.Context, n *NodeContext) error { // Only the control-plane should bootstrap this DaemonSet definition. // And only when the feature is enabled. if strings.TrimSpace(n.Config.Spec.ClusterRole) != "control-plane" || !n.Config.Spec.EnableControlAgent { klog.InfoS("skipped for", "clusterRole", n.Config.Spec.ClusterRole, "enableControlAgent", n.Config.Spec.EnableControlAgent) return nil } err := ApplyCRDs(ctx, n) if err != nil { return err } namespace := strings.TrimSpace(n.Config.Namespace) if namespace == "" { namespace = controlAgentDefaultNamespace } clients, err := kube.NewClientsFromKubeconfig(kubeconfig) if err != nil { return fmt.Errorf("build kube clients from %s: %w", kubeconfig, err) } labels := map[string]string{ "app.kubernetes.io/name": controlAgentName, "app.kubernetes.io/component": "agent", "app.kubernetes.io/part-of": "monok8s", "app.kubernetes.io/managed-by": "ctl", } kubeClient := clients.Kubernetes if err := applyControlAgentServiceAccount(ctx, kubeClient, namespace, labels); err != nil { return fmt.Errorf("apply serviceaccount: %w", err) } if err := applyControlAgentClusterRole(ctx, kubeClient, labels); err != nil { return fmt.Errorf("apply clusterrole: %w", err) } if err := applyControlAgentClusterRoleBinding(ctx, kubeClient, namespace, labels); err != nil { return fmt.Errorf("apply clusterrolebinding: %w", err) } if err := applyControlAgentDaemonSet(ctx, kubeClient, namespace, labels); err != nil { return fmt.Errorf("apply daemonset: %w", err) } return nil } func applyControlAgentServiceAccount(ctx context.Context, kubeClient kubernetes.Interface, namespace string, labels map[string]string) error { want := &corev1.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: controlAgentName, Namespace: namespace, Labels: labels, }, } existing, err := kubeClient.CoreV1().ServiceAccounts(namespace).Get(ctx, controlAgentName, metav1.GetOptions{}) if apierrors.IsNotFound(err) { _, err = kubeClient.CoreV1().ServiceAccounts(namespace).Create(ctx, want, metav1.CreateOptions{}) return err } if err != nil { return err } changed := false if !reflect.DeepEqual(existing.Labels, want.Labels) { existing.Labels = want.Labels changed = true } if !changed { return nil } _, err = kubeClient.CoreV1().ServiceAccounts(namespace).Update(ctx, existing, metav1.UpdateOptions{}) return err } func applyControlAgentClusterRole(ctx context.Context, kubeClient kubernetes.Interface, labels map[string]string) error { wantRules := []rbacv1.PolicyRule{ { APIGroups: []string{"monok8s.io"}, Resources: []string{"osupgrades"}, Verbs: []string{"get", "list", "watch"}, }, { APIGroups: []string{"monok8s.io"}, Resources: []string{"osupgrades/status"}, Verbs: []string{"get", "patch", "update"}, }, { APIGroups: []string{""}, Resources: []string{"nodes"}, Verbs: []string{"get", "list", "watch"}, }, } want := &rbacv1.ClusterRole{ ObjectMeta: metav1.ObjectMeta{ Name: controlAgentName, Labels: labels, }, Rules: wantRules, } existing, err := kubeClient.RbacV1().ClusterRoles().Get(ctx, controlAgentName, metav1.GetOptions{}) if apierrors.IsNotFound(err) { _, err = kubeClient.RbacV1().ClusterRoles().Create(ctx, want, metav1.CreateOptions{}) return err } if err != nil { return err } changed := false if !reflect.DeepEqual(existing.Labels, want.Labels) { existing.Labels = want.Labels changed = true } if !reflect.DeepEqual(existing.Rules, want.Rules) { existing.Rules = want.Rules changed = true } if !changed { return nil } _, err = kubeClient.RbacV1().ClusterRoles().Update(ctx, existing, metav1.UpdateOptions{}) return err } func applyControlAgentClusterRoleBinding(ctx context.Context, kubeClient kubernetes.Interface, namespace string, labels map[string]string) error { wantRoleRef := rbacv1.RoleRef{ APIGroup: rbacv1.GroupName, Kind: "ClusterRole", Name: controlAgentName, } wantSubjects := []rbacv1.Subject{ { Kind: "ServiceAccount", Name: controlAgentName, Namespace: namespace, }, } want := &rbacv1.ClusterRoleBinding{ ObjectMeta: metav1.ObjectMeta{ Name: controlAgentName, Labels: labels, }, RoleRef: wantRoleRef, Subjects: wantSubjects, } existing, err := kubeClient.RbacV1().ClusterRoleBindings().Get(ctx, controlAgentName, metav1.GetOptions{}) if apierrors.IsNotFound(err) { _, err = kubeClient.RbacV1().ClusterRoleBindings().Create(ctx, want, metav1.CreateOptions{}) return err } if err != nil { return err } // roleRef is immutable. If it differs, fail loudly instead of pretending we can patch it. if !reflect.DeepEqual(existing.RoleRef, want.RoleRef) { return fmt.Errorf("existing ClusterRoleBinding %q has different roleRef and must be recreated", controlAgentName) } changed := false if !reflect.DeepEqual(existing.Labels, want.Labels) { existing.Labels = want.Labels changed = true } if !reflect.DeepEqual(existing.Subjects, want.Subjects) { existing.Subjects = want.Subjects changed = true } if !changed { return nil } _, err = kubeClient.RbacV1().ClusterRoleBindings().Update(ctx, existing, metav1.UpdateOptions{}) return err } func applyControlAgentDaemonSet(ctx context.Context, kubeClient kubernetes.Interface, namespace string, labels map[string]string) error { privileged := true dsLabels := map[string]string{ "app.kubernetes.io/name": controlAgentName, "app.kubernetes.io/component": "agent", "app.kubernetes.io/part-of": "monok8s", "app.kubernetes.io/managed-by": "ctl", } want := &appsv1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ Name: controlAgentName, Namespace: namespace, Labels: labels, }, Spec: appsv1.DaemonSetSpec{ Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "app.kubernetes.io/name": controlAgentName, }, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: dsLabels, }, Spec: corev1.PodSpec{ ServiceAccountName: controlAgentName, HostNetwork: true, HostPID: true, DNSPolicy: corev1.DNSClusterFirstWithHostNet, NodeSelector: map[string]string{ controlAgentNodeSelectorKey: controlAgentNodeSelectorValue, }, Tolerations: []corev1.Toleration{ {Operator: corev1.TolerationOpExists}, }, Containers: []corev1.Container{ { Name: "agent", Image: controlAgentImage, ImagePullPolicy: corev1.PullNever, Args: []string{"agent", "--env-file", "$(CLUSTER_ENV_FILE)"}, Env: []corev1.EnvVar{ { Name: "NODE_NAME", ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ APIVersion: "v1", FieldPath: "spec.nodeName", }, }, }, { Name: "CLUSTER_ENV_FILE", Value: "/host/opt/monok8s/config/cluster.env", }, { Name: "HOST_MOUNT_ROOT", Value: "/host/mnt/control-agent", }, { Name: "HOST_DEV_DIR", Value: "/host/dev", }, { Name: "HOST_PROC_DIR", Value: "/host/proc", }, { Name: "HOST_RUN_DIR", Value: "/host/run", }, }, SecurityContext: &corev1.SecurityContext{ Privileged: &privileged, }, VolumeMounts: []corev1.VolumeMount{ { Name: "host-dev", MountPath: "/host/dev", }, { Name: "host-config", MountPath: "/host/opt/monok8s/config", ReadOnly: true, }, { Name: "host-run", MountPath: "/host/run", }, { Name: "host-proc", MountPath: "/host/proc", ReadOnly: true, }, }, }, }, Volumes: []corev1.Volume{ { Name: "host-dev", VolumeSource: corev1.VolumeSource{ HostPath: &corev1.HostPathVolumeSource{ Path: "/dev", Type: hostPathType(corev1.HostPathDirectory), }, }, }, { Name: "host-config", VolumeSource: corev1.VolumeSource{ HostPath: &corev1.HostPathVolumeSource{ Path: "/opt/monok8s/config", Type: hostPathType(corev1.HostPathDirectory), }, }, }, { Name: "host-run", VolumeSource: corev1.VolumeSource{ HostPath: &corev1.HostPathVolumeSource{ Path: "/run", Type: hostPathType(corev1.HostPathDirectory), }, }, }, { Name: "host-proc", VolumeSource: corev1.VolumeSource{ HostPath: &corev1.HostPathVolumeSource{ Path: "/proc", Type: hostPathType(corev1.HostPathDirectory), }, }, }, }, }, }, }, } existing, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, controlAgentName, metav1.GetOptions{}) if apierrors.IsNotFound(err) { _, err = kubeClient.AppsV1().DaemonSets(namespace).Create(ctx, want, metav1.CreateOptions{}) return err } if err != nil { return err } changed := false if !reflect.DeepEqual(existing.Labels, want.Labels) { existing.Labels = want.Labels changed = true } if !reflect.DeepEqual(existing.Spec, want.Spec) { existing.Spec = want.Spec changed = true } if !changed { return nil } _, err = kubeClient.AppsV1().DaemonSets(namespace).Update(ctx, existing, metav1.UpdateOptions{}) return err } func hostPathType(t corev1.HostPathType) *corev1.HostPathType { return &t } func mountPropagationMode(m corev1.MountPropagationMode) *corev1.MountPropagationMode { return &m }