]> ruderich.org/simon Gitweb - safcm/safcm.git/commitdiff
Move synchronization loop into new package frontend
authorSimon Ruderich <simon@ruderich.org>
Tue, 18 May 2021 15:46:56 +0000 (17:46 +0200)
committerSimon Ruderich <simon@ruderich.org>
Tue, 18 May 2021 16:23:47 +0000 (18:23 +0200)
This is in preparation for other programs (besides cmd/safcm) using the
safcm library. To reduce code duplication useful functions will be
provided by the frontend package. Its use is optional. All core
functionality is provided by the regular safcm packages.

The logging setup was slightly modified. Log messages are now no longer
filtered by Sync.log() but by the new log function Loop.LogEventFunc (or
its implementation logEvent()). This is also the reason why one test was
removed from sync_sync_test.go which is no longer relevant.

cmd/safcm/sync.go
cmd/safcm/sync_sync_test.go
cmd/safcm/sync_test.go
frontend/log.go [new file with mode: 0644]
frontend/loop.go [new file with mode: 0644]

index bee0133e4fd4326ef1610325fc6ae8649147da29..c1ecc965eec221828788f2bf68acd1481f08d78e 100644 (file)
@@ -23,16 +23,15 @@ import (
        "io/fs"
        "log"
        "os"
-       "os/signal"
        "runtime"
        "sort"
        "strings"
-       "sync"
 
        "golang.org/x/term"
 
        "ruderich.org/simon/safcm"
        "ruderich.org/simon/safcm/cmd/safcm/config"
+       "ruderich.org/simon/safcm/frontend"
        "ruderich.org/simon/safcm/rpc"
 )
 
@@ -43,25 +42,9 @@ type Sync struct {
        allHosts  *config.Hosts       // known hosts
        allGroups map[string][]string // known groups
 
-       events chan<- Event // all events generated by/for this host
-
        isTTY bool
-}
-
-type Event struct {
-       Host *config.Host
-
-       // Only one of Error, Log and ConnEvent is set in a single event
-       Error     error
-       Log       Log
-       ConnEvent rpc.ConnEvent
-
-       Escaped bool // true if untrusted input is already escaped
-}
 
-type Log struct {
-       Level safcm.LogLevel
-       Text  string
+       logFunc func(level safcm.LogLevel, escaped bool, msg string)
 }
 
 func MainSync(args []string) error {
@@ -131,90 +114,35 @@ func MainSync(args []string) error {
        isTTY := term.IsTerminal(int(os.Stdout.Fd())) &&
                term.IsTerminal(int(os.Stderr.Fd()))
 
-       done := make(chan bool)
-       // Collect events from all hosts and print them
-       events := make(chan Event)
-       go func() {
-               var failed bool
-               for {
-                       x := <-events
-                       if x.Host == nil {
-                               break
-                       }
-                       logEvent(x, cfg.LogLevel, isTTY, &failed)
-               }
-               done <- failed
-       }()
-
-       hostsLeft := make(map[string]bool)
-       for _, x := range toSync {
-               hostsLeft[x.Name] = true
+       loop := &frontend.Loop{
+               DebugConn: cfg.LogLevel >= safcm.LogDebug3,
+               LogEventFunc: func(x frontend.Event, failed *bool) {
+                       logEvent(x, cfg.LogLevel, isTTY, failed)
+               },
+               SyncHostFunc: func(conn *rpc.Conn, host frontend.Host) error {
+                       return host.(*Sync).Host(conn)
+               },
        }
-       var hostsLeftMutex sync.Mutex // protects hostsLeft
-
-       // Show unfinished hosts on Ctrl-C
-       sigint := make(chan os.Signal, 1)   // buffered for Notify()
-       signal.Notify(sigint, os.Interrupt) // = SIGINT = Ctrl-C
-       go func() {
-               // Running `ssh` processes get killed by SIGINT which is sent
-               // to all processes
-
-               <-sigint
-               log.Print("Received SIGINT, aborting ...")
-
-               // Print all queued events
-               events <- Event{} // poison pill
-               <-done
-               // "races" with <-done in the main function and will hang here
-               // if the other is faster. This is fine because then all hosts
-               // were synced successfully.
-
-               hostsLeftMutex.Lock()
-               var hosts []string
-               for x := range hostsLeft {
-                       hosts = append(hosts, x)
-               }
-               sort.Strings(hosts)
-               log.Fatalf("Failed to sync %s", strings.Join(hosts, ", "))
-       }()
 
-       // Sync all hosts concurrently
-       var wg sync.WaitGroup
+       var hosts []frontend.Host
        for _, x := range toSync {
-               x := x
-
-               // Once in sync.Host() and once in the go func below
-               wg.Add(2)
-
-               go func() {
-                       sync := Sync{
+               s := &Sync{
                                host:      x,
                                config:    cfg,
                                allHosts:  allHosts,
                                allGroups: allGroups,
-                               events:    events,
                                isTTY:     isTTY,
                        }
-                       err := sync.Host(&wg)
-                       if err != nil {
-                               events <- Event{
-                                       Host:  x,
-                                       Error: err,
-                               }
-                       }
-                       wg.Done()
-
-                       hostsLeftMutex.Lock()
-                       defer hostsLeftMutex.Unlock()
-                       delete(hostsLeft, x.Name)
-               }()
+               s.logFunc = func(level safcm.LogLevel, escaped bool,
+                       msg string) {
+                       loop.Log(s, level, escaped, msg)
+               }
+               hosts = append(hosts, s)
        }
 
-       wg.Wait()
-       events <- Event{} // poison pill
-       failed := <-done
+       succ := loop.Run(hosts)
 
-       if failed {
+       if !succ {
                // Exit instead of returning an error to prevent an extra log
                // message from main()
                os.Exit(1)
@@ -296,7 +224,7 @@ are only available after the hosts were contacted.
        return res, nil
 }
 
-func logEvent(x Event, level safcm.LogLevel, isTTY bool, failed *bool) {
+func logEvent(x frontend.Event, level safcm.LogLevel, isTTY bool, failed *bool) {
        // We have multiple event sources so this is somewhat ugly.
        var prefix, data string
        var color Color
@@ -307,6 +235,9 @@ func logEvent(x Event, level safcm.LogLevel, isTTY bool, failed *bool) {
                // We logged an error, tell the caller
                *failed = true
        } else if x.Log.Level != 0 {
+               if level < x.Log.Level {
+                       return
+               }
                // LogError and LogDebug3 should not occur here
                switch x.Log.Level {
                case safcm.LogInfo:
@@ -341,7 +272,7 @@ func logEvent(x Event, level safcm.LogLevel, isTTY bool, failed *bool) {
                data = x.ConnEvent.Data
        }
 
-       host := x.Host.Name
+       host := x.Host.Name()
        if color != 0 {
                host = ColorString(isTTY, color, host)
        }
@@ -353,26 +284,13 @@ func logEvent(x Event, level safcm.LogLevel, isTTY bool, failed *bool) {
        log.Printf("%-9s [%s] %s", prefix, host, data)
 }
 
-func (s *Sync) Host(wg *sync.WaitGroup) error {
-       conn := rpc.NewConn(s.config.LogLevel >= safcm.LogDebug3)
-       // Pass all connection events to main loop
-       go func() {
-               for {
-                       x, ok := <-conn.Events
-                       if !ok {
-                               break
-                       }
-                       s.events <- Event{
-                               Host:      s.host,
-                               ConnEvent: x,
-                       }
-               }
-               wg.Done()
-       }()
+func (s *Sync) Name() string {
+       return s.host.Name
+}
 
+func (s *Sync) Dial(conn *rpc.Conn) error {
        helpers, err := fs.Sub(RemoteHelpers, "remote")
        if err != nil {
-               conn.Kill()
                return err
        }
 
@@ -381,18 +299,15 @@ func (s *Sync) Host(wg *sync.WaitGroup) error {
        if user == "" {
                user = s.config.SshUser
        }
-       err = conn.DialSSH(rpc.SSHConfig{
+       return conn.DialSSH(rpc.SSHConfig{
                Host:          s.host.Name,
                User:          user,
                SshConfig:     s.config.SshConfig,
                RemoteHelpers: helpers,
        })
-       if err != nil {
-               conn.Kill()
-               return err
-       }
-       defer conn.Kill()
+}
 
+func (s *Sync) Host(conn *rpc.Conn) error {
        // Collect information about remote host
        detectedGroups, err := s.hostInfo(conn)
        if err != nil {
@@ -405,35 +320,11 @@ func (s *Sync) Host(wg *sync.WaitGroup) error {
                return err
        }
 
-       // Terminate connection to remote host
-       err = conn.Send(safcm.MsgQuitReq{})
-       if err != nil {
-               return err
-       }
-       _, err = conn.Recv()
-       if err != nil {
-               return err
-       }
-       err = conn.Wait()
-       if err != nil {
-               return err
-       }
-
        return nil
 }
 
 func (s *Sync) log(level safcm.LogLevel, escaped bool, msg string) {
-       if s.config.LogLevel < level {
-               return
-       }
-       s.events <- Event{
-               Host: s.host,
-               Log: Log{
-                       Level: level,
-                       Text:  msg,
-               },
-               Escaped: escaped,
-       }
+       s.logFunc(level, escaped, msg)
 }
 func (s *Sync) logDebugf(format string, a ...interface{}) {
        s.log(safcm.LogDebug, false, fmt.Sprintf(format, a...))
index 889aa2f21ad0098e2c14e29f659ee23f5c05a6f6..40c5c556eccd5961513abe52c4671ed467a6b7db 100644 (file)
@@ -40,7 +40,6 @@ func TestHostSyncReq(t *testing.T) {
                project   string
                host      string
                detected  []string
-               level     safcm.LogLevel
                exp       safcm.MsgSyncReq
                expEvents []string
                expErr    error
@@ -54,7 +53,6 @@ func TestHostSyncReq(t *testing.T) {
                        "project",
                        "host1.example.org",
                        nil,
-                       safcm.LogDebug3,
                        safcm.MsgSyncReq{
                                Groups: []string{
                                        "all",
@@ -148,124 +146,21 @@ func TestHostSyncReq(t *testing.T) {
                                },
                        },
                        []string{
-                               "host1.example.org: <nil> 3 host groups: all group group3 host1.example.org remove",
-                               "host1.example.org: <nil> 3 host group priorities (descending): host1.example.org",
+                               "3 false host groups: all group group3 host1.example.org remove",
+                               "3 false host group priorities (descending): host1.example.org",
                        },
                        nil,
                },
 
-               {
-                       "project: host1 (log level info)",
-                       "project",
-                       "host1.example.org",
-                       nil,
-                       safcm.LogInfo,
-                       safcm.MsgSyncReq{
-                               Groups: []string{
-                                       "all",
-                                       "group",
-                                       "group3",
-                                       "remove",
-                                       "host1.example.org",
-                               },
-                               Files: map[string]*safcm.File{
-                                       "/": &safcm.File{Path: "/",
-                                               OrigGroup: "group",
-                                               Mode:      fs.ModeDir | 0755 | fs.ModeSetgid,
-                                               Uid:       -1,
-                                               Gid:       -1,
-                                               TriggerCommands: []string{
-                                                       "touch /.update",
-                                               },
-                                       },
-                                       "/etc": &safcm.File{
-                                               OrigGroup: "group",
-                                               Path:      "/etc",
-                                               Mode:      fs.ModeDir | 0755,
-                                               Uid:       -1,
-                                               Gid:       -1,
-                                       },
-                                       "/etc/.hidden": &safcm.File{
-                                               OrigGroup: "group",
-                                               Path:      "/etc/.hidden",
-                                               Mode:      0100 | fs.ModeSetuid | fs.ModeSetgid | fs.ModeSticky,
-                                               Uid:       -1,
-                                               Gid:       -1,
-                                               Data:      []byte("..."),
-                                       },
-                                       "/etc/motd": &safcm.File{
-                                               OrigGroup: "group",
-                                               Path:      "/etc/motd",
-                                               Mode:      0644,
-                                               Uid:       -1,
-                                               Gid:       -1,
-                                               Data:      []byte("Welcome to Host ONE\n\n\n\n\n\nall\n\n\nhost1.example.org\n\n\n\n"),
-                                       },
-                                       "/etc/rc.local": &safcm.File{
-                                               OrigGroup: "group",
-                                               Path:      "/etc/rc.local",
-                                               Mode:      0700,
-                                               Uid:       -1,
-                                               Gid:       -1,
-                                               Data:      []byte("#!/bin/sh\n"),
-                                               TriggerCommands: []string{
-                                                       "/etc/rc.local",
-                                               },
-                                       },
-                                       "/etc/resolv.conf": &safcm.File{
-                                               OrigGroup: "group",
-                                               Path:      "/etc/resolv.conf",
-                                               Mode:      0641,
-                                               User:      "user",
-                                               Uid:       -1,
-                                               Group:     "group",
-                                               Gid:       -1,
-                                               Data:      []byte("nameserver ::1\n"),
-                                               TriggerCommands: []string{
-                                                       "echo resolv.conf updated",
-                                               },
-                                       },
-                                       "/etc/test": &safcm.File{
-                                               OrigGroup: "group",
-                                               Path:      "/etc/test",
-                                               Mode:      os.ModeSymlink | 0777,
-                                               Uid:       -1,
-                                               Gid:       -1,
-                                               Data:      []byte("doesnt-exist"),
-                                       },
-                               },
-                               Packages: []string{
-                                       "unbound",
-                                       "unbound-anchor",
-                               },
-                               Services: []string{
-                                       "unbound",
-                               },
-                               Commands: []*safcm.Command{
-                                       {
-                                               OrigGroup: "group",
-                                               Cmd:       "echo command one",
-                                       },
-                                       {
-                                               OrigGroup: "group",
-                                               Cmd:       "echo -n command two",
-                                       },
-                               },
-                       },
-                       nil,
-                       nil,
-               },
-
                {
                        "conflict: file",
                        "project-conflict-file",
                        "host1.example.org",
                        nil,
-                       safcm.LogDebug3,
                        safcm.MsgSyncReq{},
                        []string{
-                               "host1.example.org: <nil> 3 host groups: all dns host1.example.org",
-                               "host1.example.org: <nil> 3 host group priorities (descending): host1.example.org",
+                               "3 false host groups: all dns host1.example.org",
+                               "3 false host group priorities (descending): host1.example.org",
                        },
                        fmt.Errorf("groups dns and all both provide \"/etc/resolv.conf\"\nUse 'group_priority' in config.yaml to declare preference"),
                },
@@ -276,11 +171,10 @@ func TestHostSyncReq(t *testing.T) {
                        []string{
                                "detected_other",
                        },
-                       safcm.LogDebug3,
                        safcm.MsgSyncReq{},
                        []string{
-                               "host2.example.org: <nil> 3 host groups: all detected_other host2.example.org other",
-                               "host2.example.org: <nil> 3 host group priorities (descending): host2.example.org",
+                               "3 false host groups: all detected_other host2.example.org other",
+                               "3 false host group priorities (descending): host2.example.org",
                        },
                        fmt.Errorf("groups other and all both provide \"/etc/resolv.conf\"\nUse 'group_priority' in config.yaml to declare preference"),
                },
@@ -290,11 +184,10 @@ func TestHostSyncReq(t *testing.T) {
                        "project-conflict-dir",
                        "host1.example.org",
                        nil,
-                       safcm.LogDebug3,
                        safcm.MsgSyncReq{},
                        []string{
-                               "host1.example.org: <nil> 3 host groups: all dns host1.example.org",
-                               "host1.example.org: <nil> 3 host group priorities (descending): host1.example.org",
+                               "3 false host groups: all dns host1.example.org",
+                               "3 false host group priorities (descending): host1.example.org",
                        },
                        fmt.Errorf("groups dns and all both provide \"/etc\"\nUse 'group_priority' in config.yaml to declare preference"),
                },
@@ -305,11 +198,10 @@ func TestHostSyncReq(t *testing.T) {
                        []string{
                                "detected_other",
                        },
-                       safcm.LogDebug3,
                        safcm.MsgSyncReq{},
                        []string{
-                               "host2.example.org: <nil> 3 host groups: all detected_other host2.example.org other",
-                               "host2.example.org: <nil> 3 host group priorities (descending): host2.example.org",
+                               "3 false host groups: all detected_other host2.example.org other",
+                               "3 false host group priorities (descending): host2.example.org",
                        },
                        fmt.Errorf("groups other and all both provide \"/etc\"\nUse 'group_priority' in config.yaml to declare preference"),
                },
@@ -319,7 +211,6 @@ func TestHostSyncReq(t *testing.T) {
                        "project-group-cycle",
                        "host1.example.org",
                        nil,
-                       safcm.LogDebug3,
                        safcm.MsgSyncReq{},
                        nil,
                        fmt.Errorf("groups.yaml: cycle while expanding group \"group-b\""),
@@ -330,7 +221,6 @@ func TestHostSyncReq(t *testing.T) {
                        "project-group_priority",
                        "host1.example.org",
                        nil,
-                       safcm.LogDebug3,
                        safcm.MsgSyncReq{
                                Groups: []string{"all", "group-b", "group-a", "host1.example.org"},
                                Files: map[string]*safcm.File{
@@ -421,10 +311,10 @@ func TestHostSyncReq(t *testing.T) {
                                },
                        },
                        []string{
-                               "host1.example.org: <nil> 3 host groups: all group-a group-b host1.example.org",
-                               "host1.example.org: <nil> 3 host group priorities (descending): host1.example.org group-a group-b all",
-                               `host1.example.org: <nil> 4 files: "/etc": group group-a overwrites triggers from group group-b`,
-                               `host1.example.org: <nil> 4 files: "/etc": group host1.example.org overwrites triggers from group group-a`,
+                               "3 false host groups: all group-a group-b host1.example.org",
+                               "3 false host group priorities (descending): host1.example.org group-a group-b all",
+                               `4 false files: "/etc": group group-a overwrites triggers from group group-b`,
+                               `4 false files: "/etc": group host1.example.org overwrites triggers from group group-a`,
                        },
                        nil,
                },
@@ -434,7 +324,6 @@ func TestHostSyncReq(t *testing.T) {
                        "project-group_priority-single",
                        "host1.example.org",
                        nil,
-                       safcm.LogDebug3,
                        safcm.MsgSyncReq{
                                Groups: []string{"all", "group-b", "group-a", "host1.example.org"},
                                Files: map[string]*safcm.File{
@@ -456,8 +345,8 @@ func TestHostSyncReq(t *testing.T) {
                                },
                        },
                        []string{
-                               "host1.example.org: <nil> 3 host groups: all group-a group-b host1.example.org",
-                               "host1.example.org: <nil> 3 host group priorities (descending): host1.example.org group-a",
+                               "3 false host groups: all group-a group-b host1.example.org",
+                               "3 false host group priorities (descending): host1.example.org group-a",
                        },
                        nil,
                },
@@ -483,43 +372,22 @@ func TestHostSyncReq(t *testing.T) {
                        if err != nil {
                                t.Fatal(err)
                        }
-                       cfg.LogLevel = tc.level
 
                        var events []string
-                       ch := make(chan Event)
-                       done := make(chan struct{})
-                       go func() {
-                               for {
-                                       x, ok := <-ch
-                                       if !ok {
-                                               break
-                                       }
-                                       if x.ConnEvent.Type != 0 {
-                                               panic("unexpected ConnEvent")
-                                       }
-                                       events = append(events,
-                                               fmt.Sprintf("%s: %v %d %s",
-                                                       x.Host.Name,
-                                                       x.Error, x.Log.Level,
-                                                       x.Log.Text))
-                               }
-                               done <- struct{}{}
-                       }()
-
                        s := &Sync{
                                host:      allHosts.Map[tc.host],
                                config:    cfg,
                                allHosts:  allHosts,
                                allGroups: allGroups,
-                               events:    ch,
+                               logFunc: func(level safcm.LogLevel, escaped bool, msg string) {
+                                       events = append(events,
+                                               fmt.Sprintf("%d %v %s", level, escaped, msg))
+                               },
                        }
 
                        res, err := s.hostSyncReq(tc.detected)
                        testutil.AssertEqual(t, "res", res, tc.exp)
                        testutil.AssertErrorEqual(t, "err", err, tc.expErr)
-
-                       close(ch)
-                       <-done
                        testutil.AssertEqual(t, "events",
                                events, tc.expEvents)
                })
index 80b1589072ace73792c3bd51723b02320d996c61..4744ad553cd45715bcb08313842ad2700d5af800 100644 (file)
@@ -24,6 +24,7 @@ import (
 
        "ruderich.org/simon/safcm"
        "ruderich.org/simon/safcm/cmd/safcm/config"
+       "ruderich.org/simon/safcm/frontend"
        "ruderich.org/simon/safcm/rpc"
        "ruderich.org/simon/safcm/testutil"
 )
@@ -202,7 +203,7 @@ func TestLogEvent(t *testing.T) {
 
        tests := []struct {
                name      string
-               event     Event
+               event     frontend.Event
                level     safcm.LogLevel
                isTTY     bool
                exp       string
@@ -211,7 +212,7 @@ func TestLogEvent(t *testing.T) {
 
                {
                        "Error",
-                       Event{
+                       frontend.Event{
                                Error: fmt.Errorf("fake error"),
                        },
                        safcm.LogDebug3,
@@ -221,7 +222,7 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "Error (tty)",
-                       Event{
+                       frontend.Event{
                                Error: fmt.Errorf("fake error"),
                        },
                        safcm.LogDebug3,
@@ -231,7 +232,7 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "Error: escape",
-                       Event{
+                       frontend.Event{
                                Error: fmt.Errorf("\x00"),
                        },
                        safcm.LogDebug3,
@@ -241,7 +242,7 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "Error: escape (tty)",
-                       Event{
+                       frontend.Event{
                                Error: fmt.Errorf("\x00"),
                        },
                        safcm.LogDebug3,
@@ -252,8 +253,8 @@ func TestLogEvent(t *testing.T) {
 
                {
                        "Log: info",
-                       Event{
-                               Log: Log{
+                       frontend.Event{
+                               Log: frontend.Log{
                                        Level: safcm.LogInfo,
                                        Text:  "info log",
                                },
@@ -265,8 +266,8 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "Log: info (tty)",
-                       Event{
-                               Log: Log{
+                       frontend.Event{
+                               Log: frontend.Log{
                                        Level: safcm.LogInfo,
                                        Text:  "info log",
                                },
@@ -278,8 +279,8 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "Log: verbose",
-                       Event{
-                               Log: Log{
+                       frontend.Event{
+                               Log: frontend.Log{
                                        Level: safcm.LogVerbose,
                                        Text:  "verbose log",
                                },
@@ -291,8 +292,8 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "Log: debug",
-                       Event{
-                               Log: Log{
+                       frontend.Event{
+                               Log: frontend.Log{
                                        Level: safcm.LogDebug,
                                        Text:  "debug log",
                                },
@@ -304,8 +305,8 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "Log: debug2",
-                       Event{
-                               Log: Log{
+                       frontend.Event{
+                               Log: frontend.Log{
                                        Level: safcm.LogDebug2,
                                        Text:  "debug2 log",
                                },
@@ -317,8 +318,8 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "Log: debug3",
-                       Event{
-                               Log: Log{
+                       frontend.Event{
+                               Log: frontend.Log{
                                        Level: safcm.LogDebug3,
                                        Text:  "debug3 log",
                                },
@@ -331,8 +332,8 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "Log: debug3 (tty)",
-                       Event{
-                               Log: Log{
+                       frontend.Event{
+                               Log: frontend.Log{
                                        Level: safcm.LogDebug3,
                                        Text:  "debug3 log",
                                },
@@ -345,8 +346,8 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "Log: escape",
-                       Event{
-                               Log: Log{
+                       frontend.Event{
+                               Log: frontend.Log{
                                        Level: safcm.LogInfo,
                                        Text:  "\x00",
                                },
@@ -358,8 +359,8 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "Log: escape (tty)",
-                       Event{
-                               Log: Log{
+                       frontend.Event{
+                               Log: frontend.Log{
                                        Level: safcm.LogInfo,
                                        Text:  "\x00",
                                },
@@ -372,7 +373,7 @@ func TestLogEvent(t *testing.T) {
 
                {
                        "ConnEvent: stderr",
-                       Event{
+                       frontend.Event{
                                ConnEvent: rpc.ConnEvent{
                                        Type: rpc.ConnEventStderr,
                                        Data: "fake stderr",
@@ -385,7 +386,7 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "ConnEvent: stderr (tty)",
-                       Event{
+                       frontend.Event{
                                ConnEvent: rpc.ConnEvent{
                                        Type: rpc.ConnEventStderr,
                                        Data: "fake stderr",
@@ -398,7 +399,7 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "ConnEvent: debug",
-                       Event{
+                       frontend.Event{
                                ConnEvent: rpc.ConnEvent{
                                        Type: rpc.ConnEventDebug,
                                        Data: "conn debug",
@@ -411,7 +412,7 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "ConnEvent: upload",
-                       Event{
+                       frontend.Event{
                                ConnEvent: rpc.ConnEvent{
                                        Type: rpc.ConnEventUpload,
                                },
@@ -423,7 +424,7 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "ConnEvent: upload (ignored)",
-                       Event{
+                       frontend.Event{
                                ConnEvent: rpc.ConnEvent{
                                        Type: rpc.ConnEventUpload,
                                },
@@ -435,7 +436,7 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "ConnEvent: invalid",
-                       Event{
+                       frontend.Event{
                                ConnEvent: rpc.ConnEvent{
                                        Type: 42,
                                        Data: "invalid",
@@ -448,7 +449,7 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "ConnEvent: invalid (tty)",
-                       Event{
+                       frontend.Event{
                                ConnEvent: rpc.ConnEvent{
                                        Type: 42,
                                        Data: "invalid",
@@ -461,7 +462,7 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "ConnEvent: escape",
-                       Event{
+                       frontend.Event{
                                ConnEvent: rpc.ConnEvent{
                                        Type: rpc.ConnEventStderr,
                                        Data: "\x00",
@@ -474,7 +475,7 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "ConnEvent: escape (tty)",
-                       Event{
+                       frontend.Event{
                                ConnEvent: rpc.ConnEvent{
                                        Type: rpc.ConnEventDebug,
                                        Data: "\x01",
@@ -488,8 +489,8 @@ func TestLogEvent(t *testing.T) {
 
                {
                        "Escaped",
-                       Event{
-                               Log: Log{
+                       frontend.Event{
+                               Log: frontend.Log{
                                        Level: safcm.LogInfo,
                                        Text:  "\x00",
                                },
@@ -502,8 +503,8 @@ func TestLogEvent(t *testing.T) {
                },
                {
                        "Escaped (tty)",
-                       Event{
-                               Log: Log{
+                       frontend.Event{
+                               Log: frontend.Log{
                                        Level: safcm.LogInfo,
                                        Text:  "\x00",
                                },
@@ -517,7 +518,7 @@ func TestLogEvent(t *testing.T) {
 
                {
                        "empty (invalid)",
-                       Event{},
+                       frontend.Event{},
                        safcm.LogDebug3,
                        false,
                        "[INVALID=0] [fake-host] \n",
@@ -527,8 +528,10 @@ func TestLogEvent(t *testing.T) {
 
        for _, tc := range tests {
                t.Run(tc.name, func(t *testing.T) {
-                       tc.event.Host = &config.Host{
-                               Name: "fake-host",
+                       tc.event.Host = &Sync{
+                               host: &config.Host{
+                                       Name: "fake-host",
+                               },
                        }
 
                        var buf bytes.Buffer
diff --git a/frontend/log.go b/frontend/log.go
new file mode 100644 (file)
index 0000000..33fa1d0
--- /dev/null
@@ -0,0 +1,52 @@
+// Frontend: Logging functions for programs using the safcm library
+
+// Copyright (C) 2021  Simon Ruderich
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+package frontend
+
+import (
+       "ruderich.org/simon/safcm"
+       "ruderich.org/simon/safcm/rpc"
+)
+
+type Event struct {
+       Host Host
+
+       // Only one of Error, Log and ConnEvent is set in a single event
+       Error     error
+       Log       Log
+       ConnEvent rpc.ConnEvent
+
+       Escaped bool // true if untrusted input is already escaped
+}
+
+type Log struct {
+       Level safcm.LogLevel
+       Text  string
+}
+
+func (l *Loop) Log(host Host, level safcm.LogLevel, escaped bool,
+       msg string) {
+
+       l.events <- Event{
+               Host: host,
+               Log: Log{
+                       Level: level,
+                       Text:  msg,
+               },
+               Escaped: escaped,
+       }
+}
diff --git a/frontend/loop.go b/frontend/loop.go
new file mode 100644 (file)
index 0000000..5e232a3
--- /dev/null
@@ -0,0 +1,172 @@
+// Frontend: Host synchronization and logging event loop for programs using
+// the safcm library
+
+// Copyright (C) 2021  Simon Ruderich
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+package frontend
+
+import (
+       "log"
+       "os"
+       "os/signal"
+       "sort"
+       "strings"
+       "sync"
+
+       "ruderich.org/simon/safcm"
+       "ruderich.org/simon/safcm/rpc"
+)
+
+type Host interface {
+       Name() string
+       Dial(*rpc.Conn) error
+}
+
+type Loop struct {
+       DebugConn bool
+
+       LogEventFunc func(e Event, failed *bool)
+       SyncHostFunc func(*rpc.Conn, Host) error
+
+       events chan Event
+}
+
+func (l *Loop) Run(hosts []Host) bool {
+       done := make(chan bool)
+       // Collect events from all hosts and print them
+       events := make(chan Event)
+       go func() {
+               var failed bool
+               for {
+                       x := <-events
+                       if x.Host == nil {
+                               break
+                       }
+                       l.LogEventFunc(x, &failed)
+               }
+               done <- failed
+       }()
+
+       hostsLeft := make(map[string]bool)
+       for _, x := range hosts {
+               hostsLeft[x.Name()] = true
+       }
+       var hostsLeftMutex sync.Mutex // protects hostsLeft
+
+       // Show unfinished hosts on Ctrl-C
+       sigint := make(chan os.Signal, 1)   // buffered for Notify()
+       signal.Notify(sigint, os.Interrupt) // = SIGINT = Ctrl-C
+       go func() {
+               // Running `ssh` processes get killed by SIGINT which is sent
+               // to all processes
+
+               <-sigint
+               log.Print("Received SIGINT, aborting ...")
+
+               // Print all queued events
+               events <- Event{} // poison pill
+               <-done
+               // "races" with <-done in the main function and will hang here
+               // if the other is faster. This is fine because then all hosts
+               // were synced successfully.
+
+               hostsLeftMutex.Lock()
+               var hosts []string
+               for x := range hostsLeft {
+                       hosts = append(hosts, x)
+               }
+               sort.Strings(hosts)
+               log.Fatalf("Failed to sync %s", strings.Join(hosts, ", "))
+       }()
+
+       l.events = events // used by l.syncHost() and l.Log()
+
+       // Sync all hosts concurrently
+       var wg sync.WaitGroup
+       for _, x := range hosts {
+               x := x
+
+               // Once in sync.Host() and once in the go func below
+               wg.Add(2)
+
+               go func() {
+                       err := l.syncHost(&wg, x)
+                       if err != nil {
+                               events <- Event{
+                                       Host:  x,
+                                       Error: err,
+                               }
+                       }
+                       wg.Done()
+
+                       hostsLeftMutex.Lock()
+                       defer hostsLeftMutex.Unlock()
+                       delete(hostsLeft, x.Name())
+               }()
+       }
+
+       wg.Wait()
+       events <- Event{} // poison pill
+       failed := <-done
+
+       return !failed
+}
+
+func (l *Loop) syncHost(wg *sync.WaitGroup, host Host) error {
+       conn := rpc.NewConn(l.DebugConn)
+       // Pass all connection events to main loop
+       go func() {
+               for {
+                       x, ok := <-conn.Events
+                       if !ok {
+                               break
+                       }
+                       l.events <- Event{
+                               Host:      host,
+                               ConnEvent: x,
+                       }
+               }
+               wg.Done()
+       }()
+
+       err := host.Dial(conn)
+       if err != nil {
+               conn.Kill()
+               return err
+       }
+       defer conn.Kill()
+
+       err = l.SyncHostFunc(conn, host)
+       if err != nil {
+               return err
+       }
+
+       // Terminate connection to remote host
+       err = conn.Send(safcm.MsgQuitReq{})
+       if err != nil {
+               return err
+       }
+       _, err = conn.Recv()
+       if err != nil {
+               return err
+       }
+       err = conn.Wait()
+       if err != nil {
+               return err
+       }
+
+       return nil
+}