-
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathinterface.go
More file actions
99 lines (86 loc) · 3.03 KB
/
interface.go
File metadata and controls
99 lines (86 loc) · 3.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package worker
import (
"context"
"time"
"github.com/google/uuid"
"go.opentelemetry.io/otel/metric"
)
// Service is an interface for a task manager.
type Service interface {
workerOperations
taskOperations
durableOperations
metricsOperations
retentionOperations
hooksOperations
tracerOperations
}
type workerOperations interface {
// RegisterTask registers a new task to the worker.
RegisterTask(ctx context.Context, task *Task) error
// RegisterTasks registers multiple tasks to the worker.
RegisterTasks(ctx context.Context, tasks ...*Task) error
// StartWorkers starts the task manager's workers (idempotent).
StartWorkers(ctx context.Context)
// SetMaxWorkers adjusts the worker pool size.
SetMaxWorkers(n int)
// Wait waits for all tasks to finish or context cancellation.
Wait(ctx context.Context) error
// StopGraceful stops accepting new tasks and waits for completion.
StopGraceful(ctx context.Context) error
// StopNow cancels running tasks and stops workers immediately.
StopNow()
}
type taskOperations interface {
// CancelAll cancels all tasks.
CancelAll()
// CancelTask cancels a task by its ID.
CancelTask(id uuid.UUID) error
// GetActiveTasks returns the number of running tasks.
GetActiveTasks() int
// GetResults returns a results channel (compatibility shim for legacy API).
GetResults() <-chan Result
// SubscribeResults returns a results channel and unsubscribe function.
SubscribeResults(buffer int) (<-chan Result, func())
// SetResultsDropPolicy configures how full subscriber buffers are handled.
SetResultsDropPolicy(policy ResultDropPolicy)
// GetTask gets a task by its ID.
GetTask(id uuid.UUID) (task *Task, err error)
// GetTasks gets all tasks.
GetTasks() []*Task
// ExecuteTask executes a task given its ID and returns the result.
ExecuteTask(ctx context.Context, id uuid.UUID, timeout time.Duration) (any, error)
}
type durableOperations interface {
// RegisterDurableTask registers a durable task in the configured backend.
RegisterDurableTask(ctx context.Context, task DurableTask) error
// RegisterDurableTasks registers multiple durable tasks.
RegisterDurableTasks(ctx context.Context, tasks ...DurableTask) error
}
type metricsOperations interface {
// GetMetrics returns a snapshot of task metrics.
GetMetrics() MetricsSnapshot
// SetMeterProvider enables OpenTelemetry metrics collection.
SetMeterProvider(provider metric.MeterProvider, opts ...OTelMetricsOption) error
}
type retentionOperations interface {
// SetRetentionPolicy configures task registry retention.
SetRetentionPolicy(policy RetentionPolicy)
}
type hooksOperations interface {
// SetHooks configures task lifecycle hooks.
SetHooks(hooks TaskHooks)
}
type tracerOperations interface {
// SetTracer configures task tracing.
SetTracer(tracer TaskTracer)
}
// Middleware describes a generic middleware.
type Middleware[T any] func(T) T
// RegisterMiddleware registers middlewares to the provided service.
func RegisterMiddleware[T any](svc T, mw ...Middleware[T]) T {
for _, m := range mw {
svc = m(svc)
}
return svc
}