Conversation
…r restart
Root cause: KAFKA-16838 fix added a null check in processTasksCommitRecord()
that calls processConnectorRemoval() when connectorConfigs.get() returns null.
This incorrectly triggers for live connectors when log compaction reorders
records such that task configs + commit appear before the connector config.
Fix approach (full tombstone + defer):
1. removeConnectorConfig() now writes tombstones for ALL related keys
(connector, target-state, all task configs, commit) so that log compaction
will eventually remove all data for deleted connectors. This eliminates
the orphaned task config problem (KAFKA-16838) at the source.
2. processTasksCommitRecord() no longer calls processConnectorRemoval() when
connector config is absent. Instead, it defers processing by storing the
task count and marking the connector as inconsistent. This handles both:
- Compaction reorder (KAFKA-17719): connector config arrives later,
applyDeferredTaskConfigs() applies the deferred task configs.
- Deleted connector with tombstone compacted away: deferred data remains
inert since no connector config will ever arrive.
3. processConnectorConfigRecord() calls applyDeferredTaskConfigs() when a
connector config arrives, to handle the compaction reorder case.
4. processConnectorRemoval() now also clears the inconsistent set.
This approach requires no in-memory state to distinguish between deleted
connectors and compaction reorder - all decisions are based on the actual
data present in the config topic.
There was a problem hiding this comment.
Pull request overview
This PR improves Kafka Connect’s KafkaConfigBackingStore behavior around config-topic log compaction and connector deletion, aiming to (a) avoid orphaned task/commit records after connector deletion, and (b) correctly handle startup replay when compaction reorders task records ahead of the connector config.
Changes:
- Extend connector deletion to write tombstones for task config keys and the connector commit key, not just connector/target-state keys.
- Defer processing of task commit records when the connector config is not yet present, and apply deferred task configs once the connector config arrives.
- Add a suite of unit tests covering compaction reordering, orphan handling, and deletion/recreation scenarios.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java |
Adds broader tombstone writes on deletion; introduces deferred-apply logic for task configs/commits when connector config arrives late; cleans inconsistent state on removal. |
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java |
Adds multiple new tests for compaction reordering and deletion/orphan scenarios, plus verification of new tombstone-writing behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (taskCount != null) { | ||
| // Also collect task IDs from taskConfigs and deferredTaskUpdates to handle the case | ||
| // where task count was reduced (old task keys with higher indices still exist) | ||
| Set<Integer> allTaskIds = new HashSet<>(); | ||
| for (int i = 0; i < taskCount; i++) { | ||
| allTaskIds.add(i); | ||
| } | ||
| for (ConnectorTaskId taskId : taskConfigs.keySet()) { | ||
| if (taskId.connector().equals(connector)) { | ||
| allTaskIds.add(taskId.task()); | ||
| } | ||
| } | ||
| Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connector); | ||
| if (deferred != null) { | ||
| for (ConnectorTaskId taskId : deferred.keySet()) { | ||
| allTaskIds.add(taskId.task()); | ||
| } | ||
| } | ||
| for (int taskId : allTaskIds) { | ||
| keyValues.add(new ProducerKeyValue(TASK_KEY(new ConnectorTaskId(connector, taskId)), null)); | ||
| } | ||
| keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null)); | ||
| } |
There was a problem hiding this comment.
In removeConnectorConfig, task/commit tombstones are only added when connectorTaskCounts.get(connector) is non-null. If the task count is missing (e.g., state not yet loaded, or corrupted/partially-compacted history), this will still leave an orphaned commit record and/or task configs in the config topic. Consider always writing the COMMIT_TASKS_KEY(connector) tombstone, and tombstoning any task keys discoverable from taskConfigs/deferredTaskUpdates even when taskCount is null (or default to tombstoning indices found in those maps).
| if (taskCount != null) { | |
| // Also collect task IDs from taskConfigs and deferredTaskUpdates to handle the case | |
| // where task count was reduced (old task keys with higher indices still exist) | |
| Set<Integer> allTaskIds = new HashSet<>(); | |
| for (int i = 0; i < taskCount; i++) { | |
| allTaskIds.add(i); | |
| } | |
| for (ConnectorTaskId taskId : taskConfigs.keySet()) { | |
| if (taskId.connector().equals(connector)) { | |
| allTaskIds.add(taskId.task()); | |
| } | |
| } | |
| Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connector); | |
| if (deferred != null) { | |
| for (ConnectorTaskId taskId : deferred.keySet()) { | |
| allTaskIds.add(taskId.task()); | |
| } | |
| } | |
| for (int taskId : allTaskIds) { | |
| keyValues.add(new ProducerKeyValue(TASK_KEY(new ConnectorTaskId(connector, taskId)), null)); | |
| } | |
| keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null)); | |
| } | |
| // Collect all known task IDs for this connector from multiple sources: | |
| // 1) The recorded task count (if available), and | |
| // 2) Any existing entries in taskConfigs and deferredTaskUpdates. | |
| Set<Integer> allTaskIds = new HashSet<>(); | |
| if (taskCount != null) { | |
| for (int i = 0; i < taskCount; i++) { | |
| allTaskIds.add(i); | |
| } | |
| } | |
| for (ConnectorTaskId taskId : taskConfigs.keySet()) { | |
| if (taskId.connector().equals(connector)) { | |
| allTaskIds.add(taskId.task()); | |
| } | |
| } | |
| Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connector); | |
| if (deferred != null) { | |
| for (ConnectorTaskId taskId : deferred.keySet()) { | |
| allTaskIds.add(taskId.task()); | |
| } | |
| } | |
| for (int taskId : allTaskIds) { | |
| keyValues.add(new ProducerKeyValue(TASK_KEY(new ConnectorTaskId(connector, taskId)), null)); | |
| } | |
| keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null)); |
There was a problem hiding this comment.
In actual scenarios, when CTC is null, there will be no task config or commit record for this connector in the config topic, so it is correct not to write a tombstone.
| // Write tombstones for all task config keys and the commit key so that log compaction | ||
| // will eventually remove them. Without these tombstones, orphaned task configs and commit | ||
| // records would persist indefinitely in the config topic after the connector is deleted, | ||
| // which is the root cause of KAFKA-16838. | ||
| synchronized (lock) { | ||
| Integer taskCount = connectorTaskCounts.get(connector); | ||
| if (taskCount != null) { | ||
| // Also collect task IDs from taskConfigs and deferredTaskUpdates to handle the case | ||
| // where task count was reduced (old task keys with higher indices still exist) | ||
| Set<Integer> allTaskIds = new HashSet<>(); | ||
| for (int i = 0; i < taskCount; i++) { | ||
| allTaskIds.add(i); | ||
| } | ||
| for (ConnectorTaskId taskId : taskConfigs.keySet()) { | ||
| if (taskId.connector().equals(connector)) { | ||
| allTaskIds.add(taskId.task()); | ||
| } | ||
| } | ||
| Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connector); | ||
| if (deferred != null) { | ||
| for (ConnectorTaskId taskId : deferred.keySet()) { | ||
| allTaskIds.add(taskId.task()); | ||
| } | ||
| } | ||
| for (int taskId : allTaskIds) { | ||
| keyValues.add(new ProducerKeyValue(TASK_KEY(new ConnectorTaskId(connector, taskId)), null)); | ||
| } | ||
| keyValues.add(new ProducerKeyValue(COMMIT_TASKS_KEY(connector), null)); |
There was a problem hiding this comment.
This change introduces task-key tombstones on deletion, but processTaskConfigRecord currently logs an error and ignores records with null values. With the new deletion behavior, normal connector deletion (and restarts reading compacted tombstones) will generate error logs for each task tombstone, and the tombstone won’t actively clear any cached deferred/task config state if it were present. Consider handling task-config tombstones explicitly (treat null as an expected delete: remove the task from deferredTaskUpdates/taskConfigs and log at debug/trace).
There was a problem hiding this comment.
This is not a new issue introduced by this change
| // will apply the deferred task configs at that point. If no connector config ever arrives | ||
| // (deleted connector), the deferred data remains inert. | ||
| @SuppressWarnings("unchecked") | ||
| int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks")); |
There was a problem hiding this comment.
When the connector config is missing, this branch calls intValue(((Map) value.value()).get("tasks")) without validating the presence/type of the tasks field. If the commit record is malformed (which is more plausible for orphaned/compacted data), intValue will throw a ConnectException that is not caught in the consumer callback, potentially killing config-log consumption. Please add defensive validation (or catch ConnectException) and log+return instead of throwing in this path.
| int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks")); | |
| int newTaskCount; | |
| try { | |
| newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks")); | |
| } catch (ConnectException e) { | |
| log.error( | |
| "Ignoring connector tasks configuration commit for connector '{}' because it " | |
| + "contains an invalid 'tasks' field: {}", | |
| connectorName, value.value(), e | |
| ); | |
| return; | |
| } |
There was a problem hiding this comment.
This is not a new issue introduced by this change—there are also the same unguarded calls in the normal branch.
| // Defer processing: store the task count and mark the connector as inconsistent. | ||
| // If the connector config arrives later (compaction reorder), applyDeferredTaskConfigs() | ||
| // will apply the deferred task configs at that point. If no connector config ever arrives | ||
| // (deleted connector), the deferred data remains inert. | ||
| @SuppressWarnings("unchecked") | ||
| int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks")); | ||
| connectorTaskCounts.put(connectorName, newTaskCount); | ||
| inconsistent.add(connectorName); | ||
| log.warn( | ||
| "Received task commit for connector '{}' (task count: {}) but its connector config " | ||
| + "is not yet present. This may be due to log compaction reordering records " | ||
| + "in the config topic (KAFKA-17719), or the connector may have been deleted. " | ||
| + "Deferring task config application until the connector config is received.", | ||
| connectorName, newTaskCount |
There was a problem hiding this comment.
Adding connectorName to inconsistent when its connector config is absent changes the meaning of ClusterConfigState.inconsistentConnectors(). The javadoc for inconsistentConnectors() states entries “will appear in connectors() since their connector configuration is available” (ClusterConfigState.java:274-276), which will no longer hold for orphaned commits. Either keep inconsistent limited to connectors with configs (and track orphaned task/commit data separately), or update the public contract/docs and audit any callers that assume inconsistency implies a present connector config.
There was a problem hiding this comment.
The Javadoc description is indeed no longer accurate, but there is no actual impact. inconsistentConnectors() has no direct callers in the production code, and the inconsistency check in the tasks() method is a redundant safety measure (Herder has already filtered it out via contains()).
| // If a task commit was previously received before this connector config (due to log compaction | ||
| // reordering), the task configs were deferred. Now that the connector config has arrived, | ||
| // attempt to apply those deferred task configs. See KAFKA-17719. | ||
| applyDeferredTaskConfigs(connectorName); |
There was a problem hiding this comment.
applyDeferredTaskConfigs is invoked on every connector-config record, including after the backing store has started. If the config topic contains legacy orphaned task configs/commit records (e.g., from deletions prior to this fix), those deferred updates can persist in-memory and then be applied later when a connector with the same name is (re)created, potentially resurrecting stale task configs. Since compaction reordering is only relevant during initial log replay, consider gating the call so deferred task configs are only applied while started == false (during startup catch-up), and/or clearing deferred state for connectors that still have no connector config once startup completes.
| applyDeferredTaskConfigs(connectorName); | |
| if (!started) { | |
| applyDeferredTaskConfigs(connectorName); | |
| } |
There was a problem hiding this comment.
The reconfiguration mechanism of Herder is self-healing.
|
This issue was fixed by #3249 |
This pull request addresses two key issues in the Kafka Connect configuration storage: (1) preventing orphaned task configs and commit records from persisting after connector deletion, and (2) improving robustness when log compaction reorders records in the config topic. The changes ensure that all related records are properly removed when a connector is deleted and that deferred task configs are correctly applied if they arrive before the connector config due to log compaction.
Connector deletion cleanup:
removeConnectorConfigmethod to write tombstone records for all associated task config keys and the commit key, ensuring that orphaned task configs and commit records are eventually removed by log compaction after connector deletion. This addresses the root cause of KAFKA-16838.Handling log compaction reordering:
applyDeferredTaskConfigsmethod, which applies deferred task configs when the connector config arrives after task commit records due to log compaction reordering (KAFKA-17719). This method ensures consistency and proper application of task configs.processTasksCommitRecordto defer processing of task commit records if the connector config is not present, instead of immediately removing related data. It now logs the situation and stores the task count, marking the connector as inconsistent until the config arrives.processConnectorConfigRecordto callapplyDeferredTaskConfigswhen a connector config is received, allowing deferred task configs to be applied if they arrived earlier.processConnectorRemovalto remove connectors from theinconsistentset, ensuring that state is fully cleaned up when a connector is deleted.