]> ruderich.org/simon Gitweb - safcm/safcm.git/blobdiff - cmd/safcm/sync.go
Use SPDX license identifiers
[safcm/safcm.git] / cmd / safcm / sync.go
index 6f38daf37d100713fa00340b4aa54321944cd4a8..69431435e5e9f3066ebf82aa4ebb8add1afbacb0 100644 (file)
@@ -1,37 +1,25 @@
 // "sync" sub-command: sync data to remote hosts
 
-// Copyright (C) 2021  Simon Ruderich
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-// GNU General Public License for more details.
-//
-// You should have received a copy of the GNU General Public License
-// along with this program.  If not, see <http://www.gnu.org/licenses/>.
+// SPDX-License-Identifier: GPL-3.0-or-later
+// Copyright (C) 2021-2024  Simon Ruderich
 
 package main
 
 import (
        "flag"
        "fmt"
+       "io/fs"
        "log"
        "os"
-       "os/signal"
        "runtime"
        "sort"
        "strings"
-       "sync"
 
        "golang.org/x/term"
 
        "ruderich.org/simon/safcm"
        "ruderich.org/simon/safcm/cmd/safcm/config"
+       "ruderich.org/simon/safcm/frontend"
        "ruderich.org/simon/safcm/rpc"
 )
 
@@ -42,25 +30,10 @@ type Sync struct {
        allHosts  *config.Hosts       // known hosts
        allGroups map[string][]string // known groups
 
-       events chan<- Event // all events generated by/for this host
-
        isTTY bool
-}
-
-type Event struct {
-       Host *config.Host
-
-       // Only one of Error, Log and ConnEvent is set in a single event
-       Error     error
-       Log       Log
-       ConnEvent rpc.ConnEvent
-
-       Escaped bool // true if untrusted input is already escaped
-}
 
-type Log struct {
-       Level safcm.LogLevel
-       Text  string
+       loop    *frontend.Loop
+       logFunc func(level safcm.LogLevel, escaped bool, msg string)
 }
 
 func MainSync(args []string) error {
@@ -80,24 +53,11 @@ func MainSync(args []string) error {
        optionSshConfig := flag.String("sshconfig", "",
                "`path` to ssh configuration file; used for tests")
 
-       flag.CommandLine.Parse(args[2:])
-
-       var level safcm.LogLevel
-       switch *optionLog {
-       case "error":
-               level = safcm.LogError
-       case "info":
-               level = safcm.LogInfo
-       case "verbose":
-               level = safcm.LogVerbose
-       case "debug":
-               level = safcm.LogDebug
-       case "debug2":
-               level = safcm.LogDebug2
-       case "debug3":
-               level = safcm.LogDebug3
-       default:
-               return fmt.Errorf("invalid -log value %q", *optionLog)
+       flag.CommandLine.Parse(args[2:]) //nolint:errcheck
+
+       level, err := safcm.ParseLogLevel(*optionLog)
+       if err != nil {
+               return fmt.Errorf("-log: %v", err)
        }
 
        names := flag.Args()
@@ -130,90 +90,36 @@ func MainSync(args []string) error {
        isTTY := term.IsTerminal(int(os.Stdout.Fd())) &&
                term.IsTerminal(int(os.Stderr.Fd()))
 
-       done := make(chan bool)
-       // Collect events from all hosts and print them
-       events := make(chan Event)
-       go func() {
-               var failed bool
-               for {
-                       x := <-events
-                       if x.Host == nil {
-                               break
-                       }
-                       logEvent(x, cfg.LogLevel, isTTY, &failed)
-               }
-               done <- failed
-       }()
-
-       hostsLeft := make(map[string]bool)
-       for _, x := range toSync {
-               hostsLeft[x.Name] = true
+       loop := &frontend.Loop{
+               DebugConn: cfg.LogLevel >= safcm.LogDebug3,
+               LogEventFunc: func(x frontend.Event, failed *bool) {
+                       frontend.LogEvent(x, cfg.LogLevel, isTTY, failed)
+               },
+               SyncHostFunc: func(conn *rpc.Conn, host frontend.Host) error {
+                       return host.(*Sync).Host(conn)
+               },
        }
-       var hostsLeftMutex sync.Mutex // protects hostsLeft
-
-       // Show unfinished hosts on Ctrl-C
-       sigint := make(chan os.Signal, 1)   // buffered for Notify()
-       signal.Notify(sigint, os.Interrupt) // = SIGINT = Ctrl-C
-       go func() {
-               // Running `ssh` processes get killed by SIGINT which is sent
-               // to all processes
-
-               <-sigint
-               log.Print("Received SIGINT, aborting ...")
-
-               // Print all queued events
-               events <- Event{} // poison pill
-               <-done
-               // "races" with <-done in the main function and will hang here
-               // if the other is faster. This is fine because then all hosts
-               // were synced successfully.
-
-               hostsLeftMutex.Lock()
-               var hosts []string
-               for x := range hostsLeft {
-                       hosts = append(hosts, x)
-               }
-               sort.Strings(hosts)
-               log.Fatalf("Failed to sync %s", strings.Join(hosts, ", "))
-       }()
 
-       // Sync all hosts concurrently
-       var wg sync.WaitGroup
+       var hosts []frontend.Host
        for _, x := range toSync {
-               x := x
-
-               // Once in sync.Host() and once in the go func below
-               wg.Add(2)
-
-               go func() {
-                       sync := Sync{
-                               host:      x,
-                               config:    cfg,
-                               allHosts:  allHosts,
-                               allGroups: allGroups,
-                               events:    events,
-                               isTTY:     isTTY,
-                       }
-                       err := sync.Host(&wg)
-                       if err != nil {
-                               events <- Event{
-                                       Host:  x,
-                                       Error: err,
-                               }
-                       }
-                       wg.Done()
-
-                       hostsLeftMutex.Lock()
-                       defer hostsLeftMutex.Unlock()
-                       delete(hostsLeft, x.Name)
-               }()
+               s := &Sync{
+                       host:      x,
+                       config:    cfg,
+                       allHosts:  allHosts,
+                       allGroups: allGroups,
+                       isTTY:     isTTY,
+                       loop:      loop,
+               }
+               s.logFunc = func(level safcm.LogLevel, escaped bool,
+                       msg string) {
+                       s.loop.Log(s, level, escaped, msg)
+               }
+               hosts = append(hosts, s)
        }
 
-       wg.Wait()
-       events <- Event{} // poison pill
-       failed := <-done
+       succ := loop.Run(hosts)
 
-       if failed {
+       if !succ {
                // Exit instead of returning an error to prevent an extra log
                // message from main()
                os.Exit(1)
@@ -295,91 +201,30 @@ are only available after the hosts were contacted.
        return res, nil
 }
 
-func logEvent(x Event, level safcm.LogLevel, isTTY bool, failed *bool) {
-       // We have multiple event sources so this is somewhat ugly.
-       var prefix, data string
-       var color Color
-       if x.Error != nil {
-               prefix = "[error]"
-               data = x.Error.Error()
-               color = ColorRed
-               // We logged an error, tell the caller
-               *failed = true
-       } else if x.Log.Level != 0 {
-               // LogError and LogDebug3 should not occur here
-               switch x.Log.Level {
-               case safcm.LogInfo:
-                       prefix = "[info]"
-               case safcm.LogVerbose:
-                       prefix = "[verbose]"
-               case safcm.LogDebug:
-                       prefix = "[debug]"
-               case safcm.LogDebug2:
-                       prefix = "[debug2]"
-               default:
-                       prefix = fmt.Sprintf("[INVALID=%d]", x.Log.Level)
-                       color = ColorRed
-               }
-               data = x.Log.Text
-       } else {
-               switch x.ConnEvent.Type {
-               case rpc.ConnEventStderr:
-                       prefix = "[stderr]"
-               case rpc.ConnEventDebug:
-                       prefix = "[debug3]"
-               case rpc.ConnEventUpload:
-                       if level < safcm.LogInfo {
-                               return
-                       }
-                       prefix = "[info]"
-                       x.ConnEvent.Data = "remote helper upload in progress"
-               default:
-                       prefix = fmt.Sprintf("[INVALID=%d]", x.ConnEvent.Type)
-                       color = ColorRed
-               }
-               data = x.ConnEvent.Data
-       }
-
-       host := x.Host.Name
-       if color != 0 {
-               host = ColorString(isTTY, color, host)
-       }
-       // Make sure to escape control characters to prevent terminal
-       // injection attacks
-       if !x.Escaped {
-               data = EscapeControlCharacters(isTTY, data)
-       }
-       log.Printf("%-9s [%s] %s", prefix, host, data)
+func (s *Sync) Name() string {
+       return s.host.Name
 }
 
-func (s *Sync) Host(wg *sync.WaitGroup) error {
-       conn := rpc.NewConn(s.config.LogLevel >= safcm.LogDebug3)
-       // Pass all connection events to main loop
-       go func() {
-               for {
-                       x, ok := <-conn.Events
-                       if !ok {
-                               break
-                       }
-                       s.events <- Event{
-                               Host:      s.host,
-                               ConnEvent: x,
-                       }
-               }
-               wg.Done()
-       }()
+func (s *Sync) Dial(conn *rpc.Conn) error {
+       helpers, err := fs.Sub(RemoteHelpers, "remote")
+       if err != nil {
+               return err
+       }
 
        // Connect to remote host
        user := s.host.SshUser
        if user == "" {
                user = s.config.SshUser
        }
-       err := conn.DialSSH(user, s.host.Name, s.config.SshConfig)
-       if err != nil {
-               return err
-       }
-       defer conn.Kill()
+       return conn.DialSSH(rpc.SSHConfig{
+               Host:          s.host.Name,
+               User:          user,
+               SshConfig:     s.config.SshConfig,
+               RemoteHelpers: helpers,
+       })
+}
 
+func (s *Sync) Host(conn *rpc.Conn) error {
        // Collect information about remote host
        detectedGroups, err := s.hostInfo(conn)
        if err != nil {
@@ -392,35 +237,11 @@ func (s *Sync) Host(wg *sync.WaitGroup) error {
                return err
        }
 
-       // Terminate connection to remote host
-       err = conn.Send(safcm.MsgQuitReq{})
-       if err != nil {
-               return err
-       }
-       _, err = conn.Recv()
-       if err != nil {
-               return err
-       }
-       err = conn.Wait()
-       if err != nil {
-               return err
-       }
-
        return nil
 }
 
 func (s *Sync) log(level safcm.LogLevel, escaped bool, msg string) {
-       if s.config.LogLevel < level {
-               return
-       }
-       s.events <- Event{
-               Host: s.host,
-               Log: Log{
-                       Level: level,
-                       Text:  msg,
-               },
-               Escaped: escaped,
-       }
+       s.logFunc(level, escaped, msg)
 }
 func (s *Sync) logDebugf(format string, a ...interface{}) {
        s.log(safcm.LogDebug, false, fmt.Sprintf(format, a...))
@@ -428,25 +249,3 @@ func (s *Sync) logDebugf(format string, a ...interface{}) {
 func (s *Sync) logVerbosef(format string, a ...interface{}) {
        s.log(safcm.LogVerbose, false, fmt.Sprintf(format, a...))
 }
-
-// sendRecv sends a message over conn and waits for the response. Any MsgLog
-// messages received before the final (non MsgLog) response are passed to
-// s.log.
-func (s *Sync) sendRecv(conn *rpc.Conn, msg safcm.Msg) (safcm.Msg, error) {
-       err := conn.Send(msg)
-       if err != nil {
-               return nil, err
-       }
-       for {
-               x, err := conn.Recv()
-               if err != nil {
-                       return nil, err
-               }
-               log, ok := x.(safcm.MsgLog)
-               if ok {
-                       s.log(log.Level, false, log.Text)
-                       continue
-               }
-               return x, nil
-       }
-}