-
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdurable_types.go
More file actions
78 lines (66 loc) · 2.13 KB
/
durable_types.go
File metadata and controls
78 lines (66 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package worker
import (
"context"
"time"
"github.com/google/uuid"
"github.com/hyp3rd/ewrap"
"google.golang.org/protobuf/proto"
)
// DurableTask represents a task that can be persisted and rehydrated.
type DurableTask struct {
ID uuid.UUID
Handler string
Message proto.Message
Payload []byte
Priority int
RunAt time.Time
Queue string
Weight int
Retries int
RetryDelay time.Duration
Metadata map[string]string
}
// DurableTaskLease represents a leased durable task.
type DurableTaskLease struct {
Task DurableTask
LeaseID string
Attempts int
MaxRetries int
}
// DurableHandlerSpec describes a durable task handler.
type DurableHandlerSpec struct {
Make func() proto.Message
Fn func(ctx context.Context, payload proto.Message) (any, error)
}
// DurableCodec marshals and unmarshals durable task payloads.
type DurableCodec interface {
Marshal(msg proto.Message) ([]byte, error)
Unmarshal(data []byte, msg proto.Message) error
}
// ProtoDurableCodec uses protobuf for serialization.
type ProtoDurableCodec struct{}
// Marshal marshals a protobuf message to bytes.
func (ProtoDurableCodec) Marshal(msg proto.Message) ([]byte, error) {
data, err := proto.Marshal(msg)
if err != nil {
return nil, ewrap.Wrap(err, "marshal protobuf message")
}
return data, nil
}
// Unmarshal unmarshals bytes into a protobuf message.
func (ProtoDurableCodec) Unmarshal(data []byte, msg proto.Message) error {
err := ewrap.Wrap(proto.Unmarshal(data, msg), "unmarshal protobuf message")
if err != nil {
return ewrap.Wrap(err, "unmarshal protobuf message")
}
return nil
}
// DurableBackend provides persistence and leasing for durable tasks.
type DurableBackend interface {
Enqueue(ctx context.Context, task DurableTask) error
Dequeue(ctx context.Context, limit int, lease time.Duration) ([]DurableTaskLease, error)
Ack(ctx context.Context, lease DurableTaskLease) error
Nack(ctx context.Context, lease DurableTaskLease, delay time.Duration) error
Fail(ctx context.Context, lease DurableTaskLease, err error) error
Extend(ctx context.Context, lease DurableTaskLease, leaseDuration time.Duration) error
}