X-Git-Url: https://ruderich.org/simon/gitweb/?a=blobdiff_plain;f=frontend%2Floop.go;fp=frontend%2Floop.go;h=5e232a3aaad6165d4d7305fb1b116056abca5ef9;hb=c899e17495d4eb932e0b4f428ec91882d845f1bc;hp=0000000000000000000000000000000000000000;hpb=9269fa3c94e700afc0be823f58ea473a2db8f3dc;p=safcm%2Fsafcm.git diff --git a/frontend/loop.go b/frontend/loop.go new file mode 100644 index 0000000..5e232a3 --- /dev/null +++ b/frontend/loop.go @@ -0,0 +1,172 @@ +// Frontend: Host synchronization and logging event loop for programs using +// the safcm library + +// Copyright (C) 2021 Simon Ruderich +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package frontend + +import ( + "log" + "os" + "os/signal" + "sort" + "strings" + "sync" + + "ruderich.org/simon/safcm" + "ruderich.org/simon/safcm/rpc" +) + +type Host interface { + Name() string + Dial(*rpc.Conn) error +} + +type Loop struct { + DebugConn bool + + LogEventFunc func(e Event, failed *bool) + SyncHostFunc func(*rpc.Conn, Host) error + + events chan Event +} + +func (l *Loop) Run(hosts []Host) bool { + done := make(chan bool) + // Collect events from all hosts and print them + events := make(chan Event) + go func() { + var failed bool + for { + x := <-events + if x.Host == nil { + break + } + l.LogEventFunc(x, &failed) + } + done <- failed + }() + + hostsLeft := make(map[string]bool) + for _, x := range hosts { + hostsLeft[x.Name()] = true + } + var hostsLeftMutex sync.Mutex // protects hostsLeft + + // Show unfinished hosts on Ctrl-C + sigint := make(chan os.Signal, 1) // buffered for Notify() + signal.Notify(sigint, os.Interrupt) // = SIGINT = Ctrl-C + go func() { + // Running `ssh` processes get killed by SIGINT which is sent + // to all processes + + <-sigint + log.Print("Received SIGINT, aborting ...") + + // Print all queued events + events <- Event{} // poison pill + <-done + // "races" with <-done in the main function and will hang here + // if the other is faster. This is fine because then all hosts + // were synced successfully. + + hostsLeftMutex.Lock() + var hosts []string + for x := range hostsLeft { + hosts = append(hosts, x) + } + sort.Strings(hosts) + log.Fatalf("Failed to sync %s", strings.Join(hosts, ", ")) + }() + + l.events = events // used by l.syncHost() and l.Log() + + // Sync all hosts concurrently + var wg sync.WaitGroup + for _, x := range hosts { + x := x + + // Once in sync.Host() and once in the go func below + wg.Add(2) + + go func() { + err := l.syncHost(&wg, x) + if err != nil { + events <- Event{ + Host: x, + Error: err, + } + } + wg.Done() + + hostsLeftMutex.Lock() + defer hostsLeftMutex.Unlock() + delete(hostsLeft, x.Name()) + }() + } + + wg.Wait() + events <- Event{} // poison pill + failed := <-done + + return !failed +} + +func (l *Loop) syncHost(wg *sync.WaitGroup, host Host) error { + conn := rpc.NewConn(l.DebugConn) + // Pass all connection events to main loop + go func() { + for { + x, ok := <-conn.Events + if !ok { + break + } + l.events <- Event{ + Host: host, + ConnEvent: x, + } + } + wg.Done() + }() + + err := host.Dial(conn) + if err != nil { + conn.Kill() + return err + } + defer conn.Kill() + + err = l.SyncHostFunc(conn, host) + if err != nil { + return err + } + + // Terminate connection to remote host + err = conn.Send(safcm.MsgQuitReq{}) + if err != nil { + return err + } + _, err = conn.Recv() + if err != nil { + return err + } + err = conn.Wait() + if err != nil { + return err + } + + return nil +}