Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
* Any exception thrown by execution of a {@code DocumentWriteSetFilter} will be wrapped in this exception and
* rethrown by the {@code WriteBatcher}, allowing failure listeners to distinguish filter exceptions from other
* exceptions that may occur during batch processing.
*
* @since 8.1.0
*/
public class FilterException extends DataMovementException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,10 @@
*
* @since 8.1.0
*/
public class IncrementalWriteConfig {

private final String hashKeyName;
private final String timestampKeyName;
private final boolean canonicalizeJson;
private final Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
private final String[] jsonExclusions;
private final String[] xmlExclusions;
private final Map<String, String> xmlNamespaces;
private final String schemaName;
private final String viewName;
public record IncrementalWriteConfig(String hashKeyName, String timestampKeyName, boolean canonicalizeJson,
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer,
String[] jsonExclusions, String[] xmlExclusions, Map<String, String> xmlNamespaces,
String schemaName, String viewName) {

public IncrementalWriteConfig(String hashKeyName, String timestampKeyName, boolean canonicalizeJson,
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer,
Expand All @@ -41,39 +34,11 @@ public IncrementalWriteConfig(String hashKeyName, String timestampKeyName, boole
this.viewName = viewName;
}

public String getHashKeyName() {
return hashKeyName;
}

public String getTimestampKeyName() {
return timestampKeyName;
}

public boolean isCanonicalizeJson() {
return canonicalizeJson;
}

public Consumer<DocumentWriteOperation[]> getSkippedDocumentsConsumer() {
return skippedDocumentsConsumer;
}

public String[] getJsonExclusions() {
return jsonExclusions;
}

public String[] getXmlExclusions() {
return xmlExclusions;
}

public Map<String, String> getXmlNamespaces() {
@Override
public Map<String, String> xmlNamespaces() {
return xmlNamespaces != null ? xmlNamespaces : Collections.emptyMap();
}

public String getSchemaName() {
return schemaName;
}

public String getViewName() {
return viewName;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public static class Builder {
private String hashKeyName = "incrementalWriteHash";
private String timestampKeyName = "incrementalWriteTimestamp";
private boolean canonicalizeJson = true;
private boolean useEvalQuery = false;
private Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
private String[] jsonExclusions;
private String[] xmlExclusions;
Expand Down Expand Up @@ -87,14 +86,6 @@ public Builder canonicalizeJson(boolean canonicalizeJson) {
return this;
}

/**
* @param useEvalQuery if true, evaluate server-side JavaScript instead of an Optic query for retrieving hash values; defaults to false.
*/
public Builder useEvalQuery(boolean useEvalQuery) {
this.useEvalQuery = useEvalQuery;
return this;
}

/**
* @param skippedDocumentsConsumer a consumer that will be called with any documents in a batch that were skipped because their content had not changed.
*/
Expand Down Expand Up @@ -161,12 +152,9 @@ public IncrementalWriteFilter build() {
skippedDocumentsConsumer, jsonExclusions, xmlExclusions, xmlNamespaces, schemaName, viewName);

if (schemaName != null && viewName != null) {
return new IncrementalWriteViewFilter(config);
}
if (useEvalQuery) {
return new IncrementalWriteEvalFilter(config);
return new IncrementalWriteFromViewFilter(config);
}
return new IncrementalWriteOpticFilter(config);
return new IncrementalWriteFromLexiconsFilter(config);
}

private void validateJsonExclusions() {
Expand Down Expand Up @@ -254,19 +242,19 @@ protected final DocumentWriteSet filterDocuments(Context context, Function<Strin

if (existingHash != null) {
if (!existingHash.equals(contentHash)) {
newWriteSet.add(addHashToMetadata(doc, config.getHashKeyName(), contentHash, config.getTimestampKeyName(), timestamp));
} else if (config.getSkippedDocumentsConsumer() != null) {
newWriteSet.add(addHashToMetadata(doc, config.hashKeyName(), contentHash, config.timestampKeyName(), timestamp));
} else if (config.skippedDocumentsConsumer() != null) {
skippedDocuments.add(doc);
} else {
// No consumer, so skip the document silently.
}
} else {
newWriteSet.add(addHashToMetadata(doc, config.getHashKeyName(), contentHash, config.getTimestampKeyName(), timestamp));
newWriteSet.add(addHashToMetadata(doc, config.hashKeyName(), contentHash, config.timestampKeyName(), timestamp));
}
}

if (!skippedDocuments.isEmpty() && config.getSkippedDocumentsConsumer() != null) {
config.getSkippedDocumentsConsumer().accept(skippedDocuments.toArray(new DocumentWriteOperation[0]));
if (!skippedDocuments.isEmpty() && config.skippedDocumentsConsumer() != null) {
config.skippedDocumentsConsumer().accept(skippedDocuments.toArray(new DocumentWriteOperation[0]));
}

return newWriteSet;
Expand All @@ -283,11 +271,11 @@ private String serializeContent(DocumentWriteOperation doc) {
format = baseHandle.getFormat();
}

if (config.isCanonicalizeJson() && (Format.JSON.equals(format) || isPossiblyJsonContent(content))) {
if (config.canonicalizeJson() && (Format.JSON.equals(format) || isPossiblyJsonContent(content))) {
JsonCanonicalizer jc;
try {
if (config.getJsonExclusions() != null && config.getJsonExclusions().length > 0) {
content = ContentExclusionUtil.applyJsonExclusions(doc.getUri(), content, config.getJsonExclusions());
if (config.jsonExclusions() != null && config.jsonExclusions().length > 0) {
content = ContentExclusionUtil.applyJsonExclusions(doc.getUri(), content, config.jsonExclusions());
}
jc = new JsonCanonicalizer(content);
return jc.getEncodedString();
Expand All @@ -298,9 +286,9 @@ private String serializeContent(DocumentWriteOperation doc) {
logger.warn("Unable to canonicalize JSON content for URI {}, using original content for hashing; cause: {}",
doc.getUri(), e.getMessage());
}
} else if (config.getXmlExclusions() != null && config.getXmlExclusions().length > 0) {
} else if (config.xmlExclusions() != null && config.xmlExclusions().length > 0) {
try {
content = ContentExclusionUtil.applyXmlExclusions(doc.getUri(), content, config.getXmlNamespaces(), config.getXmlExclusions());
content = ContentExclusionUtil.applyXmlExclusions(doc.getUri(), content, config.xmlNamespaces(), config.xmlExclusions());
} catch (Exception e) {
logger.warn("Unable to apply XML exclusions for URI {}, using original content for hashing; cause: {}",
doc.getUri(), e.getMessage());
Expand Down Expand Up @@ -341,4 +329,10 @@ protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation
}


protected static String[] getUrisInBatch(DocumentWriteSet writeSet) {
return writeSet.stream()
.filter(op -> DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType()))
.map(DocumentWriteOperation::getUri)
.toArray(String[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,26 @@
import java.util.Map;

/**
* Uses an Optic query to get the existing hash values for a set of URIs.
* Uses an Optic fromLexicons query that depends on a field range index to retrieve URIs and
* hash values.
*
* @since 8.1.0
*/
class IncrementalWriteOpticFilter extends IncrementalWriteFilter {
class IncrementalWriteFromLexiconsFilter extends IncrementalWriteFilter {

IncrementalWriteOpticFilter(IncrementalWriteConfig config) {
IncrementalWriteFromLexiconsFilter(IncrementalWriteConfig config) {
super(config);
}

@Override
public DocumentWriteSet apply(Context context) {
final String[] uris = context.getDocumentWriteSet().stream()
.filter(op -> DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType()))
.map(DocumentWriteOperation::getUri)
.toArray(String[]::new);

// It doesn't seem possible yet to use a DSL query and bind an array of strings to a "uris" param, so using
// a serialized query instead. That doesn't allow a user to override the query though.
RowTemplate rowTemplate = new RowTemplate(context.getDatabaseClient());
final String[] uris = getUrisInBatch(context.getDocumentWriteSet());

try {
Map<String, Long> existingHashes = rowTemplate.query(op ->
Map<String, Long> existingHashes = new RowTemplate(context.getDatabaseClient()).query(op ->
op.fromLexicons(Map.of(
"uri", op.cts.uriReference(),
"hash", op.cts.fieldReference(getConfig().getHashKeyName())
"hash", op.cts.fieldReference(getConfig().hashKeyName())
)).where(
op.cts.documentQuery(op.xs.stringSeq(uris))
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package com.marklogic.client.datamovement.filter;

import com.marklogic.client.FailedRequestException;
import com.marklogic.client.document.DocumentWriteOperation;
import com.marklogic.client.document.DocumentWriteSet;
import com.marklogic.client.row.RowTemplate;

Expand All @@ -15,36 +14,29 @@
* Uses an Optic query with fromView to get the existing hash values for a set of URIs from a TDE view.
* This implementation requires a TDE template to be deployed that contains at minimum a "uri" column
* and a column matching the configured hash key name, plus any other columns desired.
* The query uses a {@code where} with a {@code cts.documentQuery} to filter rows by URI, which is
* significantly faster than filtering via {@code op.in}.
*
* @since 8.1.0
*/
class IncrementalWriteViewFilter extends IncrementalWriteFilter {
class IncrementalWriteFromViewFilter extends IncrementalWriteFilter {

IncrementalWriteViewFilter(IncrementalWriteConfig config) {
IncrementalWriteFromViewFilter(IncrementalWriteConfig config) {
super(config);
}

@Override
public DocumentWriteSet apply(Context context) {
final String[] uris = context.getDocumentWriteSet().stream()
.filter(op -> DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType()))
.map(DocumentWriteOperation::getUri)
.toArray(String[]::new);

RowTemplate rowTemplate = new RowTemplate(context.getDatabaseClient());
final String[] uris = getUrisInBatch(context.getDocumentWriteSet());

try {
Map<String, Long> existingHashes = rowTemplate.query(op ->
op.fromView(getConfig().getSchemaName(), getConfig().getViewName(), "")
Map<String, Long> existingHashes = new RowTemplate(context.getDatabaseClient()).query(op ->
op.fromView(getConfig().schemaName(), getConfig().viewName(), "")
.where(op.cts.documentQuery(op.xs.stringSeq(uris)))
,
rows -> {
Map<String, Long> map = new HashMap<>();
rows.forEach(row -> {
String uri = row.getString("uri");
String hashString = row.getString(getConfig().getHashKeyName());
String hashString = row.getString(getConfig().hashKeyName());
if (hashString != null && !hashString.isEmpty()) {
long existingHash = Long.parseUnsignedLong(hashString);
map.put(uri, existingHash);
Expand All @@ -59,7 +51,7 @@ public DocumentWriteSet apply(Context context) {

return filterDocuments(context, uri -> existingHashes.get(uri));
} catch (FailedRequestException e) {
String message = "Unable to query for existing incremental write hashes from view " + getConfig().getSchemaName() + "." + getConfig().getViewName() + "; cause: " + e.getMessage();
String message = "Unable to query for existing incremental write hashes from view " + getConfig().schemaName() + "." + getConfig().viewName() + "; cause: " + e.getMessage();
throw new FailedRequestException(message, e.getFailedRequest());
}
}
Expand Down
Loading