From c899e17495d4eb932e0b4f428ec91882d845f1bc Mon Sep 17 00:00:00 2001 From: Simon Ruderich Date: Tue, 18 May 2021 17:46:56 +0200 Subject: [PATCH] Move synchronization loop into new package frontend This is in preparation for other programs (besides cmd/safcm) using the safcm library. To reduce code duplication useful functions will be provided by the frontend package. Its use is optional. All core functionality is provided by the regular safcm packages. The logging setup was slightly modified. Log messages are now no longer filtered by Sync.log() but by the new log function Loop.LogEventFunc (or its implementation logEvent()). This is also the reason why one test was removed from sync_sync_test.go which is no longer relevant. --- cmd/safcm/sync.go | 173 +++++++----------------------------- cmd/safcm/sync_sync_test.go | 172 +++++------------------------------ cmd/safcm/sync_test.go | 81 +++++++++-------- frontend/log.go | 52 +++++++++++ frontend/loop.go | 172 +++++++++++++++++++++++++++++++++++ 5 files changed, 318 insertions(+), 332 deletions(-) create mode 100644 frontend/log.go create mode 100644 frontend/loop.go diff --git a/cmd/safcm/sync.go b/cmd/safcm/sync.go index bee0133..c1ecc96 100644 --- a/cmd/safcm/sync.go +++ b/cmd/safcm/sync.go @@ -23,16 +23,15 @@ import ( "io/fs" "log" "os" - "os/signal" "runtime" "sort" "strings" - "sync" "golang.org/x/term" "ruderich.org/simon/safcm" "ruderich.org/simon/safcm/cmd/safcm/config" + "ruderich.org/simon/safcm/frontend" "ruderich.org/simon/safcm/rpc" ) @@ -43,25 +42,9 @@ type Sync struct { allHosts *config.Hosts // known hosts allGroups map[string][]string // known groups - events chan<- Event // all events generated by/for this host - isTTY bool -} - -type Event struct { - Host *config.Host - - // Only one of Error, Log and ConnEvent is set in a single event - Error error - Log Log - ConnEvent rpc.ConnEvent - - Escaped bool // true if untrusted input is already escaped -} -type Log struct { - Level safcm.LogLevel - Text string + logFunc func(level safcm.LogLevel, escaped bool, msg string) } func MainSync(args []string) error { @@ -131,90 +114,35 @@ func MainSync(args []string) error { isTTY := term.IsTerminal(int(os.Stdout.Fd())) && term.IsTerminal(int(os.Stderr.Fd())) - done := make(chan bool) - // Collect events from all hosts and print them - events := make(chan Event) - go func() { - var failed bool - for { - x := <-events - if x.Host == nil { - break - } - logEvent(x, cfg.LogLevel, isTTY, &failed) - } - done <- failed - }() - - hostsLeft := make(map[string]bool) - for _, x := range toSync { - hostsLeft[x.Name] = true + loop := &frontend.Loop{ + DebugConn: cfg.LogLevel >= safcm.LogDebug3, + LogEventFunc: func(x frontend.Event, failed *bool) { + logEvent(x, cfg.LogLevel, isTTY, failed) + }, + SyncHostFunc: func(conn *rpc.Conn, host frontend.Host) error { + return host.(*Sync).Host(conn) + }, } - var hostsLeftMutex sync.Mutex // protects hostsLeft - - // Show unfinished hosts on Ctrl-C - sigint := make(chan os.Signal, 1) // buffered for Notify() - signal.Notify(sigint, os.Interrupt) // = SIGINT = Ctrl-C - go func() { - // Running `ssh` processes get killed by SIGINT which is sent - // to all processes - - <-sigint - log.Print("Received SIGINT, aborting ...") - - // Print all queued events - events <- Event{} // poison pill - <-done - // "races" with <-done in the main function and will hang here - // if the other is faster. This is fine because then all hosts - // were synced successfully. - - hostsLeftMutex.Lock() - var hosts []string - for x := range hostsLeft { - hosts = append(hosts, x) - } - sort.Strings(hosts) - log.Fatalf("Failed to sync %s", strings.Join(hosts, ", ")) - }() - // Sync all hosts concurrently - var wg sync.WaitGroup + var hosts []frontend.Host for _, x := range toSync { - x := x - - // Once in sync.Host() and once in the go func below - wg.Add(2) - - go func() { - sync := Sync{ + s := &Sync{ host: x, config: cfg, allHosts: allHosts, allGroups: allGroups, - events: events, isTTY: isTTY, } - err := sync.Host(&wg) - if err != nil { - events <- Event{ - Host: x, - Error: err, - } - } - wg.Done() - - hostsLeftMutex.Lock() - defer hostsLeftMutex.Unlock() - delete(hostsLeft, x.Name) - }() + s.logFunc = func(level safcm.LogLevel, escaped bool, + msg string) { + loop.Log(s, level, escaped, msg) + } + hosts = append(hosts, s) } - wg.Wait() - events <- Event{} // poison pill - failed := <-done + succ := loop.Run(hosts) - if failed { + if !succ { // Exit instead of returning an error to prevent an extra log // message from main() os.Exit(1) @@ -296,7 +224,7 @@ are only available after the hosts were contacted. return res, nil } -func logEvent(x Event, level safcm.LogLevel, isTTY bool, failed *bool) { +func logEvent(x frontend.Event, level safcm.LogLevel, isTTY bool, failed *bool) { // We have multiple event sources so this is somewhat ugly. var prefix, data string var color Color @@ -307,6 +235,9 @@ func logEvent(x Event, level safcm.LogLevel, isTTY bool, failed *bool) { // We logged an error, tell the caller *failed = true } else if x.Log.Level != 0 { + if level < x.Log.Level { + return + } // LogError and LogDebug3 should not occur here switch x.Log.Level { case safcm.LogInfo: @@ -341,7 +272,7 @@ func logEvent(x Event, level safcm.LogLevel, isTTY bool, failed *bool) { data = x.ConnEvent.Data } - host := x.Host.Name + host := x.Host.Name() if color != 0 { host = ColorString(isTTY, color, host) } @@ -353,26 +284,13 @@ func logEvent(x Event, level safcm.LogLevel, isTTY bool, failed *bool) { log.Printf("%-9s [%s] %s", prefix, host, data) } -func (s *Sync) Host(wg *sync.WaitGroup) error { - conn := rpc.NewConn(s.config.LogLevel >= safcm.LogDebug3) - // Pass all connection events to main loop - go func() { - for { - x, ok := <-conn.Events - if !ok { - break - } - s.events <- Event{ - Host: s.host, - ConnEvent: x, - } - } - wg.Done() - }() +func (s *Sync) Name() string { + return s.host.Name +} +func (s *Sync) Dial(conn *rpc.Conn) error { helpers, err := fs.Sub(RemoteHelpers, "remote") if err != nil { - conn.Kill() return err } @@ -381,18 +299,15 @@ func (s *Sync) Host(wg *sync.WaitGroup) error { if user == "" { user = s.config.SshUser } - err = conn.DialSSH(rpc.SSHConfig{ + return conn.DialSSH(rpc.SSHConfig{ Host: s.host.Name, User: user, SshConfig: s.config.SshConfig, RemoteHelpers: helpers, }) - if err != nil { - conn.Kill() - return err - } - defer conn.Kill() +} +func (s *Sync) Host(conn *rpc.Conn) error { // Collect information about remote host detectedGroups, err := s.hostInfo(conn) if err != nil { @@ -405,35 +320,11 @@ func (s *Sync) Host(wg *sync.WaitGroup) error { return err } - // Terminate connection to remote host - err = conn.Send(safcm.MsgQuitReq{}) - if err != nil { - return err - } - _, err = conn.Recv() - if err != nil { - return err - } - err = conn.Wait() - if err != nil { - return err - } - return nil } func (s *Sync) log(level safcm.LogLevel, escaped bool, msg string) { - if s.config.LogLevel < level { - return - } - s.events <- Event{ - Host: s.host, - Log: Log{ - Level: level, - Text: msg, - }, - Escaped: escaped, - } + s.logFunc(level, escaped, msg) } func (s *Sync) logDebugf(format string, a ...interface{}) { s.log(safcm.LogDebug, false, fmt.Sprintf(format, a...)) diff --git a/cmd/safcm/sync_sync_test.go b/cmd/safcm/sync_sync_test.go index 889aa2f..40c5c55 100644 --- a/cmd/safcm/sync_sync_test.go +++ b/cmd/safcm/sync_sync_test.go @@ -40,7 +40,6 @@ func TestHostSyncReq(t *testing.T) { project string host string detected []string - level safcm.LogLevel exp safcm.MsgSyncReq expEvents []string expErr error @@ -54,7 +53,6 @@ func TestHostSyncReq(t *testing.T) { "project", "host1.example.org", nil, - safcm.LogDebug3, safcm.MsgSyncReq{ Groups: []string{ "all", @@ -148,124 +146,21 @@ func TestHostSyncReq(t *testing.T) { }, }, []string{ - "host1.example.org: 3 host groups: all group group3 host1.example.org remove", - "host1.example.org: 3 host group priorities (descending): host1.example.org", + "3 false host groups: all group group3 host1.example.org remove", + "3 false host group priorities (descending): host1.example.org", }, nil, }, - { - "project: host1 (log level info)", - "project", - "host1.example.org", - nil, - safcm.LogInfo, - safcm.MsgSyncReq{ - Groups: []string{ - "all", - "group", - "group3", - "remove", - "host1.example.org", - }, - Files: map[string]*safcm.File{ - "/": &safcm.File{Path: "/", - OrigGroup: "group", - Mode: fs.ModeDir | 0755 | fs.ModeSetgid, - Uid: -1, - Gid: -1, - TriggerCommands: []string{ - "touch /.update", - }, - }, - "/etc": &safcm.File{ - OrigGroup: "group", - Path: "/etc", - Mode: fs.ModeDir | 0755, - Uid: -1, - Gid: -1, - }, - "/etc/.hidden": &safcm.File{ - OrigGroup: "group", - Path: "/etc/.hidden", - Mode: 0100 | fs.ModeSetuid | fs.ModeSetgid | fs.ModeSticky, - Uid: -1, - Gid: -1, - Data: []byte("..."), - }, - "/etc/motd": &safcm.File{ - OrigGroup: "group", - Path: "/etc/motd", - Mode: 0644, - Uid: -1, - Gid: -1, - Data: []byte("Welcome to Host ONE\n\n\n\n\n\nall\n\n\nhost1.example.org\n\n\n\n"), - }, - "/etc/rc.local": &safcm.File{ - OrigGroup: "group", - Path: "/etc/rc.local", - Mode: 0700, - Uid: -1, - Gid: -1, - Data: []byte("#!/bin/sh\n"), - TriggerCommands: []string{ - "/etc/rc.local", - }, - }, - "/etc/resolv.conf": &safcm.File{ - OrigGroup: "group", - Path: "/etc/resolv.conf", - Mode: 0641, - User: "user", - Uid: -1, - Group: "group", - Gid: -1, - Data: []byte("nameserver ::1\n"), - TriggerCommands: []string{ - "echo resolv.conf updated", - }, - }, - "/etc/test": &safcm.File{ - OrigGroup: "group", - Path: "/etc/test", - Mode: os.ModeSymlink | 0777, - Uid: -1, - Gid: -1, - Data: []byte("doesnt-exist"), - }, - }, - Packages: []string{ - "unbound", - "unbound-anchor", - }, - Services: []string{ - "unbound", - }, - Commands: []*safcm.Command{ - { - OrigGroup: "group", - Cmd: "echo command one", - }, - { - OrigGroup: "group", - Cmd: "echo -n command two", - }, - }, - }, - nil, - nil, - }, - { "conflict: file", "project-conflict-file", "host1.example.org", nil, - safcm.LogDebug3, safcm.MsgSyncReq{}, []string{ - "host1.example.org: 3 host groups: all dns host1.example.org", - "host1.example.org: 3 host group priorities (descending): host1.example.org", + "3 false host groups: all dns host1.example.org", + "3 false host group priorities (descending): host1.example.org", }, fmt.Errorf("groups dns and all both provide \"/etc/resolv.conf\"\nUse 'group_priority' in config.yaml to declare preference"), }, @@ -276,11 +171,10 @@ func TestHostSyncReq(t *testing.T) { []string{ "detected_other", }, - safcm.LogDebug3, safcm.MsgSyncReq{}, []string{ - "host2.example.org: 3 host groups: all detected_other host2.example.org other", - "host2.example.org: 3 host group priorities (descending): host2.example.org", + "3 false host groups: all detected_other host2.example.org other", + "3 false host group priorities (descending): host2.example.org", }, fmt.Errorf("groups other and all both provide \"/etc/resolv.conf\"\nUse 'group_priority' in config.yaml to declare preference"), }, @@ -290,11 +184,10 @@ func TestHostSyncReq(t *testing.T) { "project-conflict-dir", "host1.example.org", nil, - safcm.LogDebug3, safcm.MsgSyncReq{}, []string{ - "host1.example.org: 3 host groups: all dns host1.example.org", - "host1.example.org: 3 host group priorities (descending): host1.example.org", + "3 false host groups: all dns host1.example.org", + "3 false host group priorities (descending): host1.example.org", }, fmt.Errorf("groups dns and all both provide \"/etc\"\nUse 'group_priority' in config.yaml to declare preference"), }, @@ -305,11 +198,10 @@ func TestHostSyncReq(t *testing.T) { []string{ "detected_other", }, - safcm.LogDebug3, safcm.MsgSyncReq{}, []string{ - "host2.example.org: 3 host groups: all detected_other host2.example.org other", - "host2.example.org: 3 host group priorities (descending): host2.example.org", + "3 false host groups: all detected_other host2.example.org other", + "3 false host group priorities (descending): host2.example.org", }, fmt.Errorf("groups other and all both provide \"/etc\"\nUse 'group_priority' in config.yaml to declare preference"), }, @@ -319,7 +211,6 @@ func TestHostSyncReq(t *testing.T) { "project-group-cycle", "host1.example.org", nil, - safcm.LogDebug3, safcm.MsgSyncReq{}, nil, fmt.Errorf("groups.yaml: cycle while expanding group \"group-b\""), @@ -330,7 +221,6 @@ func TestHostSyncReq(t *testing.T) { "project-group_priority", "host1.example.org", nil, - safcm.LogDebug3, safcm.MsgSyncReq{ Groups: []string{"all", "group-b", "group-a", "host1.example.org"}, Files: map[string]*safcm.File{ @@ -421,10 +311,10 @@ func TestHostSyncReq(t *testing.T) { }, }, []string{ - "host1.example.org: 3 host groups: all group-a group-b host1.example.org", - "host1.example.org: 3 host group priorities (descending): host1.example.org group-a group-b all", - `host1.example.org: 4 files: "/etc": group group-a overwrites triggers from group group-b`, - `host1.example.org: 4 files: "/etc": group host1.example.org overwrites triggers from group group-a`, + "3 false host groups: all group-a group-b host1.example.org", + "3 false host group priorities (descending): host1.example.org group-a group-b all", + `4 false files: "/etc": group group-a overwrites triggers from group group-b`, + `4 false files: "/etc": group host1.example.org overwrites triggers from group group-a`, }, nil, }, @@ -434,7 +324,6 @@ func TestHostSyncReq(t *testing.T) { "project-group_priority-single", "host1.example.org", nil, - safcm.LogDebug3, safcm.MsgSyncReq{ Groups: []string{"all", "group-b", "group-a", "host1.example.org"}, Files: map[string]*safcm.File{ @@ -456,8 +345,8 @@ func TestHostSyncReq(t *testing.T) { }, }, []string{ - "host1.example.org: 3 host groups: all group-a group-b host1.example.org", - "host1.example.org: 3 host group priorities (descending): host1.example.org group-a", + "3 false host groups: all group-a group-b host1.example.org", + "3 false host group priorities (descending): host1.example.org group-a", }, nil, }, @@ -483,43 +372,22 @@ func TestHostSyncReq(t *testing.T) { if err != nil { t.Fatal(err) } - cfg.LogLevel = tc.level var events []string - ch := make(chan Event) - done := make(chan struct{}) - go func() { - for { - x, ok := <-ch - if !ok { - break - } - if x.ConnEvent.Type != 0 { - panic("unexpected ConnEvent") - } - events = append(events, - fmt.Sprintf("%s: %v %d %s", - x.Host.Name, - x.Error, x.Log.Level, - x.Log.Text)) - } - done <- struct{}{} - }() - s := &Sync{ host: allHosts.Map[tc.host], config: cfg, allHosts: allHosts, allGroups: allGroups, - events: ch, + logFunc: func(level safcm.LogLevel, escaped bool, msg string) { + events = append(events, + fmt.Sprintf("%d %v %s", level, escaped, msg)) + }, } res, err := s.hostSyncReq(tc.detected) testutil.AssertEqual(t, "res", res, tc.exp) testutil.AssertErrorEqual(t, "err", err, tc.expErr) - - close(ch) - <-done testutil.AssertEqual(t, "events", events, tc.expEvents) }) diff --git a/cmd/safcm/sync_test.go b/cmd/safcm/sync_test.go index 80b1589..4744ad5 100644 --- a/cmd/safcm/sync_test.go +++ b/cmd/safcm/sync_test.go @@ -24,6 +24,7 @@ import ( "ruderich.org/simon/safcm" "ruderich.org/simon/safcm/cmd/safcm/config" + "ruderich.org/simon/safcm/frontend" "ruderich.org/simon/safcm/rpc" "ruderich.org/simon/safcm/testutil" ) @@ -202,7 +203,7 @@ func TestLogEvent(t *testing.T) { tests := []struct { name string - event Event + event frontend.Event level safcm.LogLevel isTTY bool exp string @@ -211,7 +212,7 @@ func TestLogEvent(t *testing.T) { { "Error", - Event{ + frontend.Event{ Error: fmt.Errorf("fake error"), }, safcm.LogDebug3, @@ -221,7 +222,7 @@ func TestLogEvent(t *testing.T) { }, { "Error (tty)", - Event{ + frontend.Event{ Error: fmt.Errorf("fake error"), }, safcm.LogDebug3, @@ -231,7 +232,7 @@ func TestLogEvent(t *testing.T) { }, { "Error: escape", - Event{ + frontend.Event{ Error: fmt.Errorf("\x00"), }, safcm.LogDebug3, @@ -241,7 +242,7 @@ func TestLogEvent(t *testing.T) { }, { "Error: escape (tty)", - Event{ + frontend.Event{ Error: fmt.Errorf("\x00"), }, safcm.LogDebug3, @@ -252,8 +253,8 @@ func TestLogEvent(t *testing.T) { { "Log: info", - Event{ - Log: Log{ + frontend.Event{ + Log: frontend.Log{ Level: safcm.LogInfo, Text: "info log", }, @@ -265,8 +266,8 @@ func TestLogEvent(t *testing.T) { }, { "Log: info (tty)", - Event{ - Log: Log{ + frontend.Event{ + Log: frontend.Log{ Level: safcm.LogInfo, Text: "info log", }, @@ -278,8 +279,8 @@ func TestLogEvent(t *testing.T) { }, { "Log: verbose", - Event{ - Log: Log{ + frontend.Event{ + Log: frontend.Log{ Level: safcm.LogVerbose, Text: "verbose log", }, @@ -291,8 +292,8 @@ func TestLogEvent(t *testing.T) { }, { "Log: debug", - Event{ - Log: Log{ + frontend.Event{ + Log: frontend.Log{ Level: safcm.LogDebug, Text: "debug log", }, @@ -304,8 +305,8 @@ func TestLogEvent(t *testing.T) { }, { "Log: debug2", - Event{ - Log: Log{ + frontend.Event{ + Log: frontend.Log{ Level: safcm.LogDebug2, Text: "debug2 log", }, @@ -317,8 +318,8 @@ func TestLogEvent(t *testing.T) { }, { "Log: debug3", - Event{ - Log: Log{ + frontend.Event{ + Log: frontend.Log{ Level: safcm.LogDebug3, Text: "debug3 log", }, @@ -331,8 +332,8 @@ func TestLogEvent(t *testing.T) { }, { "Log: debug3 (tty)", - Event{ - Log: Log{ + frontend.Event{ + Log: frontend.Log{ Level: safcm.LogDebug3, Text: "debug3 log", }, @@ -345,8 +346,8 @@ func TestLogEvent(t *testing.T) { }, { "Log: escape", - Event{ - Log: Log{ + frontend.Event{ + Log: frontend.Log{ Level: safcm.LogInfo, Text: "\x00", }, @@ -358,8 +359,8 @@ func TestLogEvent(t *testing.T) { }, { "Log: escape (tty)", - Event{ - Log: Log{ + frontend.Event{ + Log: frontend.Log{ Level: safcm.LogInfo, Text: "\x00", }, @@ -372,7 +373,7 @@ func TestLogEvent(t *testing.T) { { "ConnEvent: stderr", - Event{ + frontend.Event{ ConnEvent: rpc.ConnEvent{ Type: rpc.ConnEventStderr, Data: "fake stderr", @@ -385,7 +386,7 @@ func TestLogEvent(t *testing.T) { }, { "ConnEvent: stderr (tty)", - Event{ + frontend.Event{ ConnEvent: rpc.ConnEvent{ Type: rpc.ConnEventStderr, Data: "fake stderr", @@ -398,7 +399,7 @@ func TestLogEvent(t *testing.T) { }, { "ConnEvent: debug", - Event{ + frontend.Event{ ConnEvent: rpc.ConnEvent{ Type: rpc.ConnEventDebug, Data: "conn debug", @@ -411,7 +412,7 @@ func TestLogEvent(t *testing.T) { }, { "ConnEvent: upload", - Event{ + frontend.Event{ ConnEvent: rpc.ConnEvent{ Type: rpc.ConnEventUpload, }, @@ -423,7 +424,7 @@ func TestLogEvent(t *testing.T) { }, { "ConnEvent: upload (ignored)", - Event{ + frontend.Event{ ConnEvent: rpc.ConnEvent{ Type: rpc.ConnEventUpload, }, @@ -435,7 +436,7 @@ func TestLogEvent(t *testing.T) { }, { "ConnEvent: invalid", - Event{ + frontend.Event{ ConnEvent: rpc.ConnEvent{ Type: 42, Data: "invalid", @@ -448,7 +449,7 @@ func TestLogEvent(t *testing.T) { }, { "ConnEvent: invalid (tty)", - Event{ + frontend.Event{ ConnEvent: rpc.ConnEvent{ Type: 42, Data: "invalid", @@ -461,7 +462,7 @@ func TestLogEvent(t *testing.T) { }, { "ConnEvent: escape", - Event{ + frontend.Event{ ConnEvent: rpc.ConnEvent{ Type: rpc.ConnEventStderr, Data: "\x00", @@ -474,7 +475,7 @@ func TestLogEvent(t *testing.T) { }, { "ConnEvent: escape (tty)", - Event{ + frontend.Event{ ConnEvent: rpc.ConnEvent{ Type: rpc.ConnEventDebug, Data: "\x01", @@ -488,8 +489,8 @@ func TestLogEvent(t *testing.T) { { "Escaped", - Event{ - Log: Log{ + frontend.Event{ + Log: frontend.Log{ Level: safcm.LogInfo, Text: "\x00", }, @@ -502,8 +503,8 @@ func TestLogEvent(t *testing.T) { }, { "Escaped (tty)", - Event{ - Log: Log{ + frontend.Event{ + Log: frontend.Log{ Level: safcm.LogInfo, Text: "\x00", }, @@ -517,7 +518,7 @@ func TestLogEvent(t *testing.T) { { "empty (invalid)", - Event{}, + frontend.Event{}, safcm.LogDebug3, false, "[INVALID=0] [fake-host] \n", @@ -527,8 +528,10 @@ func TestLogEvent(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - tc.event.Host = &config.Host{ - Name: "fake-host", + tc.event.Host = &Sync{ + host: &config.Host{ + Name: "fake-host", + }, } var buf bytes.Buffer diff --git a/frontend/log.go b/frontend/log.go new file mode 100644 index 0000000..33fa1d0 --- /dev/null +++ b/frontend/log.go @@ -0,0 +1,52 @@ +// Frontend: Logging functions for programs using the safcm library + +// Copyright (C) 2021 Simon Ruderich +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package frontend + +import ( + "ruderich.org/simon/safcm" + "ruderich.org/simon/safcm/rpc" +) + +type Event struct { + Host Host + + // Only one of Error, Log and ConnEvent is set in a single event + Error error + Log Log + ConnEvent rpc.ConnEvent + + Escaped bool // true if untrusted input is already escaped +} + +type Log struct { + Level safcm.LogLevel + Text string +} + +func (l *Loop) Log(host Host, level safcm.LogLevel, escaped bool, + msg string) { + + l.events <- Event{ + Host: host, + Log: Log{ + Level: level, + Text: msg, + }, + Escaped: escaped, + } +} diff --git a/frontend/loop.go b/frontend/loop.go new file mode 100644 index 0000000..5e232a3 --- /dev/null +++ b/frontend/loop.go @@ -0,0 +1,172 @@ +// Frontend: Host synchronization and logging event loop for programs using +// the safcm library + +// Copyright (C) 2021 Simon Ruderich +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package frontend + +import ( + "log" + "os" + "os/signal" + "sort" + "strings" + "sync" + + "ruderich.org/simon/safcm" + "ruderich.org/simon/safcm/rpc" +) + +type Host interface { + Name() string + Dial(*rpc.Conn) error +} + +type Loop struct { + DebugConn bool + + LogEventFunc func(e Event, failed *bool) + SyncHostFunc func(*rpc.Conn, Host) error + + events chan Event +} + +func (l *Loop) Run(hosts []Host) bool { + done := make(chan bool) + // Collect events from all hosts and print them + events := make(chan Event) + go func() { + var failed bool + for { + x := <-events + if x.Host == nil { + break + } + l.LogEventFunc(x, &failed) + } + done <- failed + }() + + hostsLeft := make(map[string]bool) + for _, x := range hosts { + hostsLeft[x.Name()] = true + } + var hostsLeftMutex sync.Mutex // protects hostsLeft + + // Show unfinished hosts on Ctrl-C + sigint := make(chan os.Signal, 1) // buffered for Notify() + signal.Notify(sigint, os.Interrupt) // = SIGINT = Ctrl-C + go func() { + // Running `ssh` processes get killed by SIGINT which is sent + // to all processes + + <-sigint + log.Print("Received SIGINT, aborting ...") + + // Print all queued events + events <- Event{} // poison pill + <-done + // "races" with <-done in the main function and will hang here + // if the other is faster. This is fine because then all hosts + // were synced successfully. + + hostsLeftMutex.Lock() + var hosts []string + for x := range hostsLeft { + hosts = append(hosts, x) + } + sort.Strings(hosts) + log.Fatalf("Failed to sync %s", strings.Join(hosts, ", ")) + }() + + l.events = events // used by l.syncHost() and l.Log() + + // Sync all hosts concurrently + var wg sync.WaitGroup + for _, x := range hosts { + x := x + + // Once in sync.Host() and once in the go func below + wg.Add(2) + + go func() { + err := l.syncHost(&wg, x) + if err != nil { + events <- Event{ + Host: x, + Error: err, + } + } + wg.Done() + + hostsLeftMutex.Lock() + defer hostsLeftMutex.Unlock() + delete(hostsLeft, x.Name()) + }() + } + + wg.Wait() + events <- Event{} // poison pill + failed := <-done + + return !failed +} + +func (l *Loop) syncHost(wg *sync.WaitGroup, host Host) error { + conn := rpc.NewConn(l.DebugConn) + // Pass all connection events to main loop + go func() { + for { + x, ok := <-conn.Events + if !ok { + break + } + l.events <- Event{ + Host: host, + ConnEvent: x, + } + } + wg.Done() + }() + + err := host.Dial(conn) + if err != nil { + conn.Kill() + return err + } + defer conn.Kill() + + err = l.SyncHostFunc(conn, host) + if err != nil { + return err + } + + // Terminate connection to remote host + err = conn.Send(safcm.MsgQuitReq{}) + if err != nil { + return err + } + _, err = conn.Recv() + if err != nil { + return err + } + err = conn.Wait() + if err != nil { + return err + } + + return nil +} -- 2.45.2