]> ruderich.org/simon Gitweb - safcm/safcm.git/blobdiff - cmd/safcm/sync.go
Move synchronization loop into new package frontend
[safcm/safcm.git] / cmd / safcm / sync.go
index bee0133e4fd4326ef1610325fc6ae8649147da29..c1ecc965eec221828788f2bf68acd1481f08d78e 100644 (file)
@@ -23,16 +23,15 @@ import (
        "io/fs"
        "log"
        "os"
-       "os/signal"
        "runtime"
        "sort"
        "strings"
-       "sync"
 
        "golang.org/x/term"
 
        "ruderich.org/simon/safcm"
        "ruderich.org/simon/safcm/cmd/safcm/config"
+       "ruderich.org/simon/safcm/frontend"
        "ruderich.org/simon/safcm/rpc"
 )
 
@@ -43,25 +42,9 @@ type Sync struct {
        allHosts  *config.Hosts       // known hosts
        allGroups map[string][]string // known groups
 
-       events chan<- Event // all events generated by/for this host
-
        isTTY bool
-}
-
-type Event struct {
-       Host *config.Host
-
-       // Only one of Error, Log and ConnEvent is set in a single event
-       Error     error
-       Log       Log
-       ConnEvent rpc.ConnEvent
-
-       Escaped bool // true if untrusted input is already escaped
-}
 
-type Log struct {
-       Level safcm.LogLevel
-       Text  string
+       logFunc func(level safcm.LogLevel, escaped bool, msg string)
 }
 
 func MainSync(args []string) error {
@@ -131,90 +114,35 @@ func MainSync(args []string) error {
        isTTY := term.IsTerminal(int(os.Stdout.Fd())) &&
                term.IsTerminal(int(os.Stderr.Fd()))
 
-       done := make(chan bool)
-       // Collect events from all hosts and print them
-       events := make(chan Event)
-       go func() {
-               var failed bool
-               for {
-                       x := <-events
-                       if x.Host == nil {
-                               break
-                       }
-                       logEvent(x, cfg.LogLevel, isTTY, &failed)
-               }
-               done <- failed
-       }()
-
-       hostsLeft := make(map[string]bool)
-       for _, x := range toSync {
-               hostsLeft[x.Name] = true
+       loop := &frontend.Loop{
+               DebugConn: cfg.LogLevel >= safcm.LogDebug3,
+               LogEventFunc: func(x frontend.Event, failed *bool) {
+                       logEvent(x, cfg.LogLevel, isTTY, failed)
+               },
+               SyncHostFunc: func(conn *rpc.Conn, host frontend.Host) error {
+                       return host.(*Sync).Host(conn)
+               },
        }
-       var hostsLeftMutex sync.Mutex // protects hostsLeft
-
-       // Show unfinished hosts on Ctrl-C
-       sigint := make(chan os.Signal, 1)   // buffered for Notify()
-       signal.Notify(sigint, os.Interrupt) // = SIGINT = Ctrl-C
-       go func() {
-               // Running `ssh` processes get killed by SIGINT which is sent
-               // to all processes
-
-               <-sigint
-               log.Print("Received SIGINT, aborting ...")
-
-               // Print all queued events
-               events <- Event{} // poison pill
-               <-done
-               // "races" with <-done in the main function and will hang here
-               // if the other is faster. This is fine because then all hosts
-               // were synced successfully.
-
-               hostsLeftMutex.Lock()
-               var hosts []string
-               for x := range hostsLeft {
-                       hosts = append(hosts, x)
-               }
-               sort.Strings(hosts)
-               log.Fatalf("Failed to sync %s", strings.Join(hosts, ", "))
-       }()
 
-       // Sync all hosts concurrently
-       var wg sync.WaitGroup
+       var hosts []frontend.Host
        for _, x := range toSync {
-               x := x
-
-               // Once in sync.Host() and once in the go func below
-               wg.Add(2)
-
-               go func() {
-                       sync := Sync{
+               s := &Sync{
                                host:      x,
                                config:    cfg,
                                allHosts:  allHosts,
                                allGroups: allGroups,
-                               events:    events,
                                isTTY:     isTTY,
                        }
-                       err := sync.Host(&wg)
-                       if err != nil {
-                               events <- Event{
-                                       Host:  x,
-                                       Error: err,
-                               }
-                       }
-                       wg.Done()
-
-                       hostsLeftMutex.Lock()
-                       defer hostsLeftMutex.Unlock()
-                       delete(hostsLeft, x.Name)
-               }()
+               s.logFunc = func(level safcm.LogLevel, escaped bool,
+                       msg string) {
+                       loop.Log(s, level, escaped, msg)
+               }
+               hosts = append(hosts, s)
        }
 
-       wg.Wait()
-       events <- Event{} // poison pill
-       failed := <-done
+       succ := loop.Run(hosts)
 
-       if failed {
+       if !succ {
                // Exit instead of returning an error to prevent an extra log
                // message from main()
                os.Exit(1)
@@ -296,7 +224,7 @@ are only available after the hosts were contacted.
        return res, nil
 }
 
-func logEvent(x Event, level safcm.LogLevel, isTTY bool, failed *bool) {
+func logEvent(x frontend.Event, level safcm.LogLevel, isTTY bool, failed *bool) {
        // We have multiple event sources so this is somewhat ugly.
        var prefix, data string
        var color Color
@@ -307,6 +235,9 @@ func logEvent(x Event, level safcm.LogLevel, isTTY bool, failed *bool) {
                // We logged an error, tell the caller
                *failed = true
        } else if x.Log.Level != 0 {
+               if level < x.Log.Level {
+                       return
+               }
                // LogError and LogDebug3 should not occur here
                switch x.Log.Level {
                case safcm.LogInfo:
@@ -341,7 +272,7 @@ func logEvent(x Event, level safcm.LogLevel, isTTY bool, failed *bool) {
                data = x.ConnEvent.Data
        }
 
-       host := x.Host.Name
+       host := x.Host.Name()
        if color != 0 {
                host = ColorString(isTTY, color, host)
        }
@@ -353,26 +284,13 @@ func logEvent(x Event, level safcm.LogLevel, isTTY bool, failed *bool) {
        log.Printf("%-9s [%s] %s", prefix, host, data)
 }
 
-func (s *Sync) Host(wg *sync.WaitGroup) error {
-       conn := rpc.NewConn(s.config.LogLevel >= safcm.LogDebug3)
-       // Pass all connection events to main loop
-       go func() {
-               for {
-                       x, ok := <-conn.Events
-                       if !ok {
-                               break
-                       }
-                       s.events <- Event{
-                               Host:      s.host,
-                               ConnEvent: x,
-                       }
-               }
-               wg.Done()
-       }()
+func (s *Sync) Name() string {
+       return s.host.Name
+}
 
+func (s *Sync) Dial(conn *rpc.Conn) error {
        helpers, err := fs.Sub(RemoteHelpers, "remote")
        if err != nil {
-               conn.Kill()
                return err
        }
 
@@ -381,18 +299,15 @@ func (s *Sync) Host(wg *sync.WaitGroup) error {
        if user == "" {
                user = s.config.SshUser
        }
-       err = conn.DialSSH(rpc.SSHConfig{
+       return conn.DialSSH(rpc.SSHConfig{
                Host:          s.host.Name,
                User:          user,
                SshConfig:     s.config.SshConfig,
                RemoteHelpers: helpers,
        })
-       if err != nil {
-               conn.Kill()
-               return err
-       }
-       defer conn.Kill()
+}
 
+func (s *Sync) Host(conn *rpc.Conn) error {
        // Collect information about remote host
        detectedGroups, err := s.hostInfo(conn)
        if err != nil {
@@ -405,35 +320,11 @@ func (s *Sync) Host(wg *sync.WaitGroup) error {
                return err
        }
 
-       // Terminate connection to remote host
-       err = conn.Send(safcm.MsgQuitReq{})
-       if err != nil {
-               return err
-       }
-       _, err = conn.Recv()
-       if err != nil {
-               return err
-       }
-       err = conn.Wait()
-       if err != nil {
-               return err
-       }
-
        return nil
 }
 
 func (s *Sync) log(level safcm.LogLevel, escaped bool, msg string) {
-       if s.config.LogLevel < level {
-               return
-       }
-       s.events <- Event{
-               Host: s.host,
-               Log: Log{
-                       Level: level,
-                       Text:  msg,
-               },
-               Escaped: escaped,
-       }
+       s.logFunc(level, escaped, msg)
 }
 func (s *Sync) logDebugf(format string, a ...interface{}) {
        s.log(safcm.LogDebug, false, fmt.Sprintf(format, a...))