package osupgrade import ( "context" "fmt" "strings" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" monov1alpha1 "example.com/monok8s/pkg/apis/monok8s/v1alpha1" "example.com/monok8s/pkg/kube" ) func EnsureOSUpgradeProgressForNode( ctx context.Context, clients *kube.Clients, namespace string, nodeName string, osu *monov1alpha1.OSUpgrade, ) error { if osu == nil { return fmt.Errorf("osupgrade is nil") } name := fmt.Sprintf("%s-%s", osu.Name, nodeName) now := metav1.Now() progress := &monov1alpha1.OSUpgradeProgress{ TypeMeta: metav1.TypeMeta{ APIVersion: monov1alpha1.APIVersion, Kind: "OSUpgradeProgress", }, ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, }, Spec: monov1alpha1.OSUpgradeProgressSpec{ NodeName: nodeName, SourceRef: monov1alpha1.OSUpgradeSourceRef{ Name: osu.Name, }, }, Status: &monov1alpha1.OSUpgradeProgressStatus{ Phase: monov1alpha1.OSUpgradeProgressPhasePending, LastUpdatedAt: &now, }, } created, err := createProgress(ctx, clients, progress) if err == nil { klog.InfoS("created osupgradeprogress", "name", created.Name, "namespace", created.Namespace) return nil } if !apierrors.IsAlreadyExists(err) { return fmt.Errorf("create OSUpgradeProgress %s/%s: %w", namespace, name, err) } existing, err := getProgress(ctx, clients, namespace, name) if err != nil { return fmt.Errorf("get existing OSUpgradeProgress %s/%s: %w", namespace, name, err) } if existing.Spec.NodeName != nodeName || existing.Spec.SourceRef.Name != osu.Name { return fmt.Errorf( "conflicting OSUpgradeProgress %s/%s already exists: got spec.nodeName=%q spec.sourceRef.name=%q, want nodeName=%q sourceRef.name=%q", namespace, name, existing.Spec.NodeName, existing.Spec.SourceRef.Name, nodeName, osu.Name, ) } return nil } func updateProgressRobust( ctx context.Context, clients *kube.Clients, namespace string, name string, mutate func(*monov1alpha1.OSUpgradeProgress), ) (*monov1alpha1.OSUpgradeProgress, error) { var out *monov1alpha1.OSUpgradeProgress err := retry.RetryOnConflict(retry.DefaultRetry, func() error { current, err := getProgress(ctx, clients, namespace, name) if err != nil { return err } if current.Status == nil { current.Status = &monov1alpha1.OSUpgradeProgressStatus{} } mutate(current) updated, err := updateProgressStatus(ctx, clients, current) if err != nil { if isUnknownUpdateResult(err) { latest, getErr := getProgress(ctx, clients, namespace, name) if getErr == nil { out = latest } } return err } out = updated return nil }) if err != nil && out != nil { // Unknown-result case: caller gets latest known server state plus error. return out, err } return out, err } func isUnknownUpdateResult(err error) bool { if err == nil { return false } if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) || apierrors.IsTooManyRequests(err) { return true } msg := strings.ToLower(err.Error()) return strings.Contains(msg, "request timed out") || strings.Contains(msg, "context deadline exceeded") || strings.Contains(msg, "etcdserver: request timed out") || strings.Contains(msg, "connection reset by peer") || strings.Contains(msg, "http2: client connection lost") } func createProgress( ctx context.Context, clients *kube.Clients, progress *monov1alpha1.OSUpgradeProgress, ) (*monov1alpha1.OSUpgradeProgress, error) { toCreate := progress.DeepCopy() toCreate.Status = nil created, err := clients.MonoKS. Monok8sV1alpha1(). OSUpgradeProgresses(toCreate.Namespace). Create(ctx, toCreate, metav1.CreateOptions{}) if err != nil { return nil, err } if progress.Status != nil { toUpdate := created.DeepCopy() toUpdate.Status = progress.Status return updateProgressStatus(ctx, clients, toUpdate) } return created, nil } func updateProgressStatus( ctx context.Context, clients *kube.Clients, progress *monov1alpha1.OSUpgradeProgress, ) (*monov1alpha1.OSUpgradeProgress, error) { updated, err := clients.MonoKS. Monok8sV1alpha1(). OSUpgradeProgresses(progress.Namespace). UpdateStatus(ctx, progress, metav1.UpdateOptions{}) if err != nil { return nil, fmt.Errorf( "update status for OSUpgradeProgress %s/%s: %w", progress.Namespace, progress.Name, err, ) } return updated, nil } func getProgress( ctx context.Context, clients *kube.Clients, namespace, name string, ) (*monov1alpha1.OSUpgradeProgress, error) { return clients.MonoKS. Monok8sV1alpha1(). OSUpgradeProgresses(namespace). Get(ctx, name, metav1.GetOptions{}) } func failProgress( ctx context.Context, clients *kube.Clients, osup *monov1alpha1.OSUpgradeProgress, action string, cause error, ) error { _, err := updateProgressRobust(ctx, clients, osup.Namespace, osup.Name, func(cur *monov1alpha1.OSUpgradeProgress) { now := metav1.Now() if cur.Status == nil { cur.Status = &monov1alpha1.OSUpgradeProgressStatus{} } cur.Status.LastUpdatedAt = &now cur.Status.Message = fmt.Sprintf("%s: %v", action, cause) cur.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseFailed }) if err != nil { klog.ErrorS(err, "failed to update osupgradeprogress status after error", "action", action, "name", osup.Name, "namespace", osup.Namespace, ) } return fmt.Errorf("%s: %w", action, cause) } func markProgressCompleted( ctx context.Context, clients *kube.Clients, osup *monov1alpha1.OSUpgradeProgress, message string, ) error { _, err := updateProgressRobust(ctx, clients, osup.Namespace, osup.Name, func(cur *monov1alpha1.OSUpgradeProgress) { now := metav1.Now() if cur.Status == nil { cur.Status = &monov1alpha1.OSUpgradeProgressStatus{} } cur.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseCompleted cur.Status.Message = message cur.Status.CurrentVersion = osup.Status.CurrentVersion cur.Status.TargetVersion = osup.Status.TargetVersion cur.Status.LastUpdatedAt = &now cur.Status.CompletedAt = &now }) if err != nil { return fmt.Errorf("mark progress completed: %w", err) } return nil }