]> ruderich.org/simon Gitweb - safcm/safcm.git/blob - rpc/conn.go
8b63477fc624d82119f1459ce0e509de03a76f23
[safcm/safcm.git] / rpc / conn.go
1 // Simple RPC-like protocol: implementation of connection and basic actions
2
3 // Copyright (C) 2021-2024  Simon Ruderich
4 //
5 // This program is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // This program is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License
16 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 package rpc
19
20 import (
21         "bufio"
22         "fmt"
23         "io/fs"
24         "os/exec"
25         "strings"
26         "sync"
27
28         "ruderich.org/simon/safcm"
29 )
30
31 type Conn struct {
32         Events   <-chan ConnEvent
33         events   chan<- ConnEvent // same as Events, to publish events
34         eventsWg sync.WaitGroup
35
36         debug     bool
37         sshRemote string
38         sshOpts   []string
39
40         remoteHelpers fs.FS
41
42         cmd  *exec.Cmd
43         conn *safcm.GobConn
44 }
45
46 type ConnEventType int
47
48 const (
49         _               ConnEventType = iota
50         ConnEventStderr               // stderr from spawned process
51         ConnEventDebug                // debug message
52         ConnEventUpload               // remote helper upload in progress
53 )
54
55 type ConnEvent struct {
56         Type ConnEventType
57         Data string
58 }
59
60 // NewConn creates a new connection. Events in the returned struct must be
61 // regularly read or the connection will hang. This must be done before
62 // DialSSH is called to open a connection.
63 func NewConn(debug bool) *Conn {
64         ch := make(chan ConnEvent)
65         return &Conn{
66                 Events: ch,
67                 events: ch,
68                 debug:  debug,
69         }
70 }
71
72 func (c *Conn) debugf(format string, a ...interface{}) {
73         if !c.debug {
74                 return
75         }
76         c.events <- ConnEvent{
77                 Type: ConnEventDebug,
78                 Data: fmt.Sprintf(format, a...),
79         }
80 }
81
82 // Wrap safcm.GobConn's Send() and Recv() to provide debug output.
83
84 // Send sends a single message to the remote.
85 func (c *Conn) Send(m safcm.Msg) error {
86         // No checks for invalid Conn, a stacktrace is more helpful
87
88         c.debugf("Send: sending %#v", m)
89         return c.conn.Send(m)
90 }
91
92 // Recv waits for a single message from the remote.
93 func (c *Conn) Recv() (safcm.Msg, error) {
94         // No checks for invalid Conn, a stacktrace is more helpful
95
96         c.debugf("Recv: waiting for message")
97         m, err := c.conn.Recv()
98         c.debugf("Recv: received msg=%#v err=%#v", m, err)
99         return m, err
100 }
101
102 // Wait waits for the connection to terminate. It's safe to call Wait (and
103 // Kill) multiple times.
104 func (c *Conn) Wait() error {
105         // But check here because Wait() can be called multiple times
106         if c.cmd == nil {
107                 return fmt.Errorf("Dial*() not called or already terminated")
108         }
109
110         c.debugf("Wait: waiting for connection to terminate")
111         return c.wait()
112 }
113 func (c *Conn) wait() error {
114         err := c.cmd.Wait()
115         c.cmd = nil
116
117         // Wait until we've received all events from the program's stderr.
118         c.eventsWg.Wait()
119         // Notify consumers that no more events will occur.
120         close(c.events)
121         // We cannot reuse this channel.
122         c.events = nil
123         // Don't set c.Events to nil because this creates a data race when
124         // another thread is still waiting.
125
126         return err
127 }
128
129 // Kill forcefully terminates the connection. It's safe to call Kill (and
130 // Wait) multiple times. Calling it before Dial*() was called will only close
131 // the Events channel.
132 func (c *Conn) Kill() error {
133         if c.cmd == nil {
134                 if c.events != nil {
135                         close(c.events)
136                         c.events = nil
137                 }
138                 return fmt.Errorf("Dial*() not called or already terminated")
139         }
140
141         c.debugf("Kill: killing connection")
142
143         c.cmd.Process.Kill() //nolint:errcheck
144         return c.wait()
145 }
146
147 func (c *Conn) handleStderrAsEvents(cmd *exec.Cmd) error {
148         // cmd may differ from c.cmd here!
149         stderr, err := cmd.StderrPipe()
150         if err != nil {
151                 return err
152         }
153
154         c.eventsWg.Add(1)
155         go func() {
156                 r := bufio.NewReader(stderr)
157                 for {
158                         x, err := r.ReadString('\n')
159                         if err != nil {
160                                 break
161                         }
162                         x = strings.TrimRight(x, "\n")
163
164                         c.events <- ConnEvent{
165                                 Type: ConnEventStderr,
166                                 Data: x,
167                         }
168                 }
169                 c.eventsWg.Done()
170         }()
171
172         return nil
173 }