-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathJobMetadataDb.java
More file actions
655 lines (594 loc) · 39.3 KB
/
JobMetadataDb.java
File metadata and controls
655 lines (594 loc) · 39.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.diff;
import java.math.BigInteger;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.datastax.driver.core.exceptions.QueryValidationException;
import com.datastax.driver.core.utils.UUIDs;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
public class JobMetadataDb {
private static final Logger logger = LoggerFactory.getLogger(JobMetadataDb.class);
@FunctionalInterface
private interface QueryExecution {
ResultSet execute() throws DriverException;
}
/**
* Ignore QueryConsistencyException (e.g. Read/Write timeout/failure) from the queryExecution.
* @param queryExecution
* @return resultSet. In the case of QueryConsistencyException, null is returned.
*/
private static ResultSet ignoreQueryException(QueryExecution queryExecution) {
try {
return queryExecution.execute();
}
catch (QueryExecutionException queryException) {
logger.warn("Ignoring a failed query.", queryException);
return null;
}
}
static class ProgressTracker {
private final UUID jobId;
private final int bucket;
private final String startToken;
private final String endToken;
private final String metadataKeyspace;
private final Session session;
private static PreparedStatement updateStmt;
private static PreparedStatement mismatchStmt;
private static PreparedStatement errorSummaryStmt;
private static PreparedStatement errorDetailStmt;
private static PreparedStatement updateCompleteStmt;
public ProgressTracker(UUID jobId,
int bucket,
BigInteger startToken,
BigInteger endToken,
String metadataKeyspace,
Session session) {
this.jobId = jobId;
this.bucket = bucket;
this.startToken = startToken.toString();
this.endToken = endToken.toString();
this.metadataKeyspace = metadataKeyspace;
this.session = session;
}
/**
* Runs on each executor to prepare statements shared across all instances
*/
public static void initializeStatements(Session session, String metadataKeyspace) {
if (updateStmt == null) {
updateStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
" job_id," +
" bucket," +
" qualified_table_name," +
" start_token," +
" end_token," +
" matched_partitions," +
" mismatched_partitions," +
" partitions_only_in_source," +
" partitions_only_in_target," +
" matched_rows," +
" matched_values," +
" mismatched_values," +
" skipped_partitions," +
" last_token )" +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
metadataKeyspace, Schema.TASK_STATUS));
}
if (mismatchStmt == null) {
mismatchStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
" job_id," +
" bucket," +
" qualified_table_name," +
" mismatching_token," +
" mismatch_type )" +
"VALUES (?, ?, ?, ?, ?)",
metadataKeyspace, Schema.MISMATCHES));
}
if (updateCompleteStmt == null) {
updateCompleteStmt = session.prepare(String.format("UPDATE %s.%s " +
" SET completed = completed + 1" +
" WHERE job_id = ? " +
" AND bucket = ? " +
" AND qualified_table_name = ? ",
metadataKeyspace, Schema.JOB_STATUS))
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
}
if (errorSummaryStmt == null) {
errorSummaryStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
" job_id," +
" bucket," +
" qualified_table_name," +
" start_token," +
" end_token)" +
" VALUES (?, ?, ?, ?, ?)",
metadataKeyspace, Schema.ERROR_SUMMARY));
}
if (errorDetailStmt == null) {
errorDetailStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
" job_id," +
" bucket," +
" qualified_table_name," +
" start_token," +
" end_token," +
" error_token," +
" error_source)" +
" VALUES (?, ?, ?, ?, ?, ?, ?)",
metadataKeyspace, Schema.ERROR_DETAIL));
}
}
public static void resetStatements()
{
updateStmt = null;
mismatchStmt = null;
errorSummaryStmt = null;
errorDetailStmt = null;
updateCompleteStmt = null;
}
/**
*
* @param keyspaceTablePair
* @return
*/
public DiffJob.TaskStatus getLastStatus(KeyspaceTablePair keyspaceTablePair) {
ResultSet rs = ignoreQueryException(
() -> session.execute(String.format("SELECT last_token, " +
" matched_partitions, " +
" mismatched_partitions, " +
" partitions_only_in_source, " +
" partitions_only_in_target, " +
" matched_rows," +
" matched_values," +
" mismatched_values," +
" skipped_partitions " +
" FROM %s.%s " +
" WHERE job_id = ? " +
" AND bucket = ? " +
" AND qualified_table_name = ? " +
" AND start_token = ? " +
" AND end_token = ?",
metadataKeyspace, Schema.TASK_STATUS),
jobId, bucket, keyspaceTablePair.toCqlValueString(), startToken, endToken));
Row row = (rs == null) ? null : rs.one();
if (null == row)
return DiffJob.TaskStatus.EMPTY;
RangeStats stats = RangeStats.withValues(getOrDefaultLong(row, "matched_partitions"),
getOrDefaultLong(row, "mismatched_partitions"),
0L, // error counts are per-run and not persisted in the metadata db
getOrDefaultLong(row, "skipped_partitions"),
getOrDefaultLong(row, "partitions_only_in_source"),
getOrDefaultLong(row, "partitions_only_in_target"),
getOrDefaultLong(row, "matched_rows"),
getOrDefaultLong(row, "matched_values"),
getOrDefaultLong(row, "mismatched_values"));
BigInteger lastToken = row.isNull("last_token") ? null : new BigInteger(row.getString("last_token"));
return new DiffJob.TaskStatus(lastToken, stats);
}
/**
*
* @param table
* @param diffStats
* @param latestToken
*/
public void updateStatus(KeyspaceTablePair table, RangeStats diffStats, BigInteger latestToken) {
ignoreQueryException(() -> session.execute(bindUpdateStatement(table, diffStats, latestToken)));
}
public void recordMismatch(KeyspaceTablePair table, MismatchType type, BigInteger token) {
logger.info("Detected mismatch in table {}; partition with token {} is {}",
table, token, type == MismatchType.PARTITION_MISMATCH
? " different in source and target clusters"
: type == MismatchType.ONLY_IN_SOURCE ? "only present in source cluster"
: "only present in target cluster");
ignoreQueryException(() -> session.execute(bindMismatchesStatement(table, token, type.name())));
}
/**
*
* @param table
* @param token
* @param error
*/
public void recordError(KeyspaceTablePair table, BigInteger token, Throwable error) {
logger.error(String.format("Encountered error during partition comparison in table %s; " +
"error for partition with token %s", table, token), error);
BatchStatement batch = new BatchStatement();
batch.add(bindErrorSummaryStatement(table));
DiffCluster.Type exceptionSource = null;
int maxRetrace = 10; // In case there is a loop, we do not want to loop forever or throw. So just limit the number of retracing.
for (Throwable t = error; t.getCause() != null && maxRetrace > 0; t = t.getCause(), maxRetrace--) {
if (t instanceof ClusterSourcedException) {
exceptionSource = ((ClusterSourcedException) t).exceptionSource;
break;
}
}
batch.add(bindErrorDetailStatement(table, token, exceptionSource));
batch.setIdempotent(true);
ignoreQueryException(() -> session.execute(batch));
}
/**
*
* @param table
* @param stats
*/
public void finishTable(KeyspaceTablePair table, RangeStats stats, boolean updateCompletedCount) {
logger.info("Finishing range [{}, {}] for table {}", startToken, endToken, table);
// first flush out the last status.
ignoreQueryException(() -> session.execute(bindUpdateStatement(table, stats, endToken)));
// then update the count of completed tasks
if (updateCompletedCount)
ignoreQueryException(() -> session.execute(updateCompleteStmt.bind(jobId, bucket, table.toCqlValueString())));
}
private Statement bindMismatchesStatement(KeyspaceTablePair table, BigInteger token, String type) {
return mismatchStmt.bind(jobId, bucket, table.toCqlValueString(), token.toString(), type)
.setIdempotent(true);
}
private Statement bindErrorSummaryStatement(KeyspaceTablePair table) {
return errorSummaryStmt.bind(jobId, bucket, table.toCqlValueString(), startToken, endToken)
.setIdempotent(true);
}
private Statement bindErrorDetailStatement(KeyspaceTablePair table, BigInteger errorToken, DiffCluster.Type exceptionSource) {
String errorSource = exceptionSource == null ? "" : exceptionSource.name();
return errorDetailStmt.bind(jobId, bucket, table.toCqlValueString(), startToken, endToken, errorToken.toString(), errorSource)
.setIdempotent(true);
}
private Statement bindUpdateStatement(KeyspaceTablePair table, RangeStats stats, BigInteger token) {
return bindUpdateStatement(table, stats, token.toString());
}
private Statement bindUpdateStatement(KeyspaceTablePair table, RangeStats stats, String token) {
// We don't persist the partition error count from RangeStats as errors
// are likely to be transient and not data related, so we don't want to
// accumulate them across runs.
return updateStmt.bind(jobId,
bucket,
table.toCqlValueString(),
startToken,
endToken,
stats.getMatchedPartitions(),
stats.getMismatchedPartitions(),
stats.getOnlyInSource(),
stats.getOnlyInTarget(),
stats.getMatchedRows(),
stats.getMatchedValues(),
stats.getMismatchedValues(),
stats.getSkippedPartitions(),
token)
.setIdempotent(true);
}
private static long getOrDefaultLong(Row row, String column) {
return (null == row || row.isNull(column)) ? 0L : row.getLong(column);
}
}
static class JobLifeCycle {
final Session session;
final String metadataKeyspace;
final RetryStrategyProvider retryStrategyProvider;
public JobLifeCycle(Session session, String metadataKeyspace, RetryStrategyProvider retryStrategyProvider) {
this.session = session;
this.metadataKeyspace = metadataKeyspace;
this.retryStrategyProvider = retryStrategyProvider;
}
public DiffJob.Params getJobParams(UUID jobId) {
ResultSet rs = ignoreQueryException(
() -> session.execute(String.format("SELECT qualified_table_names," +
" buckets," +
" total_tasks " +
"FROM %s.%s " +
"WHERE job_id = ?",
metadataKeyspace, Schema.JOB_SUMMARY),
jobId));
Row row = (rs == null) ? null : rs.one();
if (null == row)
return null;
// qualified_table_names is encoded as a List<String>. Decode it back to List<KeyspaceTablePair>.
List<KeyspaceTablePair> keyspaceTables = row.getList("qualified_table_names", String.class)
.stream()
.map(KeyspaceTablePair::new)
.collect(Collectors.toList());
return new DiffJob.Params(jobId,
keyspaceTables,
row.getInt("buckets"),
row.getInt("total_tasks"));
}
// Runs on Driver to insert top level job info
public void initializeJob(DiffJob.Params params,
String sourceClusterName,
String sourceClusterDesc,
String targetClusterName,
String targetClusterDesc) throws Exception {
logger.info("Initializing job status");
// The job was previously run, so this could be a re-run to
// mop up any failed splits so mark it in progress.
ResultSet rs = session.execute(String.format("INSERT INTO %s.%s (job_id) VALUES (?) IF NOT EXISTS",
metadataKeyspace, Schema.RUNNING_JOBS),
params.jobId);
if (!rs.one().getBool("[applied]")) {
logger.info("Could not mark job as running. " +
"Did a previous run of job id {} fail non-gracefully?",
params.jobId);
throw new RuntimeException("Unable to mark job running, aborting");
}
UUID timeUUID = UUIDs.timeBased();
DateTime startDateTime = new DateTime(UUIDs.unixTimestamp(timeUUID), DateTimeZone.UTC);
Statement initJobStatusStatement =
new SimpleStatement(String.format("INSERT INTO %s.%s (" +
" job_id," +
" job_start_time," +
" buckets," +
" qualified_table_names," +
" source_cluster_name," +
" source_cluster_desc," +
" target_cluster_name," +
" target_cluster_desc," +
" total_tasks)" +
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" +
" IF NOT EXISTS",
metadataKeyspace, Schema.JOB_SUMMARY),
params.jobId,
timeUUID,
params.buckets,
params.keyspaceTables.stream().map(KeyspaceTablePair::toCqlValueString).collect(Collectors.toList()),
sourceClusterName,
sourceClusterDesc,
targetClusterName,
targetClusterDesc,
params.tasks);
initJobStatusStatement.setIdempotent(true);
rs = retryStrategyProvider.get().retryIfNot(() -> session.execute(initJobStatusStatement),
NoHostAvailableException.class,
QueryValidationException.class);
// This is a brand new job, index its details including start time
if (rs.one().getBool("[applied]")) {
BatchStatement batch = new BatchStatement();
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (source_cluster_name, job_id) VALUES (?, ?)",
metadataKeyspace, Schema.SOURCE_CLUSTER_INDEX),
sourceClusterName, params.jobId));
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (target_cluster_name, job_id) VALUES (?, ?)",
metadataKeyspace, Schema.TARGET_CLUSTER_INDEX),
targetClusterName, params.jobId));
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (keyspace_name, job_id) VALUES (?, ?)",
metadataKeyspace, Schema.KEYSPACE_INDEX),
metadataKeyspace, params.jobId));
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (job_start_date, job_start_hour, job_start_time, job_id) " +
"VALUES ('%s', ?, ?, ?)",
metadataKeyspace, Schema.JOB_START_INDEX, startDateTime.toString("yyyy-MM-dd")),
startDateTime.getHourOfDay(), timeUUID, params.jobId));
batch.setIdempotent(true);
retryStrategyProvider.get().retryIfNot(() -> session.execute(batch),
NoHostAvailableException.class,
QueryValidationException.class);
}
}
public void finalizeJob(UUID jobId, Map<KeyspaceTablePair, RangeStats> results) throws Exception {
logger.info("Finalizing job status");
markNotRunning(jobId);
for (Map.Entry<KeyspaceTablePair, RangeStats> result : results.entrySet()) {
KeyspaceTablePair table = result.getKey();
RangeStats stats = result.getValue();
Statement jobResultUpdateStatement =
new SimpleStatement(String.format("INSERT INTO %s.%s (" +
" job_id," +
" qualified_table_name," +
" matched_partitions," +
" mismatched_partitions," +
" partitions_only_in_source," +
" partitions_only_in_target," +
" matched_rows," +
" matched_values," +
" mismatched_values," +
" skipped_partitions) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
metadataKeyspace, Schema.JOB_RESULTS),
jobId,
table.toCqlValueString(),
stats.getMatchedPartitions(),
stats.getMismatchedPartitions(),
stats.getOnlyInSource(),
stats.getOnlyInTarget(),
stats.getMatchedRows(),
stats.getMatchedValues(),
stats.getMismatchedValues(),
stats.getSkippedPartitions());
jobResultUpdateStatement.setIdempotent(true);
// also retry with NoHostAvailableException
retryStrategyProvider.get().retryIfNot(() -> session.execute(jobResultUpdateStatement),
QueryValidationException.class);
}
}
public void markNotRunning(UUID jobId) {
try
{
logger.info("Marking job {} as not running", jobId);
ResultSet rs = session.execute(String.format("DELETE FROM %s.%s WHERE job_id = ? IF EXISTS",
metadataKeyspace, Schema.RUNNING_JOBS),
jobId);
if (!rs.one().getBool("[applied]"))
{
logger.warn("Non-fatal: Unable to mark job %s as not running, check logs for errors " +
"during initialization as there may be no entry for this job {} in the {} table",
jobId, Schema.RUNNING_JOBS);
}
} catch (Exception e) {
// Because this is called from another exception handler, we don't want to lose the original exception
// just because we may not have been able to mark the job as not running. Just log here
logger.error("Could not mark job {} as not running.", jobId, e);
}
}
}
static class Schema {
public static final String TASK_STATUS = "task_status";
private static final String TASK_STATUS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" job_id uuid," +
" bucket int," +
" qualified_table_name text," +
" start_token varchar," +
" end_token varchar," +
" matched_partitions bigint," +
" mismatched_partitions bigint, " +
" partitions_only_in_source bigint," +
" partitions_only_in_target bigint," +
" matched_rows bigint," +
" matched_values bigint," +
" mismatched_values bigint," +
" skipped_partitions bigint," +
" last_token varchar," +
" PRIMARY KEY((job_id, bucket), qualified_table_name, start_token, end_token))" +
" WITH default_time_to_live = %s";
public static final String JOB_SUMMARY = "job_summary";
private static final String JOB_SUMMARY_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" job_id uuid," +
" job_start_time timeuuid," +
" buckets int," +
" qualified_table_names frozen<list<text>>," +
" source_cluster_name text," +
" source_cluster_desc text," +
" target_cluster_name text," +
" target_cluster_desc text," +
" total_tasks int," +
" PRIMARY KEY(job_id))" +
" WITH default_time_to_live = %s";
public static final String JOB_RESULTS = "job_results";
private static final String JOB_RESULTS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" job_id uuid," +
" qualified_table_name text," +
" matched_partitions bigint," +
" mismatched_partitions bigint," +
" partitions_only_in_source bigint," +
" partitions_only_in_target bigint," +
" matched_rows bigint," +
" matched_values bigint," +
" mismatched_values bigint," +
" skipped_partitions bigint," +
" PRIMARY KEY(job_id, qualified_table_name))" +
" WITH default_time_to_live = %s";
public static final String JOB_STATUS = "job_status";
private static final String JOB_STATUS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" job_id uuid," +
" bucket int," +
" qualified_table_name text," +
" completed counter," +
" PRIMARY KEY ((job_id, bucket), qualified_table_name))";
public static final String MISMATCHES = "mismatches";
private static final String MISMATCHES_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" job_id uuid," +
" bucket int," +
" qualified_table_name text, " +
" mismatching_token varchar, " +
" mismatch_type text, " +
" PRIMARY KEY ((job_id, bucket), qualified_table_name, mismatching_token))" +
" WITH default_time_to_live = %s";
public static final String ERROR_SUMMARY = "task_errors";
private static final String ERROR_SUMMARY_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" job_id uuid," +
" bucket int," +
" qualified_table_name text," +
" start_token varchar," +
" end_token varchar," +
" PRIMARY KEY ((job_id, bucket), qualified_table_name, start_token, end_token))" +
" WITH default_time_to_live = %s";
public static final String ERROR_DETAIL = "partition_errors";
private static final String ERROR_DETAIL_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" job_id uuid," +
" bucket int," +
" qualified_table_name text," +
" start_token varchar," +
" end_token varchar," +
" error_token varchar," +
" error_source varchar," +
" PRIMARY KEY ((job_id, bucket, qualified_table_name, start_token, end_token), error_token))" +
" WITH default_time_to_live = %s";
public static final String SOURCE_CLUSTER_INDEX = "source_cluster_index";
private static final String SOURCE_CLUSTER_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" source_cluster_name text," +
" job_id uuid," +
" PRIMARY KEY (source_cluster_name, job_id))" +
" WITH default_time_to_live = %s";
public static final String TARGET_CLUSTER_INDEX = "target_cluster_index";
private static final String TARGET_CLUSTER_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" target_cluster_name text," +
" job_id uuid," +
" PRIMARY KEY (target_cluster_name, job_id))" +
" WITH default_time_to_live = %s";
public static final String KEYSPACE_INDEX = "keyspace_index";
private static final String KEYSPACE_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" keyspace_name text," +
" job_id uuid," +
" PRIMARY KEY(keyspace_name, job_id))" +
" WITH default_time_to_live = %s";
public static final String JOB_START_INDEX = "job_start_index";
private static final String JOB_START_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" job_start_date date," +
" job_start_hour int," +
" job_start_time timeuuid," +
" job_id uuid," +
" PRIMARY KEY ((job_start_date, job_start_hour), job_start_time))" +
" WITH default_time_to_live = %s";
public static final String RUNNING_JOBS = "running_jobs";
private static final String RUNNING_JOBS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" job_id uuid," +
" PRIMARY KEY (job_id))" +
" WITH default_time_to_live = %s";
private static final String KEYSPACE_SCHEMA = "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = %s";
public static void maybeInitialize(Session session, MetadataKeyspaceOptions options, RetryStrategyProvider retryStrategyProvider) {
if (!options.should_init)
return;
Consumer<String> retryQuery = query -> {
try {
retryStrategyProvider.get().retryIfNot(() -> session.execute(query),
NoHostAvailableException.class,
QueryValidationException.class);
}
catch (Exception exception) {
Throwables.propagate(exception);
}
};
logger.info("Initializing cassandradiff journal schema in \"{}\" keyspace", options.keyspace);
retryQuery.accept(String.format(KEYSPACE_SCHEMA, options.keyspace, options.replication));
retryQuery.accept(String.format(JOB_SUMMARY_SCHEMA, options.keyspace, JOB_SUMMARY, options.ttl));
retryQuery.accept(String.format(JOB_STATUS_SCHEMA, options.keyspace, JOB_STATUS));
retryQuery.accept(String.format(JOB_RESULTS_SCHEMA, options.keyspace, JOB_RESULTS, options.ttl));
retryQuery.accept(String.format(TASK_STATUS_SCHEMA, options.keyspace, TASK_STATUS, options.ttl));
retryQuery.accept(String.format(MISMATCHES_SCHEMA, options.keyspace, MISMATCHES, options.ttl));
retryQuery.accept(String.format(ERROR_SUMMARY_SCHEMA, options.keyspace, ERROR_SUMMARY, options.ttl));
retryQuery.accept(String.format(ERROR_DETAIL_SCHEMA, options.keyspace, ERROR_DETAIL, options.ttl));
retryQuery.accept(String.format(SOURCE_CLUSTER_INDEX_SCHEMA, options.keyspace, SOURCE_CLUSTER_INDEX, options.ttl));
retryQuery.accept(String.format(TARGET_CLUSTER_INDEX_SCHEMA, options.keyspace, TARGET_CLUSTER_INDEX, options.ttl));
retryQuery.accept(String.format(KEYSPACE_INDEX_SCHEMA, options.keyspace, KEYSPACE_INDEX, options.ttl));
retryQuery.accept(String.format(JOB_START_INDEX_SCHEMA, options.keyspace, JOB_START_INDEX, options.ttl));
retryQuery.accept(String.format(RUNNING_JOBS_SCHEMA, options.keyspace, RUNNING_JOBS, options.ttl));
logger.info("Schema initialized");
}
}
}