From a89abc4a9d59fdf0ee423485a06cc5cae50eb541 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Thu, 21 May 2026 22:07:01 +0545 Subject: [PATCH] fix: scope plugin sessions to active config targets Arthas, Inspektor Gadget, and SQL Server trace sessions could leak across config pages because their in-process registries were listed globally. Scope Arthas and Inspektor sessions by the pods resolved from the active config item, validate session actions against that target set, and use SQL Server trace configItemId for trace list/get/stop/delete. Update UI cache keys and selection behavior so stale sessions are not shown after switching configs. --- arthas/ops.go | 66 ++++++++++++++++++- arthas/ui-src/src/pages/ArthasPage.tsx | 29 +++++---- arthas/ui/index.html | 2 +- arthas/ui_checksum.go | 2 +- inspektor-gadget/http.go | 16 ++++- inspektor-gadget/ops.go | 82 +++++++++++++++++++++--- inspektor-gadget/ui-src/src/main.tsx | 12 +++- inspektor-gadget/ui/index.html | 2 +- inspektor-gadget/ui_checksum.go | 2 +- sql-server/internal/sqltrace/registry.go | 51 ++++++++++++--- sql-server/ops_trace.go | 15 ++--- 11 files changed, 227 insertions(+), 52 deletions(-) diff --git a/arthas/ops.go b/arthas/ops.go index 62dcb40..664d11f 100644 --- a/arthas/ops.go +++ b/arthas/ops.go @@ -80,8 +80,18 @@ type ExecResponse struct { Duration time.Duration `json:"-"` } -func (p *ArthasPlugin) sessionsList(_ context.Context, _ sdk.InvokeCtx) (any, error) { - return p.sessions.List(), nil +func (p *ArthasPlugin) sessionsList(ctx context.Context, req sdk.InvokeCtx) (any, error) { + pods, err := p.currentPods(ctx, req) + if err != nil { + return nil, err + } + out := make([]*arthas.Session, 0) + for _, sess := range p.sessions.List() { + if sessionInPods(sess.Namespace, sess.Pod, sess.Container, pods) { + out = append(out, sess) + } + } + return out, nil } func (p *ArthasPlugin) podsList(ctx context.Context, req sdk.InvokeCtx) (any, error) { @@ -251,7 +261,7 @@ func (p *ArthasPlugin) createTarget(ctx context.Context, req sdk.InvokeCtx, para return base, nil } -func (p *ArthasPlugin) sessionDelete(_ context.Context, req sdk.InvokeCtx) (any, error) { +func (p *ArthasPlugin) sessionDelete(ctx context.Context, req sdk.InvokeCtx) (any, error) { var params SessionDeleteParams if err := json.Unmarshal(req.ParamsJSON, ¶ms); err != nil { return nil, fmt.Errorf("decode params: %w", err) @@ -259,6 +269,9 @@ func (p *ArthasPlugin) sessionDelete(_ context.Context, req sdk.InvokeCtx) (any, if params.ID == "" { return nil, fmt.Errorf("id is required") } + if _, err := p.sessionForConfig(ctx, req, params.ID); err != nil { + return nil, err + } removed, err := p.sessions.Remove(params.ID) if !removed { return nil, fmt.Errorf("session %q not found", params.ID) @@ -274,6 +287,9 @@ func (p *ArthasPlugin) exec(ctx context.Context, req sdk.InvokeCtx) (any, error) if params.SessionID == "" || params.Command == "" { return nil, fmt.Errorf("sessionId and command are required") } + if _, err := p.sessionForConfig(ctx, req, params.SessionID); err != nil { + return nil, err + } return p.execCommand(ctx, params.SessionID, params.Command) } @@ -324,6 +340,50 @@ func (p *ArthasPlugin) execCommand(ctx context.Context, sessionID, command strin return out, nil } +func (p *ArthasPlugin) sessionForConfig(ctx context.Context, req sdk.InvokeCtx, id string) (*arthas.Session, error) { + sess, ok := p.sessions.Get(id) + if !ok { + return nil, fmt.Errorf("session %q not found", id) + } + pods, err := p.currentPods(ctx, req) + if err != nil { + return nil, err + } + if !sessionInPods(sess.Namespace, sess.Pod, sess.Container, pods) { + return nil, fmt.Errorf("session %q does not belong to the current config item", id) + } + return sess, nil +} + +func (p *ArthasPlugin) currentPods(ctx context.Context, req sdk.InvokeCtx) ([]RunningPod, error) { + target, err := targetFromConfig(ctx, req.Host, req.ConfigItemID) + if err != nil { + return nil, err + } + cli, err := p.clients.Client(ctx, req.Host) + if err != nil { + return nil, err + } + return listRunningPodsForTarget(ctx, cli, target) +} + +func sessionInPods(namespace, pod, container string, pods []RunningPod) bool { + for _, p := range pods { + if p.Namespace != namespace || p.Name != pod { + continue + } + if container == "" { + return true + } + for _, c := range p.Containers { + if c == container { + return true + } + } + } + return false +} + func targetFromConfig(ctx context.Context, host sdk.HostClient, configID string) (TargetRef, error) { if host == nil || configID == "" { return TargetRef{}, fmt.Errorf("config_id is required") diff --git a/arthas/ui-src/src/pages/ArthasPage.tsx b/arthas/ui-src/src/pages/ArthasPage.tsx index 235b30b..78fd494 100644 --- a/arthas/ui-src/src/pages/ArthasPage.tsx +++ b/arthas/ui-src/src/pages/ArthasPage.tsx @@ -43,7 +43,7 @@ interface SessionCreateJob { finishedAt?: string; } -const SESSIONS_KEY = ["arthas", "sessions"] as const; +const sessionsKey = (configID: string) => ["arthas", configID, "sessions"] as const; export function ArthasPage() { const configID = configIDFromURL(); @@ -52,8 +52,9 @@ export function ArthasPage() { const qc = useQueryClient(); const sessionsQ = useQuery({ - queryKey: SESSIONS_KEY, + queryKey: sessionsKey(configID), queryFn: () => callOp("sessions-list"), + enabled: !!configID, refetchInterval: 5_000, }); @@ -68,7 +69,7 @@ export function ArthasPage() { mutationFn: (body: Record) => callOp("session-create", body), onSuccess: (job) => { if (job.status === "running" && job.session) { - qc.invalidateQueries({ queryKey: SESSIONS_KEY }); + qc.invalidateQueries({ queryKey: sessionsKey(configID) }); setSelectedId(job.session.id); toastManager.add({ title: `Arthas session started on ${job.session.pod}`, type: "success" }); return; @@ -89,7 +90,7 @@ export function ArthasPage() { const job = createStatusQ.data; if (!job || !createJobId) return; if (job.status === "running" && job.session) { - qc.invalidateQueries({ queryKey: SESSIONS_KEY }); + qc.invalidateQueries({ queryKey: sessionsKey(configID) }); setSelectedId(job.session.id); setCreateJobId(null); toastManager.add({ title: `Arthas session started on ${job.session.pod}`, type: "success" }); @@ -97,12 +98,12 @@ export function ArthasPage() { setCreateJobId(null); toastManager.add({ title: job.error ? `Failed to start Arthas session: ${job.error}` : "Failed to start Arthas session", type: "error" }); } - }, [createJobId, createStatusQ.data, qc]); + }, [configID, createJobId, createStatusQ.data, qc]); const del = useMutation({ mutationFn: (id: string) => callOp("session-delete", { id }), onSuccess: (_, id) => { - qc.invalidateQueries({ queryKey: SESSIONS_KEY }); + qc.invalidateQueries({ queryKey: sessionsKey(configID) }); if (selectedId === id) setSelectedId(null); }, }); @@ -232,7 +233,7 @@ function SessionMenu({
{targets.map((target) => (
(); + const sessionsByTarget = new Map(); for (const session of sessions) { - if (!sessionsByPod.has(session.pod)) sessionsByPod.set(session.pod, session); + sessionsByTarget.set(targetKey(session.namespace, session.pod, session.container), session); } return pods.flatMap((pod) => { - const session = sessionsByPod.get(pod.name); - if (session) { - return [{ pod, container: session.container, session }]; - } - const containers = pod.containers.length > 0 ? pod.containers : [""]; return containers.map((container) => ({ pod, container, + session: sessionsByTarget.get(targetKey(pod.namespace, pod.name, container)), })); }); } +function targetKey(namespace: string, pod: string, container: string): string { + return `${namespace}/${pod}/${container}`; +} + function formatSessionTime(startedAt: string): string { const date = new Date(startedAt); if (Number.isNaN(date.getTime())) return ""; diff --git a/arthas/ui/index.html b/arthas/ui/index.html index ee3fd9b..ad649b1 100644 --- a/arthas/ui/index.html +++ b/arthas/ui/index.html @@ -29,7 +29,7 @@ color: rgb(100 116 139); } - + diff --git a/arthas/ui_checksum.go b/arthas/ui_checksum.go index 0c1a161..0756fe2 100644 --- a/arthas/ui_checksum.go +++ b/arthas/ui_checksum.go @@ -5,4 +5,4 @@ package main // uiChecksum is the sha256 of every file embedded under ui/. // Regenerated on every `task build:plugin:arthas` or // `go generate ./plugins/arthas/...`. -const uiChecksum = "0b47ce51950a3b2065184e08971a7efd1aa0fe212a8ce073364553dfd7ece6a8" +const uiChecksum = "aa3de343da8b23fc4b8d49b7bf5aeebc591ed3a53539889db0bb1c68c0592666" diff --git a/inspektor-gadget/http.go b/inspektor-gadget/http.go index f26367f..945e82d 100644 --- a/inspektor-gadget/http.go +++ b/inspektor-gadget/http.go @@ -58,9 +58,12 @@ func (p *InspektorGadgetPlugin) httpSession(w http.ResponseWriter, r *http.Reque http.Error(w, "missing session id", http.StatusBadRequest) return } - sess, ok := p.sessions.Get(id) - if !ok { - http.Error(w, "session not found", http.StatusNotFound) + sess, err := p.sessionForConfig(r.Context(), sdk.InvokeCtx{ + ConfigItemID: configItemIDFromRequest(r), + Host: sdk.HostClientFromContext(r.Context()), + }, id) + if err != nil { + http.Error(w, err.Error(), http.StatusForbidden) return } switch tail { @@ -80,6 +83,13 @@ func (p *InspektorGadgetPlugin) httpSession(w http.ResponseWriter, r *http.Reque } } +func configItemIDFromRequest(r *http.Request) string { + if id := sdk.ConfigItemIDFromContext(r.Context()); id != "" { + return id + } + return r.URL.Query().Get("config_id") +} + func streamSessionEvents(w http.ResponseWriter, r *http.Request, sess *TraceSession) { flusher, ok := w.(http.Flusher) if !ok { diff --git a/inspektor-gadget/ops.go b/inspektor-gadget/ops.go index fc5f10c..6775526 100644 --- a/inspektor-gadget/ops.go +++ b/inspektor-gadget/ops.go @@ -36,11 +36,22 @@ func (p *InspektorGadgetPlugin) tracesList(_ context.Context, _ sdk.InvokeCtx) ( return supportedGadgets(p.settings.GadgetTag), nil } -func (p *InspektorGadgetPlugin) traceList(_ context.Context, _ sdk.InvokeCtx) (any, error) { - return p.sessions.List(), nil +func (p *InspektorGadgetPlugin) traceList(ctx context.Context, req sdk.InvokeCtx) (any, error) { + pods, err := p.currentPods(ctx, req) + if err != nil { + return nil, err + } + sessions := p.sessions.List() + out := make([]*TraceSession, 0, len(sessions)) + for i := range sessions { + if traceTargetInPods(sessions[i].Target, pods) { + out = append(out, &sessions[i]) + } + } + return out, nil } -func (p *InspektorGadgetPlugin) traceEvents(_ context.Context, req sdk.InvokeCtx) (any, error) { +func (p *InspektorGadgetPlugin) traceEvents(ctx context.Context, req sdk.InvokeCtx) (any, error) { var params TraceEventsParams if err := json.Unmarshal(req.ParamsJSON, ¶ms); err != nil { return nil, fmt.Errorf("decode params: %w", err) @@ -48,14 +59,14 @@ func (p *InspektorGadgetPlugin) traceEvents(_ context.Context, req sdk.InvokeCtx if params.ID == "" { return nil, fmt.Errorf("id is required") } - sess, ok := p.sessions.Get(params.ID) - if !ok { - return nil, fmt.Errorf("session %q not found", params.ID) + sess, err := p.sessionForConfig(ctx, req, params.ID) + if err != nil { + return nil, err } return sess.Events(), nil } -func (p *InspektorGadgetPlugin) traceStop(_ context.Context, req sdk.InvokeCtx) (any, error) { +func (p *InspektorGadgetPlugin) traceStop(ctx context.Context, req sdk.InvokeCtx) (any, error) { var params TraceStopParams if err := json.Unmarshal(req.ParamsJSON, ¶ms); err != nil { return nil, fmt.Errorf("decode params: %w", err) @@ -63,9 +74,9 @@ func (p *InspektorGadgetPlugin) traceStop(_ context.Context, req sdk.InvokeCtx) if params.ID == "" { return nil, fmt.Errorf("id is required") } - sess, ok := p.sessions.Get(params.ID) - if !ok { - return nil, fmt.Errorf("session %q not found", params.ID) + sess, err := p.sessionForConfig(ctx, req, params.ID) + if err != nil { + return nil, err } sess.Stop() return sess.Snapshot(), nil @@ -161,6 +172,57 @@ func (p *InspektorGadgetPlugin) traceStart(ctx context.Context, req sdk.InvokeCt return session.Snapshot(), nil } +func (p *InspektorGadgetPlugin) sessionForConfig(ctx context.Context, req sdk.InvokeCtx, id string) (*TraceSession, error) { + sess, ok := p.sessions.Get(id) + if !ok { + return nil, fmt.Errorf("session %q not found", id) + } + if req.ConfigItemID == "" { + return sess, nil + } + pods, err := p.currentPods(ctx, req) + if err != nil { + return nil, err + } + if !traceTargetInPods(sess.Target, pods) { + return nil, fmt.Errorf("session %q does not belong to the current config item", id) + } + return sess, nil +} + +func (p *InspektorGadgetPlugin) currentPods(ctx context.Context, req sdk.InvokeCtx) ([]RunningPod, error) { + target, err := targetFromConfig(ctx, req.Host, req.ConfigItemID) + if err != nil { + return nil, err + } + cli, err := p.clients.Client(ctx, req.Host) + if err != nil { + return nil, err + } + pods, err := listRunningPodsForTarget(ctx, cli, target) + if err != nil { + return nil, err + } + return pods, nil +} + +func traceTargetInPods(target TraceTarget, pods []RunningPod) bool { + for _, pod := range pods { + if pod.Namespace != target.Namespace || pod.Name != target.Pod { + continue + } + if target.Container == "" { + return true + } + for _, container := range pod.Containers { + if container == target.Container { + return true + } + } + } + return false +} + func normalizeTraceOptions(params TraceStartParams) (map[string]any, error) { options := map[string]any{} for k, v := range params.Options { diff --git a/inspektor-gadget/ui-src/src/main.tsx b/inspektor-gadget/ui-src/src/main.tsx index e9b8feb..cc9286b 100644 --- a/inspektor-gadget/ui-src/src/main.tsx +++ b/inspektor-gadget/ui-src/src/main.tsx @@ -215,6 +215,8 @@ function App() { setSessions(nextSessions); if (!selectedSession && nextSessions.length > 0) { setSelectedSession(nextSessions[0].id); + } else if (selectedSession && !nextSessions.some((session) => session.id === selectedSession)) { + setSelectedSession(nextSessions[0]?.id ?? ""); } } @@ -225,7 +227,15 @@ function App() { useEffect(() => { const timer = window.setInterval(() => { - invoke("trace-list").then(setSessions).catch(() => undefined); + invoke("trace-list") + .then((nextSessions) => { + setSessions(nextSessions); + setSelectedSession((current) => { + if (!current) return nextSessions[0]?.id ?? ""; + return nextSessions.some((session) => session.id === current) ? current : nextSessions[0]?.id ?? ""; + }); + }) + .catch(() => undefined); }, 5000); return () => window.clearInterval(timer); }, []); diff --git a/inspektor-gadget/ui/index.html b/inspektor-gadget/ui/index.html index 4853bc8..bd1a178 100644 --- a/inspektor-gadget/ui/index.html +++ b/inspektor-gadget/ui/index.html @@ -4,7 +4,7 @@ Inspektor Gadget - + diff --git a/inspektor-gadget/ui_checksum.go b/inspektor-gadget/ui_checksum.go index 11df3f0..23c9e99 100644 --- a/inspektor-gadget/ui_checksum.go +++ b/inspektor-gadget/ui_checksum.go @@ -2,4 +2,4 @@ package main -const uiChecksum = "f1c8ee07ec80cd019efd6e21eb171bc0ccb76f6cfed69a813b0819ef453304be" +const uiChecksum = "3514276d26b51e6afbb1a657b696ecb78c537f37c61cb757f7d417fce15b5683" diff --git a/sql-server/internal/sqltrace/registry.go b/sql-server/internal/sqltrace/registry.go index a6836b2..e630ef1 100644 --- a/sql-server/internal/sqltrace/registry.go +++ b/sql-server/internal/sqltrace/registry.go @@ -222,11 +222,13 @@ func (r *Registry) runDrain(ctx context.Context, trace *ActiveTrace, interval ti } func (r *Registry) Stop(id string) (*ActiveTrace, error) { - r.mu.Lock() - trace, ok := r.traces[id] - r.mu.Unlock() - if !ok { - return nil, fmt.Errorf("trace %q not found", id) + return r.StopForConfig(id, "") +} + +func (r *Registry) StopForConfig(id, configItemID string) (*ActiveTrace, error) { + trace, err := r.getForConfig(id, configItemID) + if err != nil { + return nil, err } if err := trace.stop(context.Background()); err != nil { return trace, err @@ -241,10 +243,21 @@ func (r *Registry) Get(id string) (*ActiveTrace, bool) { return t, ok } +func (r *Registry) GetForConfig(id, configItemID string) (*ActiveTrace, error) { + return r.getForConfig(id, configItemID) +} + func (r *Registry) List() []*ActiveTrace { + return r.ListForConfig("") +} + +func (r *Registry) ListForConfig(configItemID string) []*ActiveTrace { r.mu.Lock() out := make([]*ActiveTrace, 0, len(r.traces)) for _, t := range r.traces { + if configItemID != "" && t.ConfigItemID != configItemID { + continue + } out = append(out, t) } r.mu.Unlock() @@ -253,18 +266,38 @@ func (r *Registry) List() []*ActiveTrace { } func (r *Registry) Delete(id string) (bool, error) { + return r.DeleteForConfig(id, "") +} + +func (r *Registry) DeleteForConfig(id, configItemID string) (bool, error) { r.mu.Lock() trace, ok := r.traces[id] - if ok { - delete(r.traces, id) - } - r.mu.Unlock() if !ok { + r.mu.Unlock() return false, nil } + if configItemID != "" && trace.ConfigItemID != configItemID { + r.mu.Unlock() + return false, fmt.Errorf("trace %q does not belong to the current config item", id) + } + delete(r.traces, id) + r.mu.Unlock() return true, trace.stop(context.Background()) } +func (r *Registry) getForConfig(id, configItemID string) (*ActiveTrace, error) { + r.mu.Lock() + trace, ok := r.traces[id] + r.mu.Unlock() + if !ok { + return nil, fmt.Errorf("trace %q not found", id) + } + if configItemID != "" && trace.ConfigItemID != configItemID { + return nil, fmt.Errorf("trace %q does not belong to the current config item", id) + } + return trace, nil +} + func (r *Registry) StopAll() { r.mu.Lock() snapshot := make([]*ActiveTrace, 0, len(r.traces)) diff --git a/sql-server/ops_trace.go b/sql-server/ops_trace.go index 544b0b7..82979bf 100644 --- a/sql-server/ops_trace.go +++ b/sql-server/ops_trace.go @@ -3,7 +3,6 @@ package main import ( "context" "encoding/json" - "fmt" "time" "github.com/flanksource/incident-commander/plugin/sdk" @@ -51,9 +50,9 @@ func (p *SQLServerPlugin) traceStart(ctx context.Context, req sdk.InvokeCtx) (an }) } -func (p *SQLServerPlugin) traceList(_ context.Context, _ sdk.InvokeCtx) (any, error) { +func (p *SQLServerPlugin) traceList(_ context.Context, req sdk.InvokeCtx) (any, error) { p.traces.GC() - return p.traces.List(), nil + return p.traces.ListForConfig(req.ConfigItemID), nil } type TraceIDParams struct { @@ -66,9 +65,9 @@ func (p *SQLServerPlugin) traceGet(_ context.Context, req sdk.InvokeCtx) (any, e if err := json.Unmarshal(req.ParamsJSON, ¶ms); err != nil { return nil, err } - t, ok := p.traces.Get(params.ID) - if !ok { - return nil, fmt.Errorf("trace %q not found", params.ID) + t, err := p.traces.GetForConfig(params.ID, req.ConfigItemID) + if err != nil { + return nil, err } events := t.EventsSince(params.Since) return map[string]any{ @@ -83,7 +82,7 @@ func (p *SQLServerPlugin) traceStop(_ context.Context, req sdk.InvokeCtx) (any, if err := json.Unmarshal(req.ParamsJSON, ¶ms); err != nil { return nil, err } - t, err := p.traces.Stop(params.ID) + t, err := p.traces.StopForConfig(params.ID, req.ConfigItemID) if err != nil { return nil, err } @@ -95,7 +94,7 @@ func (p *SQLServerPlugin) traceDelete(_ context.Context, req sdk.InvokeCtx) (any if err := json.Unmarshal(req.ParamsJSON, ¶ms); err != nil { return nil, err } - removed, err := p.traces.Delete(params.ID) + removed, err := p.traces.DeleteForConfig(params.ID, req.ConfigItemID) if err != nil { return nil, err }