Skip to content

Commit 1850e8d

Browse files
committed
Add --stream-logs flag to run-errand for live log tailing via SSH
When --stream-logs is set, the errand is started asynchronously and task events are polled to discover running instances. SSH sessions are established to each instance to tail errand log files in real time, interleaved with the standard task event output. Requirements: - The CLI must have SSH access to the BOSH VMs where errands run (directly or via a gateway configured with --gw-* flags). - Errands must write output to log files under /var/vcap/sys/log/. By default, the CLI tails /var/vcap/sys/log/<errand>/<errand>.{stdout,stderr}.log. Errands that write to non-standard paths will not have their output streamed unless --stream-log-path is specified. - Use --stream-log-path <path> to override the default log location, e.g. --stream-log-path "my-job/*.log". The path is relative to /var/vcap/sys/log/ and supports shell glob and brace expansion.
1 parent fbff773 commit 1850e8d

12 files changed

Lines changed: 1741 additions & 23 deletions

cmd/cmd.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,14 @@ func (c Cmd) Execute() (cmdErr error) {
226226
case *RunErrandOpts:
227227
director, deployment := c.directorAndDeployment()
228228
downloader := NewUIDownloader(director, deps.Time, deps.FS, deps.UI)
229-
return NewRunErrandCmd(deployment, downloader, deps.UI).Run(*opts)
229+
var nonIntSSHRunner boshssh.Runner
230+
var taskReporter boshdir.TaskReporter
231+
if opts.StreamLogs {
232+
sshProvider := boshssh.NewProvider(deps.CmdRunner, deps.FS, deps.UI, deps.Logger)
233+
nonIntSSHRunner = sshProvider.NewSSHRunner(false)
234+
taskReporter = boshuit.NewReporter(deps.UI, true)
235+
}
236+
return NewRunErrandCmd(deployment, downloader, deps.UI, nonIntSSHRunner, taskReporter, nil).Run(*opts)
230237

231238
case *AttachDiskOpts:
232239
return NewAttachDiskCmd(c.deployment()).Run(*opts)

cmd/errand_event_watcher.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package cmd
2+
3+
import (
4+
"encoding/json"
5+
"strings"
6+
"time"
7+
8+
boshdir "github.com/cloudfoundry/bosh-cli/v7/director"
9+
)
10+
11+
type taskEvent struct {
12+
Stage string `json:"stage"`
13+
State string `json:"state"`
14+
Task string `json:"task"`
15+
Time int64 `json:"time"`
16+
Index int `json:"index"`
17+
Total int `json:"total"`
18+
Progress int `json:"progress"`
19+
}
20+
21+
type ErrandEventWatcher struct {
22+
deployment boshdir.Deployment
23+
taskID int
24+
pollDelay time.Duration
25+
taskReporter boshdir.TaskReporter
26+
}
27+
28+
func NewErrandEventWatcher(deployment boshdir.Deployment, taskID int, pollDelay time.Duration) *ErrandEventWatcher {
29+
return &ErrandEventWatcher{
30+
deployment: deployment,
31+
taskID: taskID,
32+
pollDelay: pollDelay,
33+
}
34+
}
35+
36+
func (w *ErrandEventWatcher) WithTaskReporter(reporter boshdir.TaskReporter) *ErrandEventWatcher {
37+
w.taskReporter = reporter
38+
return w
39+
}
40+
41+
// Watch polls the task event stream and sends discovered instance slugs
42+
// (e.g. "smoke-tests/abc-123") on the returned channel. The channel is closed
43+
// when the task is no longer running. If a TaskReporter is set, event chunks
44+
// are also fed to it for real-time formatted output.
45+
func (w *ErrandEventWatcher) Watch(stopCh <-chan struct{}) <-chan string {
46+
slugCh := make(chan string, 16)
47+
48+
if w.taskReporter != nil {
49+
w.taskReporter.TaskStarted(w.taskID)
50+
}
51+
52+
go func() {
53+
defer close(slugCh)
54+
55+
var offset int
56+
seen := map[string]bool{}
57+
var lastState string
58+
59+
for {
60+
select {
61+
case <-stopCh:
62+
return
63+
default:
64+
}
65+
66+
chunk, newOffset, err := w.deployment.FetchTaskOutputChunk(w.taskID, offset, "event")
67+
if err == nil && len(chunk) > 0 {
68+
offset = newOffset
69+
if w.taskReporter != nil {
70+
w.taskReporter.TaskOutputChunk(w.taskID, chunk)
71+
}
72+
for _, slug := range parseEventChunk(chunk) {
73+
if !seen[slug] {
74+
seen[slug] = true
75+
select {
76+
case slugCh <- slug:
77+
case <-stopCh:
78+
return
79+
}
80+
}
81+
}
82+
}
83+
84+
state, err := w.deployment.TaskState(w.taskID)
85+
if err != nil || !isTaskRunning(state) {
86+
if err == nil {
87+
lastState = state
88+
}
89+
// One final fetch to catch any remaining events
90+
chunk, _, err = w.deployment.FetchTaskOutputChunk(w.taskID, offset, "event")
91+
if err == nil && len(chunk) > 0 {
92+
if w.taskReporter != nil {
93+
w.taskReporter.TaskOutputChunk(w.taskID, chunk)
94+
}
95+
for _, slug := range parseEventChunk(chunk) {
96+
if !seen[slug] {
97+
seen[slug] = true
98+
select {
99+
case slugCh <- slug:
100+
case <-stopCh:
101+
return
102+
}
103+
}
104+
}
105+
}
106+
if w.taskReporter != nil {
107+
w.taskReporter.TaskFinished(w.taskID, lastState)
108+
}
109+
return
110+
}
111+
112+
select {
113+
case <-time.After(w.pollDelay):
114+
case <-stopCh:
115+
return
116+
}
117+
}
118+
}()
119+
120+
return slugCh
121+
}
122+
123+
func parseEventChunk(chunk []byte) []string {
124+
var slugs []string
125+
126+
for _, line := range strings.Split(string(chunk), "\n") {
127+
line = strings.TrimSpace(line)
128+
if line == "" {
129+
continue
130+
}
131+
132+
slug := parseErrandEventLine(line)
133+
if slug != "" {
134+
slugs = append(slugs, slug)
135+
}
136+
}
137+
138+
return slugs
139+
}
140+
141+
func parseErrandEventLine(line string) string {
142+
var ev taskEvent
143+
if err := json.Unmarshal([]byte(line), &ev); err != nil {
144+
return ""
145+
}
146+
147+
if ev.Stage != "Running errand" || ev.State != "started" {
148+
return ""
149+
}
150+
151+
return ParseInstanceSlug(ev.Task)
152+
}
153+
154+
// ParseInstanceSlug extracts "group/uuid" from a task field like "group/uuid (idx)".
155+
func ParseInstanceSlug(task string) string {
156+
task = strings.TrimSpace(task)
157+
if task == "" {
158+
return ""
159+
}
160+
161+
// Strip trailing " (N)" index suffix if present
162+
if idx := strings.LastIndex(task, " ("); idx >= 0 {
163+
task = task[:idx]
164+
}
165+
166+
if !strings.Contains(task, "/") {
167+
return ""
168+
}
169+
170+
return task
171+
}
172+
173+
func isTaskRunning(state string) bool {
174+
return state == "queued" || state == "processing" || state == "cancelling"
175+
}

0 commit comments

Comments
 (0)