X-Git-Url: https://ruderich.org/simon/gitweb/?a=blobdiff_plain;f=cmd%2Fsafcm%2Fsync.go;h=65745af55a68367d1fd4a2625f44cf3901bdb6b2;hb=99d753741e5b6dc86b032882541c9b8f45a51bd4;hp=6fe65fae326c2b09fa74b51202a964afb44a11ce;hpb=8a3f6af248e28ea7efc1bf89751a597d28834942;p=safcm%2Fsafcm.git diff --git a/cmd/safcm/sync.go b/cmd/safcm/sync.go index 6fe65fa..65745af 100644 --- a/cmd/safcm/sync.go +++ b/cmd/safcm/sync.go @@ -20,18 +20,18 @@ package main import ( "flag" "fmt" + "io/fs" "log" "os" - "os/signal" "runtime" "sort" "strings" - "sync" "golang.org/x/term" "ruderich.org/simon/safcm" "ruderich.org/simon/safcm/cmd/safcm/config" + "ruderich.org/simon/safcm/frontend" "ruderich.org/simon/safcm/rpc" ) @@ -42,25 +42,10 @@ type Sync struct { allHosts *config.Hosts // known hosts allGroups map[string][]string // known groups - events chan<- Event // all events generated by/for this host - isTTY bool -} - -type Event struct { - Host *config.Host - - // Only one of Error, Log and ConnEvent is set in a single event - Error error - Log Log - ConnEvent rpc.ConnEvent - Escaped bool // true if untrusted input is already escaped -} - -type Log struct { - Level safcm.LogLevel - Text string + loop *frontend.Loop + logFunc func(level safcm.LogLevel, escaped bool, msg string) } func MainSync(args []string) error { @@ -82,22 +67,9 @@ func MainSync(args []string) error { flag.CommandLine.Parse(args[2:]) - var level safcm.LogLevel - switch *optionLog { - case "error": - level = safcm.LogError - case "info": - level = safcm.LogInfo - case "verbose": - level = safcm.LogVerbose - case "debug": - level = safcm.LogDebug - case "debug2": - level = safcm.LogDebug2 - case "debug3": - level = safcm.LogDebug3 - default: - return fmt.Errorf("invalid -log value %q", *optionLog) + level, err := safcm.ParseLogLevel(*optionLog) + if err != nil { + return fmt.Errorf("-log: %v", err) } names := flag.Args() @@ -130,90 +102,36 @@ func MainSync(args []string) error { isTTY := term.IsTerminal(int(os.Stdout.Fd())) && term.IsTerminal(int(os.Stderr.Fd())) - done := make(chan bool) - // Collect events from all hosts and print them - events := make(chan Event) - go func() { - var failed bool - for { - x := <-events - if x.Host == nil { - break - } - logEvent(x, cfg.LogLevel, isTTY, &failed) - } - done <- failed - }() - - hostsLeft := make(map[string]bool) - for _, x := range toSync { - hostsLeft[x.Name] = true + loop := &frontend.Loop{ + DebugConn: cfg.LogLevel >= safcm.LogDebug3, + LogEventFunc: func(x frontend.Event, failed *bool) { + frontend.LogEvent(x, cfg.LogLevel, isTTY, failed) + }, + SyncHostFunc: func(conn *rpc.Conn, host frontend.Host) error { + return host.(*Sync).Host(conn) + }, } - var hostsLeftMutex sync.Mutex // protects hostsLeft - - // Show unfinished hosts on Ctrl-C - sigint := make(chan os.Signal, 1) // buffered for Notify() - signal.Notify(sigint, os.Interrupt) // = SIGINT = Ctrl-C - go func() { - // Running `ssh` processes get killed by SIGINT which is sent - // to all processes - - <-sigint - log.Print("Received SIGINT, aborting ...") - - // Print all queued events - events <- Event{} // poison pill - <-done - // "races" with <-done in the main function and will hang here - // if the other is faster. This is fine because then all hosts - // were synced successfully. - - hostsLeftMutex.Lock() - var hosts []string - for x := range hostsLeft { - hosts = append(hosts, x) - } - sort.Strings(hosts) - log.Fatalf("Failed to sync %s", strings.Join(hosts, ", ")) - }() - // Sync all hosts concurrently - var wg sync.WaitGroup + var hosts []frontend.Host for _, x := range toSync { - x := x - - // Once in sync.Host() and once in the go func below - wg.Add(2) - - go func() { - sync := Sync{ - host: x, - config: cfg, - allHosts: allHosts, - allGroups: allGroups, - events: events, - isTTY: isTTY, - } - err := sync.Host(&wg) - if err != nil { - events <- Event{ - Host: x, - Error: err, - } - } - wg.Done() - - hostsLeftMutex.Lock() - defer hostsLeftMutex.Unlock() - delete(hostsLeft, x.Name) - }() + s := &Sync{ + host: x, + config: cfg, + allHosts: allHosts, + allGroups: allGroups, + isTTY: isTTY, + loop: loop, + } + s.logFunc = func(level safcm.LogLevel, escaped bool, + msg string) { + s.loop.Log(s, level, escaped, msg) + } + hosts = append(hosts, s) } - wg.Wait() - events <- Event{} // poison pill - failed := <-done + succ := loop.Run(hosts) - if failed { + if !succ { // Exit instead of returning an error to prevent an extra log // message from main() os.Exit(1) @@ -295,95 +213,30 @@ are only available after the hosts were contacted. return res, nil } -func logEvent(x Event, level safcm.LogLevel, isTTY bool, failed *bool) { - // We have multiple event sources so this is somewhat ugly. - var prefix, data string - var color Color - if x.Error != nil { - prefix = "[error]" - data = x.Error.Error() - color = ColorRed - // We logged an error, tell the caller - *failed = true - } else if x.Log.Level != 0 { - // LogError and LogDebug3 should not occur here - switch x.Log.Level { - case safcm.LogInfo: - prefix = "[info]" - case safcm.LogVerbose: - prefix = "[verbose]" - case safcm.LogDebug: - prefix = "[debug]" - case safcm.LogDebug2: - prefix = "[debug2]" - default: - prefix = fmt.Sprintf("[INVALID=%d]", x.Log.Level) - color = ColorRed - } - data = x.Log.Text - } else { - switch x.ConnEvent.Type { - case rpc.ConnEventStderr: - prefix = "[stderr]" - case rpc.ConnEventDebug: - prefix = "[debug3]" - case rpc.ConnEventUpload: - if level < safcm.LogInfo { - return - } - prefix = "[info]" - x.ConnEvent.Data = "remote helper upload in progress" - default: - prefix = fmt.Sprintf("[INVALID=%d]", x.ConnEvent.Type) - color = ColorRed - } - data = x.ConnEvent.Data - } - - host := x.Host.Name - if color != 0 { - host = ColorString(isTTY, color, host) - } - // Make sure to escape control characters to prevent terminal - // injection attacks - if !x.Escaped { - data = EscapeControlCharacters(isTTY, data) - } - log.Printf("%-9s [%s] %s", prefix, host, data) +func (s *Sync) Name() string { + return s.host.Name } -func (s *Sync) Host(wg *sync.WaitGroup) error { - conn := rpc.NewConn(s.config.LogLevel >= safcm.LogDebug3) - // Pass all connection events to main loop - go func() { - for { - x, ok := <-conn.Events - if !ok { - break - } - s.events <- Event{ - Host: s.host, - ConnEvent: x, - } - } - wg.Done() - }() +func (s *Sync) Dial(conn *rpc.Conn) error { + helpers, err := fs.Sub(RemoteHelpers, "remote") + if err != nil { + return err + } // Connect to remote host user := s.host.SshUser if user == "" { user = s.config.SshUser } - err := conn.DialSSH(rpc.SSHConfig{ - Host: s.host.Name, - User: user, - SshConfig: s.config.SshConfig, + return conn.DialSSH(rpc.SSHConfig{ + Host: s.host.Name, + User: user, + SshConfig: s.config.SshConfig, + RemoteHelpers: helpers, }) - if err != nil { - return err - } - defer conn.Kill() +} +func (s *Sync) Host(conn *rpc.Conn) error { // Collect information about remote host detectedGroups, err := s.hostInfo(conn) if err != nil { @@ -396,35 +249,11 @@ func (s *Sync) Host(wg *sync.WaitGroup) error { return err } - // Terminate connection to remote host - err = conn.Send(safcm.MsgQuitReq{}) - if err != nil { - return err - } - _, err = conn.Recv() - if err != nil { - return err - } - err = conn.Wait() - if err != nil { - return err - } - return nil } func (s *Sync) log(level safcm.LogLevel, escaped bool, msg string) { - if s.config.LogLevel < level { - return - } - s.events <- Event{ - Host: s.host, - Log: Log{ - Level: level, - Text: msg, - }, - Escaped: escaped, - } + s.logFunc(level, escaped, msg) } func (s *Sync) logDebugf(format string, a ...interface{}) { s.log(safcm.LogDebug, false, fmt.Sprintf(format, a...)) @@ -432,25 +261,3 @@ func (s *Sync) logDebugf(format string, a ...interface{}) { func (s *Sync) logVerbosef(format string, a ...interface{}) { s.log(safcm.LogVerbose, false, fmt.Sprintf(format, a...)) } - -// sendRecv sends a message over conn and waits for the response. Any MsgLog -// messages received before the final (non MsgLog) response are passed to -// s.log. -func (s *Sync) sendRecv(conn *rpc.Conn, msg safcm.Msg) (safcm.Msg, error) { - err := conn.Send(msg) - if err != nil { - return nil, err - } - for { - x, err := conn.Recv() - if err != nil { - return nil, err - } - log, ok := x.(safcm.MsgLog) - if ok { - s.log(log.Level, false, log.Text) - continue - } - return x, nil - } -}