// Frontend: Host synchronization and logging event loop for programs using // the safcm library // SPDX-License-Identifier: GPL-3.0-or-later // Copyright (C) 2021-2024 Simon Ruderich package frontend import ( "log" "os" "os/signal" "sort" "strings" "sync" "ruderich.org/simon/safcm" "ruderich.org/simon/safcm/rpc" ) type Host interface { Name() string Dial(*rpc.Conn) error } type Loop struct { DebugConn bool LogEventFunc func(e Event, failed *bool) SyncHostFunc func(*rpc.Conn, Host) error events chan Event } func (l *Loop) Run(hosts []Host) bool { done := make(chan bool) // Collect events from all hosts and print them events := make(chan Event) go func() { var failed bool for { x := <-events if x.Host == nil { break } l.LogEventFunc(x, &failed) } done <- failed }() hostsLeft := make(map[string]bool) for _, x := range hosts { hostsLeft[x.Name()] = true } var hostsLeftMutex sync.Mutex // protects hostsLeft // Show unfinished hosts on Ctrl-C sigint := make(chan os.Signal, 1) // buffered for Notify() signal.Notify(sigint, os.Interrupt) // = SIGINT = Ctrl-C go func() { // Running `ssh` processes get killed by SIGINT which is sent // to all processes <-sigint log.Print("Received SIGINT, aborting ...") // Print all queued events events <- Event{} // poison pill <-done // "races" with <-done in the main function and will hang here // if the other is faster. This is fine because then all hosts // were synced successfully. hostsLeftMutex.Lock() var hosts []string for x := range hostsLeft { hosts = append(hosts, x) } sort.Strings(hosts) log.Fatalf("Failed to sync %s", strings.Join(hosts, ", ")) }() l.events = events // used by l.syncHost() and l.Log() // Sync all hosts concurrently var wg sync.WaitGroup for _, x := range hosts { x := x // Once in sync.Host() and once in the go func below wg.Add(2) go func() { err := l.syncHost(&wg, x) if err != nil { events <- Event{ Host: x, Error: err, } } wg.Done() hostsLeftMutex.Lock() defer hostsLeftMutex.Unlock() delete(hostsLeft, x.Name()) }() } wg.Wait() events <- Event{} // poison pill failed := <-done return !failed } func (l *Loop) syncHost(wg *sync.WaitGroup, host Host) error { conn := rpc.NewConn(l.DebugConn) // Pass all connection events to main loop go func() { for { x, ok := <-conn.Events if !ok { break } l.events <- Event{ Host: host, ConnEvent: x, } } wg.Done() }() err := host.Dial(conn) if err != nil { conn.Kill() //nolint:errcheck return err } defer conn.Kill() //nolint:errcheck err = l.SyncHostFunc(conn, host) if err != nil { return err } // Terminate connection to remote host err = conn.Send(safcm.MsgQuitReq{}) if err != nil { return err } _, err = conn.Recv() if err != nil { return err } err = conn.Wait() if err != nil { return err } return nil }