-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaction_cdc.go
More file actions
251 lines (223 loc) · 6.58 KB
/
action_cdc.go
File metadata and controls
251 lines (223 loc) · 6.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
package queue
import (
"context"
"fmt"
"log"
"github.com/gofrs/uuid"
"github.com/jackc/pgx/v4/pgxpool"
)
// CDCAction Action that should be taken based on row changes
type CDCAction string
// This implements the structure for a CDC (Change Data Capture) task, which
// will monitor for changes to specified records, and then call functions
// based on whether there is a new record (CREATE), a record has been removed
// (DELETE), or if a record has changed (UPDATE) based on selected fields.
//
// It works by maintaining a list of records that have been created, updated,
// or deleted, along with a hash at the time. The hash is created within
// the database based on fields that we care to track. For example, a record
// may have a field we don't care about, so we exclude that from the hash
// calculation when determining if a record has changed.
const (
// CDCActionCreate A new record should be created
CDCActionCreate CDCAction = "CREATE"
// CDCActionDelete An existing record should be removed
CDCActionDelete CDCAction = "DELETE"
// CDCActionUpdate An existing record should be updated
CDCActionUpdate CDCAction = "UPDATE"
)
// CDCRunnerAction Runs at regular intervals. Takes in an arbitrary array
// of arrays. The outer array represents rows of data with the changes to be
// tracked, while the inner array represents an ordered list of fields. Order
// is important, because this is used to calculate the MD5 hash.
// array of arrays
// controllerID: A unique ID
type CDCRunnerAction struct {
controllerID uuid.UUID
sourceQuery string
schema string
db *pgxpool.Conn
createFunc func(context.Context, CDCObjectAction) error
updateFunc func(context.Context, CDCObjectAction) error
deleteFunc func(context.Context, CDCObjectAction) error
limit int // How many to grab from database at one time
}
// CDCObjectAction Object ID along with the action that should be taken
type CDCObjectAction struct {
ObjectID string
Hash string
Action CDCAction
ControllerID string
cdcAction CDCRunnerAction
stream string
}
// NewCDCRunnerAction Creates and initialises a new Execute SQL Processor.
//
// sourceQuery: Must be a query that returns the following values:
// * object_id: a unique field identifying the object in question (likely a
// primary key
// * hash: an md5 hash (fast) of the object, which should change when and only
// when you want a change to be noted
// * schema: database schema that has the cdc_hash table
// * db a pgx ConnPool pointer with a live connection to database (postgres
// only supported)
func NewCDCRunnerAction(
controllerID uuid.UUID,
sourceQuery string,
schema string,
db *pgxpool.Conn,
createFunc func(context.Context, CDCObjectAction) error,
updateFunc func(context.Context, CDCObjectAction) error,
deleteFunc func(context.Context, CDCObjectAction) error,
limit int,
) (CDCRunnerAction, error) {
var err error
c := CDCRunnerAction{
controllerID: controllerID,
sourceQuery: sourceQuery,
schema: schema,
db: db,
createFunc: createFunc,
updateFunc: updateFunc,
deleteFunc: deleteFunc,
limit: limit,
}
if db == nil {
err = fmt.Errorf("DB cannot be nil")
}
return c, err
}
// Do Run a loop of trying to sync objects needing to sync
func (c CDCRunnerAction) Do() error {
ctx := context.Background()
if c.limit == 0 {
c.limit = 1
}
objects, err := c.GetChanges(ctx, c.limit)
if err != nil {
return err
}
// Log out for now:
for _, v := range objects {
err = v.MarkDone(ctx)
if err != nil {
log.Printf("Error marking as done: %s", err)
return err
}
}
return nil
}
// Stream Can't run actions for the same controller simultaneously
func (c CDCRunnerAction) Stream() string {
return c.controllerID.String()
}
// MarkDone Mark this action as completed
func (c CDCObjectAction) MarkDone(ctx context.Context) error {
var err error
switch c.Action {
case CDCActionCreate:
if c.cdcAction.createFunc != nil {
err = c.cdcAction.createFunc(ctx, c)
}
case CDCActionUpdate:
if c.cdcAction.updateFunc != nil {
err = c.cdcAction.updateFunc(ctx, c)
}
case CDCActionDelete:
if c.cdcAction.deleteFunc != nil {
err = c.cdcAction.deleteFunc(ctx, c)
}
default:
log.Printf("Error: Unknown action type %s", c.Action)
}
if err != nil {
return err
}
switch c.Action {
case CDCActionCreate, CDCActionUpdate:
_, err = c.cdcAction.db.Exec(ctx, `
INSERT INTO `+c.cdcAction.schema+`.cdc_hash (cdc_controller_id, object_id, hash)
VALUES($1, $2, $3)
ON CONFLICT ON CONSTRAINT cdc_hash_controller_object_uq DO
UPDATE SET hash = EXCLUDED.hash, updated_at = Now()`, c.ControllerID, c.ObjectID, c.Hash)
case CDCActionDelete:
_, err = c.cdcAction.db.Exec(ctx, `
DELETE FROM `+c.cdcAction.schema+`.cdc_hash
WHERE cdc_controller_id = $1
AND object_id = $2
AND hash = $3::uuid`, c.ControllerID, c.ObjectID, c.Hash)
default:
log.Printf("Error: Unknown action type %s", c.Action)
}
return err
}
// GetChanges Returns up to 'n' random rows that need updating/creating/deleting
func (c CDCRunnerAction) GetChanges(
ctx context.Context, n int,
) ([]CDCObjectAction, error) {
// Find all the changes
qry := fmt.Sprintf(`
WITH current AS (
%s
)
SELECT
COALESCE(s.object_id, c.object_id),
CASE
WHEN (s.object_id IS NULL) THEN 'CREATE'
WHEN (c.object_id IS NULL) THEN 'DELETE'
WHEN (COALESCE((c.hash != s.hash))) THEN 'UPDATE'
END AS action,
CASE
WHEN (s.object_id IS NULL) THEN c.hash::varchar
WHEN (c.object_id IS NULL) THEN s.hash::varchar
WHEN (COALESCE((c.hash != s.hash))) THEN c.hash::varchar
END AS hash,
$1 AS controller_id
FROM %s.cdc_hash s
FULL OUTER JOIN current c
ON c.object_id = s.object_id
AND s.cdc_controller_id = $1
WHERE (
s.hash != c.hash
OR s IS NULL
OR c IS NULL
)
ORDER BY RANDOM()
LIMIT %d
`, c.sourceQuery, c.schema, n)
rows, err := c.db.Query(ctx, qry, c.controllerID)
if err != nil {
return []CDCObjectAction{}, err
}
defer rows.Close()
var actions []CDCObjectAction
for rows.Next() {
action := CDCObjectAction{}
if err := rows.Scan(
&action.ObjectID,
&action.Action,
&action.Hash,
&action.ControllerID,
); err != nil {
return []CDCObjectAction{}, err
}
action.cdcAction = c
actions = append(actions, action)
}
return actions, nil
}
// GetObject Returns a CDCObjectAction for manual marking of done
func (c CDCRunnerAction) GetObject(
objectID uuid.UUID,
action CDCAction,
hash uuid.UUID,
) CDCObjectAction {
obj := CDCObjectAction{
ObjectID: objectID.String(),
Action: action,
Hash: hash.String(),
ControllerID: c.controllerID.String(),
cdcAction: c,
}
return obj
}