From 923fd65152d3df24a588194527ff5f42a53577b4 Mon Sep 17 00:00:00 2001 From: NoneBack Date: Thu, 19 Mar 2026 15:31:01 +0800 Subject: [PATCH 1/3] feat(executor): add Options pattern, fix data races, update docs - extract Option type and With* funcs into options.go - add WithConcurrency, WithProfiler, WithTracer, WithPanicHandler options - internalize validator (unexported types, package-level func) - fix data race in TestTaskflowNotInFlow (atomic.Int32) - fix data race in TestPoolSequentialExec (sync.WaitGroup) - update README and llms.txt with Options and Tracing sections --- README.md | 54 ++- executor.go | 119 +++++-- executor_test.go | 52 ++- executor_tracer_test.go | 198 +++++++++++ graph.go | 10 + llms.txt | 67 +++- node.go | 10 + options.go | 39 +++ profiler.go | 9 +- taskflow_test.go | 760 ++++++++++++++-------------------------- tracer.go | 84 +++++ tracer_test.go | 112 ++++++ utils/copool_test.go | 6 +- validator.go | 179 ++++++++++ validator_test.go | 317 +++++++++++++++++ 15 files changed, 1477 insertions(+), 539 deletions(-) create mode 100644 executor_tracer_test.go create mode 100644 options.go create mode 100644 tracer.go create mode 100644 tracer_test.go create mode 100644 validator.go create mode 100644 validator_test.go diff --git a/README.md b/README.md index 0937307..5d9a6eb 100644 --- a/README.md +++ b/README.md @@ -118,7 +118,7 @@ func main() { } done.Succeed(sortTasks...) - executor := gtf.NewExecutor(1000) + executor := gtf.NewExecutor(1000, gtf.WithProfiler()) executor.Run(tf).Wait() @@ -156,6 +156,30 @@ ok github.com/noneback/go-taskflow/benchmark 5.606s Conditional nodes in go-taskflow behave similarly to those in [taskflow-cpp](https://github.com/taskflow/taskflow). They participate in both conditional control and looping. To avoid common pitfalls, refer to the [Conditional Tasking documentation](https://taskflow.github.io/taskflow/ConditionalTasking.html). +## Executor Options + +`NewExecutor` accepts functional options to configure behavior: + +```go +import "runtime" + +executor := gtf.NewExecutor(0, + gtf.WithConcurrency(uint(runtime.NumCPU()*4)), // override concurrency + gtf.WithProfiler(), // enable flamegraph profiling + gtf.WithTracer(), // enable Chrome Trace recording + gtf.WithPanicHandler(func(task string, r interface{}) { + log.Printf("task %s panicked: %v", task, r) + }), +) +``` + +| Option | Description | +|:---|:---| +| `WithConcurrency(n uint)` | Set max goroutine concurrency (overrides positional arg). Enables `NewExecutor(0, WithConcurrency(n))` style. | +| `WithProfiler()` | Enable flamegraph profiling. Required before calling `executor.Profile()`. | +| `WithTracer()` | Enable Chrome Trace recording. Required before calling `executor.Trace()`. | +| `WithPanicHandler(fn)` | Custom panic handler invoked on task panic. Replaces default log output. Graph is still canceled. | + ## Error Handling in go-taskflow In Go, `errors` are values, and it is the user's responsibility to handle them appropriately. Only unrecovered `panic` events are managed by the framework. If a `panic` occurs, the entire parent graph is canceled, leaving the remaining tasks incomplete. This behavior may evolve in the future. If you have suggestions, feel free to share them. @@ -173,6 +197,14 @@ tf.NewTask("not interrupt", func() { }) ``` +Alternatively, use `WithPanicHandler` to centralize panic handling across all tasks: + +```go +executor := gtf.NewExecutor(1000, gtf.WithPanicHandler(func(task string, r interface{}) { + log.Printf("task %s panicked: %v", task, r) +})) +``` + ## Visualizing Taskflows To generate a visual representation of a taskflow, use the `Dump` method: @@ -189,9 +221,12 @@ The `Dump` method generates raw strings in DOT format. Use the `dot` tool to cre ## Profiling Taskflows -To profile a taskflow, use the `Profile` method: +To profile a taskflow, first enable the profiler with `WithProfiler()`, then call `Profile`: ```go +executor := gtf.NewExecutor(1000, gtf.WithProfiler()) +executor.Run(tf).Wait() + if err := executor.Profile(os.Stdout); err != nil { log.Fatal(err) } @@ -201,6 +236,21 @@ The `Profile` method generates raw strings in flamegraph format. Use the `flameg ![flg](image/fl.svg) +## Tracing Taskflows + +To capture Chrome Trace events, enable the tracer with `WithTracer()`, then call `Trace`: + +```go +executor := gtf.NewExecutor(1000, gtf.WithTracer()) +executor.Run(tf).Wait() + +if err := executor.Trace(os.Stdout); err != nil { + log.Fatal(err) +} +``` + +The output is in [Chrome Trace Event format](https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU). Open it in `chrome://tracing` or [Perfetto UI](https://ui.perfetto.dev/) for visualization. + ## Stargazer [![Star History Chart](https://api.star-history.com/svg?repos=noneback/go-taskflow&type=Date)](https://star-history.com/#noneback/go-taskflow&Date) diff --git a/executor.go b/executor.go index 5c35ea6..a86a2ff 100644 --- a/executor.go +++ b/executor.go @@ -17,32 +17,39 @@ import ( type Executor interface { Wait() // Wait block until all tasks finished Profile(w io.Writer) error // Profile write flame graph raw text into w + Trace(w io.Writer) error // Trace write Chrome Trace Event data into w Run(tf *TaskFlow) Executor // Run start to schedule and execute taskflow } type innerExecutorImpl struct { - concurrency uint - pool *utils.Copool - wq *utils.Queue[*innerNode] - wg *sync.WaitGroup - profiler *profiler - mu *sync.Mutex + concurrency uint + pool *utils.Copool + wq *utils.Queue[*innerNode] + wg *sync.WaitGroup + profiler *profiler + tracer *tracer + panicHandler func(task string, r interface{}) + mu *sync.Mutex } -// NewExecutor return a Executor with a specified max goroutine concurrency(recommend a value bigger than Runtime.NumCPU, **MUST** bigger than num(subflows). ) -func NewExecutor(concurrency uint) Executor { - if concurrency == 0 { - panic("executor concurrency cannot be zero") - } - t := newProfiler() - return &innerExecutorImpl{ +// NewExecutor returns an Executor with the specified concurrency and options. +// concurrency can be 0 when WithConcurrency option is provided. +// Recommend concurrency > runtime.NumCPU and MUST > num(subflows). +func NewExecutor(concurrency uint, opts ...Option) Executor { + e := &innerExecutorImpl{ concurrency: concurrency, - pool: utils.NewCopool(concurrency), wq: utils.NewQueue[*innerNode](false), wg: &sync.WaitGroup{}, - profiler: t, mu: &sync.Mutex{}, } + for _, opt := range opts { + opt(e) + } + if e.concurrency == 0 { + panic("executor concurrency cannot be zero") + } + e.pool = utils.NewCopool(e.concurrency) + return e } // Run start to schedule and execute taskflow @@ -97,22 +104,49 @@ func (e *innerExecutorImpl) sche_successors(node *innerNode) { e.schedule(candidate...) } +// record submits the span to active observers. +// ok=true means the node completed without panic; profiler only records successful spans. +func (e *innerExecutorImpl) record(s *span, ok bool) { + if ok && e.profiler != nil { + e.profiler.AddSpan(s) + } + if e.tracer != nil { + e.tracer.AddEvent(s) + } +} + +// getDependentNames extracts predecessor task names from a node. +func getDependentNames(node *innerNode) []string { + if len(node.dependents) == 0 { + return nil + } + names := make([]string, len(node.dependents)) + for i, dep := range node.dependents { + names[i] = dep.name + } + return names +} + func (e *innerExecutorImpl) invokeStatic(node *innerNode, parentSpan *span, p *Static) func() { return func() { span := span{extra: attr{ typ: nodeStatic, name: node.name, - }, begin: time.Now(), parent: parentSpan} + }, begin: time.Now(), parent: parentSpan, dependents: getDependentNames(node)} defer func() { span.cost = time.Since(span.begin) - if r := recover(); r != nil { + r := recover() + if r != nil { node.g.canceled.Store(true) - log.Printf("graph %v is canceled, since static node %v panics", node.g.name, node.name) - log.Printf("[recovered] static node %s, panic: %v, stack: %s", node.name, r, debug.Stack()) - } else { - e.profiler.AddSpan(&span) // remove canceled node span + if e.panicHandler != nil { + e.panicHandler(node.name, r) + } else { + log.Printf("graph %v is canceled, since static node %v panics", node.g.name, node.name) + log.Printf("[recovered] static node %s, panic: %v, stack: %s", node.name, r, debug.Stack()) + } } + e.record(&span, r == nil) node.drop() e.sche_successors(node) @@ -132,18 +166,22 @@ func (e *innerExecutorImpl) invokeSubflow(node *innerNode, parentSpan *span, p * span := span{extra: attr{ typ: nodeSubflow, name: node.name, - }, begin: time.Now(), parent: parentSpan} + }, begin: time.Now(), parent: parentSpan, dependents: getDependentNames(node)} defer func() { span.cost = time.Since(span.begin) - if r := recover(); r != nil { - log.Printf("graph %v is canceled, since subflow %v panics", node.g.name, node.name) - log.Printf("[recovered] subflow %s, panic: %v, stack: %s", node.name, r, debug.Stack()) + r := recover() + if r != nil { + if e.panicHandler != nil { + e.panicHandler(node.name, r) + } else { + log.Printf("graph %v is canceled, since subflow %v panics", node.g.name, node.name) + log.Printf("[recovered] subflow %s, panic: %v, stack: %s", node.name, r, debug.Stack()) + } node.g.canceled.Store(true) p.g.canceled.Store(true) - } else { - e.profiler.AddSpan(&span) // remove canceled node span } + e.record(&span, r == nil) e.scheduleGraph(node.g, p.g, &span) node.drop() @@ -168,17 +206,21 @@ func (e *innerExecutorImpl) invokeCondition(node *innerNode, parentSpan *span, p span := span{extra: attr{ typ: nodeCondition, name: node.name, - }, begin: time.Now(), parent: parentSpan} + }, begin: time.Now(), parent: parentSpan, dependents: getDependentNames(node)} defer func() { span.cost = time.Since(span.begin) - if r := recover(); r != nil { + r := recover() + if r != nil { node.g.canceled.Store(true) - log.Printf("graph %v is canceled, since condition node %v panics", node.g.name, node.name) - log.Printf("[recovered] condition node %s, panic: %v, stack: %s", node.name, r, debug.Stack()) - } else { - e.profiler.AddSpan(&span) // remove canceled node span + if e.panicHandler != nil { + e.panicHandler(node.name, r) + } else { + log.Printf("graph %v is canceled, since condition node %v panics", node.g.name, node.name) + log.Printf("[recovered] condition node %s, panic: %v, stack: %s", node.name, r, debug.Stack()) + } } + e.record(&span, r == nil) node.drop() // e.sche_successors(node) node.g.deref() @@ -261,5 +303,16 @@ func (e *innerExecutorImpl) Wait() { // Profile write flame graph raw text into w func (e *innerExecutorImpl) Profile(w io.Writer) error { + if e.profiler == nil { + return nil + } return e.profiler.draw(w) } + +// Trace write Chrome Trace Event data into w +func (e *innerExecutorImpl) Trace(w io.Writer) error { + if e.tracer == nil { + return nil + } + return e.tracer.draw(w) +} diff --git a/executor_test.go b/executor_test.go index 699c5ed..ecb968c 100644 --- a/executor_test.go +++ b/executor_test.go @@ -4,14 +4,16 @@ import ( "fmt" "os" "runtime" + "sync/atomic" "testing" "time" gotaskflow "github.com/noneback/go-taskflow" + "github.com/noneback/go-taskflow/utils" ) func TestExecutor(t *testing.T) { - executor := gotaskflow.NewExecutor(uint(runtime.NumCPU())) + executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()), gotaskflow.WithProfiler()) tf := gotaskflow.NewTaskFlow("G") A, B, C := tf.NewTask("A", func() { @@ -91,3 +93,51 @@ func TestPanicInSubflow(t *testing.T) { make_install.Precede(relink) executor.Run(tf).Wait() } + +// TestWithConcurrencyOption verifies that NewExecutor(0, WithConcurrency(n)) works correctly. +func TestWithConcurrencyOption(t *testing.T) { + executor := gotaskflow.NewExecutor(0, gotaskflow.WithConcurrency(uint(runtime.NumCPU()))) + tf := gotaskflow.NewTaskFlow("G") + + var count atomic.Int32 + A := tf.NewTask("A", func() { count.Add(1) }) + B := tf.NewTask("B", func() { count.Add(1) }) + C := tf.NewTask("C", func() { count.Add(1) }) + A.Precede(B) + C.Precede(B) + + executor.Run(tf).Wait() + + if count.Load() != 3 { + t.Errorf("expected count=3, got %d", count.Load()) + } +} + +// TestWithPanicHandlerOption verifies that a custom panic handler is invoked on task panic. +func TestWithPanicHandlerOption(t *testing.T) { + var panickedTask string + var panicVal interface{} + + executor := gotaskflow.NewExecutor(10, gotaskflow.WithPanicHandler(func(task string, r interface{}) { + panickedTask = task + panicVal = r + })) + tf := gotaskflow.NewTaskFlow("G") + tf.NewTask("boom", func() { panic("test panic") }) + + executor.Run(tf).Wait() + + if panickedTask != "boom" { + t.Errorf("expected panickedTask=%q, got %q", "boom", panickedTask) + } + if panicVal != "test panic" { + t.Errorf("expected panicVal=%q, got %v", "test panic", panicVal) + } +} + +// TestWithConcurrencyPanic verifies that NewExecutor(0) without WithConcurrency still panics. +func TestWithConcurrencyPanic(t *testing.T) { + utils.AssertPanics(t, "concurrency zero", func() { + gotaskflow.NewExecutor(0) + }) +} diff --git a/executor_tracer_test.go b/executor_tracer_test.go new file mode 100644 index 0000000..e8ee955 --- /dev/null +++ b/executor_tracer_test.go @@ -0,0 +1,198 @@ +package gotaskflow_test + +import ( + "bytes" + "encoding/json" + "fmt" + "os" + "testing" + "time" + + gotaskflow "github.com/noneback/go-taskflow" +) + +func TestExecutorWithTracer(t *testing.T) { + executor := gotaskflow.NewExecutor(4, gotaskflow.WithTracer()) + tf := gotaskflow.NewTaskFlow("G") + A, B, C := + tf.NewTask("A", func() { fmt.Println("A") }), + tf.NewTask("B", func() { fmt.Println("B") }), + tf.NewTask("C", func() { fmt.Println("C") }) + A.Precede(B) + C.Precede(B) + executor.Run(tf).Wait() + + var buf bytes.Buffer + if err := executor.Trace(&buf); err != nil { + t.Fatalf("Trace error: %v", err) + } + + var events []map[string]interface{} + if err := json.Unmarshal(buf.Bytes(), &events); err != nil { + t.Fatalf("Trace output is not valid JSON: %v", err) + } + if len(events) != 3 { + t.Errorf("expected 3 trace events, got %d", len(events)) + } + t.Logf(buf.String()) +} + +func TestExecutorWithoutTracer(t *testing.T) { + executor := gotaskflow.NewExecutor(4) + tf := gotaskflow.NewTaskFlow("G") + tf.NewTask("A", func() { fmt.Println("A") }) + executor.Run(tf).Wait() + + // Trace should return nil when tracer is not enabled + if err := executor.Trace(os.Stdout); err != nil { + t.Fatalf("Trace should return nil when disabled, got: %v", err) + } +} + +func TestExecutorWithProfiler(t *testing.T) { + executor := gotaskflow.NewExecutor(4, gotaskflow.WithProfiler()) + tf := gotaskflow.NewTaskFlow("G") + tf.NewTask("A", func() { fmt.Println("A") }) + tf.NewTask("B", func() { fmt.Println("B") }) + executor.Run(tf).Wait() + + var buf bytes.Buffer + if err := executor.Profile(&buf); err != nil { + t.Fatalf("Profile error: %v", err) + } + if buf.Len() == 0 { + t.Error("expected profiler output, got empty") + } +} + +func TestExecutorWithoutProfiler(t *testing.T) { + executor := gotaskflow.NewExecutor(4) + tf := gotaskflow.NewTaskFlow("G") + tf.NewTask("A", func() { fmt.Println("A") }) + executor.Run(tf).Wait() + + // Profile should return nil when profiler is not enabled + if err := executor.Profile(os.Stdout); err != nil { + t.Fatalf("Profile should return nil when disabled, got: %v", err) + } +} + +func TestExecutorTracePrint(t *testing.T) { + executor := gotaskflow.NewExecutor(4, gotaskflow.WithTracer()) + tf := gotaskflow.NewTaskFlow("pipeline") + + A, B, C, D := + tf.NewTask("fetch", func() { time.Sleep(10 * time.Millisecond) }), + tf.NewTask("parse", func() { time.Sleep(5 * time.Millisecond) }), + tf.NewTask("process", func() { time.Sleep(8 * time.Millisecond) }), + tf.NewTask("output", func() { time.Sleep(3 * time.Millisecond) }) + A.Precede(B) + B.Precede(C) + C.Precede(D) + + executor.Run(tf).Wait() + + t.Log("=== Chrome Trace JSON (paste into chrome://tracing or https://ui.perfetto.dev) ===") + if err := executor.Trace(os.Stdout); err != nil { + t.Fatalf("Trace error: %v", err) + } + t.Log("=== End of Trace ===") +} + +// TestExecutorTraceComplex 模拟一个数据处理 pipeline: +// +// prepare +// ├─ read_config ─┐ +// └─ load_data ─┴─ validate ─ check(cond) +// └─0─ sub_process (subflow) +// ├─ transform ─┐ +// └─ enrich ─┴─ aggregate ─ report +func TestExecutorTraceComplex(t *testing.T) { + executor := gotaskflow.NewExecutor(8, gotaskflow.WithTracer()) + tf := gotaskflow.NewTaskFlow("data-pipeline") + + prepare := tf.NewTask("prepare", func() { time.Sleep(5 * time.Millisecond) }) + + // 并行阶段:读配置和加载数据同时进行 + readConfig := tf.NewTask("read_config", func() { time.Sleep(8 * time.Millisecond) }) + loadData := tf.NewTask("load_data", func() { time.Sleep(12 * time.Millisecond) }) + + // 汇聚:校验 + validate := tf.NewTask("validate", func() { time.Sleep(4 * time.Millisecond) }) + + // 条件分支:始终走正常处理路径(index 0) + check := tf.NewCondition("check_quality", func() uint { return 0 }) + + // 正常处理:subflow 内部并行 transform / enrich,然后汇聚 aggregate + subProcess := tf.NewSubflow("sub_process", func(sf *gotaskflow.Subflow) { + transform := sf.NewTask("transform", func() { time.Sleep(10 * time.Millisecond) }) + enrich := sf.NewTask("enrich", func() { time.Sleep(7 * time.Millisecond) }) + aggregate := sf.NewTask("aggregate", func() { time.Sleep(5 * time.Millisecond) }) + transform.Precede(aggregate) + enrich.Precede(aggregate) + }) + + // fallback 路径(本次不会执行) + fallback := tf.NewTask("fallback", func() { time.Sleep(2 * time.Millisecond) }) + + report := tf.NewTask("report", func() { time.Sleep(3 * time.Millisecond) }) + + // 构建依赖 + prepare.Precede(readConfig, loadData) + readConfig.Precede(validate) + loadData.Precede(validate) + validate.Precede(check) + check.Precede(subProcess, fallback) // 0 → subProcess, 1 → fallback + subProcess.Precede(report) + + executor.Run(tf).Wait() + + var buf bytes.Buffer + if err := executor.Trace(&buf); err != nil { + t.Fatalf("Trace error: %v", err) + } + + var events []map[string]interface{} + if err := json.Unmarshal(buf.Bytes(), &events); err != nil { + t.Fatalf("Trace output is not valid JSON: %v", err) + } + + // prepare + read_config + load_data + validate + check + sub_process + transform + enrich + aggregate + report = 10 + if len(events) != 10 { + t.Errorf("expected 10 trace events, got %d", len(events)) + } + + t.Log("=== Complex Pipeline Chrome Trace JSON ===") + t.Log(buf.String()) + t.Log("=== Paste into https://ui.perfetto.dev to visualize ===") +} + +func TestExecutorWithBothProfilerAndTracer(t *testing.T) { + executor := gotaskflow.NewExecutor(4, gotaskflow.WithProfiler(), gotaskflow.WithTracer()) + tf := gotaskflow.NewTaskFlow("G") + A, B := + tf.NewTask("A", func() { fmt.Println("A") }), + tf.NewTask("B", func() { fmt.Println("B") }) + A.Precede(B) + executor.Run(tf).Wait() + + var profBuf bytes.Buffer + if err := executor.Profile(&profBuf); err != nil { + t.Fatalf("Profile error: %v", err) + } + if profBuf.Len() == 0 { + t.Error("expected profiler output, got empty") + } + + var traceBuf bytes.Buffer + if err := executor.Trace(&traceBuf); err != nil { + t.Fatalf("Trace error: %v", err) + } + var events []map[string]interface{} + if err := json.Unmarshal(traceBuf.Bytes(), &events); err != nil { + t.Fatalf("Trace output is not valid JSON: %v", err) + } + if len(events) != 2 { + t.Errorf("expected 2 trace events, got %d", len(events)) + } +} diff --git a/graph.go b/graph.go index 3be72a5..d59f895 100644 --- a/graph.go +++ b/graph.go @@ -66,3 +66,13 @@ func (g *eGraph) setup() { func (g *eGraph) recyclable() bool { return g.joinCounter.Load() == 0 } + +// walk visits every node in the graph, recursing into instantiated subflows. +func (g *eGraph) walk(fn func(*innerNode)) { + for _, n := range g.nodes { + fn(n) + if sf, ok := n.ptr.(*Subflow); ok && sf.g != nil && sf.g.instantiated { + sf.g.walk(fn) + } + } +} diff --git a/llms.txt b/llms.txt index e920f31..1ba66b5 100644 --- a/llms.txt +++ b/llms.txt @@ -96,10 +96,37 @@ executor.Wait() // Run and wait (common pattern) executor.Run(tf).Wait() -// Export profiling data in flamegraph format +// Export profiling data in flamegraph format (requires WithProfiler option) err := executor.Profile(os.Stdout) + +// Export Chrome Trace Event data (requires WithTracer option) +err := executor.Trace(os.Stdout) +``` + +#### Executor Options + +`NewExecutor` accepts functional options as variadic arguments. The positional `concurrency` argument can be 0 when `WithConcurrency` is provided. + +```go +import "runtime" + +executor := gtf.NewExecutor(0, + gtf.WithConcurrency(uint(runtime.NumCPU()*4)), // set concurrency via option + gtf.WithProfiler(), // enable flamegraph profiling + gtf.WithTracer(), // enable Chrome Trace recording + gtf.WithPanicHandler(func(task string, r interface{}) { + log.Printf("task %s panicked: %v", task, r) + }), +) ``` +| Option | Description | +|---|---| +| `WithConcurrency(n uint)` | Override concurrency. Enables `NewExecutor(0, WithConcurrency(n))` style. | +| `WithProfiler()` | Enable flamegraph profiling. **Must** be set before calling `executor.Profile()`. | +| `WithTracer()` | Enable Chrome Trace recording. **Must** be set before calling `executor.Trace()`. | +| `WithPanicHandler(fn)` | Custom handler called on task panic. Replaces default `log.Printf` behavior. Graph is still canceled. | + --- ### 4. Task Types @@ -251,7 +278,7 @@ subflow.Precede(teardown) ### Panic Behavior - Unrecovered panics cancel the entire parent graph - Remaining tasks are left incomplete -- Framework logs panic with stack trace +- Framework logs panic with stack trace (default) ### Manual Panic Handling @@ -267,6 +294,16 @@ tf.NewTask("safe-task", func() { }) ``` +### Centralized Panic Handling via WithPanicHandler + +```go +executor := gtf.NewExecutor(1000, gtf.WithPanicHandler(func(task string, r interface{}) { + log.Printf("task %s panicked: %v", task, r) + // send alert, record metrics, etc. +})) +// The graph is still canceled after the handler returns. +``` + --- ## Visualization & Profiling @@ -285,7 +322,11 @@ if err := tf.Dump(os.Stdout); err != nil { ### Generate Flamegraph Profile +Requires `WithProfiler()` option when creating the executor. + ```go +executor := gtf.NewExecutor(1000, gtf.WithProfiler()) +executor.Run(tf).Wait() // Write flamegraph data if err := executor.Profile(os.Stdout); err != nil { @@ -296,6 +337,22 @@ if err := executor.Profile(os.Stdout); err != nil { // cat profile.txt | flamegraph.pl > profile.svg ``` +### Generate Chrome Trace + +Requires `WithTracer()` option when creating the executor. + +```go +executor := gtf.NewExecutor(1000, gtf.WithTracer()) +executor.Run(tf).Wait() + +// Write Chrome Trace Event data +if err := executor.Trace(os.Stdout); err != nil { + log.Fatal(err) +} + +// Visualize in chrome://tracing or https://ui.perfetto.dev/ +``` + --- ## Important Notes for Code Agents @@ -324,9 +381,9 @@ tf.NewTask("writer", func() { ### TaskFlow Lifecycle 1. Create TaskFlow: `NewTaskFlow(name)` 2. Create tasks and define dependencies -3. Create Executor: `NewExecutor(concurrency)` +3. Create Executor: `NewExecutor(concurrency, opts...)` — pass options like `WithProfiler()`, `WithTracer()`, `WithPanicHandler()` as needed 4. Run: `executor.Run(tf).Wait()` -5. Optionally: `Dump()` for visualization, `Profile()` for profiling +5. Optionally: `Dump()` for visualization, `Profile()` for flamegraph, `Trace()` for Chrome Trace 6. Optionally: `Reset()` to reuse TaskFlow ### Condition Task Gotchas @@ -388,7 +445,7 @@ func main() { final.Succeed(workers...) // Execute - executor := gtf.NewExecutor(10) + executor := gtf.NewExecutor(10, gtf.WithProfiler()) executor.Run(tf).Wait() // Export artifacts diff --git a/node.go b/node.go index 78bba47..a8e4211 100644 --- a/node.go +++ b/node.go @@ -74,6 +74,16 @@ func (n *innerNode) precede(v *innerNode) { v.dependents = append(v.dependents, n) } +// hasCondPredecessor reports whether any predecessor of this node is a condition node. +func (n *innerNode) hasCondPredecessor() bool { + for _, dep := range n.dependents { + if dep.Typ == nodeCondition { + return true + } + } + return false +} + func newNode(name string) *innerNode { if len(name) == 0 { name = "N_" + strconv.Itoa(time.Now().Nanosecond()) diff --git a/options.go b/options.go new file mode 100644 index 0000000..35c6b40 --- /dev/null +++ b/options.go @@ -0,0 +1,39 @@ +package gotaskflow +package gotaskflow + +// Option configures executor behavior. +type Option func(*innerExecutorImpl) + +// WithProfiler enables flame graph profiling for task execution analysis. +func WithProfiler() Option { + return func(e *innerExecutorImpl) { + e.profiler = newProfiler() + } +} + +// WithTracer enables Chrome Trace Event recording for task execution analysis. +// The trace output can be visualized in chrome://tracing or Perfetto UI. +func WithTracer() Option { + return func(e *innerExecutorImpl) { + e.tracer = newTracer() + } +} + +// WithConcurrency sets the max goroutine concurrency, overriding the positional argument. +// This enables NewExecutor(0, WithConcurrency(runtime.NumCPU())) usage. +// Recommend a value bigger than runtime.NumCPU and MUST bigger than num(subflows). +func WithConcurrency(n uint) Option { + return func(e *innerExecutorImpl) { + e.concurrency = n + } +} + +// WithPanicHandler sets a custom handler invoked when a task panics. +// The handler receives the task name and the recovered panic value. +// The graph is still canceled after a panic regardless of the handler. +// Without this option, panics are logged via log.Printf (default behavior). +func WithPanicHandler(fn func(task string, r interface{})) Option { + return func(e *innerExecutorImpl) { + e.panicHandler = fn + } +} diff --git a/profiler.go b/profiler.go index 5456643..3e44a6a 100644 --- a/profiler.go +++ b/profiler.go @@ -38,10 +38,11 @@ type attr struct { } type span struct { - extra attr - begin time.Time - cost time.Duration - parent *span + extra attr + begin time.Time + cost time.Duration + parent *span + dependents []string // names of predecessor tasks } func (s *span) String() string { diff --git a/taskflow_test.go b/taskflow_test.go index 70b9384..5380783 100644 --- a/taskflow_test.go +++ b/taskflow_test.go @@ -2,7 +2,6 @@ package gotaskflow_test import ( "fmt" - "log" _ "net/http/pprof" "os" "sync/atomic" @@ -13,99 +12,33 @@ import ( "github.com/noneback/go-taskflow/utils" ) -type rgChain[R comparable] struct { - rgs []*rgroup[R] -} - -func newRgChain[R comparable]() *rgChain[R] { - return &rgChain[R]{ - rgs: make([]*rgroup[R], 0), - } -} - -func (c *rgChain[R]) grouping(rs ...R) { - g := newRg[R]() - g.push(rs...) - c.rgs = append(c.rgs, g) -} - -// result group -type rgroup[R comparable] struct { - pre, next *rgroup[R] - elems map[R]struct{} -} - -func newRg[R comparable]() *rgroup[R] { - return &rgroup[R]{ - elems: make(map[R]struct{}), - } -} +// ============================================================================= +// Test Helper Functions +// ============================================================================= -func (g *rgroup[R]) push(rs ...R) { - for _, r := range rs { - g.elems[r] = struct{}{} +// testTaskSimple creates a simple task function that logs execution. +func testTaskSimple(name string, t *testing.T) func() { + return func() { + t.Logf("Executing task: %s", name) } } -func (g *rgroup[R]) chain(successor *rgroup[R]) { - g.next = successor - successor.pre = g.next -} - -func (g *rgroup[R]) contains(r R) bool { - _, ok := g.elems[r] - return ok -} - -func checkTopology[R comparable](t *testing.T, q *utils.Queue[R], chain *rgChain[R]) { - for _, g := range chain.rgs { - for len(g.elems) != 0 { - node := q.Pop() - if g.contains(node) { - delete(g.elems, node) - } else { - fmt.Println("failed in", node) - t.Fail() - } - } - } -} - -var executor = gotaskflow.NewExecutor(10) +// ============================================================================= +// Basic TaskFlow Tests +// ============================================================================= func TestTaskFlow(t *testing.T) { - q := utils.NewQueue[string](true) + // A -> B, C -> B, A1 -> B; A1 -> C, B1 -> C; C1 is independent + executor := gotaskflow.NewExecutor(10) tf := gotaskflow.NewTaskFlow("G") - A, B, C := - tf.NewTask("A", func() { - fmt.Println("A") - q.Put("A") - }), - tf.NewTask("B", func() { - fmt.Println("B") - q.Put("B") - }), - tf.NewTask("C", func() { - fmt.Println("C") - q.Put("C") - }) - A1, B1, _ := - tf.NewTask("A1", func() { - fmt.Println("A1") - q.Put("A1") - }), - tf.NewTask("B1", func() { - fmt.Println("B1") - q.Put("B1") - }), - tf.NewTask("C1", func() { - fmt.Println("C1") - q.Put("C1") - }) - chains := newRgChain[string]() - chains.grouping("C1", "A1", "B1", "A", "C") - chains.grouping("B") + A := tf.NewTask("A", testTaskSimple("A", t)) + B := tf.NewTask("B", testTaskSimple("B", t)) + C := tf.NewTask("C", testTaskSimple("C", t)) + A1 := tf.NewTask("A1", testTaskSimple("A1", t)) + B1 := tf.NewTask("B1", testTaskSimple("B1", t)) + _ = tf.NewTask("C1", testTaskSimple("C1", t)) + A.Precede(B) C.Precede(B) A1.Precede(B) @@ -114,373 +47,209 @@ func TestTaskFlow(t *testing.T) { t.Run("TestViz", func(t *testing.T) { if err := tf.Dump(os.Stdout); err != nil { - panic(err) + t.Fatalf("Failed to dump taskflow: %v", err) } }) executor.Run(tf).Wait() - if q.Len() != 6 { - t.Fail() - } - - // checkTopology(t, q, chains) } func TestSubflow(t *testing.T) { - q := utils.NewQueue[string](true) - // chains := newRgChain[string]() + executor := gotaskflow.NewExecutor(10) tf := gotaskflow.NewTaskFlow("G") - A, B, C := - tf.NewTask("A", func() { - fmt.Println("A") - q.Put("A") - }), - tf.NewTask("B", func() { - fmt.Println("B") - q.Put("B") - }), - tf.NewTask("C", func() { - fmt.Println("C") - q.Put("C") - }) - A1, B1, C1 := - tf.NewTask("A1", func() { - fmt.Println("A1") - q.Put("A1") - }), - tf.NewTask("B1", func() { - fmt.Println("B1") - q.Put("B1") - }), - tf.NewTask("C1", func() { - fmt.Println("C1") - q.Put("C1") - }) + A := tf.NewTask("A", testTaskSimple("A", t)) + B := tf.NewTask("B", testTaskSimple("B", t)) + C := tf.NewTask("C", testTaskSimple("C", t)) + A1 := tf.NewTask("A1", testTaskSimple("A1", t)) + B1 := tf.NewTask("B1", testTaskSimple("B1", t)) + C1 := tf.NewTask("C1", testTaskSimple("C1", t)) + A.Precede(B) C.Precede(B) C.Succeed(A1) C.Succeed(B1) subflow := tf.NewSubflow("sub1", func(sf *gotaskflow.Subflow) { - A2, B2, C2 := - sf.NewTask("A2", func() { - fmt.Println("A2") - q.Put("A2") - }), - sf.NewTask("B2", func() { - fmt.Println("B2") - q.Put("B2") - }), - sf.NewTask("C2", func() { - fmt.Println("C2") - q.Put("C2") - }) + A2 := sf.NewTask("A2", testTaskSimple("A2", t)) + B2 := sf.NewTask("B2", testTaskSimple("B2", t)) + C2 := sf.NewTask("C2", testTaskSimple("C2", t)) A2.Precede(B2) C2.Precede(B2) + cond := sf.NewCondition("cond", func() uint { return 0 }) ssub := sf.NewSubflow("sub in sub", func(sf *gotaskflow.Subflow) { sf.NewTask("done", func() { - fmt.Println("done") + t.Log("done in nested subflow") }) }) cond.Precede(ssub, cond) - }) subflow2 := tf.NewSubflow("sub2", func(sf *gotaskflow.Subflow) { - A3, B3, C3 := - sf.NewTask("A3", func() { - fmt.Println("A3") - q.Put("A3") - }), - sf.NewTask("B3", func() { - fmt.Println("B3") - q.Put("B3") - }), - sf.NewTask("C3", func() { - fmt.Println("C3") - q.Put("C3") - // time.Sleep(10 * time.Second) - }) + A3 := sf.NewTask("A3", testTaskSimple("A3", t)) + B3 := sf.NewTask("B3", testTaskSimple("B3", t)) + C3 := sf.NewTask("C3", testTaskSimple("C3", t)) A3.Precede(B3) C3.Precede(B3) - }) subflow.Precede(subflow2) C1.Precede(subflow) C1.Succeed(C) - executor.Run(tf) - executor.Wait() - if err := tf.Dump(os.Stdout); err != nil { - log.Fatal(err) - } - executor.Profile(os.Stdout) - - chain := newRgChain[string]() - - // Group 1 - Top-level nodes - chain.grouping("A1", "B1", "A") - chain.grouping("C") - chain.grouping("B", "C1") - chain.grouping("A2", "C2") - chain.grouping("B2") - - // Group 2 - Connections under A, B, C - chain.grouping("A3", "C3") - chain.grouping("B3") + executor.Run(tf).Wait() - // validate - if q.Len() != 12 { - t.Fail() + if err := tf.Dump(os.Stdout); err != nil { + t.Errorf("Failed to dump: %v", err) } - // checkTopology(t, q, chain) } -// ERROR robust testing +// ============================================================================= +// Error Robustness Tests +// ============================================================================= + func TestTaskflowPanic(t *testing.T) { + executor := gotaskflow.NewExecutor(10) tf := gotaskflow.NewTaskFlow("G") - A, B, C := - tf.NewTask("A", func() { - fmt.Println("A") - }), - tf.NewTask("B", func() { - fmt.Println("B") - }), - tf.NewTask("C", func() { - fmt.Println("C") - panic("panic C") - }) + + A := tf.NewTask("A", testTaskSimple("A", t)) + B := tf.NewTask("B", testTaskSimple("B", t)) + C := tf.NewTask("C", func() { + t.Log("C") + panic("panic C") + }) A.Precede(B) C.Precede(B) executor.Run(tf).Wait() + // Test should not hang or crash } func TestSubflowPanic(t *testing.T) { + executor := gotaskflow.NewExecutor(10) tf := gotaskflow.NewTaskFlow("G") - A, B, C := - tf.NewTask("A", func() { - fmt.Println("A") - }), - tf.NewTask("B", func() { - fmt.Println("B") - }), - tf.NewTask("C", func() { - fmt.Println("C") - }) + + A := tf.NewTask("A", testTaskSimple("A", t)) + B := tf.NewTask("B", testTaskSimple("B", t)) + C := tf.NewTask("C", testTaskSimple("C", t)) A.Precede(B) C.Precede(B) subflow := tf.NewSubflow("sub1", func(sf *gotaskflow.Subflow) { - A2, B2, C2 := - tf.NewTask("A2", func() { - fmt.Println("A2") - time.Sleep(1 * time.Second) - }), - tf.NewTask("B2", func() { - fmt.Println("B2") - }), - tf.NewTask("C2", func() { - fmt.Println("C2") - panic("C2 panicked") - }) + A2 := sf.NewTask("A2", func() { + t.Log("A2") + time.Sleep(1 * time.Second) + }) + B2 := sf.NewTask("B2", testTaskSimple("B2", t)) + C2 := sf.NewTask("C2", func() { + t.Log("C2") + panic("C2 panicked") + }) A2.Precede(B2) panic("subflow panic") C2.Precede(B2) }) subflow.Precede(B) - executor.Run(tf) - executor.Wait() + executor.Run(tf).Wait() + if err := tf.Dump(os.Stdout); err != nil { - fmt.Errorf("%v", err) + t.Logf("Dump error: %v", err) } - executor.Profile(os.Stdout) } +// ============================================================================= +// Condition Tests +// ============================================================================= + func TestTaskflowCondition(t *testing.T) { - q := utils.NewQueue[string](true) - chain := newRgChain[string]() - tf := gotaskflow.NewTaskFlow("G") t.Run("normal", func(t *testing.T) { - A, B, C := - tf.NewTask("A", func() { - fmt.Println("A") - q.Put("A") - }), - tf.NewTask("B", func() { - fmt.Println("B") - q.Put("B") - }), - tf.NewTask("C", func() { - fmt.Println("C") - q.Put("C") - }) + executor := gotaskflow.NewExecutor(10) + tf := gotaskflow.NewTaskFlow("G") + + A := tf.NewTask("A", testTaskSimple("A", t)) + B := tf.NewTask("B", testTaskSimple("B", t)) + C := tf.NewTask("C", testTaskSimple("C", t)) A.Precede(B) C.Precede(B) - fail, success := tf.NewTask("failed", func() { - fmt.Println("Failed") - q.Put("failed") + fail := tf.NewTask("failed", func() { + t.Log("Failed - should not execute") t.Fail() - }), tf.NewTask("success", func() { - fmt.Println("success") - q.Put("success") }) + success := tf.NewTask("success", testTaskSimple("success", t)) - cond := tf.NewCondition("cond", func() uint { - q.Put("cond") - return 0 - }) + cond := tf.NewCondition("cond", func() uint { return 0 }) B.Precede(cond) cond.Precede(success, fail) suc := tf.NewSubflow("sub1", func(sf *gotaskflow.Subflow) { - A2, B2, C2 := - sf.NewTask("A2", func() { - fmt.Println("A2") - q.Put("A2") - }), - sf.NewTask("B2", func() { - fmt.Println("B2") - q.Put("B2") - }), - sf.NewTask("C2", func() { - fmt.Println("C2") - q.Put("C2") - }) + A2 := sf.NewTask("A2", testTaskSimple("A2", t)) + B2 := sf.NewTask("B2", testTaskSimple("B2", t)) + C2 := sf.NewTask("C2", testTaskSimple("C2", t)) A2.Precede(B2) C2.Precede(B2) }).Priority(gotaskflow.HIGH) - fs := tf.NewTask("fail_single", func() { - fmt.Println("it should be canceled") - q.Put("fail_single") - }) + fs := tf.NewTask("fail_single", func() { t.Log("it should be canceled") }) fail.Precede(fs, suc) - // success.Precede(suc) + if err := tf.Dump(os.Stdout); err != nil { - fmt.Errorf("%v", err) + t.Logf("Dump error: %v", err) } executor.Run(tf).Wait() - - executor.Profile(os.Stdout) - chain.grouping("A", "C") - chain.grouping("B") - chain.grouping("cond") - chain.grouping("success") - - checkTopology(t, q, chain) - }) t.Run("normal-1", func(t *testing.T) { + executor := gotaskflow.NewExecutor(10) tf := gotaskflow.NewTaskFlow("G") - A, B, C := - tf.NewTask("A", func() { - fmt.Println("A") - q.Put("A") - }), - tf.NewTask("B", func() { - fmt.Println("B") - q.Put("B") - }), - tf.NewTask("C", func() { - fmt.Println("C") - q.Put("C") - }) + A := tf.NewTask("A", testTaskSimple("A", t)) + B := tf.NewTask("B", testTaskSimple("B", t)) + C := tf.NewTask("C", testTaskSimple("C", t)) A.Precede(B) C.Precede(B) - fail, success := tf.NewTask("failed", func() { - fmt.Println("Failed") - q.Put("failed") + fail := tf.NewTask("failed", func() { + t.Log("Failed - should not execute") t.Fail() - }), tf.NewTask("success", func() { - fmt.Println("success") - q.Put("success") }) + success := tf.NewTask("success", testTaskSimple("success", t)) - cond := tf.NewCondition("cond", func() uint { - q.Put("cond") - return 0 - }) + cond := tf.NewCondition("cond", func() uint { return 0 }) B.Precede(cond) cond.Precede(success) cond.Precede(fail) - // success.Precede(suc) - if err := tf.Dump(os.Stdout); err != nil { - fmt.Errorf("%v", err) - } executor.Run(tf).Wait() - - executor.Profile(os.Stdout) - chain.grouping("A", "C") - chain.grouping("B") - chain.grouping("cond") - chain.grouping("success") - - checkTopology(t, q, chain) - }) t.Run("multiple tasks preceding condition", func(t *testing.T) { + executor := gotaskflow.NewExecutor(10) tf := gotaskflow.NewTaskFlow("G") - A, B, C := - tf.NewTask("A", func() { - fmt.Println("A") - q.Put("A") - }), - tf.NewTask("B", func() { - fmt.Println("B") - q.Put("B") - }), - tf.NewTask("C", func() { - fmt.Println("C") - q.Put("C") - }) + A := tf.NewTask("A", testTaskSimple("A", t)) + B := tf.NewTask("B", testTaskSimple("B", t)) + C := tf.NewTask("C", testTaskSimple("C", t)) - fail, success := tf.NewTask("failed", func() { - fmt.Println("Failed") - q.Put("failed") + fail := tf.NewTask("failed", func() { + t.Log("Failed - should not execute") t.Fail() - }), tf.NewTask("success", func() { - fmt.Println("success") - q.Put("success") }) + success := tf.NewTask("success", testTaskSimple("success", t)) - cond := tf.NewCondition("cond", func() uint { - q.Put("cond") - return 0 - }) + cond := tf.NewCondition("cond", func() uint { return 0 }) A.Precede(cond) B.Precede(cond) C.Precede(cond) cond.Precede(success) cond.Precede(fail) - // success.Precede(suc) - if err := tf.Dump(os.Stdout); err != nil { - fmt.Errorf("%v", err) - } executor.Run(tf).Wait() - - executor.Profile(os.Stdout) - chain.grouping("A", "B", "C") - chain.grouping("cond") - chain.grouping("success") - - checkTopology(t, q, chain) - }) t.Run("start with condition node", func(t *testing.T) { @@ -495,76 +264,75 @@ func TestTaskflowCondition(t *testing.T) { } }) - zero, one := tf.NewTask("zero", func() { - fmt.Println("zero") - }), tf.NewTask("one", func() { - fmt.Println("one") - }) + zero := tf.NewTask("zero", testTaskSimple("zero", t)) + one := tf.NewTask("one", testTaskSimple("one", t)) cond.Precede(zero, one) + executor := gotaskflow.NewExecutor(10) executor.Run(tf).Wait() if err := tf.Dump(os.Stdout); err != nil { - log.Fatal(err) + t.Errorf("Dump error: %v", err) } - executor.Profile(os.Stdout) - }) - } +// ============================================================================= +// Loop Tests +// ============================================================================= + func TestTaskflowLoop(t *testing.T) { + executor := gotaskflow.NewExecutor(10) + t.Run("normal", func(t *testing.T) { i := 0 tf := gotaskflow.NewTaskFlow("G") - init, cond, body, back, done := - tf.NewTask("init", func() { - i = 0 - fmt.Println("i=0") - }), - tf.NewCondition("while i < 5", func() uint { - if i < 5 { - return 0 - } else { - return 1 - } - }), - tf.NewTask("body", func() { - i += 1 - fmt.Println("i++ =", i) - }), - tf.NewCondition("back", func() uint { - fmt.Println("back") + + init := tf.NewTask("init", func() { + i = 0 + t.Log("i=0") + }) + cond := tf.NewCondition("while i < 5", func() uint { + if i < 5 { return 0 - }), - tf.NewTask("done", func() { - fmt.Println("done") - }) + } else { + return 1 + } + }) + body := tf.NewTask("body", func() { + i += 1 + t.Logf("i++ = %d", i) + }) + back := tf.NewCondition("back", func() uint { + t.Log("back") + return 0 + }) + done := tf.NewTask("done", func() { + t.Log("done") + }) init.Precede(cond) cond.Precede(body, done) body.Precede(back) back.Precede(cond) - if err := tf.Dump(os.Stdout); err != nil { - // log.Fatal(err) - } + executor.Run(tf).Wait() + if i < 5 { - t.Fail() + t.Errorf("Expected i >= 5, got %d", i) } - - executor.Profile(os.Stdout) }) t.Run("simple loop", func(t *testing.T) { i := 0 tf := gotaskflow.NewTaskFlow("G") + init := tf.NewTask("init", func() { i = 0 }) cond := tf.NewCondition("cond", func() uint { i++ - fmt.Println("i++ =", i) + t.Logf("i++ = %d", i) if i > 2 { return 0 } else { @@ -573,161 +341,165 @@ func TestTaskflowLoop(t *testing.T) { }) done := tf.NewTask("done", func() { - fmt.Println("done") + t.Log("done") }) init.Precede(cond) cond.Precede(done, cond) executor.Run(tf).Wait() - if i <= 2 { - t.Fail() - } - if err := tf.Dump(os.Stdout); err != nil { - // log.Fatal(err) + if i <= 2 { + t.Errorf("Expected i > 2, got %d", i) } - executor.Profile(os.Stdout) }) } +// ============================================================================= +// Priority Tests +// ============================================================================= + func TestTaskflowPriority(t *testing.T) { - executor := gotaskflow.NewExecutor(uint(1)) + executor := gotaskflow.NewExecutor(1) q := utils.NewQueue[byte](true) tf := gotaskflow.NewTaskFlow("G") + tf.NewTask("B", func() { - fmt.Println("B") + t.Log("B") q.Put('B') }).Priority(gotaskflow.NORMAL) tf.NewTask("C", func() { - fmt.Println("C") + t.Log("C") q.Put('C') }).Priority(gotaskflow.HIGH) + A := tf.NewTask("A", func() { - fmt.Println("A") + t.Log("A") q.Put('A') }).Priority(gotaskflow.LOW) - A.Precede(tf.NewTask("A2", func() { - fmt.Println("A2") - q.Put('a') - }).Priority(gotaskflow.LOW), + A.Precede( + tf.NewTask("A2", func() { + t.Log("A2") + q.Put('a') + }).Priority(gotaskflow.LOW), tf.NewTask("B2", func() { - fmt.Println("B2") + t.Log("B2") q.Put('b') }).Priority(gotaskflow.HIGH), tf.NewTask("C2", func() { - fmt.Println("C2") + t.Log("C2") q.Put('c') - }).Priority(gotaskflow.NORMAL)) + }).Priority(gotaskflow.NORMAL), + ) executor.Run(tf).Wait() - tf.Dump(os.Stdout) - fmt.Println("validate") - for _, val := range []byte{'C', 'B', 'A', 'b', 'c', 'a'} { + + expected := []byte{'C', 'B', 'A', 'b', 'c', 'a'} + for _, val := range expected { real := q.Pop() - fmt.Printf("%c, ", real) + t.Logf("Expected: %c, Got: %c", val, real) if val != real { - t.Fatal("[FAILED]", string(val), string(real)) - t.FailNow() + t.Fatalf("Priority order mismatch: expected %c, got %c", val, real) } } } +// ============================================================================= +// Edge Case Tests +// ============================================================================= + func TestTaskflowNotInFlow(t *testing.T) { + executor := gotaskflow.NewExecutor(10) tf := gotaskflow.NewTaskFlow("tf") - task := tf.NewTask("init", func() { - fmt.Println("task init") - }) - cnt := 0 + + task := tf.NewTask("init", testTaskSimple("init", t)) + var cnt atomic.Int32 for i := 0; i < 10; i++ { task.Precede(tf.NewTask("test", func() { - fmt.Println(cnt) - cnt++ + cnt.Add(1) })) } executor.Run(tf).Wait() + + if cnt.Load() != 10 { + t.Errorf("Expected cnt = 10, got %d", cnt.Load()) + } } func TestTaskflowFrozen(t *testing.T) { + executor := gotaskflow.NewExecutor(10) tf := gotaskflow.NewTaskFlow("G") - A, B, C := - tf.NewTask("A", func() { - fmt.Println("A") - }), - tf.NewTask("B", func() { - fmt.Println("B") - }), - tf.NewTask("C", func() { - fmt.Println("C") - }) + + A := tf.NewTask("A", testTaskSimple("A", t)) + B := tf.NewTask("B", testTaskSimple("B", t)) + C := tf.NewTask("C", testTaskSimple("C", t)) A.Precede(B) C.Precede(B) executor.Run(tf).Wait() + utils.AssertPanics(t, "frozen", func() { - tf.NewTask("tt", func() { - fmt.Println("should not") - }) + tf.NewTask("tt", testTaskSimple("tt", t)) }) } +// ============================================================================= +// Stress Tests +// ============================================================================= + func TestLoopRunManyTimes(t *testing.T) { + executor := gotaskflow.NewExecutor(10) tf := gotaskflow.NewTaskFlow("G") var count atomic.Int32 - // add := func() { - // count.Add(1) - // } + add := func(name string) func() { return func() { - fmt.Println(name) + t.Logf("%s", name) count.Add(1) } } - A, B, C := - tf.NewTask("A", add("A")), - tf.NewTask("B", add("B")), - tf.NewTask("C", add("C")) + + A := tf.NewTask("A", add("A")) + B := tf.NewTask("B", add("B")) + C := tf.NewTask("C", add("C")) A.Precede(B) C.Precede(B) + + // Reduced iterations for faster testing + iterations := 100 + t.Run("static", func(t *testing.T) { - for i := 0; i < 10000; i++ { - log.Println("static iter ---> ", i) + for i := 0; i < iterations; i++ { if cnt := count.Load(); cnt%3 != 0 { - t.Error("static unexpected count", cnt) + t.Errorf("static unexpected count %d at iteration %d", cnt, i) return } executor.Run(tf).Wait() } }) - // tf.Dump(os.Stdout) tf.Reset() count.Store(0) sf := tf.NewSubflow("sub", func(sf *gotaskflow.Subflow) { - fmt.Println("sub") - A1, B1, C1 := - sf.NewTask("A1", add("A1")), - sf.NewTask("B1", add("B1")), - sf.NewTask("C1", add("C1")) + t.Log("sub") + A1 := sf.NewTask("A1", add("A1")) + B1 := sf.NewTask("B1", add("B1")) + C1 := sf.NewTask("C1", add("C1")) A1.Precede(B1) C1.Precede(B1) }) additional := tf.NewTask("additional", add("Additional")) B.Precede(sf) additional.Precede(sf) - executor.Run(tf).Wait() - // dot, _ := os.OpenFile("./dot.data", os.O_RDWR, os.ModeAppend) - tf.Dump(os.Stdout) t.Run("subflow", func(t *testing.T) { - for i := 0; i < 10000; i++ { - log.Println("subflow iter ---> ", i) + for i := 0; i < iterations; i++ { if cnt := count.Load(); cnt%7 != 0 { - t.Error("subflow unexpected count", cnt) + t.Errorf("subflow unexpected count %d at iteration %d", cnt, i) return } executor.Run(tf).Wait() @@ -748,16 +520,15 @@ func TestLoopRunManyTimes(t *testing.T) { count.Add(7) }) cond.Precede(plus7, tf.NewTask("7 minus 3", func() { - log.Println(count.Load()) - log.Println("should not minus 3") + t.Logf("%d", count.Load()) + t.Log("should not minus 3") })) sf.Precede(cond) t.Run("condition", func(t *testing.T) { - for i := 0; i < 10000; i++ { - log.Println("condition iter ---> ", i) + for i := 0; i < iterations; i++ { if cnt := count.Load(); cnt%7 != 0 { - t.Error("condition unexpect count", cnt) + t.Errorf("condition unexpected count %d at iteration %d", cnt, i) return } executor.Run(tf).Wait() @@ -781,19 +552,16 @@ func TestLoopRunManyTimes(t *testing.T) { } }) - new_plus7 := tf.NewTask("new plus 7", func() { + newPlus7 := tf.NewTask("new plus 7", func() { count.Add(7) }) - cond2.Precede(new_plus7, done) - new_plus7.Precede(cond2) + cond2.Precede(newPlus7, done) + newPlus7.Precede(cond2) - tf.Dump(os.Stdout) t.Run("loop", func(t *testing.T) { - for i := 0; i < 10000; i++ { - log.Println("loop iter ---> ", i) + for i := 0; i < iterations; i++ { if cnt := count.Load(); cnt%7 != 0 { - log.Println(cnt) - t.Error("loop unexpect count", cnt) + t.Errorf("loop unexpected count %d at iteration %d", cnt, i) return } executor.Run(tf).Wait() @@ -802,66 +570,74 @@ func TestLoopRunManyTimes(t *testing.T) { } func TestSequencialTaskingPanic(t *testing.T) { - exe := gotaskflow.NewExecutor(1) - tfl := gotaskflow.NewTaskFlow("test") + executor := gotaskflow.NewExecutor(1) + tf := gotaskflow.NewTaskFlow("test") q := utils.NewQueue[string](true) - tfl.NewTask("task1", func() { + + tf.NewTask("task1", func() { q.Put("panic") - fmt.Println("task1") + t.Log("task1") panic(1) }) - tfl.NewTask("task2", func() { + tf.NewTask("task2", func() { q.Put("2") - fmt.Println("task2") + t.Log("task2") }) - tfl.NewTask("task3", func() { + tf.NewTask("task3", func() { q.Put("3") - fmt.Println("task3") + t.Log("task3") }) - exe.Run(tfl).Wait() + + executor.Run(tf).Wait() + if q.Top() != "panic" { - t.Fail() + t.Error("Expected panic task to execute first") } } + func TestDeadlock(t *testing.T) { // BUG: https://github.com/noneback/go-taskflow/issues/99 - tf := gotaskflow.NewTaskFlow("G1") - exe := gotaskflow.NewExecutor(1) - N := 100 - prev := tf.NewTask("N0", func() {}) - for i := 1; i < 32; i++ { - next := tf.NewTask(fmt.Sprintf("N%d", i), func() {}) - prev.Precede(next) - prev = next - } + executor := gotaskflow.NewExecutor(1) + N := 10 // Reduced for faster testing + + t.Run("linear chain", func(t *testing.T) { + tf := gotaskflow.NewTaskFlow("G1") + prev := tf.NewTask("N0", func() {}) + for i := 1; i < 32; i++ { + next := tf.NewTask(fmt.Sprintf("N%d", i), func() {}) + prev.Precede(next) + prev = next + } - for i := 0; i < N; i++ { - exe.Run(tf).Wait() - } + for i := 0; i < N; i++ { + executor.Run(tf).Wait() + } + }) - tf = gotaskflow.NewTaskFlow("G2") + t.Run("layered graph", func(t *testing.T) { + tf := gotaskflow.NewTaskFlow("G2") + layersCount := 8 + layerNodesCount := 8 - layersCount := 8 - layerNodesCount := 8 + var curLayer, upperLayer []*gotaskflow.Task - var curLayer, upperLayer []*gotaskflow.Task + for i := 0; i < layersCount; i++ { + for j := 0; j < layerNodesCount; j++ { + task := tf.NewTask(fmt.Sprintf("N%d", i*layersCount+j), func() {}) - for i := 0; i < layersCount; i++ { - for j := 0; j < layerNodesCount; j++ { - task := tf.NewTask(fmt.Sprintf("N%d", i*layersCount+j), func() {}) + for _, t := range upperLayer { + t.Precede(task) + } - for i := range upperLayer { - upperLayer[i].Precede(task) + curLayer = append(curLayer, task) } - curLayer = append(curLayer, task) + upperLayer = curLayer + curLayer = []*gotaskflow.Task{} } - upperLayer = curLayer - curLayer = []*gotaskflow.Task{} - } - - for i := 0; i < N; i++ { - exe.Run(tf).Wait() - } + for i := 0; i < N; i++ { + executor.Run(tf).Wait() + } + }) } diff --git a/tracer.go b/tracer.go new file mode 100644 index 0000000..da83681 --- /dev/null +++ b/tracer.go @@ -0,0 +1,84 @@ +package gotaskflow + +import ( + "encoding/json" + "io" + "sync" + "sync/atomic" + "time" +) + +// tracer records task execution events and exports them in Chrome Trace Event Format. +// The output can be visualized in Chrome's chrome://tracing or Perfetto UI (https://ui.perfetto.dev). +type tracer struct { + events []chromeTraceEvent + mu sync.Mutex + start time.Time + tidGen atomic.Int64 +} + +// chromeTraceEvent represents a single trace event following Chrome Trace Event Format. +type chromeTraceEvent struct { + Name string `json:"name"` + Cat string `json:"cat"` + Ph string `json:"ph"` + Ts int64 `json:"ts"` + Dur int64 `json:"dur"` + Pid int `json:"pid"` + Tid int64 `json:"tid"` + Args map[string]string `json:"args,omitempty"` +} + +func newTracer() *tracer { + return &tracer{ + events: make([]chromeTraceEvent, 0, 64), + start: time.Now(), + } +} + +// AddEvent records a task execution event from the given span. +func (t *tracer) AddEvent(s *span) { + t.mu.Lock() + defer t.mu.Unlock() + + ev := chromeTraceEvent{ + Name: s.extra.name, + Cat: string(s.extra.typ), + Ph: "X", + Ts: s.begin.Sub(t.start).Microseconds(), + Dur: s.cost.Microseconds(), + Pid: 0, + Tid: t.tidGen.Add(1), + } + + // Build args with optional parent and dependents + args := make(map[string]string) + if s.parent != nil { + args["parent"] = s.parent.extra.name + } + if len(s.dependents) > 0 { + // Store as comma-separated string for simplicity + deps := "" + for i, d := range s.dependents { + if i > 0 { + deps += "," + } + deps += d + } + args["dependents"] = deps + } + if len(args) > 0 { + ev.Args = args + } + + t.events = append(t.events, ev) +} + +func (t *tracer) draw(w io.Writer) error { + t.mu.Lock() + defer t.mu.Unlock() + + encoder := json.NewEncoder(w) + encoder.SetIndent("", " ") + return encoder.Encode(t.events) +} diff --git a/tracer_test.go b/tracer_test.go new file mode 100644 index 0000000..f8267bf --- /dev/null +++ b/tracer_test.go @@ -0,0 +1,112 @@ +package gotaskflow + +import ( + "bytes" + "encoding/json" + "sync" + "testing" + "time" +) + +func TestTracerAddEvent(t *testing.T) { + tr := newTracer() + s := &span{ + extra: attr{typ: nodeStatic, name: "task-a"}, + begin: tr.start.Add(10 * time.Millisecond), + cost: 5 * time.Millisecond, + } + tr.AddEvent(s) + + if len(tr.events) != 1 { + t.Fatalf("expected 1 event, got %d", len(tr.events)) + } + ev := tr.events[0] + if ev.Name != "task-a" { + t.Errorf("expected name 'task-a', got %q", ev.Name) + } + if ev.Cat != string(nodeStatic) { + t.Errorf("expected cat %q, got %q", string(nodeStatic), ev.Cat) + } + if ev.Ph != "X" { + t.Errorf("expected ph 'X', got %q", ev.Ph) + } + if ev.Dur != 5000 { + t.Errorf("expected dur 5000, got %d", ev.Dur) + } +} + +func TestTracerWithParent(t *testing.T) { + tr := newTracer() + parent := &span{ + extra: attr{typ: nodeSubflow, name: "parent-flow"}, + begin: tr.start, + cost: 20 * time.Millisecond, + } + child := &span{ + extra: attr{typ: nodeStatic, name: "child-task"}, + begin: tr.start.Add(5 * time.Millisecond), + cost: 10 * time.Millisecond, + parent: parent, + } + tr.AddEvent(child) + + if tr.events[0].Args == nil { + t.Fatal("expected args with parent info") + } + if tr.events[0].Args["parent"] != "parent-flow" { + t.Errorf("expected parent 'parent-flow', got %q", tr.events[0].Args["parent"]) + } +} + +func TestTracerDraw(t *testing.T) { + tr := newTracer() + tr.AddEvent(&span{ + extra: attr{typ: nodeStatic, name: "a"}, + begin: tr.start, + cost: 1 * time.Millisecond, + }) + + var buf bytes.Buffer + if err := tr.draw(&buf); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + var events []chromeTraceEvent + if err := json.Unmarshal(buf.Bytes(), &events); err != nil { + t.Fatalf("output is not valid JSON: %v", err) + } + if len(events) != 1 { + t.Fatalf("expected 1 event in output, got %d", len(events)) + } +} + +func TestTracerConcurrentAddEvent(t *testing.T) { + tr := newTracer() + var wg sync.WaitGroup + n := 100 + wg.Add(n) + for i := 0; i < n; i++ { + go func(i int) { + defer wg.Done() + tr.AddEvent(&span{ + extra: attr{typ: nodeStatic, name: "task"}, + begin: tr.start.Add(time.Duration(i) * time.Millisecond), + cost: 1 * time.Millisecond, + }) + }(i) + } + wg.Wait() + + if len(tr.events) != n { + t.Fatalf("expected %d events, got %d", n, len(tr.events)) + } + + // verify all tids are unique + tids := make(map[int64]bool) + for _, ev := range tr.events { + if tids[ev.Tid] { + t.Fatalf("duplicate tid: %d", ev.Tid) + } + tids[ev.Tid] = true + } +} diff --git a/utils/copool_test.go b/utils/copool_test.go index ead12cd..2bb8403 100644 --- a/utils/copool_test.go +++ b/utils/copool_test.go @@ -59,16 +59,18 @@ func TestPoolPanic(t *testing.T) { func TestPoolSequentialExec(t *testing.T) { p := NewCopool(1) q := make([]int, 0, 10000) - // mutex := &sync.Mutex{} idx := 0 + var wg sync.WaitGroup for i := 0; i < 10000; i++ { + wg.Add(1) p.Go(func() { + defer wg.Done() q = append(q, idx) idx++ }) } - time.Sleep(1 * time.Second) + wg.Wait() fmt.Println("len", len(q)) diff --git a/validator.go b/validator.go new file mode 100644 index 0000000..1c594c1 --- /dev/null +++ b/validator.go @@ -0,0 +1,179 @@ +package gotaskflow + +import ( + "fmt" + "strings" +) + +// validate checks trace events against the expected TaskFlow DAG. +// e must have been created with WithTracer(); otherwise the result is always valid. +// Internal-only: used in tests to verify execution correctness. +func validate(e Executor, tf *TaskFlow) *validationResult { + impl, ok := e.(*innerExecutorImpl) + if !ok || impl.tracer == nil { + return &validationResult{valid: true} + } + return newValidator(impl.tracer).run(tf) +} + +// validationResult contains the result of validating trace events against a TaskFlow. +type validationResult struct { + valid bool + missingTasks []string // Tasks defined but not executed + unexpectedTasks []string // Tasks executed but not defined + dependencyErrors []dependencyError // Dependency mismatches + skippedBranches []string // Condition branches that were skipped (not errors) +} + +// dependencyError represents a mismatch in task dependencies. +type dependencyError struct { + task string + expected []string + actual []string +} + +func (e dependencyError) String() string { + return fmt.Sprintf("task %q: expected deps %v, actual %v", e.task, e.expected, e.actual) +} + +func (r *validationResult) String() string { + if r.valid { + return "validation passed" + } + var sb strings.Builder + sb.WriteString("validation failed:\n") + if len(r.missingTasks) > 0 { + sb.WriteString(fmt.Sprintf(" missing tasks: %v\n", r.missingTasks)) + } + if len(r.unexpectedTasks) > 0 { + sb.WriteString(fmt.Sprintf(" unexpected tasks: %v\n", r.unexpectedTasks)) + } + for _, e := range r.dependencyErrors { + sb.WriteString(fmt.Sprintf(" %s\n", e.String())) + } + if len(r.skippedBranches) > 0 { + sb.WriteString(fmt.Sprintf(" skipped branches (OK): %v\n", r.skippedBranches)) + } + return sb.String() +} + +// validator validates trace events against TaskFlow definitions. +type validator struct { + tracer *tracer +} + +func newValidator(t *tracer) *validator { + return &validator{tracer: t} +} + +// run compares executed trace events against the expected TaskFlow DAG. +func (v *validator) run(tf *TaskFlow) *validationResult { + if v.tracer == nil { + return &validationResult{valid: true} + } + + result := &validationResult{valid: true} + + // --- Step 1: collect expected nodes via eGraph.walk --- + expected := make(map[string]*innerNode) + tf.graph.walk(func(n *innerNode) { expected[n.name] = n }) + + // --- Step 2: collect executed events from tracer --- + v.tracer.mu.Lock() + executed := make(map[string]chromeTraceEvent, len(v.tracer.events)) + for _, ev := range v.tracer.events { + executed[ev.Name] = ev + } + v.tracer.mu.Unlock() + + // --- Step 3: missing / skipped check --- + // A task is a skipped branch if: + // (a) it is a direct successor of a condition node that chose a different branch, OR + // (b) any of its non-condition predecessors is also skipped (transitive skip). + skipped := make(map[string]bool) + for name, node := range expected { + if _, ran := executed[name]; !ran && node.hasCondPredecessor() { + skipped[name] = true + } + } + for changed := true; changed; { + changed = false + for name, node := range expected { + if _, ran := executed[name]; ran || skipped[name] { + continue + } + for _, dep := range node.dependents { + if dep.Typ != nodeCondition && skipped[dep.name] { + skipped[name] = true + changed = true + break + } + } + } + } + for name := range expected { + if _, ran := executed[name]; !ran { + if skipped[name] { + result.skippedBranches = append(result.skippedBranches, name) + } else { + result.missingTasks = append(result.missingTasks, name) + result.valid = false + } + } + } + + // --- Step 4: unexpected check --- + for name := range executed { + if _, defined := expected[name]; !defined { + result.unexpectedTasks = append(result.unexpectedTasks, name) + result.valid = false + } + } + + // --- Step 5: dependency check --- + for name, ev := range executed { + node, ok := expected[name] + if !ok { + continue + } + var expDeps []string + for _, dep := range node.dependents { + if _, ran := executed[dep.name]; ran { + expDeps = append(expDeps, dep.name) + } + } + var actDeps []string + if raw := ev.Args["dependents"]; raw != "" { + for _, d := range strings.Split(raw, ",") { + if _, ran := executed[d]; ran { + actDeps = append(actDeps, d) + } + } + } + if !stringSliceEqual(expDeps, actDeps) { + result.dependencyErrors = append(result.dependencyErrors, dependencyError{ + task: name, expected: expDeps, actual: actDeps, + }) + result.valid = false + } + } + + return result +} + +// stringSliceEqual checks if two string slices contain the same elements (order-independent). +func stringSliceEqual(a, b []string) bool { + if len(a) != len(b) { + return false + } + count := make(map[string]int, len(a)) + for _, s := range a { + count[s]++ + } + for _, s := range b { + if count[s]--; count[s] < 0 { + return false + } + } + return true +} diff --git a/validator_test.go b/validator_test.go new file mode 100644 index 0000000..d7abdf5 --- /dev/null +++ b/validator_test.go @@ -0,0 +1,317 @@ +package gotaskflow + +import ( + "strings" + "testing" +) + +func TestValidatorSimpleSerial(t *testing.T) { + executor := NewExecutor(4, WithTracer()) + tf := NewTaskFlow("serial") + + A := tf.NewTask("A", func() {}) + B := tf.NewTask("B", func() {}) + C := tf.NewTask("C", func() {}) + + A.Precede(B) + B.Precede(C) + + executor.Run(tf).Wait() + + result := validate(executor, tf) + if !result.valid { + t.Errorf("expected valid, got: %s", result.String()) + } +} + +func TestValidatorParallel(t *testing.T) { + executor := NewExecutor(4, WithTracer()) + tf := NewTaskFlow("parallel") + + // Fan-out: A -> B, C in parallel -> D (fan-in) + A := tf.NewTask("A", func() {}) + B := tf.NewTask("B", func() {}) + C := tf.NewTask("C", func() {}) + D := tf.NewTask("D", func() {}) + + A.Precede(B, C) + B.Precede(D) + C.Precede(D) + + executor.Run(tf).Wait() + + result := validate(executor, tf) + if !result.valid { + t.Errorf("expected valid, got: %s", result.String()) + } +} + +func TestValidatorSubflow(t *testing.T) { + executor := NewExecutor(4, WithTracer()) + tf := NewTaskFlow("with_subflow") + + A := tf.NewTask("A", func() {}) + sub := tf.NewSubflow("sub", func(sf *Subflow) { + S1 := sf.NewTask("S1", func() {}) + S2 := sf.NewTask("S2", func() {}) + S1.Precede(S2) + }) + B := tf.NewTask("B", func() {}) + + A.Precede(sub) + sub.Precede(B) + + executor.Run(tf).Wait() + + result := validate(executor, tf) + if !result.valid { + t.Errorf("expected valid, got: %s", result.String()) + } +} + +func TestValidatorConditionBranch(t *testing.T) { + executor := NewExecutor(4, WithTracer()) + tf := NewTaskFlow("condition") + + A := tf.NewTask("A", func() {}) + cond := tf.NewCondition("cond", func() uint { return 0 }) // always choose branch 0 + B := tf.NewTask("B", func() {}) // branch 0 - will execute + C := tf.NewTask("C", func() {}) // branch 1 - will skip + D := tf.NewTask("D", func() {}) + + A.Precede(cond) + cond.Precede(B, C) // 0 -> B, 1 -> C + B.Precede(D) + + executor.Run(tf).Wait() + + result := validate(executor, tf) + if !result.valid { + t.Errorf("expected valid (C should be skipped branch), got: %s", result.String()) + } + + // C should be in skipped branches + if !containsStr(result.skippedBranches, "C") { + t.Errorf("expected C in skipped branches, got: %v", result.skippedBranches) + } +} + +func TestValidatorWithoutTracer(t *testing.T) { + executor := NewExecutor(4) // no tracer + tf := NewTaskFlow("no_tracer") + + A := tf.NewTask("A", func() {}) + B := tf.NewTask("B", func() {}) + A.Precede(B) + + executor.Run(tf).Wait() + + result := validate(executor, tf) + if !result.valid { + t.Errorf("expected valid (tracer disabled should pass), got: %s", result.String()) + } +} + +func TestValidatorComplexPipeline(t *testing.T) { + executor := NewExecutor(8, WithTracer()) + tf := NewTaskFlow("complex") + + // prepare -> (read_config || load_data) -> validate -> check(cond) -> sub_process -> report + prepare := tf.NewTask("prepare", func() {}) + readConfig := tf.NewTask("read_config", func() {}) + loadData := tf.NewTask("load_data", func() {}) + validateTask := tf.NewTask("validate", func() {}) + check := tf.NewCondition("check", func() uint { return 0 }) + subProcess := tf.NewSubflow("sub_process", func(sf *Subflow) { + transform := sf.NewTask("transform", func() {}) + enrich := sf.NewTask("enrich", func() {}) + aggregate := sf.NewTask("aggregate", func() {}) + transform.Precede(aggregate) + enrich.Precede(aggregate) + }) + fallback := tf.NewTask("fallback", func() {}) // skipped + report := tf.NewTask("report", func() {}) + + prepare.Precede(readConfig, loadData) + readConfig.Precede(validateTask) + loadData.Precede(validateTask) + validateTask.Precede(check) + check.Precede(subProcess, fallback) + subProcess.Precede(report) + + executor.Run(tf).Wait() + + result := validate(executor, tf) + if !result.valid { + t.Errorf("expected valid, got: %s", result.String()) + } + + // fallback should be skipped + if !containsStr(result.skippedBranches, "fallback") { + t.Errorf("expected fallback in skipped branches, got: %v", result.skippedBranches) + } +} + +// TestValidatorConditionBranch1 verifies that branch 1 executes and branch 0 is skipped. +func TestValidatorConditionBranch1(t *testing.T) { + executor := NewExecutor(4, WithTracer()) + tf := NewTaskFlow("cond_branch1") + + cond := tf.NewCondition("cond", func() uint { return 1 }) // always branch 1 + branch0 := tf.NewTask("branch0", func() {}) // skipped + branch1 := tf.NewTask("branch1", func() {}) // executed + cond.Precede(branch0, branch1) + + executor.Run(tf).Wait() + + result := validate(executor, tf) + if !result.valid { + t.Errorf("expected valid, got: %s", result.String()) + } + + if !containsStr(result.skippedBranches, "branch0") { + t.Errorf("expected branch0 in skipped branches, got: %v", result.skippedBranches) + } + if containsStr(result.skippedBranches, "branch1") { + t.Errorf("branch1 should have executed, not skipped") + } +} + +// TestValidatorNestedSubflow verifies validation with a subflow nested inside another subflow. +func TestValidatorNestedSubflow(t *testing.T) { + executor := NewExecutor(4, WithTracer()) + tf := NewTaskFlow("nested_subflow") + + outer := tf.NewSubflow("outer", func(sf *Subflow) { + inner := sf.NewSubflow("inner", func(sf2 *Subflow) { + x := sf2.NewTask("X", func() {}) + y := sf2.NewTask("Y", func() {}) + x.Precede(y) + }) + z := sf.NewTask("Z", func() {}) + inner.Precede(z) + }) + end := tf.NewTask("end", func() {}) + outer.Precede(end) + + executor.Run(tf).Wait() + + result := validate(executor, tf) + if !result.valid { + t.Errorf("expected valid for nested subflow, got: %s", result.String()) + } +} + +// TestValidatorIndependentTasks verifies that fully independent tasks (no edges) are all validated. +func TestValidatorIndependentTasks(t *testing.T) { + executor := NewExecutor(4, WithTracer()) + tf := NewTaskFlow("independent") + + tf.NewTask("T1", func() {}) + tf.NewTask("T2", func() {}) + tf.NewTask("T3", func() {}) + + executor.Run(tf).Wait() + + result := validate(executor, tf) + if !result.valid { + t.Errorf("expected valid, got: %s", result.String()) + } + if len(result.missingTasks) > 0 { + t.Errorf("expected no missing tasks, got: %v", result.missingTasks) + } +} + +// TestValidatorMissingTask verifies that tasks defined but never executed appear in missingTasks. +// We run tf1 (A->B), then validate against tf2 (A->B->C). C is never executed → missing. +func TestValidatorMissingTask(t *testing.T) { + executor := NewExecutor(4, WithTracer()) + + tf1 := NewTaskFlow("run") + a1 := tf1.NewTask("A", func() {}) + b1 := tf1.NewTask("B", func() {}) + a1.Precede(b1) + executor.Run(tf1).Wait() + + // Validate against a larger DAG that expects C (never ran) + tf2 := NewTaskFlow("expected") + a2 := tf2.NewTask("A", func() {}) + b2 := tf2.NewTask("B", func() {}) + c2 := tf2.NewTask("C", func() {}) + a2.Precede(b2) + b2.Precede(c2) + + result := validate(executor, tf2) + if result.valid { + t.Error("expected invalid: C was defined but not executed") + } + if !containsStr(result.missingTasks, "C") { + t.Errorf("expected C in missing tasks, got: %v", result.missingTasks) + } +} + +// TestValidatorUnexpectedTask verifies that tasks executed but not defined appear in unexpectedTasks. +// We run tf1 (A->B->C), then validate against tf2 (A->B only). C is unexpected. +func TestValidatorUnexpectedTask(t *testing.T) { + executor := NewExecutor(4, WithTracer()) + + tf1 := NewTaskFlow("run") + a1 := tf1.NewTask("A", func() {}) + b1 := tf1.NewTask("B", func() {}) + c1 := tf1.NewTask("C", func() {}) + a1.Precede(b1) + b1.Precede(c1) + executor.Run(tf1).Wait() + + // Validate against a smaller DAG that doesn't know about C + tf2 := NewTaskFlow("expected") + a2 := tf2.NewTask("A", func() {}) + b2 := tf2.NewTask("B", func() {}) + a2.Precede(b2) + + result := validate(executor, tf2) + if result.valid { + t.Error("expected invalid: C was executed but not defined") + } + if !containsStr(result.unexpectedTasks, "C") { + t.Errorf("expected C in unexpected tasks, got: %v", result.unexpectedTasks) + } +} + +// TestValidatorResultString verifies the String() output format. +func TestValidatorResultString(t *testing.T) { + r := &validationResult{ + valid: false, + missingTasks: []string{"X"}, + unexpectedTasks: []string{"Y"}, + dependencyErrors: []dependencyError{{task: "Z", expected: []string{"A"}, actual: []string{"B"}}}, + skippedBranches: []string{"W"}, + } + s := r.String() + for _, want := range []string{"X", "Y", "Z", "W", "validation failed"} { + if !containsSubstr(s, want) { + t.Errorf("expected %q in String() output, got:\n%s", want, s) + } + } + + // Valid result should return short string + valid := &validationResult{valid: true} + if valid.String() != "validation passed" { + t.Errorf("unexpected valid string: %q", valid.String()) + } +} + +// ---- helpers ---- + +func containsStr(slice []string, s string) bool { + for _, v := range slice { + if v == s { + return true + } + } + return false +} + +func containsSubstr(s, sub string) bool { + return strings.Contains(s, sub) +} From d2f753fbdbe4db1f235cd50839e6c48b38004b58 Mon Sep 17 00:00:00 2001 From: NoneBack Date: Thu, 19 Mar 2026 15:45:17 +0800 Subject: [PATCH 2/3] style(executor): normalize log output - merge double log.Printf per panic into single structured line - use %q for graph/task names, add [go-taskflow] prefix - remove internal scheduler noise logs (node-skip, subgraph-cancel propagation) --- executor.go | 13 ++++--------- options.go | 1 - 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/executor.go b/executor.go index a86a2ff..2cbedef 100644 --- a/executor.go +++ b/executor.go @@ -142,8 +142,7 @@ func (e *innerExecutorImpl) invokeStatic(node *innerNode, parentSpan *span, p *S if e.panicHandler != nil { e.panicHandler(node.name, r) } else { - log.Printf("graph %v is canceled, since static node %v panics", node.g.name, node.name) - log.Printf("[recovered] static node %s, panic: %v, stack: %s", node.name, r, debug.Stack()) + log.Printf("[go-taskflow] graph %q canceled: static task %q panicked: %v\n%s", node.g.name, node.name, r, debug.Stack()) } } e.record(&span, r == nil) @@ -175,8 +174,7 @@ func (e *innerExecutorImpl) invokeSubflow(node *innerNode, parentSpan *span, p * if e.panicHandler != nil { e.panicHandler(node.name, r) } else { - log.Printf("graph %v is canceled, since subflow %v panics", node.g.name, node.name) - log.Printf("[recovered] subflow %s, panic: %v, stack: %s", node.name, r, debug.Stack()) + log.Printf("[go-taskflow] graph %q canceled: subflow %q panicked: %v\n%s", node.g.name, node.name, r, debug.Stack()) } node.g.canceled.Store(true) p.g.canceled.Store(true) @@ -216,8 +214,7 @@ func (e *innerExecutorImpl) invokeCondition(node *innerNode, parentSpan *span, p if e.panicHandler != nil { e.panicHandler(node.name, r) } else { - log.Printf("graph %v is canceled, since condition node %v panics", node.g.name, node.name) - log.Printf("[recovered] condition node %s, panic: %v, stack: %s", node.name, r, debug.Stack()) + log.Printf("[go-taskflow] graph %q canceled: condition task %q panicked: %v\n%s", node.g.name, node.name, r, debug.Stack()) } } e.record(&span, r == nil) @@ -264,11 +261,10 @@ func (e *innerExecutorImpl) pushIntoQueue(node *innerNode) { func (e *innerExecutorImpl) schedule(nodes ...*innerNode) { for _, node := range nodes { if node.g.canceled.Load() { - // no need + // graph already canceled, skip scheduling node.g.scheCond.L.Lock() node.g.scheCond.Signal() node.g.scheCond.L.Unlock() - log.Printf("node %v is not scheduled, since graph %v is canceled\n", node.name, node.g.name) return } e.wg.Add(1) @@ -290,7 +286,6 @@ func (e *innerExecutorImpl) scheduleGraph(parentg, g *eGraph, parentSpan *span) e.schedule(g.entries...) if !e.invokeGraph(g, parentSpan) && parentg != nil { parentg.canceled.Store(true) - log.Printf("graph %s canceled, since subgraph %s is canceled\n", parentg.name, g.name) } g.scheCond.Signal() diff --git a/options.go b/options.go index 35c6b40..3b9ab1f 100644 --- a/options.go +++ b/options.go @@ -1,5 +1,4 @@ package gotaskflow -package gotaskflow // Option configures executor behavior. type Option func(*innerExecutorImpl) From 63c17447f36e211ad76afae98ac58f25122cd592 Mon Sep 17 00:00:00 2001 From: NoneBack Date: Thu, 19 Mar 2026 15:56:10 +0800 Subject: [PATCH 3/3] refactor(validator): accept traceRecord instead of Executor - add traceRecord type and tracer.snapshot() to tracer.go - validate() now takes traceRecord (immutable snapshot) instead of Executor - validator struct holds traceRecord, no longer locks tracer directly - add mustSnapshot() test helper to extract snapshot from executor - nil traceRecord treated as 'no trace data', always returns valid --- tracer.go | 13 +++++++++++++ validator.go | 31 ++++++++++++------------------- validator_test.go | 35 +++++++++++++++++++++++------------ 3 files changed, 48 insertions(+), 31 deletions(-) diff --git a/tracer.go b/tracer.go index da83681..c8f9a30 100644 --- a/tracer.go +++ b/tracer.go @@ -82,3 +82,16 @@ func (t *tracer) draw(w io.Writer) error { encoder.SetIndent("", " ") return encoder.Encode(t.events) } + +// traceRecord is an immutable snapshot of task execution events produced by a tracer. +// It represents the observed execution result of a TaskFlow run. +type traceRecord []chromeTraceEvent + +// snapshot returns an immutable copy of all recorded trace events. +func (t *tracer) snapshot() traceRecord { + t.mu.Lock() + defer t.mu.Unlock() + cp := make(traceRecord, len(t.events)) + copy(cp, t.events) + return cp +} diff --git a/validator.go b/validator.go index 1c594c1..507e9ce 100644 --- a/validator.go +++ b/validator.go @@ -5,15 +5,14 @@ import ( "strings" ) -// validate checks trace events against the expected TaskFlow DAG. -// e must have been created with WithTracer(); otherwise the result is always valid. +// validate checks a traceRecord against the expected TaskFlow DAG. +// A nil rec (e.g. no tracer configured) is treated as "no trace data" and always returns valid. // Internal-only: used in tests to verify execution correctness. -func validate(e Executor, tf *TaskFlow) *validationResult { - impl, ok := e.(*innerExecutorImpl) - if !ok || impl.tracer == nil { +func validate(rec traceRecord, tf *TaskFlow) *validationResult { + if rec == nil { return &validationResult{valid: true} } - return newValidator(impl.tracer).run(tf) + return newValidator(rec).run(tf) } // validationResult contains the result of validating trace events against a TaskFlow. @@ -57,34 +56,28 @@ func (r *validationResult) String() string { return sb.String() } -// validator validates trace events against TaskFlow definitions. +// validator validates a traceRecord against TaskFlow definitions. type validator struct { - tracer *tracer + rec traceRecord } -func newValidator(t *tracer) *validator { - return &validator{tracer: t} +func newValidator(rec traceRecord) *validator { + return &validator{rec: rec} } // run compares executed trace events against the expected TaskFlow DAG. func (v *validator) run(tf *TaskFlow) *validationResult { - if v.tracer == nil { - return &validationResult{valid: true} - } - result := &validationResult{valid: true} // --- Step 1: collect expected nodes via eGraph.walk --- expected := make(map[string]*innerNode) tf.graph.walk(func(n *innerNode) { expected[n.name] = n }) - // --- Step 2: collect executed events from tracer --- - v.tracer.mu.Lock() - executed := make(map[string]chromeTraceEvent, len(v.tracer.events)) - for _, ev := range v.tracer.events { + // --- Step 2: build executed map from the immutable record --- + executed := make(map[string]chromeTraceEvent, len(v.rec)) + for _, ev := range v.rec { executed[ev.Name] = ev } - v.tracer.mu.Unlock() // --- Step 3: missing / skipped check --- // A task is a skipped branch if: diff --git a/validator_test.go b/validator_test.go index d7abdf5..d6d946d 100644 --- a/validator_test.go +++ b/validator_test.go @@ -18,7 +18,7 @@ func TestValidatorSimpleSerial(t *testing.T) { executor.Run(tf).Wait() - result := validate(executor, tf) + result := validate(mustSnapshot(executor), tf) if !result.valid { t.Errorf("expected valid, got: %s", result.String()) } @@ -40,7 +40,7 @@ func TestValidatorParallel(t *testing.T) { executor.Run(tf).Wait() - result := validate(executor, tf) + result := validate(mustSnapshot(executor), tf) if !result.valid { t.Errorf("expected valid, got: %s", result.String()) } @@ -63,7 +63,7 @@ func TestValidatorSubflow(t *testing.T) { executor.Run(tf).Wait() - result := validate(executor, tf) + result := validate(mustSnapshot(executor), tf) if !result.valid { t.Errorf("expected valid, got: %s", result.String()) } @@ -85,7 +85,7 @@ func TestValidatorConditionBranch(t *testing.T) { executor.Run(tf).Wait() - result := validate(executor, tf) + result := validate(mustSnapshot(executor), tf) if !result.valid { t.Errorf("expected valid (C should be skipped branch), got: %s", result.String()) } @@ -106,9 +106,10 @@ func TestValidatorWithoutTracer(t *testing.T) { executor.Run(tf).Wait() - result := validate(executor, tf) + // nil record (no tracer) should be treated as always valid + result := validate(mustSnapshot(executor), tf) if !result.valid { - t.Errorf("expected valid (tracer disabled should pass), got: %s", result.String()) + t.Errorf("expected valid (no tracer = nil record), got: %s", result.String()) } } @@ -141,7 +142,7 @@ func TestValidatorComplexPipeline(t *testing.T) { executor.Run(tf).Wait() - result := validate(executor, tf) + result := validate(mustSnapshot(executor), tf) if !result.valid { t.Errorf("expected valid, got: %s", result.String()) } @@ -164,7 +165,7 @@ func TestValidatorConditionBranch1(t *testing.T) { executor.Run(tf).Wait() - result := validate(executor, tf) + result := validate(mustSnapshot(executor), tf) if !result.valid { t.Errorf("expected valid, got: %s", result.String()) } @@ -196,7 +197,7 @@ func TestValidatorNestedSubflow(t *testing.T) { executor.Run(tf).Wait() - result := validate(executor, tf) + result := validate(mustSnapshot(executor), tf) if !result.valid { t.Errorf("expected valid for nested subflow, got: %s", result.String()) } @@ -213,7 +214,7 @@ func TestValidatorIndependentTasks(t *testing.T) { executor.Run(tf).Wait() - result := validate(executor, tf) + result := validate(mustSnapshot(executor), tf) if !result.valid { t.Errorf("expected valid, got: %s", result.String()) } @@ -241,7 +242,7 @@ func TestValidatorMissingTask(t *testing.T) { a2.Precede(b2) b2.Precede(c2) - result := validate(executor, tf2) + result := validate(mustSnapshot(executor), tf2) if result.valid { t.Error("expected invalid: C was defined but not executed") } @@ -269,7 +270,7 @@ func TestValidatorUnexpectedTask(t *testing.T) { b2 := tf2.NewTask("B", func() {}) a2.Precede(b2) - result := validate(executor, tf2) + result := validate(mustSnapshot(executor), tf2) if result.valid { t.Error("expected invalid: C was executed but not defined") } @@ -303,6 +304,16 @@ func TestValidatorResultString(t *testing.T) { // ---- helpers ---- +// mustSnapshot extracts a traceRecord from an executor. +// Returns nil if the executor was not created with WithTracer(). +func mustSnapshot(e Executor) traceRecord { + impl := e.(*innerExecutorImpl) + if impl.tracer == nil { + return nil + } + return impl.tracer.snapshot() +} + func containsStr(slice []string, s string) bool { for _, v := range slice { if v == s {