Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions arthas/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ The Arthas plugin adds JVM diagnostics to Mission Control for Kubernetes workloa

## Operations

| Operation | Purpose |
| ---------------- | --------------------------------------------------------------- |
| `sessions-list` | List active Arthas sessions. |
| `session-create` | Attach Arthas to the selected workload or pod. |
| `session-delete` | Stop and remove a plugin session and close port-forwards. |
| `pods-list` | List ready pods that can be targeted for the selected workload. |
| `exec` | Execute an Arthas command through the Arthas HTTP API. |
| Operation | Purpose |
| ----------------------- | --------------------------------------------------------------------- |
| `sessions-list` | List active Arthas sessions. |
| `session-create` | Start attaching Arthas to the selected workload or pod asynchronously. |
| `session-creation-status` | Poll the status of an asynchronous session creation job. |
| `session-delete` | Stop and remove a plugin session and close port-forwards. |
| `pods-list` | List ready pods that can be targeted for the selected workload. |
| `exec` | Execute an Arthas command through the Arthas HTTP API. |

## Kubernetes access

Expand Down
11 changes: 11 additions & 0 deletions arthas/internal/arthas/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ func (r *SessionRegistry) Get(id string) (*Session, bool) {
return s, ok
}

func (r *SessionRegistry) FindByTarget(namespace, pod, container string) (*Session, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
for _, s := range r.sessions {
if s.Namespace == namespace && s.Pod == pod && s.Container == container {
return s, true
}
}
return nil, false
}

// List returns a snapshot sorted by start time (oldest first).
func (r *SessionRegistry) List() []*Session {
r.mu.RLock()
Expand Down
157 changes: 157 additions & 0 deletions arthas/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package main

import (
"crypto/rand"
"encoding/hex"
"fmt"
"sort"
"sync"
"time"

"github.com/flanksource/mission-control-plugins/arthas/internal/arthas"
)

const (
SessionCreatePending = "pending"
SessionCreateRunning = "running"
SessionCreateFailed = "failed"

sessionCreateJobTTL = 30 * time.Minute
)

type SessionCreateJob struct {
ID string `json:"jobId,omitempty"`
TargetKey string `json:"targetKey,omitempty"`
Status string `json:"status"`
SessionID string `json:"sessionId,omitempty"`
Session *arthas.Session `json:"session,omitempty"`
Error string `json:"error,omitempty"`
StartedAt time.Time `json:"startedAt"`
FinishedAt *time.Time `json:"finishedAt,omitempty"`
}

type SessionCreateJobRegistry struct {
mu sync.RWMutex
jobs map[string]*SessionCreateJob
}

func NewSessionCreateJobRegistry() *SessionCreateJobRegistry {
return &SessionCreateJobRegistry{jobs: make(map[string]*SessionCreateJob)}
}

func (r *SessionCreateJobRegistry) Start(targetKey string) (*SessionCreateJob, bool) {
r.mu.Lock()
defer r.mu.Unlock()
r.cleanupLocked(time.Now())
for _, job := range r.jobs {
if job.TargetKey == targetKey && job.Status == SessionCreatePending {
return cloneSessionCreateJob(job), false
}
}
job := &SessionCreateJob{
ID: newJobID(),
TargetKey: targetKey,
Status: SessionCreatePending,
StartedAt: time.Now().UTC(),
}
r.jobs[job.ID] = job
return cloneSessionCreateJob(job), true
}

func CompletedSessionCreateJob(targetKey string, session *arthas.Session) *SessionCreateJob {
now := time.Now().UTC()
job := &SessionCreateJob{
TargetKey: targetKey,
Status: SessionCreateRunning,
Session: session,
StartedAt: now,
FinishedAt: &now,
}
if session != nil {
job.SessionID = session.ID
job.StartedAt = session.StartedAt
}
return job
}

func (r *SessionCreateJobRegistry) Succeed(id string, session *arthas.Session) {
r.finish(id, func(job *SessionCreateJob) {
job.Status = SessionCreateRunning
job.Session = session
if session != nil {
job.SessionID = session.ID
}
})
}

func (r *SessionCreateJobRegistry) Fail(id string, err error) {
r.finish(id, func(job *SessionCreateJob) {
job.Status = SessionCreateFailed
if err != nil {
job.Error = err.Error()
}
})
}

func (r *SessionCreateJobRegistry) finish(id string, update func(*SessionCreateJob)) {
r.mu.Lock()
defer r.mu.Unlock()
job, ok := r.jobs[id]
if !ok {
return
}
update(job)
now := time.Now().UTC()
job.FinishedAt = &now
}

func (r *SessionCreateJobRegistry) Get(id string) (*SessionCreateJob, bool) {
r.mu.Lock()
defer r.mu.Unlock()
r.cleanupLocked(time.Now())
job, ok := r.jobs[id]
if !ok {
return nil, false
}
return cloneSessionCreateJob(job), true
}

func (r *SessionCreateJobRegistry) List() []*SessionCreateJob {
r.mu.Lock()
defer r.mu.Unlock()
r.cleanupLocked(time.Now())
out := make([]*SessionCreateJob, 0, len(r.jobs))
for _, job := range r.jobs {
out = append(out, cloneSessionCreateJob(job))
}
sort.Slice(out, func(i, j int) bool { return out[i].StartedAt.Before(out[j].StartedAt) })
return out
}

func (r *SessionCreateJobRegistry) cleanupLocked(now time.Time) {
for id, job := range r.jobs {
if job.FinishedAt != nil && now.Sub(*job.FinishedAt) > sessionCreateJobTTL {
delete(r.jobs, id)
}
}
}

func cloneSessionCreateJob(job *SessionCreateJob) *SessionCreateJob {
if job == nil {
return nil
}
copy := *job
if job.FinishedAt != nil {
finished := *job.FinishedAt
copy.FinishedAt = &finished
}
return &copy
}

func newJobID() string {
var b [8]byte
if _, err := rand.Read(b[:]); err != nil {
return fmt.Sprintf("j%d", time.Now().UnixNano())
}
return hex.EncodeToString(b[:])
}
24 changes: 15 additions & 9 deletions arthas/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
)

const (
OpSessionsList = "sessions-list"
OpSessionCreate = "session-create"
OpSessionDelete = "session-delete"
OpPodsList = "pods-list"
OpExec = "exec"
OpSessionsList = "sessions-list"
OpSessionCreate = "session-create"
OpSessionCreationStatus = "session-creation-status"
OpSessionDelete = "session-delete"
OpPodsList = "pods-list"
OpExec = "exec"
)

//go:generate go run ./internal/gen-checksum
Expand All @@ -43,12 +44,13 @@ func main() {
}

type ArthasPlugin struct {
clients clientCache
sessions *arthas.SessionRegistry
clients clientCache
sessions *arthas.SessionRegistry
sessionCreates *SessionCreateJobRegistry
}

func newPlugin() *ArthasPlugin {
return &ArthasPlugin{sessions: arthas.NewSessionRegistry()}
return &ArthasPlugin{sessions: arthas.NewSessionRegistry(), sessionCreates: NewSessionCreateJobRegistry()}
}

func (p *ArthasPlugin) Manifest() *pluginpb.PluginManifest {
Expand All @@ -73,6 +75,9 @@ func (p *ArthasPlugin) Operations() []sdk.Operation {
handlers := map[string]func(context.Context, sdk.InvokeCtx) (any, error){
OpSessionsList: p.sessionsList,
OpSessionCreate: p.sessionCreate,
OpSessionCreationStatus: func(_ context.Context, req sdk.InvokeCtx) (any, error) {
return p.sessionCreationStatus(req)
},
OpSessionDelete: p.sessionDelete,
OpPodsList: p.podsList,
OpExec: p.exec,
Expand All @@ -98,7 +103,8 @@ func operationDefs() []*pluginpb.OperationDef {
}
return []*pluginpb.OperationDef{
mk(OpSessionsList, "List active Arthas sessions in this plugin process."),
mk(OpSessionCreate, "Attach Arthas to the selected Kubernetes workload or pod."),
mk(OpSessionCreate, "Start attaching Arthas to the selected Kubernetes workload or pod asynchronously."),
mk(OpSessionCreationStatus, "Get the status of an asynchronous Arthas session creation."),
mk(OpSessionDelete, "Stop and remove an Arthas session."),
mk(OpPodsList, "List ready target pods for the selected Kubernetes workload."),
mk(OpExec, "Execute one Arthas command through the session HTTP API."),
Expand Down
94 changes: 91 additions & 3 deletions arthas/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"k8s.io/client-go/kubernetes"
)

const sessionCreationTimeout = 5 * time.Minute

type RunningPod struct {
Namespace string `json:"namespace"`
Name string `json:"name"`
Expand All @@ -46,6 +48,10 @@ type SessionCreateParams struct {
SkipJDKInstall bool `json:"skipJdkInstall,omitempty"`
}

type SessionCreationStatusParams struct {
JobID string `json:"jobId"`
}

type SessionDeleteParams struct {
ID string `json:"id"`
}
Expand Down Expand Up @@ -91,6 +97,90 @@ func (p *ArthasPlugin) podsList(ctx context.Context, req sdk.InvokeCtx) (any, er
}

func (p *ArthasPlugin) sessionCreate(ctx context.Context, req sdk.InvokeCtx) (any, error) {
if len(strings.TrimSpace(string(req.ParamsJSON))) == 0 {
req.ParamsJSON = []byte("{}")
} else {
req.ParamsJSON = append([]byte(nil), req.ParamsJSON...)
}

var params SessionCreateParams
if err := json.Unmarshal(req.ParamsJSON, &params); err != nil {
return nil, fmt.Errorf("decode params: %w", err)
}
target, err := p.createTarget(ctx, req, params)
if err != nil {
return nil, err
}
cfg, err := p.clients.RESTConfig(ctx, req.Host)
if err != nil {
return nil, err
}
pod, container, err := arthask8s.ResolvePod(ctx, cfg, target.Namespace, target.Kind, target.Name, params.Container)
if err != nil {
return nil, fmt.Errorf("resolve pod: %w", err)
}

targetKey := sessionTargetKey(target.Namespace, pod, container)
if sess, ok := p.sessions.FindByTarget(target.Namespace, pod, container); ok {
return CompletedSessionCreateJob(targetKey, sess), nil
}

params.Namespace = target.Namespace
params.Kind = "pod"
params.Name = pod
params.Pod = pod
params.Container = container
resolvedParams, err := json.Marshal(params)
if err != nil {
return nil, fmt.Errorf("encode resolved params: %w", err)
}
req.ParamsJSON = resolvedParams

job, started := p.sessionCreates.Start(targetKey)
if !started {
return job, nil
}
if sess, ok := p.sessions.FindByTarget(target.Namespace, pod, container); ok {
p.sessionCreates.Succeed(job.ID, sess)
return CompletedSessionCreateJob(targetKey, sess), nil
}

go func(jobID string, req sdk.InvokeCtx) {
// Do not use the request context here: the HTTP request returns as soon as
// this job is registered, which cancels r.Context().
ctx, cancel := context.WithTimeout(context.Background(), sessionCreationTimeout)
defer cancel()
sess, err := p.sessionCreateSync(ctx, req)
if err != nil {
p.sessionCreates.Fail(jobID, err)
return
}
p.sessionCreates.Succeed(jobID, sess)
}(job.ID, req)

return job, nil
}

func (p *ArthasPlugin) sessionCreationStatus(req sdk.InvokeCtx) (any, error) {
var params SessionCreationStatusParams
if err := json.Unmarshal(req.ParamsJSON, &params); err != nil {
return nil, fmt.Errorf("decode params: %w", err)
}
if params.JobID == "" {
return nil, fmt.Errorf("jobId is required")
}
job, ok := p.sessionCreates.Get(params.JobID)
if !ok {
return nil, fmt.Errorf("session create job %q not found", params.JobID)
}
return job, nil
}

func sessionTargetKey(namespace, pod, container string) string {
return namespace + "/" + pod + "/" + container
}

func (p *ArthasPlugin) sessionCreateSync(ctx context.Context, req sdk.InvokeCtx) (*arthas.Session, error) {
var params SessionCreateParams
if len(req.ParamsJSON) > 0 {
if err := json.Unmarshal(req.ParamsJSON, &params); err != nil {
Expand All @@ -112,9 +202,7 @@ func (p *ArthasPlugin) sessionCreate(ctx context.Context, req sdk.InvokeCtx) (an
return nil, fmt.Errorf("resolve pod: %w", err)
}

startCtx, cancel := context.WithTimeout(ctx, 90*time.Second)
defer cancel()
sess, err := arthas.Start(startCtx, cfg, arthas.StartOptions{
sess, err := arthas.Start(ctx, cfg, arthas.StartOptions{
Namespace: target.Namespace,
Kind: target.Kind,
Name: target.Name,
Expand Down
Loading
Loading