Throttle disk write to prevent etcd puking during upgrade

This commit is contained in:
2026-04-07 17:34:38 +08:00
parent 11e2c96173
commit bc4b124246
17 changed files with 639 additions and 25 deletions

View File

@@ -10,7 +10,8 @@ apk add alpine-base \
# For diagnotics
apk add \
iproute2 iproute2-ss curl bind-tools procps strace tcpdump lsof jq binutils \
openssl conntrack-tools ethtool findmnt kmod coreutils util-linux zstd libcap-utils
openssl conntrack-tools ethtool findmnt kmod coreutils util-linux zstd libcap-utils \
iotop sysstat
echo '[ -x /bin/bash ] && exec /bin/bash -l' >> "/root/.profile"
# Compat layer for kubelet for now. Will look into building it myself later. If needed

View File

@@ -5,8 +5,8 @@ TAG=dev
# The Linux kernel, from NXP
NXP_VERSION=lf-6.18.2-1.0.0
CRIO_VERSION=cri-o.arm64.v1.33.10
KUBE_VERSION=v1.33.10
CRIO_VERSION=cri-o.arm64.v1.33.3
KUBE_VERSION=v1.33.3
# Mono's tutorial said fsl-ls1046a-rdb.dtb but our shipped board is not that one
# We need fsl-ls1046a-rdb-sdk.dtb here

View File

@@ -49,6 +49,9 @@ func NewRegistry(ctx *node.NodeContext) *Registry {
"ValidateNodeIPAndAPIServerReachability": node.ValidateNodeIPAndAPIServerReachability,
"ValidateRequiredImagesPresent": node.ValidateRequiredImagesPresent,
"WaitForExistingClusterIfNeeded": node.WaitForExistingClusterIfNeeded,
// Diagnostics
"DiagTestDiskWrite": node.DiagTestDiskWrite,
},
}
}

View File

@@ -8,6 +8,7 @@ type VersionCatalog struct {
type CatalogImage struct {
Version string `json:"version" yaml:"version"`
Patch int `json:"patch" yaml:"version"`
URL string `json:"url" yaml:"url"`
Checksum string `json:"checksum,omitempty" yaml:"checksum,omitempty"`
Size int64 `json:"size,omitempty" yaml:"size,omitempty"`

View File

@@ -5,11 +5,30 @@ import (
"fmt"
"io"
"os"
"time"
)
const (
defaultWriteBufferSize = 1 * 1024 * 1024
defaultMinWriteBPS = int64(2 * 1024 * 1024)
defaultInitialWriteBPS = int64(4 * 1024 * 1024)
defaultMaxWriteBPS = int64(8 * 1024 * 1024)
defaultBurstBytes = int64(512 * 1024)
defaultSampleInterval = 250 * time.Millisecond
defaultSyncEveryBytes = 0
defaultBusyHighPct = 80.0
defaultBusyLowPct = 40.0
defaultSlowAwait = 20 * time.Millisecond
defaultFastAwait = 5 * time.Millisecond
)
func WriteStreamToTarget(ctx context.Context,
src io.Reader,
targetPath string,
src io.Reader, targetPath string,
expectedSize int64, bufferSize int,
progress ProgressFunc,
) (int64, error) {
@@ -17,7 +36,7 @@ func WriteStreamToTarget(ctx context.Context,
return 0, fmt.Errorf("target path is required")
}
if bufferSize <= 0 {
bufferSize = 4 * 1024 * 1024
bufferSize = defaultWriteBufferSize
}
f, err := os.OpenFile(targetPath, os.O_WRONLY, 0)
@@ -26,7 +45,22 @@ func WriteStreamToTarget(ctx context.Context,
}
defer f.Close()
written, err := copyWithProgressBuffer(ctx, f, src, expectedSize, "flash", progress, make([]byte, bufferSize))
ctrl, err := newAdaptiveWriteController(targetPath)
if err != nil {
ctrl = newNoopAdaptiveWriteController()
}
written, err := copyWithProgressBuffer(
ctx,
f,
src,
expectedSize,
"flash",
progress,
make([]byte, bufferSize),
ctrl,
defaultSyncEveryBytes,
)
if err != nil {
return written, err
}
@@ -42,8 +76,19 @@ func WriteStreamToTarget(ctx context.Context,
return written, nil
}
func copyWithProgressBuffer(ctx context.Context, dst io.Writer, src io.Reader, total int64, stage string, progress ProgressFunc, buf []byte) (int64, error) {
func copyWithProgressBuffer(
ctx context.Context,
dst *os.File,
src io.Reader,
total int64,
stage string,
progress ProgressFunc,
buf []byte,
ctrl *adaptiveWriteController,
syncEvery int64,
) (int64, error) {
var written int64
var sinceSync int64
for {
select {
@@ -54,9 +99,21 @@ func copyWithProgressBuffer(ctx context.Context, dst io.Writer, src io.Reader, t
nr, er := src.Read(buf)
if nr > 0 {
if ctrl != nil {
if err := ctrl.Wait(ctx, nr); err != nil {
return written, err
}
}
nw, ew := dst.Write(buf[:nr])
if nw > 0 {
written += int64(nw)
sinceSync += int64(nw)
if ctrl != nil {
ctrl.ObserveWrite(nw)
}
if progress != nil {
progress(Progress{
Stage: stage,
@@ -64,7 +121,19 @@ func copyWithProgressBuffer(ctx context.Context, dst io.Writer, src io.Reader, t
BytesTotal: total,
})
}
if syncEvery > 0 && sinceSync >= syncEvery {
if err := dst.Sync(); err != nil {
return written, fmt.Errorf("periodic sync target: %w", err)
}
sinceSync = 0
if ctrl != nil {
ctrl.ObserveSync()
}
}
}
if ew != nil {
return written, ew
}
@@ -72,6 +141,7 @@ func copyWithProgressBuffer(ctx context.Context, dst io.Writer, src io.Reader, t
return written, io.ErrShortWrite
}
}
if er != nil {
if er == io.EOF {
return written, nil

View File

@@ -0,0 +1,106 @@
package osimage
import (
"context"
"fmt"
"io"
"time"
"k8s.io/klog/v2"
)
type repeatPatternReader struct {
pattern []byte
remain int64
off int
}
func newRepeatPatternReader(total int64, pattern []byte) *repeatPatternReader {
if len(pattern) == 0 {
pattern = []byte("monok8s-test-pattern-0123456789abcdef")
}
return &repeatPatternReader{
pattern: pattern,
remain: total,
}
}
func (r *repeatPatternReader) Read(p []byte) (int, error) {
if r.remain <= 0 {
return 0, io.EOF
}
if int64(len(p)) > r.remain {
p = p[:r.remain]
}
n := 0
for n < len(p) {
copied := copy(p[n:], r.pattern[r.off:])
n += copied
r.off += copied
if r.off == len(r.pattern) {
r.off = 0
}
}
r.remain -= int64(n)
return n, nil
}
func TestStreamToTarget(ctx context.Context, targetPath string) error {
const (
totalSize = int64(512 * 1024 * 1024) // 512 MiB
bufferSize = 128 * 1024 // test the conservative setting
)
src := newRepeatPatternReader(totalSize, nil)
start := time.Now()
lastLog := start
progress := func(p Progress) {
now := time.Now()
if now.Sub(lastLog) < 1*time.Second && p.BytesComplete != p.BytesTotal {
return
}
lastLog = now
var mbps float64
elapsed := now.Sub(start).Seconds()
if elapsed > 0 {
mbps = float64(p.BytesComplete) / 1024.0 / 1024.0 / elapsed
}
klog.InfoS("test write progress",
"stage", p.Stage,
"bytesComplete", p.BytesComplete,
"bytesTotal", p.BytesTotal,
"mbpsAvg", fmt.Sprintf("%.2f", mbps),
)
}
written, err := WriteStreamToTarget(
ctx,
src,
targetPath,
totalSize,
bufferSize,
progress,
)
if err != nil {
return fmt.Errorf("write stream to target: %w", err)
}
elapsed := time.Since(start)
mbps := float64(written) / 1024.0 / 1024.0 / elapsed.Seconds()
klog.InfoS("test write complete",
"targetPath", targetPath,
"written", written,
"elapsed", elapsed.String(),
"mbpsAvg", fmt.Sprintf("%.2f", mbps),
)
return nil
}

View File

@@ -0,0 +1,400 @@
//go:build linux
package osimage
import (
"bufio"
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"golang.org/x/sys/unix"
)
type adaptiveWriteController struct {
mu sync.Mutex
limiter *rateLimiter
monitor *diskBusyMonitor
sampleInterval time.Duration
nextSampleAt time.Time
minBPS int64
maxBPS int64
busyHighPct float64
busyLowPct float64
}
func newAdaptiveWriteController(targetPath string) (*adaptiveWriteController, error) {
mon, err := newDiskBusyMonitor(targetPath)
if err != nil {
return nil, err
}
now := time.Now()
return &adaptiveWriteController{
limiter: newRateLimiter(defaultInitialWriteBPS, defaultBurstBytes),
monitor: mon,
sampleInterval: defaultSampleInterval,
nextSampleAt: now.Add(defaultSampleInterval),
minBPS: defaultMinWriteBPS,
maxBPS: defaultMaxWriteBPS,
busyHighPct: defaultBusyHighPct,
busyLowPct: defaultBusyLowPct,
}, nil
}
func newNoopAdaptiveWriteController() *adaptiveWriteController {
return &adaptiveWriteController{
limiter: newRateLimiter(0, 0),
sampleInterval: defaultSampleInterval,
}
}
func (c *adaptiveWriteController) Wait(ctx context.Context, n int) error {
if c == nil || c.limiter == nil {
return nil
}
return c.limiter.Wait(ctx, n)
}
func (c *adaptiveWriteController) ObserveWrite(n int) {
c.observe(false)
}
func (c *adaptiveWriteController) ObserveSync() {
c.observe(true)
}
func (c *adaptiveWriteController) observe(afterSync bool) {
if c == nil {
return
}
c.mu.Lock()
defer c.mu.Unlock()
now := time.Now()
if c.monitor == nil || now.Before(c.nextSampleAt) {
return
}
c.nextSampleAt = now.Add(c.sampleInterval)
s, err := c.monitor.Sample(now)
if err != nil {
return
}
cur := c.limiter.Rate()
if cur <= 0 {
cur = c.minBPS
}
switch {
case s.UtilPct >= c.busyHighPct || s.Await >= defaultSlowAwait || afterSync:
// Back off aggressively when the disk is obviously suffering.
next := cur / 2
if next < c.minBPS {
next = c.minBPS
}
c.limiter.SetRate(next)
case s.UtilPct <= c.busyLowPct && s.Await <= defaultFastAwait:
// Recover slowly.
next := cur + (cur / 5) // +20%
if next > c.maxBPS {
next = c.maxBPS
}
c.limiter.SetRate(next)
}
}
type rateLimiter struct {
mu sync.Mutex
rateBPS int64
burst int64
tokens float64
last time.Time
}
func newRateLimiter(rateBPS, burst int64) *rateLimiter {
now := time.Now()
if burst < 0 {
burst = 0
}
return &rateLimiter{
rateBPS: rateBPS,
burst: burst,
tokens: float64(burst),
last: now,
}
}
func (r *rateLimiter) Rate() int64 {
r.mu.Lock()
defer r.mu.Unlock()
return r.rateBPS
}
func (r *rateLimiter) SetRate(rateBPS int64) {
r.mu.Lock()
defer r.mu.Unlock()
r.refillLocked(time.Now())
r.rateBPS = rateBPS
if rateBPS <= 0 {
r.tokens = 0
r.burst = 0
return
}
// Keep burst small and fixed. Do not let burst scale with rate.
r.burst = defaultBurstBytes
if r.tokens > float64(r.burst) {
r.tokens = float64(r.burst)
}
}
func (r *rateLimiter) Wait(ctx context.Context, n int) error {
if n <= 0 {
return nil
}
remaining := n
for remaining > 0 {
r.mu.Lock()
if r.rateBPS <= 0 {
r.mu.Unlock()
return nil
}
now := time.Now()
r.refillLocked(now)
allowed := remaining
if int64(allowed) > r.burst && r.burst > 0 {
allowed = int(r.burst)
}
if allowed <= 0 {
allowed = remaining
}
if r.tokens >= float64(allowed) {
r.tokens -= float64(allowed)
r.mu.Unlock()
remaining -= allowed
continue
}
missing := float64(allowed) - r.tokens
waitDur := time.Duration(missing / float64(r.rateBPS) * float64(time.Second))
if waitDur < 5*time.Millisecond {
waitDur = 5 * time.Millisecond
}
r.mu.Unlock()
timer := time.NewTimer(waitDur)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
}
}
return nil
}
func (r *rateLimiter) refillLocked(now time.Time) {
if r.rateBPS <= 0 {
r.last = now
return
}
elapsed := now.Sub(r.last)
if elapsed <= 0 {
return
}
r.tokens += elapsed.Seconds() * float64(r.rateBPS)
if r.tokens > float64(r.burst) {
r.tokens = float64(r.burst)
}
r.last = now
}
type diskBusySample struct {
UtilPct float64
Await time.Duration
}
type diskBusyMonitor struct {
major int
minor int
lastAt time.Time
lastIOMs uint64
lastWrites uint64
}
func newDiskBusyMonitor(targetPath string) (*diskBusyMonitor, error) {
major, minor, err := resolveWholeDiskMajorMinor(targetPath)
if err != nil {
return nil, err
}
ioMs, writes, err := readDiskStats(major, minor)
if err != nil {
return nil, err
}
return &diskBusyMonitor{
major: major,
minor: minor,
lastAt: time.Now(),
lastIOMs: ioMs,
lastWrites: writes,
}, nil
}
func (m *diskBusyMonitor) Sample(now time.Time) (diskBusySample, error) {
ioMs, writes, err := readDiskStats(m.major, m.minor)
if err != nil {
return diskBusySample{}, err
}
elapsedMs := now.Sub(m.lastAt).Milliseconds()
if elapsedMs <= 0 {
return diskBusySample{}, nil
}
deltaIOMs := int64(ioMs - m.lastIOMs)
deltaWrites := int64(writes - m.lastWrites)
m.lastAt = now
m.lastIOMs = ioMs
m.lastWrites = writes
util := float64(deltaIOMs) * 100 / float64(elapsedMs)
if util < 0 {
util = 0
}
if util > 100 {
util = 100
}
var await time.Duration
if deltaWrites > 0 {
await = time.Duration(deltaIOMs/int64(deltaWrites)) * time.Millisecond
}
return diskBusySample{
UtilPct: util,
Await: await,
}, nil
}
func resolveWholeDiskMajorMinor(targetPath string) (int, int, error) {
var st unix.Stat_t
if err := unix.Stat(targetPath, &st); err != nil {
return 0, 0, fmt.Errorf("stat target %q: %w", targetPath, err)
}
if st.Mode&unix.S_IFMT != unix.S_IFBLK {
return 0, 0, fmt.Errorf("target %q is not a block device", targetPath)
}
major := int(unix.Major(uint64(st.Rdev)))
minor := int(unix.Minor(uint64(st.Rdev)))
sysfsPath := fmt.Sprintf("/sys/dev/block/%d:%d", major, minor)
resolved, err := filepath.EvalSymlinks(sysfsPath)
if err != nil {
return major, minor, nil
}
// Partition path usually looks like .../block/sda/sda3
// Parent whole disk is .../block/sda
parent := filepath.Dir(resolved)
devName := filepath.Base(parent)
ueventPath := filepath.Join(parent, "dev")
data, err := os.ReadFile(ueventPath)
if err != nil {
return major, minor, nil
}
parts := strings.Split(strings.TrimSpace(string(data)), ":")
if len(parts) != 2 {
return major, minor, nil
}
parentMajor, err1 := strconv.Atoi(parts[0])
parentMinor, err2 := strconv.Atoi(parts[1])
if err1 != nil || err2 != nil || devName == "" {
return major, minor, nil
}
return parentMajor, parentMinor, nil
}
func readDiskStats(major, minor int) (ioMs uint64, writesCompleted uint64, err error) {
f, err := os.Open("/proc/diskstats")
if err != nil {
return 0, 0, fmt.Errorf("open /proc/diskstats: %w", err)
}
defer f.Close()
sc := bufio.NewScanner(f)
for sc.Scan() {
line := strings.Fields(sc.Text())
if len(line) < 14 {
continue
}
maj, err := strconv.Atoi(line[0])
if err != nil {
continue
}
min, err := strconv.Atoi(line[1])
if err != nil {
continue
}
if maj != major || min != minor {
continue
}
// writes completed successfully: field 5, index 4
writesCompleted, err = strconv.ParseUint(line[4], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse writes completed for %d:%d: %w", major, minor, err)
}
// time spent doing I/Os (ms): field 13, index 12
ioMs, err = strconv.ParseUint(line[12], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parse io_ms for %d:%d: %w", major, minor, err)
}
return ioMs, writesCompleted, nil
}
if err := sc.Err(); err != nil {
return 0, 0, fmt.Errorf("scan /proc/diskstats: %w", err)
}
return 0, 0, fmt.Errorf("device %d:%d not found in /proc/diskstats", major, minor)
}

View File

@@ -0,0 +1,18 @@
//go:build !linux
package osimage
import "context"
type adaptiveWriteController struct{}
func newAdaptiveWriteController(string) (*adaptiveWriteController, error) {
return &adaptiveWriteController{}, nil
}
func newNoopAdaptiveWriteController() *adaptiveWriteController {
return &adaptiveWriteController{}
}
func (c *adaptiveWriteController) Wait(ctx context.Context, n int) error { return nil }
func (c *adaptiveWriteController) ObserveWrite(n int, dur interface{}) {}

View File

@@ -93,6 +93,7 @@ func handleOSUpgradeLocked(ctx context.Context, clients *kube.Clients,
updated, err := updateProgressRobust(ctx, clients, osup.Namespace, osup.Name, func(cur *monov1alpha1.OSUpgradeProgress) {
now := metav1.Now()
cur.Status.CurrentVersion = buildinfo.KubeVersion
cur.Status.TargetVersion = plan.ResolvedTarget
cur.Status.Phase = monov1alpha1.OSUpgradeProgressPhaseDownloading
cur.Status.Message = fmt.Sprintf("downloading image: %s", first.URL)
@@ -134,6 +135,9 @@ func handleOSUpgradeLocked(ctx context.Context, clients *kube.Clients,
pLogger.Log(p)
if err := statusUpdater.Run(func() error {
klog.Infof("%s: %d%%", p.Stage, osimage.PercentOf(p.BytesComplete, p.BytesTotal))
updated, err := updateProgressRobust(ctx, clients, osup.Namespace, osup.Name, func(cur *monov1alpha1.OSUpgradeProgress) {
now := metav1.Now()

View File

@@ -7,6 +7,7 @@ import (
"path/filepath"
monov1alpha1 "example.com/monok8s/pkg/apis/monok8s/v1alpha1"
"example.com/monok8s/pkg/controller/osimage"
)
func EngageControlGate(ctx context.Context, nctx *NodeContext) error {
@@ -30,3 +31,7 @@ func ReleaseControlGate(ctx context.Context, nctx *NodeContext) error {
return nil
}
func DiagTestDiskWrite(ctx context.Context, nctx *NodeContext) error {
return osimage.TestStreamToTarget(ctx, monov1alpha1.AltPartDeviceLink)
}

View File

@@ -96,9 +96,6 @@ func resolvePauseImage(ctx context.Context, nctx *NodeContext, kubeVersion strin
continue
}
// examples:
// registry.k8s.io/pause:3.10
// some.registry.local/pause:3.10
if strings.Contains(line, "/pause:") || strings.HasPrefix(line, "pause:") {
return line, nil
}

View File

@@ -36,7 +36,7 @@ func MountAltImageStore(ctx context.Context, nctx *NodeContext) error {
[]string{"-o", "ro", altDev, altRootMount},
system.RunOptions{Timeout: 30 * time.Second},
); err != nil {
return fmt.Errorf("mount alt rootfs %s on %s: %w", altDev, altRootMount, err)
klog.Errorf("mount alt rootfs %s on %s: %w", altDev, altRootMount, err)
}
}

View File

@@ -32,7 +32,7 @@ func ConfigureABBoot(ctx context.Context, nctx *node.NodeContext) error {
return fmt.Errorf("read boot state: %w", err)
}
bootPart := state["BOOT_PART"]
bootPart = state["BOOT_PART"]
if bootPart == "" {
return fmt.Errorf("BOOT_PART missing")
}

View File

@@ -70,6 +70,6 @@ fi;
setenv bootdev 0:${rootpart};
setenv rootdev ${boot_source}:${rootpart};
setenv bootargs "${bootargs_console} root=${rootdev} bootpart=${boot_part} rw rootwait rootfstype=ext4";
setenv bootargs "${bootargs_console} root=${rootdev} bootpart=${boot_part} rw rootwait rootfstype=ext4 delayacct";
ext4load ${boot_iface} ${bootdev} ${kernel_addr_r} /boot/kernel.itb && bootm ${kernel_addr_r};
`

View File

@@ -25,18 +25,9 @@ flash-emmc.sh
6. Reboot into uboot, boot using the following commands
```
setenv boot_source emmc
setenv boot_part A
setenv rootpart 2;
setenv bootdev 0:${rootpart};
setenv rootdev emmc:2;
setenv kernel_addr_r 0xa0000000;
setenv bootargs "${bootargs_console} root=${rootdev} bootpart=${boot_part} rw rootwait rootfstype=ext4";
ext4load mmc ${bootdev} ${kernel_addr_r} /boot/kernel.itb && bootm ${kernel_addr_r};'
setenv bootargs "${bootargs_console} root=emmc:2 bootpart=A rw rootwait delayacct rootfstype=ext4";
ext4load mmc 0:2 ${kernel_addr_r} /boot/kernel.itb && bootm ${kernel_addr_r};
```
7. tail /var/log/monok8s/bootstrap.log
@@ -47,3 +38,12 @@ On MacOS
1. ./macos/flashusb.sh
On Windows (Work In Progress)
```
usb start;
setenv kernel_addr_r 0xa0000000;
setenv bootargs "${bootargs_console} root=usb:2 bootpart=A rw rootwait delayacct rootfstype=ext4";
ext4load usb 0:2 ${kernel_addr_r} /boot/kernel.itb && bootm ${kernel_addr_r};
```

View File

@@ -235,6 +235,9 @@ fi
if [ -z "$ROOT_DEV" ]; then
ROOT_DEV="$(find_fallback_root_for_slot "$BOOT_PART" || true)"
if [ -n "$ROOT_DEV" ]; then
if [ -z "$BOOT_PART" ]; then
BOOT_PART=A
fi
log "Preferred root not found. Falling back to first valid root device: $ROOT_DEV"
fi
fi

View File

@@ -358,3 +358,9 @@ CONFIG_DEVTMPFS_MOUNT=y
CONFIG_XEN=n
CONFIG_XEN_DOM0=n
CONFIG_VHOST_XEN=n
### For Disk IO diagnostics
CONFIG_TASK_DELAY_ACCT=y
CONFIG_TASK_IO_ACCOUNTING=y
CONFIG_TASKSTATS=y
CONFIG_TASK_XACCT=y