package controller import ( "context" "errors" "fmt" "net" "net/http" "os" "time" "github.com/spf13/cobra" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/klog/v2" mkscontroller "example.com/monok8s/pkg/controller" osupgradectrl "example.com/monok8s/pkg/controller/osupgrade" "example.com/monok8s/pkg/kube" ) type ServerConfig struct { Namespace string `json:"namespace,omitempty"` TLSCertFile string `json:"tlsCertFile,omitempty"` TLSPrivateKeyFile string `json:"tlsPrivateKeyFile,omitempty"` } func NewCmdController(flags *genericclioptions.ConfigFlags) *cobra.Command { var conf ServerConfig cmd := &cobra.Command{ Use: "controller", Short: "Start a controller that handles OSUpgrade resources", RunE: func(cmd *cobra.Command, _ []string) error { ns, _, err := flags.ToRawKubeConfigLoader().Namespace() if err != nil { return err } conf.Namespace = ns ctx := cmd.Context() klog.InfoS("starting controller", "namespace", conf.Namespace) clients, err := kube.NewClients(flags) if err != nil { return err } ctx, cancel := context.WithCancel(ctx) defer cancel() httpErrCh := make(chan error, 1) watchErrCh := make(chan error, 1) go func() { klog.InfoS("starting OSUpgrade watch loop", "namespace", conf.Namespace) watchErrCh <- osupgradectrl.Watch(ctx, clients, conf.Namespace) }() go func() { httpErrCh <- listenAndServe(ctx, clients, conf) }() select { case <-ctx.Done(): klog.InfoS("controller context canceled") return ctx.Err() case err := <-watchErrCh: if err != nil && !errors.Is(err, context.Canceled) { cancel() return err } cancel() return nil case err := <-httpErrCh: if err != nil && !errors.Is(err, context.Canceled) { cancel() return err } cancel() return nil } }, } cmd.Flags().StringVar(&conf.TLSCertFile, "tls-cert-file", conf.TLSCertFile, "File containing x509 Certificate used for serving HTTPS (with intermediate certs, if any, concatenated after server cert).") cmd.Flags().StringVar(&conf.TLSPrivateKeyFile, "tls-private-key-file", conf.TLSPrivateKeyFile, "File containing x509 private key matching --tls-cert-file.") return cmd } func listenAndServe(ctx context.Context, clients *kube.Clients, conf ServerConfig) error { nodeName := os.Getenv("NODE_NAME") controllerServer := mkscontroller.NewServer(ctx, clients, conf.Namespace, nodeName) healthMux := http.NewServeMux() healthMux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok\n")) }) healthMux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok\n")) }) healthAddr := net.JoinHostPort("", "8080") controllerAddr := net.JoinHostPort("", "8443") healthHTTPServer := &http.Server{ Addr: healthAddr, Handler: healthMux, IdleTimeout: 90 * time.Second, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, MaxHeaderBytes: 1 << 20, } controllerHTTPServer := &http.Server{ Addr: controllerAddr, Handler: controllerServer, IdleTimeout: 90 * time.Second, ReadTimeout: 4 * time.Minute, WriteTimeout: 4 * time.Minute, MaxHeaderBytes: 1 << 20, } serverErrCh := make(chan error, 2) go func() { klog.InfoS("starting health HTTP server", "addr", healthAddr) err := healthHTTPServer.ListenAndServe() if err != nil && !errors.Is(err, http.ErrServerClosed) { serverErrCh <- fmt.Errorf("health HTTP server: %w", err) return } serverErrCh <- nil }() go func() { if conf.TLSCertFile != "" { klog.InfoS("starting controller HTTPS server", "addr", controllerAddr, "certFile", conf.TLSCertFile, "keyFile", conf.TLSPrivateKeyFile, ) err := controllerHTTPServer.ListenAndServeTLS(conf.TLSCertFile, conf.TLSPrivateKeyFile) if err != nil && !errors.Is(err, http.ErrServerClosed) { serverErrCh <- fmt.Errorf("controller HTTPS server: %w", err) return } serverErrCh <- nil return } klog.InfoS("starting controller HTTP server", "addr", controllerAddr) err := controllerHTTPServer.ListenAndServe() if err != nil && !errors.Is(err, http.ErrServerClosed) { serverErrCh <- fmt.Errorf("controller HTTP server: %w", err) return } serverErrCh <- nil }() select { case <-ctx.Done(): klog.InfoS("shutting down HTTP servers", "healthAddr", healthAddr, "controllerAddr", controllerAddr, ) shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() var errs []error if err := healthHTTPServer.Shutdown(shutdownCtx); err != nil { errs = append(errs, fmt.Errorf("shutdown health HTTP server: %w", err)) } if err := controllerHTTPServer.Shutdown(shutdownCtx); err != nil { errs = append(errs, fmt.Errorf("shutdown controller HTTP server: %w", err)) } for i := 0; i < 2; i++ { if err := <-serverErrCh; err != nil { errs = append(errs, err) } } if len(errs) > 0 { return errors.Join(errs...) } return context.Canceled case err := <-serverErrCh: if err != nil { klog.ErrorS(err, "HTTP server failed") return err } // One server exited cleanly unexpectedly. Treat that as failure because // the process should keep both servers alive until ctx is canceled. return fmt.Errorf("HTTP server exited unexpectedly") } }