-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathhandler.go
More file actions
145 lines (122 loc) · 4.43 KB
/
handler.go
File metadata and controls
145 lines (122 loc) · 4.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package orbital
import (
"context"
"time"
"github.com/google/uuid"
slogctx "github.com/veqryn/slog-context"
"github.com/openkcm/orbital/internal/clock"
)
const (
handlerResultProcessing handlerResultType = "PROCESSING"
handlerResultDone handlerResultType = "DONE"
handlerResultFailed handlerResultType = "FAILED"
)
type (
// HandlerFunc processes a handler request and populates the handler response.
// Per default, the handler response will continue processing and the working state will be preserved.
HandlerFunc func(ctx context.Context, request HandlerRequest, resp *HandlerResponse)
// HandlerRequest contains information extracted from orbital.TaskRequest
// that are relevant for the operator's processing.
HandlerRequest struct {
TaskID uuid.UUID
TaskType string
TaskData []byte
TaskRawWorkingState []byte
TaskCreatedAt time.Time
TaskLastReconciledAt time.Time
}
// HandlerResponse is used by the handler to indicate the result of processing.
HandlerResponse struct {
reconcileAfterSec uint64
rawWorkingState []byte
result handlerResultType
errorMessage string
workingState *WorkingState
}
handlerResultType string
)
// WorkingState returns the WorkingState from the HandlerResponse.
// It returns an error if the decoding of the rawWorkingState fails.
//
// The WorkingState is automatically encoded back into the orbital.TaskResponse.
// If the working state is not decoded,
// if the changes are discarded,
// or if there is an error during encoding,
// the rawWorkingState field will take precedence.
func (r *HandlerResponse) WorkingState() (*WorkingState, error) {
if r.workingState != nil {
return r.workingState, nil
}
if len(r.rawWorkingState) == 0 {
workingState := &WorkingState{
s: make(map[string]any),
}
r.workingState = workingState
return workingState, nil
}
workingState, err := decodeWorkingState(r.rawWorkingState)
if err != nil {
return nil, err
}
r.workingState = workingState
return workingState, nil
}
// UseRawWorkingState allows the handler to set the rawWorkingState directly.
//
// Note: If the WorkingState is used in the handler,
// the changes in the WorkingState take precedence over the rawWorkingState.
func (r *HandlerResponse) UseRawWorkingState(raw []byte) {
r.rawWorkingState = raw
}
// ContinueAndWaitFor indicates that the handler has processed the request and wants to continue processing after a defined duration.
//
// Note: Duration will be converted to seconds and rounded down.
func (r *HandlerResponse) ContinueAndWaitFor(duration time.Duration) {
r.reconcileAfterSec = uint64(duration.Seconds())
r.result = handlerResultProcessing
}
// Fail indicates that the handler has processed the request and wants to mark the task as failed with a reason.
//
// Note: This will terminate the processing of the task.
func (r *HandlerResponse) Fail(reason string) {
r.errorMessage = reason
r.result = handlerResultFailed
}
// Complete indicates that the handler has processed the request and wants to mark the task as completed.
//
// Note: This will terminate the processing of the task.
func (r *HandlerResponse) Complete() {
r.result = handlerResultDone
}
// ExecuteHandler transforms the TaskRequest into a HandlerRequest, executes the handler, and transforms the HandlerResponse back into a TaskResponse.
func ExecuteHandler(ctx context.Context, h HandlerFunc, req TaskRequest) TaskResponse {
resp := req.prepareResponse()
hReq := HandlerRequest{
TaskID: req.TaskID,
TaskType: req.Type,
TaskData: req.Data,
TaskCreatedAt: clock.TimeFromUnixNano(req.TaskCreatedAt),
TaskLastReconciledAt: clock.TimeFromUnixNano(req.TaskLastReconciledAt),
TaskRawWorkingState: req.WorkingState,
}
hResp := HandlerResponse{
result: handlerResultProcessing,
rawWorkingState: req.WorkingState,
}
start := time.Now()
h(ctx, hReq, &hResp)
slogctx.Debug(ctx, "task handler finished", "processingTime", time.Since(start))
resp.Status = string(hResp.result)
resp.ErrorMessage = hResp.errorMessage
resp.ReconcileAfterSec = hResp.reconcileAfterSec
resp.WorkingState = hResp.rawWorkingState
if hResp.workingState != nil && hResp.workingState.s != nil {
encodedState, err := hResp.workingState.encode()
if err != nil {
slogctx.Warn(ctx, "failed to encode working state", "error", err)
return resp
}
resp.WorkingState = encodedState
}
return resp
}