]> ruderich.org/simon Gitweb - safcm/safcm.git/blob - rpc/conn.go
03b748545f12c1c588acb24bac2761e0b439b25c
[safcm/safcm.git] / rpc / conn.go
1 // Simple RPC-like protocol: implementation of connection and basic actions
2
3 // Copyright (C) 2021  Simon Ruderich
4 //
5 // This program is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // This program is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License
16 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 package rpc
19
20 import (
21         "bufio"
22         "fmt"
23         "os/exec"
24         "strings"
25         "sync"
26
27         "ruderich.org/simon/safcm"
28 )
29
30 type Conn struct {
31         Events   <-chan ConnEvent
32         events   chan<- ConnEvent // same as Events, to publish events
33         eventsWg sync.WaitGroup
34
35         debug     bool
36         sshRemote string
37         sshOpts   []string
38
39         cmd  *exec.Cmd
40         conn *safcm.GobConn
41 }
42
43 type ConnEventType int
44
45 const (
46         _               ConnEventType = iota
47         ConnEventStderr               // stderr from spawned process
48         ConnEventDebug                // debug message
49         ConnEventUpload               // remote helper upload in progress
50 )
51
52 type ConnEvent struct {
53         Type ConnEventType
54         Data string
55 }
56
57 // NewConn creates a new connection. Events in the returned struct must be
58 // regularly read or the connection will hang. This must be done before
59 // DialSSH is called to open a connection.
60 func NewConn(debug bool) *Conn {
61         ch := make(chan ConnEvent)
62         return &Conn{
63                 Events: ch,
64                 events: ch,
65                 debug:  debug,
66         }
67 }
68
69 func (c *Conn) debugf(format string, a ...interface{}) {
70         if !c.debug {
71                 return
72         }
73         c.events <- ConnEvent{
74                 Type: ConnEventDebug,
75                 Data: fmt.Sprintf(format, a...),
76         }
77 }
78
79 // Wrap safcm.GobConn's Send() and Recv() to provide debug output.
80
81 // Send sends a single message to the remote.
82 func (c *Conn) Send(m safcm.Msg) error {
83         // No checks for invalid Conn, a stacktrace is more helpful
84
85         c.debugf("Send: sending %#v", m)
86         return c.conn.Send(m)
87 }
88
89 // Recv waits for a single message from the remote.
90 func (c *Conn) Recv() (safcm.Msg, error) {
91         // No checks for invalid Conn, a stacktrace is more helpful
92
93         c.debugf("Recv: waiting for message")
94         m, err := c.conn.Recv()
95         c.debugf("Recv: received msg=%#v err=%#v", m, err)
96         return m, err
97 }
98
99 // Wait waits for the connection to terminate. It's safe to call Wait (and
100 // Kill) multiple times.
101 func (c *Conn) Wait() error {
102         // But check here because Wait() can be called multiple times
103         if c.cmd == nil {
104                 return fmt.Errorf("Dial*() not called or already terminated")
105         }
106
107         c.debugf("Wait: waiting for connection to terminate")
108         return c.wait()
109 }
110 func (c *Conn) wait() error {
111         err := c.cmd.Wait()
112         c.cmd = nil
113
114         // Wait until we've received all events from the program's stderr.
115         c.eventsWg.Wait()
116         // Notify consumers that no more events will occur.
117         close(c.events)
118         // We cannot reuse this channel.
119         c.events = nil
120         // Don't set c.Events to nil because this creates a data race when
121         // another thread is still waiting.
122
123         return err
124 }
125
126 // Kill forcefully terminates the connection. It's safe to call Kill (and
127 // Wait) multiple times. Calling it before Dial*() was called will only close
128 // the Events channel.
129 func (c *Conn) Kill() error {
130         if c.cmd == nil {
131                 if c.events != nil {
132                         close(c.events)
133                         c.events = nil
134                 }
135                 return fmt.Errorf("Dial*() not called or already terminated")
136         }
137
138         c.debugf("Kill: killing connection")
139
140         c.cmd.Process.Kill()
141         return c.wait()
142 }
143
144 func (c *Conn) handleStderrAsEvents(cmd *exec.Cmd) error {
145         // cmd may differ from c.cmd here!
146         stderr, err := cmd.StderrPipe()
147         if err != nil {
148                 return err
149         }
150
151         c.eventsWg.Add(1)
152         go func() {
153                 r := bufio.NewReader(stderr)
154                 for {
155                         x, err := r.ReadString('\n')
156                         if err != nil {
157                                 break
158                         }
159                         x = strings.TrimRight(x, "\n")
160
161                         c.events <- ConnEvent{
162                                 Type: ConnEventStderr,
163                                 Data: x,
164                         }
165                 }
166                 c.eventsWg.Done()
167         }()
168
169         return nil
170 }