]> ruderich.org/simon Gitweb - safcm/safcm.git/blobdiff - frontend/loop.go
Move synchronization loop into new package frontend
[safcm/safcm.git] / frontend / loop.go
diff --git a/frontend/loop.go b/frontend/loop.go
new file mode 100644 (file)
index 0000000..5e232a3
--- /dev/null
@@ -0,0 +1,172 @@
+// Frontend: Host synchronization and logging event loop for programs using
+// the safcm library
+
+// Copyright (C) 2021  Simon Ruderich
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+package frontend
+
+import (
+       "log"
+       "os"
+       "os/signal"
+       "sort"
+       "strings"
+       "sync"
+
+       "ruderich.org/simon/safcm"
+       "ruderich.org/simon/safcm/rpc"
+)
+
+type Host interface {
+       Name() string
+       Dial(*rpc.Conn) error
+}
+
+type Loop struct {
+       DebugConn bool
+
+       LogEventFunc func(e Event, failed *bool)
+       SyncHostFunc func(*rpc.Conn, Host) error
+
+       events chan Event
+}
+
+func (l *Loop) Run(hosts []Host) bool {
+       done := make(chan bool)
+       // Collect events from all hosts and print them
+       events := make(chan Event)
+       go func() {
+               var failed bool
+               for {
+                       x := <-events
+                       if x.Host == nil {
+                               break
+                       }
+                       l.LogEventFunc(x, &failed)
+               }
+               done <- failed
+       }()
+
+       hostsLeft := make(map[string]bool)
+       for _, x := range hosts {
+               hostsLeft[x.Name()] = true
+       }
+       var hostsLeftMutex sync.Mutex // protects hostsLeft
+
+       // Show unfinished hosts on Ctrl-C
+       sigint := make(chan os.Signal, 1)   // buffered for Notify()
+       signal.Notify(sigint, os.Interrupt) // = SIGINT = Ctrl-C
+       go func() {
+               // Running `ssh` processes get killed by SIGINT which is sent
+               // to all processes
+
+               <-sigint
+               log.Print("Received SIGINT, aborting ...")
+
+               // Print all queued events
+               events <- Event{} // poison pill
+               <-done
+               // "races" with <-done in the main function and will hang here
+               // if the other is faster. This is fine because then all hosts
+               // were synced successfully.
+
+               hostsLeftMutex.Lock()
+               var hosts []string
+               for x := range hostsLeft {
+                       hosts = append(hosts, x)
+               }
+               sort.Strings(hosts)
+               log.Fatalf("Failed to sync %s", strings.Join(hosts, ", "))
+       }()
+
+       l.events = events // used by l.syncHost() and l.Log()
+
+       // Sync all hosts concurrently
+       var wg sync.WaitGroup
+       for _, x := range hosts {
+               x := x
+
+               // Once in sync.Host() and once in the go func below
+               wg.Add(2)
+
+               go func() {
+                       err := l.syncHost(&wg, x)
+                       if err != nil {
+                               events <- Event{
+                                       Host:  x,
+                                       Error: err,
+                               }
+                       }
+                       wg.Done()
+
+                       hostsLeftMutex.Lock()
+                       defer hostsLeftMutex.Unlock()
+                       delete(hostsLeft, x.Name())
+               }()
+       }
+
+       wg.Wait()
+       events <- Event{} // poison pill
+       failed := <-done
+
+       return !failed
+}
+
+func (l *Loop) syncHost(wg *sync.WaitGroup, host Host) error {
+       conn := rpc.NewConn(l.DebugConn)
+       // Pass all connection events to main loop
+       go func() {
+               for {
+                       x, ok := <-conn.Events
+                       if !ok {
+                               break
+                       }
+                       l.events <- Event{
+                               Host:      host,
+                               ConnEvent: x,
+                       }
+               }
+               wg.Done()
+       }()
+
+       err := host.Dial(conn)
+       if err != nil {
+               conn.Kill()
+               return err
+       }
+       defer conn.Kill()
+
+       err = l.SyncHostFunc(conn, host)
+       if err != nil {
+               return err
+       }
+
+       // Terminate connection to remote host
+       err = conn.Send(safcm.MsgQuitReq{})
+       if err != nil {
+               return err
+       }
+       _, err = conn.Recv()
+       if err != nil {
+               return err
+       }
+       err = conn.Wait()
+       if err != nil {
+               return err
+       }
+
+       return nil
+}