437 lines
12 KiB
Go
437 lines
12 KiB
Go
package node
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
|
|
appsv1 "k8s.io/api/apps/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
rbacv1 "k8s.io/api/rbac/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/klog/v2"
|
|
|
|
"undecided.project/monok8s/pkg/crds"
|
|
"undecided.project/monok8s/pkg/kube"
|
|
)
|
|
|
|
const (
|
|
controlAgentName = "control-agent"
|
|
controlAgentDefaultNamespace = "kube-system"
|
|
controlAgentNodeSelectorKey = "monok8s.io/control-agent"
|
|
controlAgentNodeSelectorValue = "true"
|
|
controlAgentImage = "localhost/monok8s/control-agent:dev"
|
|
kubeconfig = "/etc/kubernetes/admin.conf"
|
|
)
|
|
|
|
func ApplyCRDs(ctx context.Context, n *NodeContext) error {
|
|
if n.Config.Spec.ClusterRole != "control-plane" {
|
|
return nil
|
|
}
|
|
|
|
clients, err := kube.NewClientsFromKubeconfig(kubeconfig)
|
|
if err != nil {
|
|
return fmt.Errorf("build kube clients from %s: %w", kubeconfig, err)
|
|
}
|
|
|
|
crdClient := clients.APIExtensions.ApiextensionsV1().CustomResourceDefinitions()
|
|
|
|
for _, wanted := range crds.Definitions() {
|
|
_, err := crdClient.Create(ctx, wanted, metav1.CreateOptions{})
|
|
if err == nil {
|
|
klog.InfoS("crd created", "name", wanted.Name)
|
|
continue
|
|
}
|
|
|
|
if !apierrors.IsAlreadyExists(err) {
|
|
return fmt.Errorf("create CRD %s: %w", wanted.Name, err)
|
|
}
|
|
|
|
current, getErr := crdClient.Get(ctx, wanted.Name, metav1.GetOptions{})
|
|
if getErr != nil {
|
|
return fmt.Errorf("get existing CRD %s: %w", wanted.Name, getErr)
|
|
}
|
|
|
|
updated := wanted.DeepCopy()
|
|
updated.ResourceVersion = current.ResourceVersion
|
|
|
|
_, err = crdClient.Update(ctx, updated, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("update CRD %s: %w", wanted.Name, err)
|
|
}
|
|
|
|
klog.InfoS("crd updated", "name", wanted.Name)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func ApplyControlAgentDaemonSetResources(ctx context.Context, n *NodeContext) error {
|
|
// Only the control-plane should bootstrap this DaemonSet definition.
|
|
// And only when the feature is enabled.
|
|
if strings.TrimSpace(n.Config.Spec.ClusterRole) != "control-plane" || !n.Config.Spec.EnableControlAgent {
|
|
klog.InfoS("skipped for", "clusterRole", n.Config.Spec.ClusterRole, "enableControlAgent", n.Config.Spec.EnableControlAgent)
|
|
return nil
|
|
}
|
|
|
|
err := ApplyCRDs(ctx, n)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
namespace := strings.TrimSpace(n.Config.Namespace)
|
|
if namespace == "" {
|
|
namespace = controlAgentDefaultNamespace
|
|
}
|
|
|
|
clients, err := kube.NewClientsFromKubeconfig(kubeconfig)
|
|
if err != nil {
|
|
return fmt.Errorf("build kube clients from %s: %w", kubeconfig, err)
|
|
}
|
|
|
|
labels := map[string]string{
|
|
"app.kubernetes.io/name": controlAgentName,
|
|
"app.kubernetes.io/component": "agent",
|
|
"app.kubernetes.io/part-of": "monok8s",
|
|
"app.kubernetes.io/managed-by": "ctl",
|
|
}
|
|
|
|
kubeClient := clients.Kubernetes
|
|
|
|
if err := applyControlAgentServiceAccount(ctx, kubeClient, namespace, labels); err != nil {
|
|
return fmt.Errorf("apply serviceaccount: %w", err)
|
|
}
|
|
if err := applyControlAgentClusterRole(ctx, kubeClient, labels); err != nil {
|
|
return fmt.Errorf("apply clusterrole: %w", err)
|
|
}
|
|
if err := applyControlAgentClusterRoleBinding(ctx, kubeClient, namespace, labels); err != nil {
|
|
return fmt.Errorf("apply clusterrolebinding: %w", err)
|
|
}
|
|
if err := applyControlAgentDaemonSet(ctx, kubeClient, namespace, labels); err != nil {
|
|
return fmt.Errorf("apply daemonset: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func applyControlAgentServiceAccount(ctx context.Context, kubeClient kubernetes.Interface, namespace string, labels map[string]string) error {
|
|
want := &corev1.ServiceAccount{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: controlAgentName,
|
|
Namespace: namespace,
|
|
Labels: labels,
|
|
},
|
|
}
|
|
|
|
existing, err := kubeClient.CoreV1().ServiceAccounts(namespace).Get(ctx, controlAgentName, metav1.GetOptions{})
|
|
if apierrors.IsNotFound(err) {
|
|
_, err = kubeClient.CoreV1().ServiceAccounts(namespace).Create(ctx, want, metav1.CreateOptions{})
|
|
return err
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
changed := false
|
|
if !reflect.DeepEqual(existing.Labels, want.Labels) {
|
|
existing.Labels = want.Labels
|
|
changed = true
|
|
}
|
|
|
|
if !changed {
|
|
return nil
|
|
}
|
|
|
|
_, err = kubeClient.CoreV1().ServiceAccounts(namespace).Update(ctx, existing, metav1.UpdateOptions{})
|
|
return err
|
|
}
|
|
|
|
func applyControlAgentClusterRole(ctx context.Context, kubeClient kubernetes.Interface, labels map[string]string) error {
|
|
wantRules := []rbacv1.PolicyRule{
|
|
{
|
|
APIGroups: []string{"monok8s.io"},
|
|
Resources: []string{"osupgrades"},
|
|
Verbs: []string{"get", "list", "watch"},
|
|
},
|
|
{
|
|
APIGroups: []string{"monok8s.io"},
|
|
Resources: []string{"osupgrades/status"},
|
|
Verbs: []string{"get", "patch", "update"},
|
|
},
|
|
{
|
|
APIGroups: []string{""},
|
|
Resources: []string{"nodes"},
|
|
Verbs: []string{"get", "list", "watch"},
|
|
},
|
|
}
|
|
|
|
want := &rbacv1.ClusterRole{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: controlAgentName,
|
|
Labels: labels,
|
|
},
|
|
Rules: wantRules,
|
|
}
|
|
|
|
existing, err := kubeClient.RbacV1().ClusterRoles().Get(ctx, controlAgentName, metav1.GetOptions{})
|
|
if apierrors.IsNotFound(err) {
|
|
_, err = kubeClient.RbacV1().ClusterRoles().Create(ctx, want, metav1.CreateOptions{})
|
|
return err
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
changed := false
|
|
if !reflect.DeepEqual(existing.Labels, want.Labels) {
|
|
existing.Labels = want.Labels
|
|
changed = true
|
|
}
|
|
if !reflect.DeepEqual(existing.Rules, want.Rules) {
|
|
existing.Rules = want.Rules
|
|
changed = true
|
|
}
|
|
|
|
if !changed {
|
|
return nil
|
|
}
|
|
|
|
_, err = kubeClient.RbacV1().ClusterRoles().Update(ctx, existing, metav1.UpdateOptions{})
|
|
return err
|
|
}
|
|
|
|
func applyControlAgentClusterRoleBinding(ctx context.Context, kubeClient kubernetes.Interface, namespace string, labels map[string]string) error {
|
|
wantRoleRef := rbacv1.RoleRef{
|
|
APIGroup: rbacv1.GroupName,
|
|
Kind: "ClusterRole",
|
|
Name: controlAgentName,
|
|
}
|
|
wantSubjects := []rbacv1.Subject{
|
|
{
|
|
Kind: "ServiceAccount",
|
|
Name: controlAgentName,
|
|
Namespace: namespace,
|
|
},
|
|
}
|
|
|
|
want := &rbacv1.ClusterRoleBinding{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: controlAgentName,
|
|
Labels: labels,
|
|
},
|
|
RoleRef: wantRoleRef,
|
|
Subjects: wantSubjects,
|
|
}
|
|
|
|
existing, err := kubeClient.RbacV1().ClusterRoleBindings().Get(ctx, controlAgentName, metav1.GetOptions{})
|
|
if apierrors.IsNotFound(err) {
|
|
_, err = kubeClient.RbacV1().ClusterRoleBindings().Create(ctx, want, metav1.CreateOptions{})
|
|
return err
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// roleRef is immutable. If it differs, fail loudly instead of pretending we can patch it.
|
|
if !reflect.DeepEqual(existing.RoleRef, want.RoleRef) {
|
|
return fmt.Errorf("existing ClusterRoleBinding %q has different roleRef and must be recreated", controlAgentName)
|
|
}
|
|
|
|
changed := false
|
|
if !reflect.DeepEqual(existing.Labels, want.Labels) {
|
|
existing.Labels = want.Labels
|
|
changed = true
|
|
}
|
|
if !reflect.DeepEqual(existing.Subjects, want.Subjects) {
|
|
existing.Subjects = want.Subjects
|
|
changed = true
|
|
}
|
|
|
|
if !changed {
|
|
return nil
|
|
}
|
|
|
|
_, err = kubeClient.RbacV1().ClusterRoleBindings().Update(ctx, existing, metav1.UpdateOptions{})
|
|
return err
|
|
}
|
|
|
|
func applyControlAgentDaemonSet(ctx context.Context, kubeClient kubernetes.Interface, namespace string, labels map[string]string) error {
|
|
privileged := true
|
|
|
|
dsLabels := map[string]string{
|
|
"app.kubernetes.io/name": controlAgentName,
|
|
"app.kubernetes.io/component": "agent",
|
|
"app.kubernetes.io/part-of": "monok8s",
|
|
"app.kubernetes.io/managed-by": "ctl",
|
|
}
|
|
|
|
want := &appsv1.DaemonSet{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: controlAgentName,
|
|
Namespace: namespace,
|
|
Labels: labels,
|
|
},
|
|
Spec: appsv1.DaemonSetSpec{
|
|
Selector: &metav1.LabelSelector{
|
|
MatchLabels: map[string]string{
|
|
"app.kubernetes.io/name": controlAgentName,
|
|
},
|
|
},
|
|
Template: corev1.PodTemplateSpec{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Labels: dsLabels,
|
|
},
|
|
Spec: corev1.PodSpec{
|
|
ServiceAccountName: controlAgentName,
|
|
HostNetwork: true,
|
|
HostPID: true,
|
|
DNSPolicy: corev1.DNSClusterFirstWithHostNet,
|
|
NodeSelector: map[string]string{
|
|
controlAgentNodeSelectorKey: controlAgentNodeSelectorValue,
|
|
},
|
|
Tolerations: []corev1.Toleration{
|
|
{Operator: corev1.TolerationOpExists},
|
|
},
|
|
Containers: []corev1.Container{
|
|
{
|
|
Name: "agent",
|
|
Image: controlAgentImage,
|
|
ImagePullPolicy: corev1.PullNever,
|
|
Args: []string{"agent", "--env-file", "$(CLUSTER_ENV_FILE)"},
|
|
Env: []corev1.EnvVar{
|
|
{
|
|
Name: "NODE_NAME",
|
|
ValueFrom: &corev1.EnvVarSource{
|
|
FieldRef: &corev1.ObjectFieldSelector{
|
|
APIVersion: "v1",
|
|
FieldPath: "spec.nodeName",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "CLUSTER_ENV_FILE",
|
|
Value: "/host/opt/monok8s/config/cluster.env",
|
|
},
|
|
{
|
|
Name: "HOST_MOUNT_ROOT",
|
|
Value: "/host/mnt/control-agent",
|
|
},
|
|
{
|
|
Name: "HOST_DEV_DIR",
|
|
Value: "/host/dev",
|
|
},
|
|
{
|
|
Name: "HOST_PROC_DIR",
|
|
Value: "/host/proc",
|
|
},
|
|
{
|
|
Name: "HOST_RUN_DIR",
|
|
Value: "/host/run",
|
|
},
|
|
},
|
|
SecurityContext: &corev1.SecurityContext{
|
|
Privileged: &privileged,
|
|
},
|
|
VolumeMounts: []corev1.VolumeMount{
|
|
{
|
|
Name: "host-dev",
|
|
MountPath: "/host/dev",
|
|
},
|
|
{
|
|
Name: "host-config",
|
|
MountPath: "/host/opt/monok8s/config",
|
|
ReadOnly: true,
|
|
},
|
|
{
|
|
Name: "host-run",
|
|
MountPath: "/host/run",
|
|
},
|
|
{
|
|
Name: "host-proc",
|
|
MountPath: "/host/proc",
|
|
ReadOnly: true,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
Volumes: []corev1.Volume{
|
|
{
|
|
Name: "host-dev",
|
|
VolumeSource: corev1.VolumeSource{
|
|
HostPath: &corev1.HostPathVolumeSource{
|
|
Path: "/dev",
|
|
Type: hostPathType(corev1.HostPathDirectory),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "host-config",
|
|
VolumeSource: corev1.VolumeSource{
|
|
HostPath: &corev1.HostPathVolumeSource{
|
|
Path: "/opt/monok8s/config",
|
|
Type: hostPathType(corev1.HostPathDirectory),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "host-run",
|
|
VolumeSource: corev1.VolumeSource{
|
|
HostPath: &corev1.HostPathVolumeSource{
|
|
Path: "/run",
|
|
Type: hostPathType(corev1.HostPathDirectory),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "host-proc",
|
|
VolumeSource: corev1.VolumeSource{
|
|
HostPath: &corev1.HostPathVolumeSource{
|
|
Path: "/proc",
|
|
Type: hostPathType(corev1.HostPathDirectory),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
existing, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, controlAgentName, metav1.GetOptions{})
|
|
if apierrors.IsNotFound(err) {
|
|
_, err = kubeClient.AppsV1().DaemonSets(namespace).Create(ctx, want, metav1.CreateOptions{})
|
|
return err
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
changed := false
|
|
if !reflect.DeepEqual(existing.Labels, want.Labels) {
|
|
existing.Labels = want.Labels
|
|
changed = true
|
|
}
|
|
if !reflect.DeepEqual(existing.Spec, want.Spec) {
|
|
existing.Spec = want.Spec
|
|
changed = true
|
|
}
|
|
|
|
if !changed {
|
|
return nil
|
|
}
|
|
|
|
_, err = kubeClient.AppsV1().DaemonSets(namespace).Update(ctx, existing, metav1.UpdateOptions{})
|
|
return err
|
|
}
|
|
|
|
func hostPathType(t corev1.HostPathType) *corev1.HostPathType {
|
|
return &t
|
|
}
|
|
|
|
func mountPropagationMode(m corev1.MountPropagationMode) *corev1.MountPropagationMode {
|
|
return &m
|
|
}
|