Skip to content

fix(connect): fix connect compact reorder#3247

Closed
woshigaopp wants to merge 3 commits into1.6from
fix/KAFKA-17719-full-tombstone-v2
Closed

fix(connect): fix connect compact reorder#3247
woshigaopp wants to merge 3 commits into1.6from
fix/KAFKA-17719-full-tombstone-v2

Conversation

@woshigaopp
Copy link
Copy Markdown
Contributor

This pull request addresses two key issues in the Kafka Connect configuration storage: (1) preventing orphaned task configs and commit records from persisting after connector deletion, and (2) improving robustness when log compaction reorders records in the config topic. The changes ensure that all related records are properly removed when a connector is deleted and that deferred task configs are correctly applied if they arrive before the connector config due to log compaction.

Connector deletion cleanup:

  • Enhanced the removeConnectorConfig method to write tombstone records for all associated task config keys and the commit key, ensuring that orphaned task configs and commit records are eventually removed by log compaction after connector deletion. This addresses the root cause of KAFKA-16838.

Handling log compaction reordering:

  • Added the applyDeferredTaskConfigs method, which applies deferred task configs when the connector config arrives after task commit records due to log compaction reordering (KAFKA-17719). This method ensures consistency and proper application of task configs.
  • Modified processTasksCommitRecord to defer processing of task commit records if the connector config is not present, instead of immediately removing related data. It now logs the situation and stores the task count, marking the connector as inconsistent until the config arrives.
  • Updated processConnectorConfigRecord to call applyDeferredTaskConfigs when a connector config is received, allowing deferred task configs to be applied if they arrived earlier.
  • Improved processConnectorRemoval to remove connectors from the inconsistent set, ensuring that state is fully cleaned up when a connector is deleted.

…r restart

Root cause: KAFKA-16838 fix added a null check in processTasksCommitRecord()
that calls processConnectorRemoval() when connectorConfigs.get() returns null.
This incorrectly triggers for live connectors when log compaction reorders
records such that task configs + commit appear before the connector config.

Fix approach (full tombstone + defer):
1. removeConnectorConfig() now writes tombstones for ALL related keys
   (connector, target-state, all task configs, commit) so that log compaction
   will eventually remove all data for deleted connectors. This eliminates
   the orphaned task config problem (KAFKA-16838) at the source.

2. processTasksCommitRecord() no longer calls processConnectorRemoval() when
   connector config is absent. Instead, it defers processing by storing the
   task count and marking the connector as inconsistent. This handles both:
   - Compaction reorder (KAFKA-17719): connector config arrives later,
     applyDeferredTaskConfigs() applies the deferred task configs.
   - Deleted connector with tombstone compacted away: deferred data remains
     inert since no connector config will ever arrive.

3. processConnectorConfigRecord() calls applyDeferredTaskConfigs() when a
   connector config arrives, to handle the compaction reorder case.

4. processConnectorRemoval() now also clears the inconsistent set.

This approach requires no in-memory state to distinguish between deleted
connectors and compaction reorder - all decisions are based on the actual
data present in the config topic.
@woshigaopp woshigaopp requested a review from superhx as a code owner March 11, 2026 03:10
Copilot AI review requested due to automatic review settings March 11, 2026 03:10
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves Kafka Connect’s KafkaConfigBackingStore behavior around config-topic log compaction and connector deletion, aiming to (a) avoid orphaned task/commit records after connector deletion, and (b) correctly handle startup replay when compaction reorders task records ahead of the connector config.

Changes:

  • Extend connector deletion to write tombstones for task config keys and the connector commit key, not just connector/target-state keys.
  • Defer processing of task commit records when the connector config is not yet present, and apply deferred task configs once the connector config arrives.
  • Add a suite of unit tests covering compaction reordering, orphan handling, and deletion/recreation scenarios.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.

File Description
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java Adds broader tombstone writes on deletion; introduces deferred-apply logic for task configs/commits when connector config arrives late; cleans inconsistent state on removal.
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java Adds multiple new tests for compaction reordering and deletion/orphan scenarios, plus verification of new tombstone-writing behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +563 to +585
if (taskCount != null) {
// Also collect task IDs from taskConfigs and deferredTaskUpdates to handle the case
// where task count was reduced (old task keys with higher indices still exist)
Set<Integer> allTaskIds = new HashSet<>();
for (int i = 0; i < taskCount; i++) {
allTaskIds.add(i);
}
for (ConnectorTaskId taskId : taskConfigs.keySet()) {
if (taskId.connector().equals(connector)) {
allTaskIds.add(taskId.task());
}
}
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connector);
if (deferred != null) {
for (ConnectorTaskId taskId : deferred.keySet()) {
allTaskIds.add(taskId.task());
}
}
for (int taskId : allTaskIds) {
keyValues.add(new ProducerKeyValue(TASK_KEY(new ConnectorTaskId(connector, taskId)), null));
}
keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null));
}
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In removeConnectorConfig, task/commit tombstones are only added when connectorTaskCounts.get(connector) is non-null. If the task count is missing (e.g., state not yet loaded, or corrupted/partially-compacted history), this will still leave an orphaned commit record and/or task configs in the config topic. Consider always writing the COMMIT_TASKS_KEY(connector) tombstone, and tombstoning any task keys discoverable from taskConfigs/deferredTaskUpdates even when taskCount is null (or default to tombstoning indices found in those maps).

Suggested change
if (taskCount != null) {
// Also collect task IDs from taskConfigs and deferredTaskUpdates to handle the case
// where task count was reduced (old task keys with higher indices still exist)
Set<Integer> allTaskIds = new HashSet<>();
for (int i = 0; i < taskCount; i++) {
allTaskIds.add(i);
}
for (ConnectorTaskId taskId : taskConfigs.keySet()) {
if (taskId.connector().equals(connector)) {
allTaskIds.add(taskId.task());
}
}
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connector);
if (deferred != null) {
for (ConnectorTaskId taskId : deferred.keySet()) {
allTaskIds.add(taskId.task());
}
}
for (int taskId : allTaskIds) {
keyValues.add(new ProducerKeyValue(TASK_KEY(new ConnectorTaskId(connector, taskId)), null));
}
keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null));
}
// Collect all known task IDs for this connector from multiple sources:
// 1) The recorded task count (if available), and
// 2) Any existing entries in taskConfigs and deferredTaskUpdates.
Set<Integer> allTaskIds = new HashSet<>();
if (taskCount != null) {
for (int i = 0; i < taskCount; i++) {
allTaskIds.add(i);
}
}
for (ConnectorTaskId taskId : taskConfigs.keySet()) {
if (taskId.connector().equals(connector)) {
allTaskIds.add(taskId.task());
}
}
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connector);
if (deferred != null) {
for (ConnectorTaskId taskId : deferred.keySet()) {
allTaskIds.add(taskId.task());
}
}
for (int taskId : allTaskIds) {
keyValues.add(new ProducerKeyValue(TASK_KEY(new ConnectorTaskId(connector, taskId)), null));
}
keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null));

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In actual scenarios, when CTC is null, there will be no task config or commit record for this connector in the config topic, so it is correct not to write a tombstone.

Comment on lines +557 to +584
// Write tombstones for all task config keys and the commit key so that log compaction
// will eventually remove them. Without these tombstones, orphaned task configs and commit
// records would persist indefinitely in the config topic after the connector is deleted,
// which is the root cause of KAFKA-16838.
synchronized (lock) {
Integer taskCount = connectorTaskCounts.get(connector);
if (taskCount != null) {
// Also collect task IDs from taskConfigs and deferredTaskUpdates to handle the case
// where task count was reduced (old task keys with higher indices still exist)
Set<Integer> allTaskIds = new HashSet<>();
for (int i = 0; i < taskCount; i++) {
allTaskIds.add(i);
}
for (ConnectorTaskId taskId : taskConfigs.keySet()) {
if (taskId.connector().equals(connector)) {
allTaskIds.add(taskId.task());
}
}
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connector);
if (deferred != null) {
for (ConnectorTaskId taskId : deferred.keySet()) {
allTaskIds.add(taskId.task());
}
}
for (int taskId : allTaskIds) {
keyValues.add(new ProducerKeyValue(TASK_KEY(new ConnectorTaskId(connector, taskId)), null));
}
keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null));
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change introduces task-key tombstones on deletion, but processTaskConfigRecord currently logs an error and ignores records with null values. With the new deletion behavior, normal connector deletion (and restarts reading compacted tombstones) will generate error logs for each task tombstone, and the tombstone won’t actively clear any cached deferred/task config state if it were present. Consider handling task-config tombstones explicitly (treat null as an expected delete: remove the task from deferredTaskUpdates/taskConfigs and log at debug/trace).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a new issue introduced by this change

// will apply the deferred task configs at that point. If no connector config ever arrives
// (deleted connector), the deferred data remains inert.
@SuppressWarnings("unchecked")
int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the connector config is missing, this branch calls intValue(((Map) value.value()).get("tasks")) without validating the presence/type of the tasks field. If the commit record is malformed (which is more plausible for orphaned/compacted data), intValue will throw a ConnectException that is not caught in the consumer callback, potentially killing config-log consumption. Please add defensive validation (or catch ConnectException) and log+return instead of throwing in this path.

Suggested change
int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));
int newTaskCount;
try {
newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));
} catch (ConnectException e) {
log.error(
"Ignoring connector tasks configuration commit for connector '{}' because it "
+ "contains an invalid 'tasks' field: {}",
connectorName, value.value(), e
);
return;
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a new issue introduced by this change—there are also the same unguarded calls in the normal branch.

Comment on lines +1135 to +1148
// Defer processing: store the task count and mark the connector as inconsistent.
// If the connector config arrives later (compaction reorder), applyDeferredTaskConfigs()
// will apply the deferred task configs at that point. If no connector config ever arrives
// (deleted connector), the deferred data remains inert.
@SuppressWarnings("unchecked")
int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks"));
connectorTaskCounts.put(connectorName, newTaskCount);
inconsistent.add(connectorName);
log.warn(
"Received task commit for connector '{}' (task count: {}) but its connector config "
+ "is not yet present. This may be due to log compaction reordering records "
+ "in the config topic (KAFKA-17719), or the connector may have been deleted. "
+ "Deferring task config application until the connector config is received.",
connectorName, newTaskCount
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding connectorName to inconsistent when its connector config is absent changes the meaning of ClusterConfigState.inconsistentConnectors(). The javadoc for inconsistentConnectors() states entries “will appear in connectors() since their connector configuration is available” (ClusterConfigState.java:274-276), which will no longer hold for orphaned commits. Either keep inconsistent limited to connectors with configs (and track orphaned task/commit data separately), or update the public contract/docs and audit any callers that assume inconsistency implies a present connector config.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc description is indeed no longer accurate, but there is no actual impact. inconsistentConnectors() has no direct callers in the production code, and the inconsistency check in the tasks() method is a redundant safety measure (Herder has already filtered it out via contains()).

// If a task commit was previously received before this connector config (due to log compaction
// reordering), the task configs were deferred. Now that the connector config has arrived,
// attempt to apply those deferred task configs. See KAFKA-17719.
applyDeferredTaskConfigs(connectorName);
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applyDeferredTaskConfigs is invoked on every connector-config record, including after the backing store has started. If the config topic contains legacy orphaned task configs/commit records (e.g., from deletions prior to this fix), those deferred updates can persist in-memory and then be applied later when a connector with the same name is (re)created, potentially resurrecting stale task configs. Since compaction reordering is only relevant during initial log replay, consider gating the call so deferred task configs are only applied while started == false (during startup catch-up), and/or clearing deferred state for connectors that still have no connector config once startup completes.

Suggested change
applyDeferredTaskConfigs(connectorName);
if (!started) {
applyDeferredTaskConfigs(connectorName);
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reconfiguration mechanism of Herder is self-healing.

@woshigaopp woshigaopp changed the title fix(connect): kafka-17719 fix(connect): fix connect compact reorder Mar 12, 2026
@superhx
Copy link
Copy Markdown
Collaborator

superhx commented Mar 30, 2026

This issue was fixed by #3249

@superhx superhx closed this Mar 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants