diff --git a/arthas/README.md b/arthas/README.md index e61ed70..507f070 100644 --- a/arthas/README.md +++ b/arthas/README.md @@ -21,13 +21,14 @@ The Arthas plugin adds JVM diagnostics to Mission Control for Kubernetes workloa ## Operations -| Operation | Purpose | -| ---------------- | --------------------------------------------------------------- | -| `sessions-list` | List active Arthas sessions. | -| `session-create` | Attach Arthas to the selected workload or pod. | -| `session-delete` | Stop and remove a plugin session and close port-forwards. | -| `pods-list` | List ready pods that can be targeted for the selected workload. | -| `exec` | Execute an Arthas command through the Arthas HTTP API. | +| Operation | Purpose | +| ----------------------- | --------------------------------------------------------------------- | +| `sessions-list` | List active Arthas sessions. | +| `session-create` | Start attaching Arthas to the selected workload or pod asynchronously. | +| `session-creation-status` | Poll the status of an asynchronous session creation job. | +| `session-delete` | Stop and remove a plugin session and close port-forwards. | +| `pods-list` | List ready pods that can be targeted for the selected workload. | +| `exec` | Execute an Arthas command through the Arthas HTTP API. | ## Kubernetes access diff --git a/arthas/internal/arthas/session.go b/arthas/internal/arthas/session.go index 28310dd..38600fc 100644 --- a/arthas/internal/arthas/session.go +++ b/arthas/internal/arthas/session.go @@ -72,6 +72,17 @@ func (r *SessionRegistry) Get(id string) (*Session, bool) { return s, ok } +func (r *SessionRegistry) FindByTarget(namespace, pod, container string) (*Session, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + for _, s := range r.sessions { + if s.Namespace == namespace && s.Pod == pod && s.Container == container { + return s, true + } + } + return nil, false +} + // List returns a snapshot sorted by start time (oldest first). func (r *SessionRegistry) List() []*Session { r.mu.RLock() diff --git a/arthas/jobs.go b/arthas/jobs.go new file mode 100644 index 0000000..c2910e4 --- /dev/null +++ b/arthas/jobs.go @@ -0,0 +1,157 @@ +package main + +import ( + "crypto/rand" + "encoding/hex" + "fmt" + "sort" + "sync" + "time" + + "github.com/flanksource/mission-control-plugins/arthas/internal/arthas" +) + +const ( + SessionCreatePending = "pending" + SessionCreateRunning = "running" + SessionCreateFailed = "failed" + + sessionCreateJobTTL = 30 * time.Minute +) + +type SessionCreateJob struct { + ID string `json:"jobId,omitempty"` + TargetKey string `json:"targetKey,omitempty"` + Status string `json:"status"` + SessionID string `json:"sessionId,omitempty"` + Session *arthas.Session `json:"session,omitempty"` + Error string `json:"error,omitempty"` + StartedAt time.Time `json:"startedAt"` + FinishedAt *time.Time `json:"finishedAt,omitempty"` +} + +type SessionCreateJobRegistry struct { + mu sync.RWMutex + jobs map[string]*SessionCreateJob +} + +func NewSessionCreateJobRegistry() *SessionCreateJobRegistry { + return &SessionCreateJobRegistry{jobs: make(map[string]*SessionCreateJob)} +} + +func (r *SessionCreateJobRegistry) Start(targetKey string) (*SessionCreateJob, bool) { + r.mu.Lock() + defer r.mu.Unlock() + r.cleanupLocked(time.Now()) + for _, job := range r.jobs { + if job.TargetKey == targetKey && job.Status == SessionCreatePending { + return cloneSessionCreateJob(job), false + } + } + job := &SessionCreateJob{ + ID: newJobID(), + TargetKey: targetKey, + Status: SessionCreatePending, + StartedAt: time.Now().UTC(), + } + r.jobs[job.ID] = job + return cloneSessionCreateJob(job), true +} + +func CompletedSessionCreateJob(targetKey string, session *arthas.Session) *SessionCreateJob { + now := time.Now().UTC() + job := &SessionCreateJob{ + TargetKey: targetKey, + Status: SessionCreateRunning, + Session: session, + StartedAt: now, + FinishedAt: &now, + } + if session != nil { + job.SessionID = session.ID + job.StartedAt = session.StartedAt + } + return job +} + +func (r *SessionCreateJobRegistry) Succeed(id string, session *arthas.Session) { + r.finish(id, func(job *SessionCreateJob) { + job.Status = SessionCreateRunning + job.Session = session + if session != nil { + job.SessionID = session.ID + } + }) +} + +func (r *SessionCreateJobRegistry) Fail(id string, err error) { + r.finish(id, func(job *SessionCreateJob) { + job.Status = SessionCreateFailed + if err != nil { + job.Error = err.Error() + } + }) +} + +func (r *SessionCreateJobRegistry) finish(id string, update func(*SessionCreateJob)) { + r.mu.Lock() + defer r.mu.Unlock() + job, ok := r.jobs[id] + if !ok { + return + } + update(job) + now := time.Now().UTC() + job.FinishedAt = &now +} + +func (r *SessionCreateJobRegistry) Get(id string) (*SessionCreateJob, bool) { + r.mu.Lock() + defer r.mu.Unlock() + r.cleanupLocked(time.Now()) + job, ok := r.jobs[id] + if !ok { + return nil, false + } + return cloneSessionCreateJob(job), true +} + +func (r *SessionCreateJobRegistry) List() []*SessionCreateJob { + r.mu.Lock() + defer r.mu.Unlock() + r.cleanupLocked(time.Now()) + out := make([]*SessionCreateJob, 0, len(r.jobs)) + for _, job := range r.jobs { + out = append(out, cloneSessionCreateJob(job)) + } + sort.Slice(out, func(i, j int) bool { return out[i].StartedAt.Before(out[j].StartedAt) }) + return out +} + +func (r *SessionCreateJobRegistry) cleanupLocked(now time.Time) { + for id, job := range r.jobs { + if job.FinishedAt != nil && now.Sub(*job.FinishedAt) > sessionCreateJobTTL { + delete(r.jobs, id) + } + } +} + +func cloneSessionCreateJob(job *SessionCreateJob) *SessionCreateJob { + if job == nil { + return nil + } + copy := *job + if job.FinishedAt != nil { + finished := *job.FinishedAt + copy.FinishedAt = &finished + } + return © +} + +func newJobID() string { + var b [8]byte + if _, err := rand.Read(b[:]); err != nil { + return fmt.Sprintf("j%d", time.Now().UnixNano()) + } + return hex.EncodeToString(b[:]) +} diff --git a/arthas/main.go b/arthas/main.go index a3e8dfd..eb2a1d5 100644 --- a/arthas/main.go +++ b/arthas/main.go @@ -17,11 +17,12 @@ import ( ) const ( - OpSessionsList = "sessions-list" - OpSessionCreate = "session-create" - OpSessionDelete = "session-delete" - OpPodsList = "pods-list" - OpExec = "exec" + OpSessionsList = "sessions-list" + OpSessionCreate = "session-create" + OpSessionCreationStatus = "session-creation-status" + OpSessionDelete = "session-delete" + OpPodsList = "pods-list" + OpExec = "exec" ) //go:generate go run ./internal/gen-checksum @@ -43,12 +44,13 @@ func main() { } type ArthasPlugin struct { - clients clientCache - sessions *arthas.SessionRegistry + clients clientCache + sessions *arthas.SessionRegistry + sessionCreates *SessionCreateJobRegistry } func newPlugin() *ArthasPlugin { - return &ArthasPlugin{sessions: arthas.NewSessionRegistry()} + return &ArthasPlugin{sessions: arthas.NewSessionRegistry(), sessionCreates: NewSessionCreateJobRegistry()} } func (p *ArthasPlugin) Manifest() *pluginpb.PluginManifest { @@ -73,6 +75,9 @@ func (p *ArthasPlugin) Operations() []sdk.Operation { handlers := map[string]func(context.Context, sdk.InvokeCtx) (any, error){ OpSessionsList: p.sessionsList, OpSessionCreate: p.sessionCreate, + OpSessionCreationStatus: func(_ context.Context, req sdk.InvokeCtx) (any, error) { + return p.sessionCreationStatus(req) + }, OpSessionDelete: p.sessionDelete, OpPodsList: p.podsList, OpExec: p.exec, @@ -98,7 +103,8 @@ func operationDefs() []*pluginpb.OperationDef { } return []*pluginpb.OperationDef{ mk(OpSessionsList, "List active Arthas sessions in this plugin process."), - mk(OpSessionCreate, "Attach Arthas to the selected Kubernetes workload or pod."), + mk(OpSessionCreate, "Start attaching Arthas to the selected Kubernetes workload or pod asynchronously."), + mk(OpSessionCreationStatus, "Get the status of an asynchronous Arthas session creation."), mk(OpSessionDelete, "Stop and remove an Arthas session."), mk(OpPodsList, "List ready target pods for the selected Kubernetes workload."), mk(OpExec, "Execute one Arthas command through the session HTTP API."), diff --git a/arthas/ops.go b/arthas/ops.go index b175ff1..62dcb40 100644 --- a/arthas/ops.go +++ b/arthas/ops.go @@ -21,6 +21,8 @@ import ( "k8s.io/client-go/kubernetes" ) +const sessionCreationTimeout = 5 * time.Minute + type RunningPod struct { Namespace string `json:"namespace"` Name string `json:"name"` @@ -46,6 +48,10 @@ type SessionCreateParams struct { SkipJDKInstall bool `json:"skipJdkInstall,omitempty"` } +type SessionCreationStatusParams struct { + JobID string `json:"jobId"` +} + type SessionDeleteParams struct { ID string `json:"id"` } @@ -91,6 +97,90 @@ func (p *ArthasPlugin) podsList(ctx context.Context, req sdk.InvokeCtx) (any, er } func (p *ArthasPlugin) sessionCreate(ctx context.Context, req sdk.InvokeCtx) (any, error) { + if len(strings.TrimSpace(string(req.ParamsJSON))) == 0 { + req.ParamsJSON = []byte("{}") + } else { + req.ParamsJSON = append([]byte(nil), req.ParamsJSON...) + } + + var params SessionCreateParams + if err := json.Unmarshal(req.ParamsJSON, ¶ms); err != nil { + return nil, fmt.Errorf("decode params: %w", err) + } + target, err := p.createTarget(ctx, req, params) + if err != nil { + return nil, err + } + cfg, err := p.clients.RESTConfig(ctx, req.Host) + if err != nil { + return nil, err + } + pod, container, err := arthask8s.ResolvePod(ctx, cfg, target.Namespace, target.Kind, target.Name, params.Container) + if err != nil { + return nil, fmt.Errorf("resolve pod: %w", err) + } + + targetKey := sessionTargetKey(target.Namespace, pod, container) + if sess, ok := p.sessions.FindByTarget(target.Namespace, pod, container); ok { + return CompletedSessionCreateJob(targetKey, sess), nil + } + + params.Namespace = target.Namespace + params.Kind = "pod" + params.Name = pod + params.Pod = pod + params.Container = container + resolvedParams, err := json.Marshal(params) + if err != nil { + return nil, fmt.Errorf("encode resolved params: %w", err) + } + req.ParamsJSON = resolvedParams + + job, started := p.sessionCreates.Start(targetKey) + if !started { + return job, nil + } + if sess, ok := p.sessions.FindByTarget(target.Namespace, pod, container); ok { + p.sessionCreates.Succeed(job.ID, sess) + return CompletedSessionCreateJob(targetKey, sess), nil + } + + go func(jobID string, req sdk.InvokeCtx) { + // Do not use the request context here: the HTTP request returns as soon as + // this job is registered, which cancels r.Context(). + ctx, cancel := context.WithTimeout(context.Background(), sessionCreationTimeout) + defer cancel() + sess, err := p.sessionCreateSync(ctx, req) + if err != nil { + p.sessionCreates.Fail(jobID, err) + return + } + p.sessionCreates.Succeed(jobID, sess) + }(job.ID, req) + + return job, nil +} + +func (p *ArthasPlugin) sessionCreationStatus(req sdk.InvokeCtx) (any, error) { + var params SessionCreationStatusParams + if err := json.Unmarshal(req.ParamsJSON, ¶ms); err != nil { + return nil, fmt.Errorf("decode params: %w", err) + } + if params.JobID == "" { + return nil, fmt.Errorf("jobId is required") + } + job, ok := p.sessionCreates.Get(params.JobID) + if !ok { + return nil, fmt.Errorf("session create job %q not found", params.JobID) + } + return job, nil +} + +func sessionTargetKey(namespace, pod, container string) string { + return namespace + "/" + pod + "/" + container +} + +func (p *ArthasPlugin) sessionCreateSync(ctx context.Context, req sdk.InvokeCtx) (*arthas.Session, error) { var params SessionCreateParams if len(req.ParamsJSON) > 0 { if err := json.Unmarshal(req.ParamsJSON, ¶ms); err != nil { @@ -112,9 +202,7 @@ func (p *ArthasPlugin) sessionCreate(ctx context.Context, req sdk.InvokeCtx) (an return nil, fmt.Errorf("resolve pod: %w", err) } - startCtx, cancel := context.WithTimeout(ctx, 90*time.Second) - defer cancel() - sess, err := arthas.Start(startCtx, cfg, arthas.StartOptions{ + sess, err := arthas.Start(ctx, cfg, arthas.StartOptions{ Namespace: target.Namespace, Kind: target.Kind, Name: target.Name, diff --git a/arthas/ui-src/src/pages/ArthasPage.tsx b/arthas/ui-src/src/pages/ArthasPage.tsx index c1375ef..235b30b 100644 --- a/arthas/ui-src/src/pages/ArthasPage.tsx +++ b/arthas/ui-src/src/pages/ArthasPage.tsx @@ -33,11 +33,22 @@ interface RunningPod { ownerName?: string; } +interface SessionCreateJob { + jobId: string; + status: "pending" | "running" | "failed"; + sessionId?: string; + session?: ArthasSession; + error?: string; + startedAt: string; + finishedAt?: string; +} + const SESSIONS_KEY = ["arthas", "sessions"] as const; export function ArthasPage() { const configID = configIDFromURL(); const [selectedId, setSelectedId] = useState(null); + const [createJobId, setCreateJobId] = useState(null); const qc = useQueryClient(); const sessionsQ = useQuery({ @@ -54,14 +65,40 @@ export function ArthasPage() { }); const create = useMutation({ - mutationFn: (body: Record) => callOp("session-create", body), - onSuccess: (sess) => { - qc.invalidateQueries({ queryKey: SESSIONS_KEY }); - setSelectedId(sess.id); - toastManager.add({ title: `Arthas session started on ${sess.pod}`, type: "success" }); + mutationFn: (body: Record) => callOp("session-create", body), + onSuccess: (job) => { + if (job.status === "running" && job.session) { + qc.invalidateQueries({ queryKey: SESSIONS_KEY }); + setSelectedId(job.session.id); + toastManager.add({ title: `Arthas session started on ${job.session.pod}`, type: "success" }); + return; + } + setCreateJobId(job.jobId); + toastManager.add({ title: "Starting Arthas session…", type: "info" }); }, }); + const createStatusQ = useQuery({ + queryKey: ["arthas", "session-creation-status", createJobId], + queryFn: () => callOp("session-creation-status", { jobId: createJobId }), + enabled: !!createJobId, + refetchInterval: createJobId ? 2_000 : false, + }); + + useEffect(() => { + const job = createStatusQ.data; + if (!job || !createJobId) return; + if (job.status === "running" && job.session) { + qc.invalidateQueries({ queryKey: SESSIONS_KEY }); + setSelectedId(job.session.id); + setCreateJobId(null); + toastManager.add({ title: `Arthas session started on ${job.session.pod}`, type: "success" }); + } else if (job.status === "failed") { + setCreateJobId(null); + toastManager.add({ title: job.error ? `Failed to start Arthas session: ${job.error}` : "Failed to start Arthas session", type: "error" }); + } + }, [createJobId, createStatusQ.data, qc]); + const del = useMutation({ mutationFn: (id: string) => callOp("session-delete", { id }), onSuccess: (_, id) => { @@ -97,7 +134,7 @@ export function ArthasPage() { pods={podsQ.data ?? []} podsLoading={podsQ.isLoading} podsError={podsQ.error} - creating={create.isPending} + creating={create.isPending || !!createJobId} deletingId={del.isPending ? String(del.variables ?? "") || null : null} onSelectSession={setSelectedId} onCreateSession={(body) => create.mutate(body)} @@ -111,7 +148,7 @@ export function ArthasPage() { pods={podsQ.data ?? []} podsLoading={podsQ.isLoading} podsError={podsQ.error} - creating={create.isPending} + creating={create.isPending || !!createJobId} deletingId={del.isPending ? String(del.variables ?? "") || null : null} onSelectSession={setSelectedId} onCreateSession={(body) => create.mutate(body)} diff --git a/arthas/ui/index.html b/arthas/ui/index.html index 1f49855..ee3fd9b 100644 --- a/arthas/ui/index.html +++ b/arthas/ui/index.html @@ -29,7 +29,7 @@ color: rgb(100 116 139); } - + diff --git a/arthas/ui_checksum.go b/arthas/ui_checksum.go index b966938..0c1a161 100644 --- a/arthas/ui_checksum.go +++ b/arthas/ui_checksum.go @@ -5,4 +5,4 @@ package main // uiChecksum is the sha256 of every file embedded under ui/. // Regenerated on every `task build:plugin:arthas` or // `go generate ./plugins/arthas/...`. -const uiChecksum = "371c753ebb8ac1c14068752a03b33e3bed71fa39043101c4cddbcf3ef754bc27" +const uiChecksum = "0b47ce51950a3b2065184e08971a7efd1aa0fe212a8ce073364553dfd7ece6a8"