]> ruderich.org/simon Gitweb - safcm/safcm.git/blob - frontend/loop.go
Use SPDX license identifiers
[safcm/safcm.git] / frontend / loop.go
1 // Frontend: Host synchronization and logging event loop for programs using
2 // the safcm library
3
4 // SPDX-License-Identifier: GPL-3.0-or-later
5 // Copyright (C) 2021-2024  Simon Ruderich
6
7 package frontend
8
9 import (
10         "log"
11         "os"
12         "os/signal"
13         "sort"
14         "strings"
15         "sync"
16
17         "ruderich.org/simon/safcm"
18         "ruderich.org/simon/safcm/rpc"
19 )
20
21 type Host interface {
22         Name() string
23         Dial(*rpc.Conn) error
24 }
25
26 type Loop struct {
27         DebugConn bool
28
29         LogEventFunc func(e Event, failed *bool)
30         SyncHostFunc func(*rpc.Conn, Host) error
31
32         events chan Event
33 }
34
35 func (l *Loop) Run(hosts []Host) bool {
36         done := make(chan bool)
37         // Collect events from all hosts and print them
38         events := make(chan Event)
39         go func() {
40                 var failed bool
41                 for {
42                         x := <-events
43                         if x.Host == nil {
44                                 break
45                         }
46                         l.LogEventFunc(x, &failed)
47                 }
48                 done <- failed
49         }()
50
51         hostsLeft := make(map[string]bool)
52         for _, x := range hosts {
53                 hostsLeft[x.Name()] = true
54         }
55         var hostsLeftMutex sync.Mutex // protects hostsLeft
56
57         // Show unfinished hosts on Ctrl-C
58         sigint := make(chan os.Signal, 1)   // buffered for Notify()
59         signal.Notify(sigint, os.Interrupt) // = SIGINT = Ctrl-C
60         go func() {
61                 // Running `ssh` processes get killed by SIGINT which is sent
62                 // to all processes
63
64                 <-sigint
65                 log.Print("Received SIGINT, aborting ...")
66
67                 // Print all queued events
68                 events <- Event{} // poison pill
69                 <-done
70                 // "races" with <-done in the main function and will hang here
71                 // if the other is faster. This is fine because then all hosts
72                 // were synced successfully.
73
74                 hostsLeftMutex.Lock()
75                 var hosts []string
76                 for x := range hostsLeft {
77                         hosts = append(hosts, x)
78                 }
79                 sort.Strings(hosts)
80                 log.Fatalf("Failed to sync %s", strings.Join(hosts, ", "))
81         }()
82
83         l.events = events // used by l.syncHost() and l.Log()
84
85         // Sync all hosts concurrently
86         var wg sync.WaitGroup
87         for _, x := range hosts {
88                 x := x
89
90                 // Once in sync.Host() and once in the go func below
91                 wg.Add(2)
92
93                 go func() {
94                         err := l.syncHost(&wg, x)
95                         if err != nil {
96                                 events <- Event{
97                                         Host:  x,
98                                         Error: err,
99                                 }
100                         }
101                         wg.Done()
102
103                         hostsLeftMutex.Lock()
104                         defer hostsLeftMutex.Unlock()
105                         delete(hostsLeft, x.Name())
106                 }()
107         }
108
109         wg.Wait()
110         events <- Event{} // poison pill
111         failed := <-done
112
113         return !failed
114 }
115
116 func (l *Loop) syncHost(wg *sync.WaitGroup, host Host) error {
117         conn := rpc.NewConn(l.DebugConn)
118         // Pass all connection events to main loop
119         go func() {
120                 for {
121                         x, ok := <-conn.Events
122                         if !ok {
123                                 break
124                         }
125                         l.events <- Event{
126                                 Host:      host,
127                                 ConnEvent: x,
128                         }
129                 }
130                 wg.Done()
131         }()
132
133         err := host.Dial(conn)
134         if err != nil {
135                 conn.Kill() //nolint:errcheck
136                 return err
137         }
138         defer conn.Kill() //nolint:errcheck
139
140         err = l.SyncHostFunc(conn, host)
141         if err != nil {
142                 return err
143         }
144
145         // Terminate connection to remote host
146         err = conn.Send(safcm.MsgQuitReq{})
147         if err != nil {
148                 return err
149         }
150         _, err = conn.Recv()
151         if err != nil {
152                 return err
153         }
154         err = conn.Wait()
155         if err != nil {
156                 return err
157         }
158
159         return nil
160 }