package system import ( "bufio" "bytes" "context" "errors" "fmt" "io" "os" "os/exec" "strings" "sync" "time" ) type Logger interface { Printf(format string, args ...any) } type RunnerConfig struct { DefaultTimeout time.Duration StreamOutput bool Logger Logger } type Runner struct { cfg RunnerConfig } func NewRunner(cfg RunnerConfig) *Runner { if cfg.DefaultTimeout <= 0 { cfg.DefaultTimeout = 30 * time.Second } return &Runner{cfg: cfg} } type Result struct { Name string Args []string ExitCode int Stdout string Stderr string Duration time.Duration StartTime time.Time EndTime time.Time } type RunOptions struct { Dir string Env []string Timeout time.Duration Stdin io.Reader Stdout io.Writer Stderr io.Writer Quiet bool RedactEnv []string // Optional line hooks. Called for each complete line seen on stdout/stderr. OnStdoutLine func(line string) OnStderrLine func(line string) } type RetryOptions struct { Attempts int Delay time.Duration } func (r *Runner) Run(ctx context.Context, name string, args ...string) (*Result, error) { return r.RunWithOptions(ctx, name, args, RunOptions{}) } func (r *Runner) RunWithOptions(ctx context.Context, name string, args []string, opt RunOptions) (*Result, error) { if strings.TrimSpace(name) == "" { return nil, errors.New("command name cannot be empty") } timeout := opt.Timeout if timeout <= 0 { timeout = r.cfg.DefaultTimeout } ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() cmd := exec.CommandContext(ctx, name, args...) cmd.Dir = opt.Dir cmd.Env = mergeEnv(os.Environ(), opt.Env) cmd.Stdin = opt.Stdin var stdoutBuf bytes.Buffer var stderrBuf bytes.Buffer stdoutDst := io.Writer(&stdoutBuf) stderrDst := io.Writer(&stderrBuf) if opt.Stdout != nil { stdoutDst = io.MultiWriter(stdoutDst, opt.Stdout) } else if r.cfg.StreamOutput && !opt.Quiet { stdoutDst = io.MultiWriter(stdoutDst, os.Stdout) } if opt.Stderr != nil { stderrDst = io.MultiWriter(stderrDst, opt.Stderr) } else if r.cfg.StreamOutput && !opt.Quiet { stderrDst = io.MultiWriter(stderrDst, os.Stderr) } if opt.OnStdoutLine != nil { stdoutDst = newLineHookWriter(stdoutDst, opt.OnStdoutLine) } if opt.OnStderrLine != nil { stderrDst = newLineHookWriter(stderrDst, opt.OnStderrLine) } cmd.Stdout = stdoutDst cmd.Stderr = stderrDst start := time.Now() if r.cfg.Logger != nil { r.cfg.Logger.Printf("run: %s", formatCmd(name, args)) } err := cmd.Run() if hw, ok := stdoutDst.(interface{ Flush() }); ok { hw.Flush() } if hw, ok := stderrDst.(interface{ Flush() }); ok { hw.Flush() } end := time.Now() res := &Result{ Name: name, Args: append([]string(nil), args...), Stdout: stdoutBuf.String(), Stderr: stderrBuf.String(), Duration: end.Sub(start), StartTime: start, EndTime: end, ExitCode: exitCode(err), } if err != nil { if ctx.Err() == context.DeadlineExceeded { return res, fmt.Errorf("command timed out after %s: %s", timeout, formatCmd(name, args)) } return res, fmt.Errorf("command failed (exit=%d): %s\nstderr:\n%s", res.ExitCode, formatCmd(name, args), trimBlock(res.Stderr)) } return res, nil } func (r *Runner) RunRetry(ctx context.Context, retry RetryOptions, name string, args ...string) (*Result, error) { return r.RunRetryWithOptions(ctx, retry, name, args, RunOptions{}) } func (r *Runner) RunRetryWithOptions(ctx context.Context, retry RetryOptions, name string, args []string, opt RunOptions) (*Result, error) { if retry.Attempts <= 0 { retry.Attempts = 1 } if retry.Delay < 0 { retry.Delay = 0 } var lastRes *Result var lastErr error for attempt := 1; attempt <= retry.Attempts; attempt++ { res, err := r.RunWithOptions(ctx, name, args, opt) lastRes, lastErr = res, err if err == nil { return res, nil } if r.cfg.Logger != nil { r.cfg.Logger.Printf("attempt %d/%d failed: %v", attempt, retry.Attempts, err) } if attempt == retry.Attempts { break } select { case <-ctx.Done(): return lastRes, ctx.Err() case <-time.After(retry.Delay): } } return lastRes, lastErr } type StepFunc func(ctx context.Context, r *Runner) error type Step struct { Name string Description string Retry RetryOptions Run StepFunc } type StepEvent struct { Name string StartTime time.Time EndTime time.Time Duration time.Duration Err error } type StepReporter interface { StepStarted(event StepEvent) StepFinished(event StepEvent) } type Phase struct { Name string Steps []Step } func (r *Runner) RunPhase(ctx context.Context, phase Phase, reporter StepReporter) error { for _, step := range phase.Steps { start := time.Now() if reporter != nil { reporter.StepStarted(StepEvent{ Name: step.Name, StartTime: start, }) } var err error if step.Retry.Attempts > 0 { err = runStepWithRetry(ctx, r, step) } else { err = step.Run(ctx, r) } end := time.Now() if reporter != nil { reporter.StepFinished(StepEvent{ Name: step.Name, StartTime: start, EndTime: end, Duration: end.Sub(start), Err: err, }) } if err != nil { return fmt.Errorf("phase %q step %q failed: %w", phase.Name, step.Name, err) } } return nil } func runStepWithRetry(ctx context.Context, r *Runner, step Step) error { attempts := step.Retry.Attempts if attempts <= 0 { attempts = 1 } var lastErr error for i := 1; i <= attempts; i++ { lastErr = step.Run(ctx, r) if lastErr == nil { return nil } if i == attempts { break } if r.cfg.Logger != nil { r.cfg.Logger.Printf("step %q attempt %d/%d failed: %v", step.Name, i, attempts, lastErr) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(step.Retry.Delay): } } return lastErr } func CheckCommandExists(name string) error { _, err := exec.LookPath(name) if err != nil { return fmt.Errorf("required command not found in PATH: %s", name) } return nil } func mergeEnv(base []string, extra []string) []string { if len(extra) == 0 { return base } m := map[string]string{} for _, kv := range base { k, v, ok := strings.Cut(kv, "=") if ok { m[k] = v } } for _, kv := range extra { k, v, ok := strings.Cut(kv, "=") if ok { m[k] = v } } out := make([]string, 0, len(m)) for k, v := range m { out = append(out, k+"="+v) } return out } func formatCmd(name string, args []string) string { parts := make([]string, 0, len(args)+1) parts = append(parts, shellQuote(name)) for _, a := range args { parts = append(parts, shellQuote(a)) } return strings.Join(parts, " ") } func shellQuote(s string) string { if s == "" { return "''" } if !strings.ContainsAny(s, " \t\n'\"\\$`!&|;<>()[]{}*?~") { return s } return "'" + strings.ReplaceAll(s, "'", `'\''`) + "'" } func trimBlock(s string) string { s = strings.TrimSpace(s) if s == "" { return "(empty)" } return s } func exitCode(err error) int { if err == nil { return 0 } var exitErr *exec.ExitError if errors.As(err, &exitErr) { return exitErr.ExitCode() } return -1 } type StdLogger struct { mu sync.Mutex } func (l *StdLogger) Printf(format string, args ...any) { l.mu.Lock() defer l.mu.Unlock() fmt.Fprintf(os.Stderr, format+"\n", args...) } type lineHookWriter struct { dst io.Writer fn func(string) mu sync.Mutex buf bytes.Buffer } func newLineHookWriter(dst io.Writer, fn func(string)) *lineHookWriter { return &lineHookWriter{ dst: dst, fn: fn, } } func (w *lineHookWriter) Write(p []byte) (int, error) { w.mu.Lock() defer w.mu.Unlock() n, err := w.dst.Write(p) // Keep line processing separate from dst write result. w.buf.Write(p) w.flushCompleteLinesLocked() return n, err } func (w *lineHookWriter) flushCompleteLinesLocked() { r := bufio.NewReader(&w.buf) var pending bytes.Buffer for { line, err := r.ReadString('\n') if err == io.EOF { pending.WriteString(line) break } line = strings.TrimRight(line, "\r\n") w.fn(line) } w.buf.Reset() _, _ = w.buf.Write(pending.Bytes()) } // Flush emits the final unterminated line, if any. // Not strictly required for kubeadm, but useful and correct. func (w *lineHookWriter) Flush() { w.mu.Lock() defer w.mu.Unlock() if w.buf.Len() == 0 { return } w.fn(strings.TrimRight(w.buf.String(), "\r\n")) w.buf.Reset() }