]> ruderich.org/simon Gitweb - safcm/safcm.git/blob - frontend/loop.go
safcm: add commit date to version output
[safcm/safcm.git] / frontend / loop.go
1 // Frontend: Host synchronization and logging event loop for programs using
2 // the safcm library
3
4 // Copyright (C) 2021  Simon Ruderich
5 //
6 // This program is free software: you can redistribute it and/or modify
7 // it under the terms of the GNU General Public License as published by
8 // the Free Software Foundation, either version 3 of the License, or
9 // (at your option) any later version.
10 //
11 // This program is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 // GNU General Public License for more details.
15 //
16 // You should have received a copy of the GNU General Public License
17 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19 package frontend
20
21 import (
22         "log"
23         "os"
24         "os/signal"
25         "sort"
26         "strings"
27         "sync"
28
29         "ruderich.org/simon/safcm"
30         "ruderich.org/simon/safcm/rpc"
31 )
32
33 type Host interface {
34         Name() string
35         Dial(*rpc.Conn) error
36 }
37
38 type Loop struct {
39         DebugConn bool
40
41         LogEventFunc func(e Event, failed *bool)
42         SyncHostFunc func(*rpc.Conn, Host) error
43
44         events chan Event
45 }
46
47 func (l *Loop) Run(hosts []Host) bool {
48         done := make(chan bool)
49         // Collect events from all hosts and print them
50         events := make(chan Event)
51         go func() {
52                 var failed bool
53                 for {
54                         x := <-events
55                         if x.Host == nil {
56                                 break
57                         }
58                         l.LogEventFunc(x, &failed)
59                 }
60                 done <- failed
61         }()
62
63         hostsLeft := make(map[string]bool)
64         for _, x := range hosts {
65                 hostsLeft[x.Name()] = true
66         }
67         var hostsLeftMutex sync.Mutex // protects hostsLeft
68
69         // Show unfinished hosts on Ctrl-C
70         sigint := make(chan os.Signal, 1)   // buffered for Notify()
71         signal.Notify(sigint, os.Interrupt) // = SIGINT = Ctrl-C
72         go func() {
73                 // Running `ssh` processes get killed by SIGINT which is sent
74                 // to all processes
75
76                 <-sigint
77                 log.Print("Received SIGINT, aborting ...")
78
79                 // Print all queued events
80                 events <- Event{} // poison pill
81                 <-done
82                 // "races" with <-done in the main function and will hang here
83                 // if the other is faster. This is fine because then all hosts
84                 // were synced successfully.
85
86                 hostsLeftMutex.Lock()
87                 var hosts []string
88                 for x := range hostsLeft {
89                         hosts = append(hosts, x)
90                 }
91                 sort.Strings(hosts)
92                 log.Fatalf("Failed to sync %s", strings.Join(hosts, ", "))
93         }()
94
95         l.events = events // used by l.syncHost() and l.Log()
96
97         // Sync all hosts concurrently
98         var wg sync.WaitGroup
99         for _, x := range hosts {
100                 x := x
101
102                 // Once in sync.Host() and once in the go func below
103                 wg.Add(2)
104
105                 go func() {
106                         err := l.syncHost(&wg, x)
107                         if err != nil {
108                                 events <- Event{
109                                         Host:  x,
110                                         Error: err,
111                                 }
112                         }
113                         wg.Done()
114
115                         hostsLeftMutex.Lock()
116                         defer hostsLeftMutex.Unlock()
117                         delete(hostsLeft, x.Name())
118                 }()
119         }
120
121         wg.Wait()
122         events <- Event{} // poison pill
123         failed := <-done
124
125         return !failed
126 }
127
128 func (l *Loop) syncHost(wg *sync.WaitGroup, host Host) error {
129         conn := rpc.NewConn(l.DebugConn)
130         // Pass all connection events to main loop
131         go func() {
132                 for {
133                         x, ok := <-conn.Events
134                         if !ok {
135                                 break
136                         }
137                         l.events <- Event{
138                                 Host:      host,
139                                 ConnEvent: x,
140                         }
141                 }
142                 wg.Done()
143         }()
144
145         err := host.Dial(conn)
146         if err != nil {
147                 conn.Kill()
148                 return err
149         }
150         defer conn.Kill()
151
152         err = l.SyncHostFunc(conn, host)
153         if err != nil {
154                 return err
155         }
156
157         // Terminate connection to remote host
158         err = conn.Send(safcm.MsgQuitReq{})
159         if err != nil {
160                 return err
161         }
162         _, err = conn.Recv()
163         if err != nil {
164                 return err
165         }
166         err = conn.Wait()
167         if err != nil {
168                 return err
169         }
170
171         return nil
172 }