// Simple RPC-like protocol: implementation of connection and basic actions // SPDX-License-Identifier: GPL-3.0-or-later // Copyright (C) 2021-2024 Simon Ruderich package rpc import ( "bufio" "fmt" "io/fs" "os/exec" "strings" "sync" "ruderich.org/simon/safcm" ) type Conn struct { Events <-chan ConnEvent events chan<- ConnEvent // same as Events, to publish events eventsWg sync.WaitGroup debug bool sshRemote string sshOpts []string remoteHelpers fs.FS cmd *exec.Cmd conn *safcm.GobConn } type ConnEventType int const ( _ ConnEventType = iota ConnEventStderr // stderr from spawned process ConnEventDebug // debug message ConnEventUpload // remote helper upload in progress ) type ConnEvent struct { Type ConnEventType Data string } // NewConn creates a new connection. Events in the returned struct must be // regularly read or the connection will hang. This must be done before // DialSSH is called to open a connection. func NewConn(debug bool) *Conn { ch := make(chan ConnEvent) return &Conn{ Events: ch, events: ch, debug: debug, } } func (c *Conn) debugf(format string, a ...interface{}) { if !c.debug { return } c.events <- ConnEvent{ Type: ConnEventDebug, Data: fmt.Sprintf(format, a...), } } // Wrap safcm.GobConn's Send() and Recv() to provide debug output. // Send sends a single message to the remote. func (c *Conn) Send(m safcm.Msg) error { // No checks for invalid Conn, a stacktrace is more helpful c.debugf("Send: sending %#v", m) return c.conn.Send(m) } // Recv waits for a single message from the remote. func (c *Conn) Recv() (safcm.Msg, error) { // No checks for invalid Conn, a stacktrace is more helpful c.debugf("Recv: waiting for message") m, err := c.conn.Recv() c.debugf("Recv: received msg=%#v err=%#v", m, err) return m, err } // Wait waits for the connection to terminate. It's safe to call Wait (and // Kill) multiple times. func (c *Conn) Wait() error { // But check here because Wait() can be called multiple times if c.cmd == nil { return fmt.Errorf("Dial*() not called or already terminated") } c.debugf("Wait: waiting for connection to terminate") return c.wait() } func (c *Conn) wait() error { err := c.cmd.Wait() c.cmd = nil // Wait until we've received all events from the program's stderr. c.eventsWg.Wait() // Notify consumers that no more events will occur. close(c.events) // We cannot reuse this channel. c.events = nil // Don't set c.Events to nil because this creates a data race when // another thread is still waiting. return err } // Kill forcefully terminates the connection. It's safe to call Kill (and // Wait) multiple times. Calling it before Dial*() was called will only close // the Events channel. func (c *Conn) Kill() error { if c.cmd == nil { if c.events != nil { close(c.events) c.events = nil } return fmt.Errorf("Dial*() not called or already terminated") } c.debugf("Kill: killing connection") c.cmd.Process.Kill() //nolint:errcheck return c.wait() } func (c *Conn) handleStderrAsEvents(cmd *exec.Cmd) error { // cmd may differ from c.cmd here! stderr, err := cmd.StderrPipe() if err != nil { return err } c.eventsWg.Add(1) go func() { r := bufio.NewReader(stderr) for { x, err := r.ReadString('\n') if err != nil { break } x = strings.TrimRight(x, "\n") c.events <- ConnEvent{ Type: ConnEventStderr, Data: x, } } c.eventsWg.Done() }() return nil }