From 16c62bfe8e72a4596c109fc3f08290e9dac9d626 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Thu, 2 Apr 2026 18:57:26 +0100 Subject: [PATCH 1/2] Fix timer and client leak in OpenSearch JSONResourceWrapper The Timer and RestHighLevelClient were created inside an anonymous TimerTask with no way to cancel or close them. The timer thread runs forever and the client's connection pool is never shut down. Store both as instance fields and override cleanup() to cancel the timer and close the client. cleanup() is called by ParseFilters during bolt cleanup. Co-Authored-By: Claude Opus 4.6 --- .../parse/filter/JSONResourceWrapper.java | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/parse/filter/JSONResourceWrapper.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/parse/filter/JSONResourceWrapper.java index e475afb2e..72bf2e9cf 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/parse/filter/JSONResourceWrapper.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/parse/filter/JSONResourceWrapper.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.util.Map; import java.util.Timer; import java.util.TimerTask; @@ -70,6 +71,8 @@ public class JSONResourceWrapper extends ParseFilter { private static final Logger LOG = LoggerFactory.getLogger(JSONResourceWrapper.class); private ParseFilter delegatedParseFilter; + private Timer refreshTimer; + private RestHighLevelClient osClient; public void configure(@NotNull Map stormConf, @NotNull JsonNode filterParams) { @@ -126,15 +129,13 @@ public void configure(@NotNull Map stormConf, @NotNull JsonNode final JSONResource resource = (JSONResource) delegatedParseFilter; - new Timer() - .schedule( + refreshTimer = new Timer(); + refreshTimer.schedule( new TimerTask() { - private RestHighLevelClient esClient; - public void run() { - if (esClient == null) { + if (osClient == null) { try { - esClient = + osClient = OpenSearchConnection.getClient(stormConf, "config"); } catch (Exception e) { LOG.error( @@ -142,11 +143,11 @@ public void run() { e); } } - if (esClient != null) { + if (osClient != null) { LOG.info("Reloading json resources from OpenSearch"); try { GetResponse response = - esClient.get( + osClient.get( new GetRequest( "config", resource.getResourceFile()), @@ -168,4 +169,18 @@ public void run() { public void filter(String URL, byte[] content, DocumentFragment doc, ParseResult parse) { delegatedParseFilter.filter(URL, content, doc, parse); } + + @Override + public void cleanup() { + if (refreshTimer != null) { + refreshTimer.cancel(); + } + if (osClient != null) { + try { + osClient.close(); + } catch (IOException e) { + LOG.error("Exception when closing OpenSearch client", e); + } + } + } } From 44db6a1257c2437d54e9c5e6c7f848339e0d5cb1 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Thu, 2 Apr 2026 19:11:33 +0100 Subject: [PATCH 2/2] Reformat Signed-off-by: Julien Nioche --- .../parse/filter/JSONResourceWrapper.java | 57 +++++++++---------- 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/parse/filter/JSONResourceWrapper.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/parse/filter/JSONResourceWrapper.java index 72bf2e9cf..41561d79e 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/parse/filter/JSONResourceWrapper.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/parse/filter/JSONResourceWrapper.java @@ -131,38 +131,33 @@ public void configure(@NotNull Map stormConf, @NotNull JsonNode refreshTimer = new Timer(); refreshTimer.schedule( - new TimerTask() { - public void run() { - if (osClient == null) { - try { - osClient = - OpenSearchConnection.getClient(stormConf, "config"); - } catch (Exception e) { - LOG.error( - "Exception while creating OpenSearch connection", - e); - } - } - if (osClient != null) { - LOG.info("Reloading json resources from OpenSearch"); - try { - GetResponse response = - osClient.get( - new GetRequest( - "config", - resource.getResourceFile()), - RequestOptions.DEFAULT); - resource.loadJSONResources( - new ByteArrayInputStream( - response.getSourceAsBytes())); - } catch (Exception e) { - LOG.error("Can't load config from OpenSearch", e); - } - } + new TimerTask() { + public void run() { + if (osClient == null) { + try { + osClient = OpenSearchConnection.getClient(stormConf, "config"); + } catch (Exception e) { + LOG.error("Exception while creating OpenSearch connection", e); } - }, - 0, - refreshRate * 1000); + } + if (osClient != null) { + LOG.info("Reloading json resources from OpenSearch"); + try { + GetResponse response = + osClient.get( + new GetRequest( + "config", resource.getResourceFile()), + RequestOptions.DEFAULT); + resource.loadJSONResources( + new ByteArrayInputStream(response.getSourceAsBytes())); + } catch (Exception e) { + LOG.error("Can't load config from OpenSearch", e); + } + } + } + }, + 0, + refreshRate * 1000); } @Override