]> ruderich.org/simon Gitweb - safcm/safcm.git/blobdiff - rpc/conn.go
First working version
[safcm/safcm.git] / rpc / conn.go
diff --git a/rpc/conn.go b/rpc/conn.go
new file mode 100644 (file)
index 0000000..c59bbd3
--- /dev/null
@@ -0,0 +1,164 @@
+// Simple RPC-like protocol: implementation of connection and basic actions
+
+// Copyright (C) 2021  Simon Ruderich
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+package rpc
+
+import (
+       "bufio"
+       "fmt"
+       "os/exec"
+       "strings"
+       "sync"
+
+       "ruderich.org/simon/safcm"
+)
+
+type Conn struct {
+       Events   <-chan ConnEvent
+       events   chan<- ConnEvent // same as Events, to publish events
+       eventsWg sync.WaitGroup
+
+       debug  bool
+       remote string
+
+       cmd  *exec.Cmd
+       conn *safcm.GobConn
+}
+
+type ConnEventType int
+
+const (
+       _ ConnEventType = iota
+       ConnEventStderr
+       ConnEventDebug
+       ConnEventUpload
+)
+
+type ConnEvent struct {
+       Type ConnEventType
+       Data string
+}
+
+// NewConn creates a new connection. Events in the returned struct must be
+// regularly read or the connection will stall. This must be done before
+// DialSSH is called to open a connection.
+func NewConn(debug bool) *Conn {
+       ch := make(chan ConnEvent)
+       return &Conn{
+               Events: ch,
+               events: ch,
+               debug:  debug,
+       }
+}
+
+func (c *Conn) debugf(format string, a ...interface{}) {
+       if !c.debug {
+               return
+       }
+       c.events <- ConnEvent{
+               Type: ConnEventDebug,
+               Data: fmt.Sprintf(format, a...),
+       }
+}
+
+// Wrap safcm.GobConn's Send() and Recv() to provide debug output.
+
+// Send sends a single message to the remote.
+func (c *Conn) Send(m safcm.Msg) error {
+       // No checks for invalid Conn, a stacktrace is more helpful
+
+       c.debugf("Send: sending %#v", m)
+       return c.conn.Send(m)
+}
+
+// Recv waits for a single message from the remote.
+func (c *Conn) Recv() (safcm.Msg, error) {
+       // No checks for invalid Conn, a stacktrace is more helpful
+
+       c.debugf("Recv: waiting for message")
+       m, err := c.conn.Recv()
+       c.debugf("Recv: received msg=%#v err=%#v", m, err)
+       return m, err
+}
+
+// Wait waits for the connection to terminate. It's safe to call Wait (and
+// Kill) multiple times.
+func (c *Conn) Wait() error {
+       // But check here because Wait() can be called multiple times
+       if c.cmd == nil {
+               return fmt.Errorf("Dial*() not called or already terminated")
+       }
+
+       c.debugf("Wait: waiting for connection to terminate")
+       return c.wait()
+}
+func (c *Conn) wait() error {
+       err := c.cmd.Wait()
+       c.cmd = nil
+
+       // Wait until we've received all events from the program's stderr.
+       c.eventsWg.Wait()
+       // Notify consumers that no more events will occur.
+       close(c.events)
+       // We cannot reuse this channel.
+       c.events = nil
+       // Don't set c.Events to nil because this creates a data race when
+       // another thread is still waiting.
+
+       return err
+}
+
+// Kill forcefully terminates the connection. It's safe to call Kill (and
+// Wait) multiple times.
+func (c *Conn) Kill() error {
+       if c.cmd == nil {
+               return fmt.Errorf("Dial*() not called or already terminated")
+       }
+
+       c.debugf("Kill: killing connection")
+
+       c.cmd.Process.Kill()
+       return c.wait()
+}
+
+func (c *Conn) handleStderrAsEvents(cmd *exec.Cmd) error {
+       // cmd may differ from c.cmd here!
+       stderr, err := cmd.StderrPipe()
+       if err != nil {
+               return err
+       }
+
+       c.eventsWg.Add(1)
+       go func() {
+               r := bufio.NewReader(stderr)
+               for {
+                       x, err := r.ReadString('\n')
+                       if err != nil {
+                               break
+                       }
+                       x = strings.TrimRight(x, "\n")
+
+                       c.events <- ConnEvent{
+                               Type: ConnEventStderr,
+                               Data: x,
+                       }
+               }
+               c.eventsWg.Done()
+       }()
+
+       return nil
+}