Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func main() {
}
done.Succeed(sortTasks...)

executor := gtf.NewExecutor(1000)
executor := gtf.NewExecutor(1000, gtf.WithProfiler())

executor.Run(tf).Wait()

Expand Down Expand Up @@ -156,6 +156,30 @@ ok github.com/noneback/go-taskflow/benchmark 5.606s

Conditional nodes in go-taskflow behave similarly to those in [taskflow-cpp](https://github.com/taskflow/taskflow). They participate in both conditional control and looping. To avoid common pitfalls, refer to the [Conditional Tasking documentation](https://taskflow.github.io/taskflow/ConditionalTasking.html).

## Executor Options

`NewExecutor` accepts functional options to configure behavior:

```go
import "runtime"

executor := gtf.NewExecutor(0,
gtf.WithConcurrency(uint(runtime.NumCPU()*4)), // override concurrency
gtf.WithProfiler(), // enable flamegraph profiling
gtf.WithTracer(), // enable Chrome Trace recording
gtf.WithPanicHandler(func(task string, r interface{}) {
log.Printf("task %s panicked: %v", task, r)
}),
)
```

| Option | Description |
|:---|:---|
| `WithConcurrency(n uint)` | Set max goroutine concurrency (overrides positional arg). Enables `NewExecutor(0, WithConcurrency(n))` style. |
| `WithProfiler()` | Enable flamegraph profiling. Required before calling `executor.Profile()`. |
| `WithTracer()` | Enable Chrome Trace recording. Required before calling `executor.Trace()`. |
| `WithPanicHandler(fn)` | Custom panic handler invoked on task panic. Replaces default log output. Graph is still canceled. |

## Error Handling in go-taskflow

In Go, `errors` are values, and it is the user's responsibility to handle them appropriately. Only unrecovered `panic` events are managed by the framework. If a `panic` occurs, the entire parent graph is canceled, leaving the remaining tasks incomplete. This behavior may evolve in the future. If you have suggestions, feel free to share them.
Expand All @@ -173,6 +197,14 @@ tf.NewTask("not interrupt", func() {
})
```

Alternatively, use `WithPanicHandler` to centralize panic handling across all tasks:

```go
executor := gtf.NewExecutor(1000, gtf.WithPanicHandler(func(task string, r interface{}) {
log.Printf("task %s panicked: %v", task, r)
}))
```

## Visualizing Taskflows

To generate a visual representation of a taskflow, use the `Dump` method:
Expand All @@ -189,9 +221,12 @@ The `Dump` method generates raw strings in DOT format. Use the `dot` tool to cre

## Profiling Taskflows

To profile a taskflow, use the `Profile` method:
To profile a taskflow, first enable the profiler with `WithProfiler()`, then call `Profile`:

```go
executor := gtf.NewExecutor(1000, gtf.WithProfiler())
executor.Run(tf).Wait()

if err := executor.Profile(os.Stdout); err != nil {
log.Fatal(err)
}
Expand All @@ -201,6 +236,21 @@ The `Profile` method generates raw strings in flamegraph format. Use the `flameg

![flg](image/fl.svg)

## Tracing Taskflows

To capture Chrome Trace events, enable the tracer with `WithTracer()`, then call `Trace`:

```go
executor := gtf.NewExecutor(1000, gtf.WithTracer())
executor.Run(tf).Wait()

if err := executor.Trace(os.Stdout); err != nil {
log.Fatal(err)
}
```

The output is in [Chrome Trace Event format](https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU). Open it in `chrome://tracing` or [Perfetto UI](https://ui.perfetto.dev/) for visualization.

## Stargazer

[![Star History Chart](https://api.star-history.com/svg?repos=noneback/go-taskflow&type=Date)](https://star-history.com/#noneback/go-taskflow&Date)
Expand Down
120 changes: 84 additions & 36 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,39 @@ import (
type Executor interface {
Wait() // Wait block until all tasks finished
Profile(w io.Writer) error // Profile write flame graph raw text into w
Trace(w io.Writer) error // Trace write Chrome Trace Event data into w
Run(tf *TaskFlow) Executor // Run start to schedule and execute taskflow
}

type innerExecutorImpl struct {
concurrency uint
pool *utils.Copool
wq *utils.Queue[*innerNode]
wg *sync.WaitGroup
profiler *profiler
mu *sync.Mutex
concurrency uint
pool *utils.Copool
wq *utils.Queue[*innerNode]
wg *sync.WaitGroup
profiler *profiler
tracer *tracer
panicHandler func(task string, r interface{})
mu *sync.Mutex
}

// NewExecutor return a Executor with a specified max goroutine concurrency(recommend a value bigger than Runtime.NumCPU, **MUST** bigger than num(subflows). )
func NewExecutor(concurrency uint) Executor {
if concurrency == 0 {
panic("executor concurrency cannot be zero")
}
t := newProfiler()
return &innerExecutorImpl{
// NewExecutor returns an Executor with the specified concurrency and options.
// concurrency can be 0 when WithConcurrency option is provided.
// Recommend concurrency > runtime.NumCPU and MUST > num(subflows).
func NewExecutor(concurrency uint, opts ...Option) Executor {
e := &innerExecutorImpl{
concurrency: concurrency,
pool: utils.NewCopool(concurrency),
wq: utils.NewQueue[*innerNode](false),
wg: &sync.WaitGroup{},
profiler: t,
mu: &sync.Mutex{},
}
for _, opt := range opts {
opt(e)
}
if e.concurrency == 0 {
panic("executor concurrency cannot be zero")
}
e.pool = utils.NewCopool(e.concurrency)
return e
}

// Run start to schedule and execute taskflow
Expand Down Expand Up @@ -97,22 +104,48 @@ func (e *innerExecutorImpl) sche_successors(node *innerNode) {
e.schedule(candidate...)
}

// record submits the span to active observers.
// ok=true means the node completed without panic; profiler only records successful spans.
func (e *innerExecutorImpl) record(s *span, ok bool) {
if ok && e.profiler != nil {
e.profiler.AddSpan(s)
}
if e.tracer != nil {
e.tracer.AddEvent(s)
}
}

// getDependentNames extracts predecessor task names from a node.
func getDependentNames(node *innerNode) []string {
if len(node.dependents) == 0 {
return nil
}
names := make([]string, len(node.dependents))
for i, dep := range node.dependents {
names[i] = dep.name
}
return names
}

func (e *innerExecutorImpl) invokeStatic(node *innerNode, parentSpan *span, p *Static) func() {
return func() {
span := span{extra: attr{
typ: nodeStatic,
name: node.name,
}, begin: time.Now(), parent: parentSpan}
}, begin: time.Now(), parent: parentSpan, dependents: getDependentNames(node)}

defer func() {
span.cost = time.Since(span.begin)
if r := recover(); r != nil {
r := recover()
if r != nil {
node.g.canceled.Store(true)
log.Printf("graph %v is canceled, since static node %v panics", node.g.name, node.name)
log.Printf("[recovered] static node %s, panic: %v, stack: %s", node.name, r, debug.Stack())
} else {
e.profiler.AddSpan(&span) // remove canceled node span
if e.panicHandler != nil {
e.panicHandler(node.name, r)
} else {
log.Printf("[go-taskflow] graph %q canceled: static task %q panicked: %v\n%s", node.g.name, node.name, r, debug.Stack())
}
}
e.record(&span, r == nil)

node.drop()
e.sche_successors(node)
Expand All @@ -132,18 +165,21 @@ func (e *innerExecutorImpl) invokeSubflow(node *innerNode, parentSpan *span, p *
span := span{extra: attr{
typ: nodeSubflow,
name: node.name,
}, begin: time.Now(), parent: parentSpan}
}, begin: time.Now(), parent: parentSpan, dependents: getDependentNames(node)}

defer func() {
span.cost = time.Since(span.begin)
if r := recover(); r != nil {
log.Printf("graph %v is canceled, since subflow %v panics", node.g.name, node.name)
log.Printf("[recovered] subflow %s, panic: %v, stack: %s", node.name, r, debug.Stack())
r := recover()
if r != nil {
if e.panicHandler != nil {
e.panicHandler(node.name, r)
} else {
log.Printf("[go-taskflow] graph %q canceled: subflow %q panicked: %v\n%s", node.g.name, node.name, r, debug.Stack())
}
node.g.canceled.Store(true)
p.g.canceled.Store(true)
} else {
e.profiler.AddSpan(&span) // remove canceled node span
}
e.record(&span, r == nil)

e.scheduleGraph(node.g, p.g, &span)
node.drop()
Expand All @@ -168,17 +204,20 @@ func (e *innerExecutorImpl) invokeCondition(node *innerNode, parentSpan *span, p
span := span{extra: attr{
typ: nodeCondition,
name: node.name,
}, begin: time.Now(), parent: parentSpan}
}, begin: time.Now(), parent: parentSpan, dependents: getDependentNames(node)}

defer func() {
span.cost = time.Since(span.begin)
if r := recover(); r != nil {
r := recover()
if r != nil {
node.g.canceled.Store(true)
log.Printf("graph %v is canceled, since condition node %v panics", node.g.name, node.name)
log.Printf("[recovered] condition node %s, panic: %v, stack: %s", node.name, r, debug.Stack())
} else {
e.profiler.AddSpan(&span) // remove canceled node span
if e.panicHandler != nil {
e.panicHandler(node.name, r)
} else {
log.Printf("[go-taskflow] graph %q canceled: condition task %q panicked: %v\n%s", node.g.name, node.name, r, debug.Stack())
}
}
e.record(&span, r == nil)
node.drop()
// e.sche_successors(node)
node.g.deref()
Expand Down Expand Up @@ -222,11 +261,10 @@ func (e *innerExecutorImpl) pushIntoQueue(node *innerNode) {
func (e *innerExecutorImpl) schedule(nodes ...*innerNode) {
for _, node := range nodes {
if node.g.canceled.Load() {
// no need
// graph already canceled, skip scheduling
node.g.scheCond.L.Lock()
node.g.scheCond.Signal()
node.g.scheCond.L.Unlock()
log.Printf("node %v is not scheduled, since graph %v is canceled\n", node.name, node.g.name)
return
}
e.wg.Add(1)
Expand All @@ -248,7 +286,6 @@ func (e *innerExecutorImpl) scheduleGraph(parentg, g *eGraph, parentSpan *span)
e.schedule(g.entries...)
if !e.invokeGraph(g, parentSpan) && parentg != nil {
parentg.canceled.Store(true)
log.Printf("graph %s canceled, since subgraph %s is canceled\n", parentg.name, g.name)
}

g.scheCond.Signal()
Expand All @@ -261,5 +298,16 @@ func (e *innerExecutorImpl) Wait() {

// Profile write flame graph raw text into w
func (e *innerExecutorImpl) Profile(w io.Writer) error {
if e.profiler == nil {
return nil
}
return e.profiler.draw(w)
}

// Trace write Chrome Trace Event data into w
func (e *innerExecutorImpl) Trace(w io.Writer) error {
if e.tracer == nil {
return nil
}
return e.tracer.draw(w)
}
52 changes: 51 additions & 1 deletion executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import (
"fmt"
"os"
"runtime"
"sync/atomic"
"testing"
"time"

gotaskflow "github.com/noneback/go-taskflow"
"github.com/noneback/go-taskflow/utils"
)

func TestExecutor(t *testing.T) {
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()))
executor := gotaskflow.NewExecutor(uint(runtime.NumCPU()), gotaskflow.WithProfiler())
tf := gotaskflow.NewTaskFlow("G")
A, B, C :=
tf.NewTask("A", func() {
Expand Down Expand Up @@ -91,3 +93,51 @@ func TestPanicInSubflow(t *testing.T) {
make_install.Precede(relink)
executor.Run(tf).Wait()
}

// TestWithConcurrencyOption verifies that NewExecutor(0, WithConcurrency(n)) works correctly.
func TestWithConcurrencyOption(t *testing.T) {
executor := gotaskflow.NewExecutor(0, gotaskflow.WithConcurrency(uint(runtime.NumCPU())))
tf := gotaskflow.NewTaskFlow("G")

var count atomic.Int32
A := tf.NewTask("A", func() { count.Add(1) })
B := tf.NewTask("B", func() { count.Add(1) })
C := tf.NewTask("C", func() { count.Add(1) })
A.Precede(B)
C.Precede(B)

executor.Run(tf).Wait()

if count.Load() != 3 {
t.Errorf("expected count=3, got %d", count.Load())
}
}

// TestWithPanicHandlerOption verifies that a custom panic handler is invoked on task panic.
func TestWithPanicHandlerOption(t *testing.T) {
var panickedTask string
var panicVal interface{}

executor := gotaskflow.NewExecutor(10, gotaskflow.WithPanicHandler(func(task string, r interface{}) {
panickedTask = task
panicVal = r
}))
tf := gotaskflow.NewTaskFlow("G")
tf.NewTask("boom", func() { panic("test panic") })

executor.Run(tf).Wait()

if panickedTask != "boom" {
t.Errorf("expected panickedTask=%q, got %q", "boom", panickedTask)
}
if panicVal != "test panic" {
t.Errorf("expected panicVal=%q, got %v", "test panic", panicVal)
}
}

// TestWithConcurrencyPanic verifies that NewExecutor(0) without WithConcurrency still panics.
func TestWithConcurrencyPanic(t *testing.T) {
utils.AssertPanics(t, "concurrency zero", func() {
gotaskflow.NewExecutor(0)
})
}
Loading
Loading