diff --git a/dashboard/src/components/layout/app-shell.tsx b/dashboard/src/components/layout/app-shell.tsx index d18ded4..cc6b76b 100644 --- a/dashboard/src/components/layout/app-shell.tsx +++ b/dashboard/src/components/layout/app-shell.tsx @@ -1,3 +1,4 @@ +import { useLocation } from "@tanstack/react-router"; import type { ReactNode } from "react"; import { TooltipProvider } from "@/components/ui"; import { useApplyAccent } from "@/features/settings"; @@ -5,17 +6,20 @@ import { CommandPalette } from "./command-palette"; import { Header } from "./header"; import { RouteErrorBoundary } from "./route-error-boundary"; import { Sidebar } from "./sidebar"; +import { TopProgressBar } from "./top-progress-bar"; export function AppShell({ children }: { children: ReactNode }) { useApplyAccent(); + const { pathname } = useLocation(); return (
+
-
+
{children}
diff --git a/dashboard/src/components/layout/header.tsx b/dashboard/src/components/layout/header.tsx index c5cc714..a66e8d2 100644 --- a/dashboard/src/components/layout/header.tsx +++ b/dashboard/src/components/layout/header.tsx @@ -3,7 +3,6 @@ import { Button, Kbd } from "@/components/ui"; import { useCommandPalette } from "@/providers"; import { LastRefreshed } from "./last-refreshed"; import { MobileMenu } from "./mobile-menu"; -import { RefreshControl } from "./refresh-control"; import { ThemeToggle } from "./theme-toggle"; export function Header() { @@ -37,7 +36,6 @@ export function Header() {
-
diff --git a/dashboard/src/components/layout/index.ts b/dashboard/src/components/layout/index.ts index 4da8a91..0dd0d18 100644 --- a/dashboard/src/components/layout/index.ts +++ b/dashboard/src/components/layout/index.ts @@ -7,7 +7,7 @@ export { Header } from "./header"; export { LastRefreshed } from "./last-refreshed"; export { MobileMenu } from "./mobile-menu"; export { PageHeader } from "./page-header"; -export { RefreshControl } from "./refresh-control"; export { RouteErrorBoundary } from "./route-error-boundary"; export { Sidebar } from "./sidebar"; export { ThemeToggle } from "./theme-toggle"; +export { TopProgressBar } from "./top-progress-bar"; diff --git a/dashboard/src/components/layout/page-header.tsx b/dashboard/src/components/layout/page-header.tsx index 9867135..2c84cae 100644 --- a/dashboard/src/components/layout/page-header.tsx +++ b/dashboard/src/components/layout/page-header.tsx @@ -34,8 +34,10 @@ export function PageHeader({ {eyebrow}
) : null} -

{title}

- {description ?

{description}

: null} +

{title}

+ {description ? ( +

{description}

+ ) : null}
{actions ?
{actions}
: null} diff --git a/dashboard/src/components/layout/refresh-control.tsx b/dashboard/src/components/layout/refresh-control.tsx deleted file mode 100644 index 032330e..0000000 --- a/dashboard/src/components/layout/refresh-control.tsx +++ /dev/null @@ -1,35 +0,0 @@ -import { cn } from "@/lib/cn"; -import { type RefreshOption, useRefreshInterval } from "@/providers"; - -const OPTIONS: RefreshOption[] = ["2s", "5s", "10s", "off"]; - -export function RefreshControl() { - const { option, setOption } = useRefreshInterval(); - return ( -
- {OPTIONS.map((value) => { - const active = option === value; - return ( - - ); - })} -
- ); -} diff --git a/dashboard/src/components/layout/sidebar.tsx b/dashboard/src/components/layout/sidebar.tsx index ff42312..f3ddd13 100644 --- a/dashboard/src/components/layout/sidebar.tsx +++ b/dashboard/src/components/layout/sidebar.tsx @@ -91,13 +91,20 @@ export function Sidebar() {
  • + {active ? ( + + ) : null} {label} diff --git a/dashboard/src/components/layout/top-progress-bar.tsx b/dashboard/src/components/layout/top-progress-bar.tsx new file mode 100644 index 0000000..283f612 --- /dev/null +++ b/dashboard/src/components/layout/top-progress-bar.tsx @@ -0,0 +1,36 @@ +import { useIsMutating } from "@tanstack/react-query"; +import { useEffect, useState } from "react"; + +/** + * Thin accent-colored bar that fades in while a mutation (user action) + * is in flight. Polling refetches do not trigger it — at low polling + * intervals a per-fetch indicator strobes and reads as visual jitter. + * The "Updated just now" label in the header already conveys refresh + * cadence, so the progress bar focuses on intentional state changes. + */ +export function TopProgressBar() { + const mutating = useIsMutating(); + const busy = mutating > 0; + const [visible, setVisible] = useState(false); + + useEffect(() => { + if (busy) { + setVisible(true); + return; + } + const timeout = window.setTimeout(() => setVisible(false), 400); + return () => window.clearTimeout(timeout); + }, [busy]); + + return ( +
    +
    +
    + ); +} diff --git a/dashboard/src/components/ui/index.ts b/dashboard/src/components/ui/index.ts index c8dcd7e..2697f53 100644 --- a/dashboard/src/components/ui/index.ts +++ b/dashboard/src/components/ui/index.ts @@ -81,7 +81,7 @@ export { SheetTitle, SheetTrigger, } from "./sheet"; -export { Skeleton } from "./skeleton"; +export { Skeleton, TableSkeleton } from "./skeleton"; export { StatCard } from "./stat-card"; export { Table, diff --git a/dashboard/src/components/ui/skeleton.tsx b/dashboard/src/components/ui/skeleton.tsx index b9966c2..c6206bb 100644 --- a/dashboard/src/components/ui/skeleton.tsx +++ b/dashboard/src/components/ui/skeleton.tsx @@ -6,3 +6,58 @@ export function Skeleton({ className, ...props }: HTMLAttributes
    ); } + +interface TableSkeletonProps { + rows?: number; + columns?: Array; + className?: string; +} + +/** + * Table-shaped skeleton — gives a placeholder that matches the eventual row + * grid instead of a single opaque block. Pass widths per column (Tailwind + * class like "w-32" or numeric ch units). + */ +export function TableSkeleton({ + rows = 8, + columns = ["w-24", "w-40", "w-20", "w-16", "w-28", "w-24"], + className, +}: TableSkeletonProps) { + return ( +
    +
    + {columns.map((width, idx) => ( + + ))} +
    +
    + {Array.from({ length: rows }, (_, rowIdx) => ( +
    + {columns.map((width, colIdx) => ( + + ))} +
    + ))} +
    +
    + ); +} diff --git a/dashboard/src/components/ui/stat-card-trend.test.ts b/dashboard/src/components/ui/stat-card-trend.test.ts new file mode 100644 index 0000000..1bbddd6 --- /dev/null +++ b/dashboard/src/components/ui/stat-card-trend.test.ts @@ -0,0 +1,57 @@ +import { describe, expect, it } from "vitest"; +import { computeTrend, trendToneClass } from "./stat-card-trend"; + +describe("trendToneClass", () => { + it("renders flat as muted regardless of upIsGood", () => { + expect(trendToneClass("flat", true)).toMatch(/fg-subtle/); + expect(trendToneClass("flat", false)).toMatch(/fg-subtle/); + }); + + it("treats up as success when upIsGood", () => { + expect(trendToneClass("up", true)).toBe("text-success"); + expect(trendToneClass("down", true)).toBe("text-danger"); + }); + + it("inverts when upIsGood is false", () => { + expect(trendToneClass("up", false)).toBe("text-danger"); + expect(trendToneClass("down", false)).toBe("text-success"); + }); +}); + +describe("computeTrend", () => { + it("returns null on non-finite input", () => { + expect(computeTrend(Number.NaN, 5)).toBeNull(); + expect(computeTrend(5, Number.POSITIVE_INFINITY)).toBeNull(); + }); + + it("returns flat when current and previous are both zero", () => { + expect(computeTrend(0, 0)).toEqual({ direction: "flat", label: "0%", upIsGood: undefined }); + }); + + it("returns 'new' when previous is zero and current is non-zero", () => { + const t = computeTrend(7, 0); + expect(t?.direction).toBe("up"); + expect(t?.label).toBe("new"); + }); + + it("computes positive percentage", () => { + expect(computeTrend(120, 100)).toEqual({ direction: "up", label: "+20%", upIsGood: undefined }); + }); + + it("computes negative percentage", () => { + expect(computeTrend(80, 100)).toEqual({ + direction: "down", + label: "-20%", + upIsGood: undefined, + }); + }); + + it("returns flat when rounded percentage is zero", () => { + expect(computeTrend(1001, 1000)?.direction).toBe("flat"); + }); + + it("threads upIsGood through", () => { + const t = computeTrend(120, 100, { upIsGood: false }); + expect(t?.upIsGood).toBe(false); + }); +}); diff --git a/dashboard/src/components/ui/stat-card-trend.ts b/dashboard/src/components/ui/stat-card-trend.ts new file mode 100644 index 0000000..127fc9b --- /dev/null +++ b/dashboard/src/components/ui/stat-card-trend.ts @@ -0,0 +1,44 @@ +export type TrendDirection = "up" | "down" | "flat"; + +export interface StatTrend { + direction: TrendDirection; + /** Display label (e.g. "12%", "+4 / hr"). Caller controls formatting. */ + label: string; + /** + * Whether `up` should read as positive. Defaults to true. Set false for + * metrics where rising is bad (e.g. failures, latency) so colors invert. + */ + upIsGood?: boolean; +} + +export function trendToneClass(direction: TrendDirection, upIsGood: boolean): string { + if (direction === "flat") return "text-[var(--fg-subtle)]"; + const positive = direction === "up" ? upIsGood : !upIsGood; + return positive ? "text-success" : "text-danger"; +} + +/** + * Compare current vs previous bucket totals and produce a trend descriptor. + * Returns `null` when there is no previous data to compare against — the + * caller should treat that as "no trend yet" rather than rendering "flat". + */ +export function computeTrend( + current: number, + previous: number, + options: { upIsGood?: boolean } = {}, +): StatTrend | null { + if (!Number.isFinite(current) || !Number.isFinite(previous)) return null; + if (previous === 0 && current === 0) + return { direction: "flat", label: "0%", upIsGood: options.upIsGood }; + if (previous === 0) { + return { direction: "up", label: "new", upIsGood: options.upIsGood }; + } + const delta = current - previous; + const pct = Math.round((delta / previous) * 100); + if (pct === 0) return { direction: "flat", label: "0%", upIsGood: options.upIsGood }; + return { + direction: pct > 0 ? "up" : "down", + label: `${pct > 0 ? "+" : ""}${pct}%`, + upIsGood: options.upIsGood, + }; +} diff --git a/dashboard/src/components/ui/stat-card.tsx b/dashboard/src/components/ui/stat-card.tsx index e4db07e..1443089 100644 --- a/dashboard/src/components/ui/stat-card.tsx +++ b/dashboard/src/components/ui/stat-card.tsx @@ -1,17 +1,24 @@ +import { ArrowDown, ArrowUp, Minus } from "lucide-react"; import { forwardRef, type HTMLAttributes, type ReactNode } from "react"; import { cn } from "@/lib/cn"; import { Card } from "./card"; +import { type StatTrend, trendToneClass } from "./stat-card-trend"; + +export type { StatTrend, TrendDirection } from "./stat-card-trend"; + +type StatTone = "neutral" | "accent" | "success" | "warning" | "danger" | "info"; interface StatCardProps extends HTMLAttributes { label: string; value: ReactNode; hint?: ReactNode; icon?: ReactNode; - trend?: "up" | "down" | "flat"; - tone?: "neutral" | "accent" | "success" | "warning" | "danger" | "info"; + trend?: StatTrend; + sparkline?: ReactNode; + tone?: StatTone; } -const TONE_RING: Record, string> = { +const TONE_RING: Record = { neutral: "text-[var(--fg-muted)]", accent: "text-accent", info: "text-info", @@ -20,18 +27,40 @@ const TONE_RING: Record, string> = { danger: "text-danger", }; +const TREND_ICON: Record = { + up: ArrowUp, + down: ArrowDown, + flat: Minus, +}; + export const StatCard = forwardRef( - ({ label, value, hint, icon, tone = "neutral", className, ...props }, ref) => ( - -
    -
    - {label} + ({ label, value, hint, icon, trend, sparkline, tone = "neutral", className, ...props }, ref) => { + const TrendIcon = trend ? TREND_ICON[trend.direction] : null; + const trendClass = trend ? trendToneClass(trend.direction, trend.upIsGood ?? true) : ""; + return ( + +
    +
    + {label} +
    + {icon ?
    {icon}
    : null} +
    +
    +
    {value}
    + {trend && TrendIcon ? ( + + + Trend {trend.direction}: + {trend.label} + + ) : null}
    - {icon ?
    {icon}
    : null} -
    -
    {value}
    - {hint ?
    {hint}
    : null} - - ), + {hint ?
    {hint}
    : null} + {sparkline ?
    {sparkline}
    : null} + + ); + }, ); StatCard.displayName = "StatCard"; diff --git a/dashboard/src/components/ui/tabs.tsx b/dashboard/src/components/ui/tabs.tsx index c5a26be..82acfe8 100644 --- a/dashboard/src/components/ui/tabs.tsx +++ b/dashboard/src/components/ui/tabs.tsx @@ -45,6 +45,7 @@ const TabsContent = forwardRef< ref={ref} className={cn( "mt-4 focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-[var(--color-ring)]", + "data-[state=active]:animate-fade-in", className, )} {...props} diff --git a/dashboard/src/features/circuit-breakers/components/circuit-breakers-table.tsx b/dashboard/src/features/circuit-breakers/components/circuit-breakers-table.tsx index 25985f7..cb5a7cf 100644 --- a/dashboard/src/features/circuit-breakers/components/circuit-breakers-table.tsx +++ b/dashboard/src/features/circuit-breakers/components/circuit-breakers-table.tsx @@ -1,7 +1,7 @@ import type { ColumnDef } from "@tanstack/react-table"; import { CircuitBoard } from "lucide-react"; import { useMemo } from "react"; -import { Badge, DataTable, EmptyState, ErrorState, Skeleton } from "@/components/ui"; +import { Badge, DataTable, EmptyState, ErrorState, TableSkeleton } from "@/components/ui"; import type { CircuitBreaker } from "@/lib/api-types"; import { CIRCUIT_LABEL, CIRCUIT_TONE } from "@/lib/status"; import { formatDuration, formatRelative } from "@/lib/time"; @@ -92,7 +92,7 @@ export function CircuitBreakersTable({ } if (loading && !breakers) { - return ; + return ; } if (!breakers || breakers.length === 0) { diff --git a/dashboard/src/features/dead-letters/components/dead-letter-group-row.tsx b/dashboard/src/features/dead-letters/components/dead-letter-group-row.tsx index 17b1e44..e64161a 100644 --- a/dashboard/src/features/dead-letters/components/dead-letter-group-row.tsx +++ b/dashboard/src/features/dead-letters/components/dead-letter-group-row.tsx @@ -30,7 +30,7 @@ export function DeadLetterGroupRow({ group }: DeadLetterGroupRowProps) { } return ( -
    +
    + ); + })} +
    + + + + ); +} diff --git a/dashboard/src/features/settings/hooks.ts b/dashboard/src/features/settings/hooks.ts index 40e1b09..6206f08 100644 --- a/dashboard/src/features/settings/hooks.ts +++ b/dashboard/src/features/settings/hooks.ts @@ -78,6 +78,7 @@ export function useDeleteSetting() { if (ctx?.prev) qc.setQueryData(KEY, ctx.prev); toast.error("Couldn't delete setting", { description: describeError(error) }); }, + onSuccess: () => toast.success("Setting cleared"), onSettled: () => qc.invalidateQueries({ queryKey: KEY }), }); } diff --git a/dashboard/src/features/settings/index.ts b/dashboard/src/features/settings/index.ts index 0dbe45f..2725c79 100644 --- a/dashboard/src/features/settings/index.ts +++ b/dashboard/src/features/settings/index.ts @@ -1,6 +1,7 @@ export { BrandingSection } from "./components/branding-section"; export { ExternalLinksSection } from "./components/external-links-section"; export { IntegrationsSection } from "./components/integrations-section"; +export { RefreshIntervalSection } from "./components/refresh-interval-section"; export { applyJobContext, parseExternalLinks, diff --git a/dashboard/src/features/system/components/interception-table.tsx b/dashboard/src/features/system/components/interception-table.tsx index 68a071a..a56608d 100644 --- a/dashboard/src/features/system/components/interception-table.tsx +++ b/dashboard/src/features/system/components/interception-table.tsx @@ -1,13 +1,14 @@ import type { ColumnDef } from "@tanstack/react-table"; +import { ListFilter } from "lucide-react"; import { useMemo } from "react"; -import { DataTable, ErrorState, Skeleton } from "@/components/ui"; +import { DataTable, EmptyState, ErrorState, TableSkeleton } from "@/components/ui"; import type { InterceptionStats } from "@/lib/api-types"; import { formatCount } from "@/lib/number"; interface Row { strategy: string; count: number; - avg_ms: number; + share: number; } interface InterceptionTableProps { @@ -17,11 +18,25 @@ interface InterceptionTableProps { onRetry: () => void; } +const STRATEGY_LABEL: Record = { + pass: "Pass", + convert: "Convert", + proxy: "Proxy", + redirect: "Redirect", + reject: "Reject", +}; + export function InterceptionTable({ stats, loading, error, onRetry }: InterceptionTableProps) { const rows = useMemo(() => { - if (!stats) return []; - return Object.entries(stats) - .map(([strategy, v]) => ({ strategy, ...v })) + if (!stats?.strategy_counts) return []; + const total = Object.values(stats.strategy_counts).reduce((sum, n) => sum + n, 0); + return Object.entries(stats.strategy_counts) + .map(([strategy, count]) => ({ + strategy, + count, + share: total > 0 ? count / total : 0, + })) + .filter((r) => r.count > 0) .sort((a, b) => b.count - a.count); }, [stats]); @@ -30,9 +45,10 @@ export function InterceptionTable({ stats, loading, error, onRetry }: Intercepti { accessorKey: "strategy", header: "Strategy", - cell: ({ getValue }) => ( - {getValue()} - ), + cell: ({ getValue }) => { + const key = getValue(); + return {STRATEGY_LABEL[key] ?? key}; + }, }, { accessorKey: "count", @@ -42,11 +58,11 @@ export function InterceptionTable({ stats, loading, error, onRetry }: Intercepti ), }, { - accessorKey: "avg_ms", - header: "Avg", + accessorKey: "share", + header: "Share", cell: ({ getValue }) => ( - {getValue().toFixed(1)}ms + {(getValue() * 100).toFixed(1)}% ), }, @@ -63,15 +79,50 @@ export function InterceptionTable({ stats, loading, error, onRetry }: Intercepti /> ); } - if (loading && rows.length === 0) { - return ; + if (loading && !stats) { + return ; + } + if (!stats || stats.total_intercepts === 0) { + return ( + + ); } return ( - r.strategy} - empty="No interceptions recorded" - /> +
    +
    + + Total intercepts:{" "} + + {formatCount(stats.total_intercepts)} + + + + Avg duration:{" "} + + {stats.avg_duration_ms.toFixed(2)}ms + + + + Max depth:{" "} + {stats.max_depth_reached} + +
    + r.strategy} + empty={ + + } + /> +
    ); } diff --git a/dashboard/src/features/system/components/proxy-table.tsx b/dashboard/src/features/system/components/proxy-table.tsx index b8af8a0..6eaa5c2 100644 --- a/dashboard/src/features/system/components/proxy-table.tsx +++ b/dashboard/src/features/system/components/proxy-table.tsx @@ -1,16 +1,10 @@ import type { ColumnDef } from "@tanstack/react-table"; +import { Shuffle } from "lucide-react"; import { useMemo } from "react"; -import { DataTable, ErrorState, Skeleton } from "@/components/ui"; -import type { ProxyStats } from "@/lib/api-types"; +import { DataTable, EmptyState, ErrorState, TableSkeleton } from "@/components/ui"; +import type { ProxyHandlerStats, ProxyStats } from "@/lib/api-types"; import { formatCount } from "@/lib/number"; -interface Row { - handler: string; - reconstructions: number; - avg_ms: number; - errors: number; -} - interface ProxyTableProps { stats: ProxyStats | undefined; loading: boolean; @@ -19,14 +13,12 @@ interface ProxyTableProps { } export function ProxyTable({ stats, loading, error, onRetry }: ProxyTableProps) { - const rows = useMemo(() => { + const rows = useMemo(() => { if (!stats) return []; - return Object.entries(stats) - .map(([handler, v]) => ({ handler, ...v })) - .sort((a, b) => b.reconstructions - a.reconstructions); + return [...stats].sort((a, b) => b.total_reconstructions - a.total_reconstructions); }, [stats]); - const columns = useMemo[]>( + const columns = useMemo[]>( () => [ { accessorKey: "handler", @@ -36,23 +28,32 @@ export function ProxyTable({ stats, loading, error, onRetry }: ProxyTableProps) ), }, { - accessorKey: "reconstructions", + accessorKey: "total_reconstructions", header: "Reconstructions", cell: ({ getValue }) => ( {formatCount(getValue())} ), }, { - accessorKey: "avg_ms", + accessorKey: "avg_duration_ms", header: "Avg", cell: ({ getValue }) => ( - {getValue().toFixed(1)}ms + {getValue().toFixed(2)}ms + + ), + }, + { + accessorKey: "p95_duration_ms", + header: "p95", + cell: ({ getValue }) => ( + + {getValue().toFixed(2)}ms ), }, { - accessorKey: "errors", + accessorKey: "total_errors", header: "Errors", cell: ({ getValue }) => { const n = getValue(); @@ -73,14 +74,20 @@ export function ProxyTable({ stats, loading, error, onRetry }: ProxyTableProps) ); } if (loading && rows.length === 0) { - return ; + return ; } return ( r.handler} - empty="No proxy reconstructions recorded" + empty={ + + } /> ); } diff --git a/dashboard/src/features/workers/components/workers-table.tsx b/dashboard/src/features/workers/components/workers-table.tsx index d277530..93a8c26 100644 --- a/dashboard/src/features/workers/components/workers-table.tsx +++ b/dashboard/src/features/workers/components/workers-table.tsx @@ -1,8 +1,9 @@ import type { ColumnDef } from "@tanstack/react-table"; import { Server } from "lucide-react"; import { useMemo } from "react"; -import { Badge, DataTable, EmptyState, ErrorState, Skeleton } from "@/components/ui"; +import { Badge, DataTable, EmptyState, ErrorState, TableSkeleton } from "@/components/ui"; import type { Worker } from "@/lib/api-types"; +import { cn } from "@/lib/cn"; import { formatRelative } from "@/lib/time"; const STALE_AFTER_MS = 30_000; @@ -53,7 +54,10 @@ export function WorkersTable({ workers, loading, error, onRetry }: WorkersTableP return (
    {formatRelative(ts)} @@ -90,7 +94,7 @@ export function WorkersTable({ workers, loading, error, onRetry }: WorkersTableP } if (loading && !workers) { - return ; + return ; } if (!workers || workers.length === 0) { diff --git a/dashboard/src/lib/api-types.ts b/dashboard/src/lib/api-types.ts index 0602194..a1904ae 100644 --- a/dashboard/src/lib/api-types.ts +++ b/dashboard/src/lib/api-types.ts @@ -131,6 +131,9 @@ export interface TimeseriesBucket { success: number; failure: number; avg_ms: number; + p50_ms: number; + p95_ms: number; + p99_ms: number; } export interface Worker { @@ -171,19 +174,24 @@ export interface ResourceStatus { pool?: ResourcePoolStats; } -export type ProxyStats = Record< - string, - { - reconstructions: number; - avg_ms: number; - errors: number; - } ->; +export interface ProxyHandlerStats { + handler: string; + total_reconstructions: number; + total_errors: number; + total_cleanup_errors: number; + total_checksum_failures: number; + total_duration_ms: number; + avg_duration_ms: number; + max_duration_ms: number; + p95_duration_ms: number; +} -export type InterceptionStats = Record< - string, - { - count: number; - avg_ms: number; - } ->; +export type ProxyStats = ProxyHandlerStats[]; + +export interface InterceptionStats { + total_intercepts: number; + total_duration_ms: number; + avg_duration_ms: number; + strategy_counts: Record; + max_depth_reached: number; +} diff --git a/dashboard/src/routes/index.tsx b/dashboard/src/routes/index.tsx index 0777a63..54bd368 100644 --- a/dashboard/src/routes/index.tsx +++ b/dashboard/src/routes/index.tsx @@ -1,5 +1,6 @@ import { createFileRoute } from "@tanstack/react-router"; import { PageHeader } from "@/components/layout"; +import { Separator } from "@/components/ui"; import { pausedQueuesQuery, QueueBreakdown, @@ -45,11 +46,32 @@ function OverviewPage() { />
    - +
    + stats.refetch()} + /> +
    + +
    + throughput.refetch()} + /> +
    - + -
    +

    Queues

    @@ -62,7 +84,10 @@ function OverviewPage() { />
    -
    +

    Recent jobs

    diff --git a/dashboard/src/routes/queues.tsx b/dashboard/src/routes/queues.tsx index 3061d30..3ff4f2e 100644 --- a/dashboard/src/routes/queues.tsx +++ b/dashboard/src/routes/queues.tsx @@ -1,5 +1,7 @@ import { createFileRoute } from "@tanstack/react-router"; +import { ListTree, Pause, ShieldCheck } from "lucide-react"; import { PageHeader } from "@/components/layout"; +import { StatCard } from "@/components/ui"; import { pausedQueuesQuery, QueuesTable, @@ -7,6 +9,7 @@ import { usePausedQueues, useQueueStats, } from "@/features/queues"; +import { formatCount } from "@/lib/number"; export const Route = createFileRoute("/queues")({ loader: ({ context: { queryClient } }) => @@ -21,12 +24,39 @@ function QueuesPage() { const stats = useQueueStats(); const paused = usePausedQueues(); + const queueNames = stats.data ? Object.keys(stats.data) : []; + const totalQueues = queueNames.length; + const pausedCount = paused.data?.length ?? 0; + const totalPending = stats.data + ? Object.values(stats.data).reduce((sum, s) => sum + (s.pending ?? 0), 0) + : 0; + return ( <> +
    + } + value={formatCount(totalQueues)} + /> + } + value={formatCount(totalPending)} + /> + } + value={formatCount(pausedCount)} + /> +
    ) : (
    + diff --git a/dashboard/src/routes/workers.tsx b/dashboard/src/routes/workers.tsx index 9812eb4..27a3320 100644 --- a/dashboard/src/routes/workers.tsx +++ b/dashboard/src/routes/workers.tsx @@ -1,8 +1,12 @@ import { createFileRoute } from "@tanstack/react-router"; +import { Activity, AlertCircle, Server } from "lucide-react"; import { PageHeader } from "@/components/layout"; +import { StatCard } from "@/components/ui"; import { useWorkers, WorkersTable, workersQuery } from "@/features/workers"; import { formatCount } from "@/lib/number"; +const STALE_AFTER_MS = 30_000; + export const Route = createFileRoute("/workers")({ loader: ({ context: { queryClient } }) => queryClient.ensureQueryData(workersQuery()), component: WorkersPage, @@ -10,7 +14,11 @@ export const Route = createFileRoute("/workers")({ function WorkersPage() { const workers = useWorkers(); + const all = workers.data ?? []; const count = workers.data?.length; + const now = Date.now(); + const stale = all.filter((w) => now - w.last_heartbeat > STALE_AFTER_MS).length; + const healthy = all.length - stale; return ( <> @@ -22,6 +30,26 @@ function WorkersPage() { : "Active workers, heartbeats, and assignments." } /> +
    + } + value={formatCount(all.length)} + /> + } + value={formatCount(healthy)} + /> + } + value={formatCount(stale)} + /> +
    Callable: """Wrap a task function with hooks, middleware, and job context.""" - from taskito.context import _clear_context, current_job - from taskito.interception.reconstruct import reconstruct_args - hooks = self._hooks queue_ref = self diff --git a/py_src/taskito/async_support/context.py b/py_src/taskito/async_support/context.py index 29d4d70..a0dbc40 100644 --- a/py_src/taskito/async_support/context.py +++ b/py_src/taskito/async_support/context.py @@ -4,7 +4,7 @@ import contextvars -from taskito.context import _ActiveContext +from taskito._active_context import _ActiveContext _context_var: contextvars.ContextVar[_ActiveContext | None] = contextvars.ContextVar( "_taskito_async_context", default=None diff --git a/py_src/taskito/async_support/executor.py b/py_src/taskito/async_support/executor.py index 7af228e..a1ae934 100644 --- a/py_src/taskito/async_support/executor.py +++ b/py_src/taskito/async_support/executor.py @@ -12,6 +12,7 @@ import cloudpickle from taskito.async_support.context import clear_async_context, set_async_context +from taskito.context import current_job from taskito.exceptions import TaskCancelledError from taskito.interception.reconstruct import reconstruct_args from taskito.proxies import cleanup_proxies, reconstruct_proxies @@ -130,8 +131,6 @@ async def _execute( # Middleware before hooks middleware_chain = queue._get_middleware_chain(task_name) - from taskito.context import current_job - for mw in middleware_chain: try: mw.before(current_job) @@ -181,8 +180,6 @@ async def _execute( cleanup_proxies(proxy_cleanup, metrics=self._queue_ref._proxy_metrics) # Middleware after hooks (only those whose before() succeeded) - from taskito.context import current_job - for mw in completed_mw: try: mw.after(current_job, result, error) diff --git a/py_src/taskito/async_support/mixins.py b/py_src/taskito/async_support/mixins.py index 497bca5..3d64a08 100644 --- a/py_src/taskito/async_support/mixins.py +++ b/py_src/taskito/async_support/mixins.py @@ -9,11 +9,12 @@ from collections.abc import Callable from typing import TYPE_CHECKING, Any, TypeVar +from taskito.async_support.locks import AsyncDistributedLock + if TYPE_CHECKING: from collections.abc import Sequence from concurrent.futures import Executor - from taskito.async_support.locks import AsyncDistributedLock from taskito.result import JobResult logger = logging.getLogger("taskito") @@ -254,8 +255,6 @@ def alock( timeout: Max seconds to wait for acquisition. None = fail immediately. retry_interval: Seconds between retries when timeout is set. """ - from taskito.async_support.locks import AsyncDistributedLock - return AsyncDistributedLock( inner=self._inner, name=name, diff --git a/py_src/taskito/cli.py b/py_src/taskito/cli.py index a82043b..da90943 100644 --- a/py_src/taskito/cli.py +++ b/py_src/taskito/cli.py @@ -4,12 +4,14 @@ import argparse import importlib +import os +import signal as sig import sys import time -from typing import TYPE_CHECKING -if TYPE_CHECKING: - from taskito.app import Queue +from taskito.app import Queue +from taskito.dashboard import serve_dashboard +from taskito.scaler import serve_scaler def main() -> None: @@ -183,8 +185,6 @@ def _load_queue(app_path: str) -> Queue: ) sys.exit(1) - from taskito.app import Queue - if not isinstance(queue, Queue): print( f"Error: '{app_path}' is not a Queue instance (got {type(queue).__name__})", @@ -207,8 +207,6 @@ def run_worker(args: argparse.Namespace) -> None: def run_dashboard(args: argparse.Namespace) -> None: """Start the web dashboard.""" queue = _load_queue(args.app) - from taskito.dashboard import serve_dashboard - serve_dashboard(queue, host=args.host, port=args.port) @@ -257,8 +255,6 @@ def _watch_stats(queue: Queue) -> None: def run_scaler(args: argparse.Namespace) -> None: """Start the lightweight KEDA metrics server.""" queue = _load_queue(args.app) - from taskito.scaler import serve_scaler - serve_scaler( queue, host=args.host, @@ -301,9 +297,6 @@ def run_resources(args: argparse.Namespace) -> None: def run_reload(args: argparse.Namespace) -> None: """Send SIGHUP to a running worker to reload resources.""" - import os - import signal as sig - if not hasattr(sig, "SIGHUP"): print("Error: SIGHUP is not available on this platform", file=sys.stderr) sys.exit(1) diff --git a/py_src/taskito/context.py b/py_src/taskito/context.py index 0f87a08..af26b18 100644 --- a/py_src/taskito/context.py +++ b/py_src/taskito/context.py @@ -8,6 +8,10 @@ import time from typing import TYPE_CHECKING, Any +from taskito._active_context import _ActiveContext +from taskito.async_support.context import get_async_context +from taskito.exceptions import SoftTimeoutError, TaskCancelledError + logger = logging.getLogger("taskito.context") if TYPE_CHECKING: @@ -112,8 +116,6 @@ def check_cancelled(self) -> None: Raises: TaskCancelledError: If the job has been marked for cancellation. """ - from taskito.exceptions import TaskCancelledError - ctx = self._require_context() if _queue_ref is None: raise RuntimeError("Queue reference not set.") @@ -126,8 +128,6 @@ def check_timeout(self) -> None: Raises: SoftTimeoutError: If the soft timeout has elapsed. """ - from taskito.exceptions import SoftTimeoutError - ctx = self._require_context() if ctx.soft_timeout is not None and ctx.started_mono is not None: elapsed = time.monotonic() - ctx.started_mono @@ -144,8 +144,6 @@ def _set_soft_timeout(self, seconds: float) -> None: @staticmethod def _require_context() -> _ActiveContext: # Try contextvars first (async tasks on native executor) - from taskito.async_support.context import get_async_context - ctx = get_async_context() if ctx is not None: return ctx @@ -158,31 +156,6 @@ def _require_context() -> _ActiveContext: return sync_ctx -class _ActiveContext: - __slots__ = ( - "job_id", - "queue_name", - "retry_count", - "soft_timeout", - "started_mono", - "task_name", - ) - - def __init__( - self, - job_id: str, - task_name: str, - retry_count: int, - queue_name: str, - ): - self.job_id = job_id - self.task_name = task_name - self.retry_count = retry_count - self.queue_name = queue_name - self.started_mono: float | None = time.monotonic() - self.soft_timeout: float | None = None - - def _set_queue_ref(queue: Any) -> None: """Store the Queue instance at module level for use by job context.""" global _queue_ref diff --git a/py_src/taskito/contrib/django/management/commands/taskito_info.py b/py_src/taskito/contrib/django/management/commands/taskito_info.py index d3dfce9..6a2ea3d 100644 --- a/py_src/taskito/contrib/django/management/commands/taskito_info.py +++ b/py_src/taskito/contrib/django/management/commands/taskito_info.py @@ -2,6 +2,8 @@ from __future__ import annotations +import time + try: from django.core.management.base import BaseCommand except ImportError as e: @@ -42,8 +44,6 @@ def _print(self, queue): # type: ignore[no-untyped-def] self.stdout.write(f" {'total':<12} {total}") def _watch(self, queue): # type: ignore[no-untyped-def] - import time - from django.conf import settings interval = getattr(settings, "TASKITO_WATCH_INTERVAL", 2) diff --git a/py_src/taskito/contrib/flask.py b/py_src/taskito/contrib/flask.py index f7ec535..f63a9fa 100644 --- a/py_src/taskito/contrib/flask.py +++ b/py_src/taskito/contrib/flask.py @@ -23,8 +23,11 @@ from __future__ import annotations +import json from typing import TYPE_CHECKING, Any +from taskito.app import Queue + if TYPE_CHECKING: import flask @@ -58,8 +61,6 @@ def __init__(self, app: flask.Flask | None = None, cli_group: str = "taskito"): def init_app(self, app: flask.Flask) -> None: """Initialize the extension with a Flask app.""" - from taskito.app import Queue - self.queue = Queue( db_path=app.config.get("TASKITO_DB_PATH", ".taskito/taskito.db"), workers=app.config.get("TASKITO_WORKERS", 0), @@ -102,8 +103,6 @@ def worker_cmd(queues: str | None) -> None: ) def info_cmd(output_format: str) -> None: """Show queue statistics.""" - import json - stats = self.queue.stats() if output_format == "json": click.echo(json.dumps(stats, indent=2)) diff --git a/py_src/taskito/contrib/otel.py b/py_src/taskito/contrib/otel.py index 7d6f414..e133f0e 100644 --- a/py_src/taskito/contrib/otel.py +++ b/py_src/taskito/contrib/otel.py @@ -13,6 +13,7 @@ from __future__ import annotations +import threading from collections.abc import Callable from typing import TYPE_CHECKING, Any @@ -66,8 +67,6 @@ def __init__( "opentelemetry-api is required for OpenTelemetryMiddleware. " "Install it with: pip install taskito[otel]" ) - import threading - self._tracer = trace.get_tracer(tracer_name) self._span_name_fn = span_name_fn self._attr_prefix = attribute_prefix diff --git a/py_src/taskito/interception/built_in.py b/py_src/taskito/interception/built_in.py index fa56b85..5550aca 100644 --- a/py_src/taskito/interception/built_in.py +++ b/py_src/taskito/interception/built_in.py @@ -5,8 +5,16 @@ import collections import datetime import decimal +import enum +import importlib +import io +import logging as logging_mod import pathlib import re +import socket +import subprocess +import threading +import types as builtin_types import uuid from taskito.interception import converters @@ -34,8 +42,6 @@ def _try_import(module_path: str, class_name: str) -> type | None: """Try to import a type, returning None if not available or not a type.""" try: - import importlib - mod = importlib.import_module(module_path) obj = getattr(mod, class_name, None) # Only return actual types — functions, modules, etc. will break isinstance() @@ -139,8 +145,6 @@ def build_default_registry() -> TypeRegistry: ) # Enum (must be lower priority than specific enum-like types) - import enum - reg.register( enum.Enum, Strategy.CONVERT, @@ -179,9 +183,6 @@ def build_default_registry() -> TypeRegistry: # -- PROXY (priority 20) -- # Types that can be deconstructed into a recipe and reconstructed on the worker. - import io - import logging as logging_mod - reg.register( (io.TextIOWrapper, io.BufferedReader, io.BufferedWriter, io.FileIO), Strategy.PROXY, @@ -239,11 +240,6 @@ def build_default_registry() -> TypeRegistry: # -- REJECT (priority 30, high — catch these before anything else) -- - import socket - import subprocess - import threading - import types as builtin_types - # threading.Lock and threading.RLock are factory functions, not types. # Use the actual instance types for isinstance() checks. _lock_type = type(threading.Lock()) diff --git a/py_src/taskito/interception/converters.py b/py_src/taskito/interception/converters.py index 1ab4fbd..6f328f2 100644 --- a/py_src/taskito/interception/converters.py +++ b/py_src/taskito/interception/converters.py @@ -2,8 +2,14 @@ from __future__ import annotations +import collections import dataclasses +import datetime +import decimal import importlib +import pathlib +import re +import uuid from typing import Any @@ -29,7 +35,6 @@ def convert_uuid(obj: Any) -> dict[str, Any]: def reconstruct_uuid(data: dict[str, Any]) -> Any: - import uuid return uuid.UUID(data["value"]) @@ -42,7 +47,6 @@ def convert_datetime(obj: Any) -> dict[str, Any]: def reconstruct_datetime(data: dict[str, Any]) -> Any: - import datetime return datetime.datetime.fromisoformat(data["value"]) @@ -55,7 +59,6 @@ def convert_date(obj: Any) -> dict[str, Any]: def reconstruct_date(data: dict[str, Any]) -> Any: - import datetime return datetime.date.fromisoformat(data["value"]) @@ -68,7 +71,6 @@ def convert_time(obj: Any) -> dict[str, Any]: def reconstruct_time(data: dict[str, Any]) -> Any: - import datetime return datetime.time.fromisoformat(data["value"]) @@ -81,7 +83,6 @@ def convert_timedelta(obj: Any) -> dict[str, Any]: def reconstruct_timedelta(data: dict[str, Any]) -> Any: - import datetime return datetime.timedelta(seconds=data["value"]) @@ -94,7 +95,6 @@ def convert_decimal(obj: Any) -> dict[str, Any]: def reconstruct_decimal(data: dict[str, Any]) -> Any: - import decimal return decimal.Decimal(data["value"]) @@ -107,7 +107,6 @@ def convert_path(obj: Any) -> dict[str, Any]: def reconstruct_path(data: dict[str, Any]) -> Any: - import pathlib return pathlib.Path(data["value"]) @@ -176,7 +175,6 @@ def convert_pattern(obj: Any) -> dict[str, Any]: def reconstruct_pattern(data: dict[str, Any]) -> Any: - import re return re.compile(data["value"], data["flags"]) @@ -210,7 +208,6 @@ def convert_ordered_dict(obj: Any) -> dict[str, Any]: def reconstruct_ordered_dict(data: dict[str, Any]) -> Any: - import collections return collections.OrderedDict(data["pairs"]) diff --git a/py_src/taskito/interception/interceptor.py b/py_src/taskito/interception/interceptor.py index 198b3f5..96a348e 100644 --- a/py_src/taskito/interception/interceptor.py +++ b/py_src/taskito/interception/interceptor.py @@ -2,6 +2,7 @@ from __future__ import annotations +import dataclasses as dc import logging import time from dataclasses import dataclass, field @@ -130,8 +131,6 @@ def _analyze_values(self, args: tuple, kwargs: dict, report: InterceptionReport) def _analyze_single(self, obj: Any, path: str, report: InterceptionReport) -> None: """Analyze a single value for the report.""" - import dataclasses as dc - if obj is None: report.entries.append( ReportEntry(path=path, type_name="NoneType", strategy=Strategy.PASS) diff --git a/py_src/taskito/interception/walker.py b/py_src/taskito/interception/walker.py index 553a4c9..11261e3 100644 --- a/py_src/taskito/interception/walker.py +++ b/py_src/taskito/interception/walker.py @@ -7,6 +7,7 @@ import uuid from typing import TYPE_CHECKING, Any +from taskito.interception.converters import convert_dataclass, convert_named_tuple from taskito.interception.errors import ArgumentFailure from taskito.interception.registry import RegistryEntry, TypeRegistry from taskito.interception.strategy import Strategy @@ -151,8 +152,6 @@ def _process( # NamedTuple detection — must check before registry (tuples are PASS) if isinstance(obj, tuple) and hasattr(obj, "_fields") and hasattr(obj, "_asdict"): - from taskito.interception.converters import convert_named_tuple - return convert_named_tuple(obj) # Check dataclass (can't use isinstance, need is_dataclass) @@ -164,8 +163,6 @@ def _process( if entry is not None: return self._apply_strategy(obj, path, entry, result, proxy_identity) # Fallback: auto-convert as dataclass - from taskito.interception.converters import convert_dataclass - return convert_dataclass(obj) # Look up in registry diff --git a/py_src/taskito/locks.py b/py_src/taskito/locks.py index 6a5f666..f6a2e9d 100644 --- a/py_src/taskito/locks.py +++ b/py_src/taskito/locks.py @@ -3,6 +3,7 @@ from __future__ import annotations import threading +import time import uuid from typing import TYPE_CHECKING, Any @@ -115,8 +116,6 @@ def _stop_extend(self) -> None: def __enter__(self) -> DistributedLock: if self._timeout is not None: - import time - deadline = time.monotonic() + self._timeout while True: if self.acquire(): diff --git a/py_src/taskito/mixins/inspection.py b/py_src/taskito/mixins/inspection.py index c4fe2f7..c9ad568 100644 --- a/py_src/taskito/mixins/inspection.py +++ b/py_src/taskito/mixins/inspection.py @@ -2,6 +2,7 @@ from __future__ import annotations +import time from collections import defaultdict from typing import Any @@ -135,10 +136,8 @@ def metrics_timeseries( Returns: List of dicts with ``timestamp``, ``count``, ``success``, - ``failure``, ``avg_ms`` keys. + ``failure``, ``avg_ms``, ``p50_ms``, ``p95_ms``, ``p99_ms`` keys. """ - import time - raw = self._inner.get_metrics(task_name=task_name, since_seconds=since) now_ms = int(time.time() * 1000) bucket_ms = bucket * 1000 @@ -156,7 +155,7 @@ def metrics_timeseries( records = buckets[ts] n = len(records) success = sum(1 for r in records if r.get("succeeded")) - times = [r["wall_time_ns"] / 1_000_000 for r in records] + times = sorted(r["wall_time_ns"] / 1_000_000 for r in records) result.append( { "timestamp": ts, @@ -164,6 +163,9 @@ def metrics_timeseries( "success": success, "failure": n - success, "avg_ms": round(sum(times) / n, 2) if n else 0, + "p50_ms": _percentile(times, 0.50), + "p95_ms": _percentile(times, 0.95), + "p99_ms": _percentile(times, 0.99), } ) @@ -220,11 +222,20 @@ def _aggregate_metrics(raw: list[dict]) -> dict[str, Any]: "success_count": success, "failure_count": n - success, "avg_ms": round(sum(times) / n, 2) if n else 0, - "p50_ms": round(times[n // 2], 2) if n else 0, - "p95_ms": round(times[min(int(n * 0.95), n - 1)], 2) if n else 0, - "p99_ms": round(times[min(int(n * 0.99), n - 1)], 2) if n else 0, + "p50_ms": _percentile(times, 0.50), + "p95_ms": _percentile(times, 0.95), + "p99_ms": _percentile(times, 0.99), "min_ms": round(times[0], 2) if n else 0, "max_ms": round(times[-1], 2) if n else 0, } return result + + +def _percentile(sorted_values: list[float], q: float) -> float: + """Nearest-rank percentile from a sorted list, rounded to 2 decimals.""" + n = len(sorted_values) + if n == 0: + return 0 + idx = min(int(n * q), n - 1) + return round(sorted_values[idx], 2) diff --git a/py_src/taskito/prefork/child.py b/py_src/taskito/prefork/child.py index 4f85e06..b7486ad 100644 --- a/py_src/taskito/prefork/child.py +++ b/py_src/taskito/prefork/child.py @@ -21,6 +21,7 @@ from typing import Any from taskito.async_support.helpers import run_maybe_async +from taskito.context import _clear_context, _set_context, _set_queue_ref from taskito.exceptions import TaskCancelledError logger = logging.getLogger("taskito.prefork.child") @@ -69,8 +70,6 @@ def _execute_job( } # Set job context - from taskito.context import _clear_context, _set_context - _set_context(job_id, task_name, retry_count, job.get("queue", "default")) start_ns = time.monotonic_ns() @@ -154,8 +153,6 @@ def main() -> None: # Import the queue and set up context queue = _import_queue(app_path) - from taskito.context import _set_queue_ref - _set_queue_ref(queue) # Initialize resources if any are defined diff --git a/py_src/taskito/proxies/reconstruct.py b/py_src/taskito/proxies/reconstruct.py index 4640946..31a3d79 100644 --- a/py_src/taskito/proxies/reconstruct.py +++ b/py_src/taskito/proxies/reconstruct.py @@ -15,6 +15,8 @@ from taskito.exceptions import ProxyReconstructionError from taskito.proxies.handler import ProxyHandler from taskito.proxies.registry import ProxyRegistry +from taskito.proxies.schema import validate_recipe +from taskito.proxies.signing import verify_recipe if TYPE_CHECKING: from taskito.proxies.metrics import ProxyMetrics @@ -147,8 +149,6 @@ def _reconstruct_one( raise ProxyReconstructionError( f"Recipe for '{handler_name}' is missing checksum (signing is enabled)" ) - from taskito.proxies.signing import verify_recipe - try: verify_recipe(handler_name, version, recipe, checksum, signing_secret) except ProxyReconstructionError: @@ -159,8 +159,6 @@ def _reconstruct_one( # Schema validation handler_schema = getattr(handler, "schema", None) if handler_schema is not None: - from taskito.proxies.schema import validate_recipe - validate_recipe(handler_name, recipe, handler_schema) # Version migration diff --git a/py_src/taskito/resources/runtime.py b/py_src/taskito/resources/runtime.py index 42473f8..363db7a 100644 --- a/py_src/taskito/resources/runtime.py +++ b/py_src/taskito/resources/runtime.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +import time from collections.abc import Callable from typing import Any @@ -13,7 +14,10 @@ ResourceUnavailableError, ) from taskito.resources.definition import ResourceDefinition, ResourceScope +from taskito.resources.frozen import FrozenResource from taskito.resources.graph import topological_sort +from taskito.resources.pool import PoolConfig, ResourcePool +from taskito.resources.thread_local import ThreadLocalStore logger = logging.getLogger("taskito.resources") @@ -38,12 +42,6 @@ def __init__(self, definitions: dict[str, ResourceDefinition]) -> None: def initialize(self) -> None: """Create all resources in topological (dependency-first) order.""" - import time - - from taskito.resources.frozen import FrozenResource - from taskito.resources.pool import PoolConfig, ResourcePool - from taskito.resources.thread_local import ThreadLocalStore - self._init_order = topological_sort(self._definitions) for name in self._init_order: defn = self._definitions[name] diff --git a/py_src/taskito/task.py b/py_src/taskito/task.py index 4f9f496..224f992 100644 --- a/py_src/taskito/task.py +++ b/py_src/taskito/task.py @@ -5,10 +5,11 @@ from collections.abc import Callable from typing import TYPE_CHECKING, Any +from taskito.canvas import Signature +from taskito.interception import InterceptionReport + if TYPE_CHECKING: from taskito.app import Queue - from taskito.canvas import Signature - from taskito.interception import InterceptionReport from taskito.result import JobResult @@ -130,14 +131,10 @@ def map(self, iterable: list[tuple]) -> list[JobResult]: def s(self, *args: Any, **kwargs: Any) -> Signature: """Create a mutable :class:`~taskito.canvas.Signature`.""" - from taskito.canvas import Signature - return Signature(task=self, args=args, kwargs=kwargs) def si(self, *args: Any, **kwargs: Any) -> Signature: """Create an immutable :class:`~taskito.canvas.Signature`.""" - from taskito.canvas import Signature - return Signature(task=self, args=args, kwargs=kwargs, immutable=True) def analyze(self, *args: Any, **kwargs: Any) -> InterceptionReport: @@ -146,8 +143,6 @@ def analyze(self, *args: Any, **kwargs: Any) -> InterceptionReport: Returns an :class:`~taskito.interception.InterceptionReport` describing the strategy for each argument. """ - from taskito.interception import InterceptionReport - interceptor = self._queue._interceptor if interceptor is None: return InterceptionReport() diff --git a/py_src/taskito/testing.py b/py_src/taskito/testing.py index 9316559..4b96671 100644 --- a/py_src/taskito/testing.py +++ b/py_src/taskito/testing.py @@ -22,6 +22,9 @@ from typing import TYPE_CHECKING, Any from unittest.mock import patch +from taskito.context import _clear_context, _set_context +from taskito.resources.runtime import ResourceRuntime + if TYPE_CHECKING: from taskito.app import Queue @@ -141,8 +144,6 @@ def __enter__(self) -> TestResults: # Set up test resource runtime if resources provided if self._resources is not None: - from taskito.resources.runtime import ResourceRuntime - self._prev_runtime = self._queue._resource_runtime resolved: dict[str, Any] = {} for name, value in self._resources.items(): @@ -195,8 +196,6 @@ def _execute_task( raise KeyError(f"Task '{task_name}' not found in registry") # Set up context so current_job works inside tasks - from taskito.context import _clear_context, _set_context - queue_name = enqueue_kwargs.get("queue", "default") _set_context( job_id=job_id, diff --git a/py_src/taskito/workflows/builder.py b/py_src/taskito/workflows/builder.py index e600b21..b825d87 100644 --- a/py_src/taskito/workflows/builder.py +++ b/py_src/taskito/workflows/builder.py @@ -10,6 +10,11 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable +from taskito._taskito import PyWorkflowBuilder + +from . import analysis as _analysis +from .visualization import nodes_and_edges_from_steps, render_dot, render_mermaid + if TYPE_CHECKING: from collections.abc import Callable @@ -277,27 +282,19 @@ def step_names(self) -> list[str]: def ancestors(self, node: str) -> list[str]: """Return all transitive predecessors of *node*.""" - from .analysis import ancestors - - return ancestors(self._steps, node) + return _analysis.ancestors(self._steps, node) def descendants(self, node: str) -> list[str]: """Return all transitive successors of *node*.""" - from .analysis import descendants - - return descendants(self._steps, node) + return _analysis.descendants(self._steps, node) def topological_levels(self) -> list[list[str]]: """Group nodes by topological depth.""" - from .analysis import topological_levels - - return topological_levels(self._steps) + return _analysis.topological_levels(self._steps) def stats(self) -> dict[str, int | float]: """Compute basic DAG statistics (nodes, edges, depth, width, density).""" - from .analysis import stats - - return stats(self._steps) + return _analysis.stats(self._steps) def critical_path(self, costs: dict[str, float]) -> tuple[list[str], float]: """Find the longest-weighted path through the DAG. @@ -308,21 +305,15 @@ def critical_path(self, costs: dict[str, float]) -> tuple[list[str], float]: Returns: ``(path, total_cost)`` """ - from .analysis import critical_path - - return critical_path(self._steps, costs) + return _analysis.critical_path(self._steps, costs) def execution_plan(self, max_workers: int = 1) -> list[list[str]]: """Generate a step-by-step execution plan respecting worker limits.""" - from .analysis import execution_plan - - return execution_plan(self._steps, max_workers) + return _analysis.execution_plan(self._steps, max_workers) def bottleneck_analysis(self, costs: dict[str, float]) -> dict[str, Any]: """Identify the bottleneck node on the critical path.""" - from .analysis import bottleneck_analysis - - return bottleneck_analysis(self._steps, costs) + return _analysis.bottleneck_analysis(self._steps, costs) def visualize(self, fmt: str = "mermaid") -> str: """Render the workflow DAG as a diagram string. @@ -333,12 +324,6 @@ def visualize(self, fmt: str = "mermaid") -> str: Returns: The diagram string (no statuses — pre-execution view). """ - from .visualization import ( - nodes_and_edges_from_steps, - render_dot, - render_mermaid, - ) - nodes, edges = nodes_and_edges_from_steps(self._steps) if fmt == "dot": return render_dot(nodes, edges) @@ -362,8 +347,6 @@ def _compile( ``(dag_bytes, step_metadata_json, node_payloads, deferred_nodes, callable_conditions, on_failure, gate_configs, sub_workflow_refs)`` """ - from taskito._taskito import PyWorkflowBuilder - builder = PyWorkflowBuilder() node_payloads: dict[str, bytes] = {} callable_conditions: dict[str, Any] = {} diff --git a/py_src/taskito/workflows/run.py b/py_src/taskito/workflows/run.py index 5e92a95..2269938 100644 --- a/py_src/taskito/workflows/run.py +++ b/py_src/taskito/workflows/run.py @@ -13,6 +13,7 @@ from typing import TYPE_CHECKING from .types import NodeSnapshot, NodeStatus, WorkflowState, WorkflowStatus +from .visualization import nodes_and_edges_from_dag_bytes, render_dot, render_mermaid if TYPE_CHECKING: from taskito.app import Queue @@ -99,12 +100,6 @@ def visualize(self, fmt: str = "mermaid") -> str: Args: fmt: Output format — ``"mermaid"`` or ``"dot"``. """ - from .visualization import ( - nodes_and_edges_from_dag_bytes, - render_dot, - render_mermaid, - ) - dag_bytes = self._queue._inner.get_workflow_definition_dag(self.id) nodes, edges = nodes_and_edges_from_dag_bytes(dag_bytes) diff --git a/py_src/taskito/workflows/visualization.py b/py_src/taskito/workflows/visualization.py index 8118f35..34a5f3c 100644 --- a/py_src/taskito/workflows/visualization.py +++ b/py_src/taskito/workflows/visualization.py @@ -2,6 +2,7 @@ from __future__ import annotations +import json from typing import Any _STATUS_COLORS_MERMAID = { @@ -124,8 +125,6 @@ def nodes_and_edges_from_dag_bytes( dag_bytes: bytes | list[int], ) -> tuple[list[str], list[tuple[str, str]]]: """Extract node list and edge list from serialized DAG JSON.""" - import json - raw = bytes(dag_bytes) if isinstance(dag_bytes, list) else dag_bytes dag = json.loads(raw) nodes = [n["name"] for n in dag.get("nodes", [])]