From 1661088b48a2bc4421b8f543371bc251d2c70510 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Thu, 2 Apr 2026 18:46:42 +0100 Subject: [PATCH] Fix client leak on BulkProcessor or Sniffer construction failure In getConnection(), if BulkProcessor.builder().build() or Sniffer.builder().build() throws, the already-created RestHighLevelClient (and its connection pool/threads) is leaked. Similarly, if Sniffer construction fails, the BulkProcessor is also leaked. Wrap construction in try-catch to close already-created resources on failure. Co-Authored-By: Claude Opus 4.6 --- .../opensearch/OpenSearchConnection.java | 56 ++++++++++++------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java index c3662a098..58d38df24 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/OpenSearchConnection.java @@ -286,27 +286,45 @@ public static OpenSearchConnection getConnection( new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory( bufferSize * 1024 * 1024)); - final BulkProcessor bulkProcessor = - BulkProcessor.builder( - (request, bulkListener) -> - client.bulkAsync( - request, - requestOptionsBuilder.build(), - bulkListener), - listener) - .setFlushInterval(flushInterval) - .setBulkActions(bulkActions) - .setConcurrentRequests(concurrentRequests) - .build(); - - boolean sniff = - ConfUtils.getBoolean(stormConf, Constants.PARAMPREFIX, dottedType, "sniff", true); + BulkProcessor bulkProcessor = null; Sniffer sniffer = null; - if (sniff) { - sniffer = Sniffer.builder(client.getLowLevelClient()).build(); - } + try { + bulkProcessor = + BulkProcessor.builder( + (request, bulkListener) -> + client.bulkAsync( + request, + requestOptionsBuilder.build(), + bulkListener), + listener) + .setFlushInterval(flushInterval) + .setBulkActions(bulkActions) + .setConcurrentRequests(concurrentRequests) + .build(); + + boolean sniff = + ConfUtils.getBoolean( + stormConf, Constants.PARAMPREFIX, dottedType, "sniff", true); + if (sniff) { + sniffer = Sniffer.builder(client.getLowLevelClient()).build(); + } - return new OpenSearchConnection(client, bulkProcessor, sniffer); + return new OpenSearchConnection(client, bulkProcessor, sniffer); + } catch (Exception e) { + if (bulkProcessor != null) { + try { + bulkProcessor.close(); + } catch (Exception suppressed) { + e.addSuppressed(suppressed); + } + } + try { + client.close(); + } catch (IOException suppressed) { + e.addSuppressed(suppressed); + } + throw e; + } } private boolean isClosed = false;