424 lines
8.3 KiB
Go
424 lines
8.3 KiB
Go
package system
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type Logger interface {
|
|
Printf(format string, args ...any)
|
|
}
|
|
|
|
type RunnerConfig struct {
|
|
DefaultTimeout time.Duration
|
|
StreamOutput bool
|
|
Logger Logger
|
|
}
|
|
|
|
type Runner struct {
|
|
cfg RunnerConfig
|
|
}
|
|
|
|
func NewRunner(cfg RunnerConfig) *Runner {
|
|
if cfg.DefaultTimeout <= 0 {
|
|
cfg.DefaultTimeout = 30 * time.Second
|
|
}
|
|
return &Runner{cfg: cfg}
|
|
}
|
|
|
|
type Result struct {
|
|
Name string
|
|
Args []string
|
|
ExitCode int
|
|
Stdout string
|
|
Stderr string
|
|
Duration time.Duration
|
|
StartTime time.Time
|
|
EndTime time.Time
|
|
}
|
|
|
|
type RunOptions struct {
|
|
Dir string
|
|
Env []string
|
|
Timeout time.Duration
|
|
Stdin io.Reader
|
|
Stdout io.Writer
|
|
Stderr io.Writer
|
|
Quiet bool
|
|
RedactEnv []string
|
|
|
|
// Optional line hooks. Called for each complete line seen on stdout/stderr.
|
|
OnStdoutLine func(line string)
|
|
OnStderrLine func(line string)
|
|
}
|
|
|
|
type RetryOptions struct {
|
|
Attempts int
|
|
Delay time.Duration
|
|
}
|
|
|
|
func (r *Runner) Run(ctx context.Context, name string, args ...string) (*Result, error) {
|
|
return r.RunWithOptions(ctx, name, args, RunOptions{})
|
|
}
|
|
|
|
func (r *Runner) RunWithOptions(ctx context.Context, name string, args []string, opt RunOptions) (*Result, error) {
|
|
if strings.TrimSpace(name) == "" {
|
|
return nil, errors.New("command name cannot be empty")
|
|
}
|
|
|
|
timeout := opt.Timeout
|
|
if timeout <= 0 {
|
|
timeout = r.cfg.DefaultTimeout
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
|
|
cmd := exec.CommandContext(ctx, name, args...)
|
|
cmd.Dir = opt.Dir
|
|
cmd.Env = mergeEnv(os.Environ(), opt.Env)
|
|
cmd.Stdin = opt.Stdin
|
|
|
|
var stdoutBuf bytes.Buffer
|
|
var stderrBuf bytes.Buffer
|
|
|
|
stdoutDst := io.Writer(&stdoutBuf)
|
|
stderrDst := io.Writer(&stderrBuf)
|
|
|
|
if opt.Stdout != nil {
|
|
stdoutDst = io.MultiWriter(stdoutDst, opt.Stdout)
|
|
} else if r.cfg.StreamOutput && !opt.Quiet {
|
|
stdoutDst = io.MultiWriter(stdoutDst, os.Stdout)
|
|
}
|
|
|
|
if opt.Stderr != nil {
|
|
stderrDst = io.MultiWriter(stderrDst, opt.Stderr)
|
|
} else if r.cfg.StreamOutput && !opt.Quiet {
|
|
stderrDst = io.MultiWriter(stderrDst, os.Stderr)
|
|
}
|
|
|
|
if opt.OnStdoutLine != nil {
|
|
stdoutDst = newLineHookWriter(stdoutDst, opt.OnStdoutLine)
|
|
}
|
|
if opt.OnStderrLine != nil {
|
|
stderrDst = newLineHookWriter(stderrDst, opt.OnStderrLine)
|
|
}
|
|
|
|
cmd.Stdout = stdoutDst
|
|
cmd.Stderr = stderrDst
|
|
|
|
start := time.Now()
|
|
if r.cfg.Logger != nil {
|
|
r.cfg.Logger.Printf("run: %s", formatCmd(name, args))
|
|
}
|
|
|
|
err := cmd.Run()
|
|
if hw, ok := stdoutDst.(interface{ Flush() }); ok {
|
|
hw.Flush()
|
|
}
|
|
if hw, ok := stderrDst.(interface{ Flush() }); ok {
|
|
hw.Flush()
|
|
}
|
|
|
|
end := time.Now()
|
|
|
|
res := &Result{
|
|
Name: name,
|
|
Args: append([]string(nil), args...),
|
|
Stdout: stdoutBuf.String(),
|
|
Stderr: stderrBuf.String(),
|
|
Duration: end.Sub(start),
|
|
StartTime: start,
|
|
EndTime: end,
|
|
ExitCode: exitCode(err),
|
|
}
|
|
|
|
if err != nil {
|
|
if ctx.Err() == context.DeadlineExceeded {
|
|
return res, fmt.Errorf("command timed out after %s: %s", timeout, formatCmd(name, args))
|
|
}
|
|
return res, fmt.Errorf("command failed (exit=%d): %s\nstderr:\n%s",
|
|
res.ExitCode, formatCmd(name, args), trimBlock(res.Stderr))
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (r *Runner) RunRetry(ctx context.Context, retry RetryOptions, name string, args ...string) (*Result, error) {
|
|
return r.RunRetryWithOptions(ctx, retry, name, args, RunOptions{})
|
|
}
|
|
|
|
func (r *Runner) RunRetryWithOptions(ctx context.Context, retry RetryOptions, name string, args []string, opt RunOptions) (*Result, error) {
|
|
if retry.Attempts <= 0 {
|
|
retry.Attempts = 1
|
|
}
|
|
if retry.Delay < 0 {
|
|
retry.Delay = 0
|
|
}
|
|
|
|
var lastRes *Result
|
|
var lastErr error
|
|
|
|
for attempt := 1; attempt <= retry.Attempts; attempt++ {
|
|
res, err := r.RunWithOptions(ctx, name, args, opt)
|
|
lastRes, lastErr = res, err
|
|
if err == nil {
|
|
return res, nil
|
|
}
|
|
|
|
if r.cfg.Logger != nil {
|
|
r.cfg.Logger.Printf("attempt %d/%d failed: %v", attempt, retry.Attempts, err)
|
|
}
|
|
|
|
if attempt == retry.Attempts {
|
|
break
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return lastRes, ctx.Err()
|
|
case <-time.After(retry.Delay):
|
|
}
|
|
}
|
|
|
|
return lastRes, lastErr
|
|
}
|
|
|
|
type StepFunc func(ctx context.Context, r *Runner) error
|
|
|
|
type Step struct {
|
|
Name string
|
|
Description string
|
|
Retry RetryOptions
|
|
Run StepFunc
|
|
}
|
|
|
|
type StepEvent struct {
|
|
Name string
|
|
StartTime time.Time
|
|
EndTime time.Time
|
|
Duration time.Duration
|
|
Err error
|
|
}
|
|
|
|
type StepReporter interface {
|
|
StepStarted(event StepEvent)
|
|
StepFinished(event StepEvent)
|
|
}
|
|
|
|
type Phase struct {
|
|
Name string
|
|
Steps []Step
|
|
}
|
|
|
|
func (r *Runner) RunPhase(ctx context.Context, phase Phase, reporter StepReporter) error {
|
|
for _, step := range phase.Steps {
|
|
start := time.Now()
|
|
if reporter != nil {
|
|
reporter.StepStarted(StepEvent{
|
|
Name: step.Name,
|
|
StartTime: start,
|
|
})
|
|
}
|
|
|
|
var err error
|
|
if step.Retry.Attempts > 0 {
|
|
err = runStepWithRetry(ctx, r, step)
|
|
} else {
|
|
err = step.Run(ctx, r)
|
|
}
|
|
|
|
end := time.Now()
|
|
if reporter != nil {
|
|
reporter.StepFinished(StepEvent{
|
|
Name: step.Name,
|
|
StartTime: start,
|
|
EndTime: end,
|
|
Duration: end.Sub(start),
|
|
Err: err,
|
|
})
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("phase %q step %q failed: %w", phase.Name, step.Name, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func runStepWithRetry(ctx context.Context, r *Runner, step Step) error {
|
|
attempts := step.Retry.Attempts
|
|
if attempts <= 0 {
|
|
attempts = 1
|
|
}
|
|
|
|
var lastErr error
|
|
for i := 1; i <= attempts; i++ {
|
|
lastErr = step.Run(ctx, r)
|
|
if lastErr == nil {
|
|
return nil
|
|
}
|
|
if i == attempts {
|
|
break
|
|
}
|
|
if r.cfg.Logger != nil {
|
|
r.cfg.Logger.Printf("step %q attempt %d/%d failed: %v", step.Name, i, attempts, lastErr)
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(step.Retry.Delay):
|
|
}
|
|
}
|
|
return lastErr
|
|
}
|
|
|
|
func CheckCommandExists(name string) error {
|
|
_, err := exec.LookPath(name)
|
|
if err != nil {
|
|
return fmt.Errorf("required command not found in PATH: %s", name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func mergeEnv(base []string, extra []string) []string {
|
|
if len(extra) == 0 {
|
|
return base
|
|
}
|
|
m := map[string]string{}
|
|
for _, kv := range base {
|
|
k, v, ok := strings.Cut(kv, "=")
|
|
if ok {
|
|
m[k] = v
|
|
}
|
|
}
|
|
for _, kv := range extra {
|
|
k, v, ok := strings.Cut(kv, "=")
|
|
if ok {
|
|
m[k] = v
|
|
}
|
|
}
|
|
out := make([]string, 0, len(m))
|
|
for k, v := range m {
|
|
out = append(out, k+"="+v)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func formatCmd(name string, args []string) string {
|
|
parts := make([]string, 0, len(args)+1)
|
|
parts = append(parts, shellQuote(name))
|
|
for _, a := range args {
|
|
parts = append(parts, shellQuote(a))
|
|
}
|
|
return strings.Join(parts, " ")
|
|
}
|
|
|
|
func shellQuote(s string) string {
|
|
if s == "" {
|
|
return "''"
|
|
}
|
|
if !strings.ContainsAny(s, " \t\n'\"\\$`!&|;<>()[]{}*?~") {
|
|
return s
|
|
}
|
|
return "'" + strings.ReplaceAll(s, "'", `'\''`) + "'"
|
|
}
|
|
|
|
func trimBlock(s string) string {
|
|
s = strings.TrimSpace(s)
|
|
if s == "" {
|
|
return "(empty)"
|
|
}
|
|
return s
|
|
}
|
|
|
|
func exitCode(err error) int {
|
|
if err == nil {
|
|
return 0
|
|
}
|
|
|
|
var exitErr *exec.ExitError
|
|
if errors.As(err, &exitErr) {
|
|
return exitErr.ExitCode()
|
|
}
|
|
|
|
return -1
|
|
}
|
|
|
|
type StdLogger struct {
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func (l *StdLogger) Printf(format string, args ...any) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
fmt.Fprintf(os.Stderr, format+"\n", args...)
|
|
}
|
|
|
|
type lineHookWriter struct {
|
|
dst io.Writer
|
|
fn func(string)
|
|
mu sync.Mutex
|
|
buf bytes.Buffer
|
|
}
|
|
|
|
func newLineHookWriter(dst io.Writer, fn func(string)) *lineHookWriter {
|
|
return &lineHookWriter{
|
|
dst: dst,
|
|
fn: fn,
|
|
}
|
|
}
|
|
|
|
func (w *lineHookWriter) Write(p []byte) (int, error) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
n, err := w.dst.Write(p)
|
|
|
|
// Keep line processing separate from dst write result.
|
|
w.buf.Write(p)
|
|
w.flushCompleteLinesLocked()
|
|
|
|
return n, err
|
|
}
|
|
|
|
func (w *lineHookWriter) flushCompleteLinesLocked() {
|
|
r := bufio.NewReader(&w.buf)
|
|
var pending bytes.Buffer
|
|
|
|
for {
|
|
line, err := r.ReadString('\n')
|
|
if err == io.EOF {
|
|
pending.WriteString(line)
|
|
break
|
|
}
|
|
line = strings.TrimRight(line, "\r\n")
|
|
w.fn(line)
|
|
}
|
|
|
|
w.buf.Reset()
|
|
_, _ = w.buf.Write(pending.Bytes())
|
|
}
|
|
|
|
// Flush emits the final unterminated line, if any.
|
|
// Not strictly required for kubeadm, but useful and correct.
|
|
func (w *lineHookWriter) Flush() {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if w.buf.Len() == 0 {
|
|
return
|
|
}
|
|
w.fn(strings.TrimRight(w.buf.String(), "\r\n"))
|
|
w.buf.Reset()
|
|
}
|