X-Git-Url: https://ruderich.org/simon/gitweb/?a=blobdiff_plain;f=cmd%2Fsafcm%2Fsync.go;h=69431435e5e9f3066ebf82aa4ebb8add1afbacb0;hb=HEAD;hp=bee0133e4fd4326ef1610325fc6ae8649147da29;hpb=fd97e8019e2ab166d9475ed59782c86247d8430b;p=safcm%2Fsafcm.git diff --git a/cmd/safcm/sync.go b/cmd/safcm/sync.go index bee0133..6943143 100644 --- a/cmd/safcm/sync.go +++ b/cmd/safcm/sync.go @@ -1,19 +1,7 @@ // "sync" sub-command: sync data to remote hosts -// Copyright (C) 2021 Simon Ruderich -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . +// SPDX-License-Identifier: GPL-3.0-or-later +// Copyright (C) 2021-2024 Simon Ruderich package main @@ -23,16 +11,15 @@ import ( "io/fs" "log" "os" - "os/signal" "runtime" "sort" "strings" - "sync" "golang.org/x/term" "ruderich.org/simon/safcm" "ruderich.org/simon/safcm/cmd/safcm/config" + "ruderich.org/simon/safcm/frontend" "ruderich.org/simon/safcm/rpc" ) @@ -43,25 +30,10 @@ type Sync struct { allHosts *config.Hosts // known hosts allGroups map[string][]string // known groups - events chan<- Event // all events generated by/for this host - isTTY bool -} - -type Event struct { - Host *config.Host - - // Only one of Error, Log and ConnEvent is set in a single event - Error error - Log Log - ConnEvent rpc.ConnEvent - - Escaped bool // true if untrusted input is already escaped -} -type Log struct { - Level safcm.LogLevel - Text string + loop *frontend.Loop + logFunc func(level safcm.LogLevel, escaped bool, msg string) } func MainSync(args []string) error { @@ -81,24 +53,11 @@ func MainSync(args []string) error { optionSshConfig := flag.String("sshconfig", "", "`path` to ssh configuration file; used for tests") - flag.CommandLine.Parse(args[2:]) - - var level safcm.LogLevel - switch *optionLog { - case "error": - level = safcm.LogError - case "info": - level = safcm.LogInfo - case "verbose": - level = safcm.LogVerbose - case "debug": - level = safcm.LogDebug - case "debug2": - level = safcm.LogDebug2 - case "debug3": - level = safcm.LogDebug3 - default: - return fmt.Errorf("invalid -log value %q", *optionLog) + flag.CommandLine.Parse(args[2:]) //nolint:errcheck + + level, err := safcm.ParseLogLevel(*optionLog) + if err != nil { + return fmt.Errorf("-log: %v", err) } names := flag.Args() @@ -131,90 +90,36 @@ func MainSync(args []string) error { isTTY := term.IsTerminal(int(os.Stdout.Fd())) && term.IsTerminal(int(os.Stderr.Fd())) - done := make(chan bool) - // Collect events from all hosts and print them - events := make(chan Event) - go func() { - var failed bool - for { - x := <-events - if x.Host == nil { - break - } - logEvent(x, cfg.LogLevel, isTTY, &failed) - } - done <- failed - }() - - hostsLeft := make(map[string]bool) - for _, x := range toSync { - hostsLeft[x.Name] = true + loop := &frontend.Loop{ + DebugConn: cfg.LogLevel >= safcm.LogDebug3, + LogEventFunc: func(x frontend.Event, failed *bool) { + frontend.LogEvent(x, cfg.LogLevel, isTTY, failed) + }, + SyncHostFunc: func(conn *rpc.Conn, host frontend.Host) error { + return host.(*Sync).Host(conn) + }, } - var hostsLeftMutex sync.Mutex // protects hostsLeft - - // Show unfinished hosts on Ctrl-C - sigint := make(chan os.Signal, 1) // buffered for Notify() - signal.Notify(sigint, os.Interrupt) // = SIGINT = Ctrl-C - go func() { - // Running `ssh` processes get killed by SIGINT which is sent - // to all processes - - <-sigint - log.Print("Received SIGINT, aborting ...") - - // Print all queued events - events <- Event{} // poison pill - <-done - // "races" with <-done in the main function and will hang here - // if the other is faster. This is fine because then all hosts - // were synced successfully. - - hostsLeftMutex.Lock() - var hosts []string - for x := range hostsLeft { - hosts = append(hosts, x) - } - sort.Strings(hosts) - log.Fatalf("Failed to sync %s", strings.Join(hosts, ", ")) - }() - // Sync all hosts concurrently - var wg sync.WaitGroup + var hosts []frontend.Host for _, x := range toSync { - x := x - - // Once in sync.Host() and once in the go func below - wg.Add(2) - - go func() { - sync := Sync{ - host: x, - config: cfg, - allHosts: allHosts, - allGroups: allGroups, - events: events, - isTTY: isTTY, - } - err := sync.Host(&wg) - if err != nil { - events <- Event{ - Host: x, - Error: err, - } - } - wg.Done() - - hostsLeftMutex.Lock() - defer hostsLeftMutex.Unlock() - delete(hostsLeft, x.Name) - }() + s := &Sync{ + host: x, + config: cfg, + allHosts: allHosts, + allGroups: allGroups, + isTTY: isTTY, + loop: loop, + } + s.logFunc = func(level safcm.LogLevel, escaped bool, + msg string) { + s.loop.Log(s, level, escaped, msg) + } + hosts = append(hosts, s) } - wg.Wait() - events <- Event{} // poison pill - failed := <-done + succ := loop.Run(hosts) - if failed { + if !succ { // Exit instead of returning an error to prevent an extra log // message from main() os.Exit(1) @@ -296,83 +201,13 @@ are only available after the hosts were contacted. return res, nil } -func logEvent(x Event, level safcm.LogLevel, isTTY bool, failed *bool) { - // We have multiple event sources so this is somewhat ugly. - var prefix, data string - var color Color - if x.Error != nil { - prefix = "[error]" - data = x.Error.Error() - color = ColorRed - // We logged an error, tell the caller - *failed = true - } else if x.Log.Level != 0 { - // LogError and LogDebug3 should not occur here - switch x.Log.Level { - case safcm.LogInfo: - prefix = "[info]" - case safcm.LogVerbose: - prefix = "[verbose]" - case safcm.LogDebug: - prefix = "[debug]" - case safcm.LogDebug2: - prefix = "[debug2]" - default: - prefix = fmt.Sprintf("[INVALID=%d]", x.Log.Level) - color = ColorRed - } - data = x.Log.Text - } else { - switch x.ConnEvent.Type { - case rpc.ConnEventStderr: - prefix = "[stderr]" - case rpc.ConnEventDebug: - prefix = "[debug3]" - case rpc.ConnEventUpload: - if level < safcm.LogInfo { - return - } - prefix = "[info]" - x.ConnEvent.Data = "remote helper upload in progress" - default: - prefix = fmt.Sprintf("[INVALID=%d]", x.ConnEvent.Type) - color = ColorRed - } - data = x.ConnEvent.Data - } - - host := x.Host.Name - if color != 0 { - host = ColorString(isTTY, color, host) - } - // Make sure to escape control characters to prevent terminal - // injection attacks - if !x.Escaped { - data = EscapeControlCharacters(isTTY, data) - } - log.Printf("%-9s [%s] %s", prefix, host, data) +func (s *Sync) Name() string { + return s.host.Name } -func (s *Sync) Host(wg *sync.WaitGroup) error { - conn := rpc.NewConn(s.config.LogLevel >= safcm.LogDebug3) - // Pass all connection events to main loop - go func() { - for { - x, ok := <-conn.Events - if !ok { - break - } - s.events <- Event{ - Host: s.host, - ConnEvent: x, - } - } - wg.Done() - }() - +func (s *Sync) Dial(conn *rpc.Conn) error { helpers, err := fs.Sub(RemoteHelpers, "remote") if err != nil { - conn.Kill() return err } @@ -381,18 +216,15 @@ func (s *Sync) Host(wg *sync.WaitGroup) error { if user == "" { user = s.config.SshUser } - err = conn.DialSSH(rpc.SSHConfig{ + return conn.DialSSH(rpc.SSHConfig{ Host: s.host.Name, User: user, SshConfig: s.config.SshConfig, RemoteHelpers: helpers, }) - if err != nil { - conn.Kill() - return err - } - defer conn.Kill() +} +func (s *Sync) Host(conn *rpc.Conn) error { // Collect information about remote host detectedGroups, err := s.hostInfo(conn) if err != nil { @@ -405,35 +237,11 @@ func (s *Sync) Host(wg *sync.WaitGroup) error { return err } - // Terminate connection to remote host - err = conn.Send(safcm.MsgQuitReq{}) - if err != nil { - return err - } - _, err = conn.Recv() - if err != nil { - return err - } - err = conn.Wait() - if err != nil { - return err - } - return nil } func (s *Sync) log(level safcm.LogLevel, escaped bool, msg string) { - if s.config.LogLevel < level { - return - } - s.events <- Event{ - Host: s.host, - Log: Log{ - Level: level, - Text: msg, - }, - Escaped: escaped, - } + s.logFunc(level, escaped, msg) } func (s *Sync) logDebugf(format string, a ...interface{}) { s.log(safcm.LogDebug, false, fmt.Sprintf(format, a...)) @@ -441,25 +249,3 @@ func (s *Sync) logDebugf(format string, a ...interface{}) { func (s *Sync) logVerbosef(format string, a ...interface{}) { s.log(safcm.LogVerbose, false, fmt.Sprintf(format, a...)) } - -// sendRecv sends a message over conn and waits for the response. Any MsgLog -// messages received before the final (non MsgLog) response are passed to -// s.log. -func (s *Sync) sendRecv(conn *rpc.Conn, msg safcm.Msg) (safcm.Msg, error) { - err := conn.Send(msg) - if err != nil { - return nil, err - } - for { - x, err := conn.Recv() - if err != nil { - return nil, err - } - log, ok := x.(safcm.MsgLog) - if ok { - s.log(log.Level, false, log.Text) - continue - } - return x, nil - } -}