From 12d5aa12445a00065e439dc283faa3cad66d5b78 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Thu, 2 Apr 2026 19:05:08 +0100 Subject: [PATCH 1/2] Fix timer and client leak in OpenSearch JSONURLFilterWrapper Add cleanup() lifecycle method to URLFilter (mirroring ParseFilter.cleanup()) and wire it through URLFilters, StatusEmitterBolt, URLFilterBolt, and all bolt subclasses via super.cleanup() calls. The Timer and RestHighLevelClient in JSONURLFilterWrapper were created inside an anonymous TimerTask with no way to cancel or close them. The timer thread runs forever and the client's connection pool is never shut down. Store both as instance fields and override cleanup() to cancel the timer and close the client. Co-Authored-By: Claude Opus 4.6 --- .../stormcrawler/bolt/FeedParserBolt.java | 1 + .../apache/stormcrawler/bolt/FetcherBolt.java | 1 + .../stormcrawler/bolt/JSoupParserBolt.java | 1 + .../stormcrawler/bolt/SimpleFetcherBolt.java | 1 + .../stormcrawler/bolt/SiteMapParserBolt.java | 1 + .../stormcrawler/bolt/StatusEmitterBolt.java | 7 ++++++ .../stormcrawler/bolt/URLFilterBolt.java | 7 ++++++ .../stormcrawler/filtering/URLFilter.java | 9 ++++++++ .../stormcrawler/filtering/URLFilters.java | 7 ++++++ .../filtering/JSONURLFilterWrapper.java | 23 +++++++++++++++---- 10 files changed, 54 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/FeedParserBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/FeedParserBolt.java index 5d04a80fd..1591c14d9 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/FeedParserBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/FeedParserBolt.java @@ -248,6 +248,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { @Override public void cleanup() { + super.cleanup(); if (parseFilters != null) { parseFilters.cleanup(); } diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java index 333f17653..3b8c89a5c 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java @@ -1000,6 +1000,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { @Override public void cleanup() { + super.cleanup(); protocolFactory.cleanup(); } diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java index b412fd9c0..73b3f4536 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java @@ -570,6 +570,7 @@ protected List toOutlinks( @Override public void cleanup() { + super.cleanup(); if (parseFilters != null) { parseFilters.cleanup(); } diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java index 6597d762a..7beef050b 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java @@ -260,6 +260,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { @Override public void cleanup() { + super.cleanup(); protocolFactory.cleanup(); if (fetchExecutor != null) { fetchExecutor.shutdownNow(); diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/SiteMapParserBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/SiteMapParserBolt.java index 466fa691f..4e77b0f20 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/SiteMapParserBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/SiteMapParserBolt.java @@ -379,6 +379,7 @@ private boolean sniff(byte[] content) { @Override public void cleanup() { + super.cleanup(); if (parseFilters != null) { parseFilters.cleanup(); } diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/StatusEmitterBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/StatusEmitterBolt.java index aa3acefa8..45abeb100 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/StatusEmitterBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/StatusEmitterBolt.java @@ -118,4 +118,11 @@ protected Outlink filterOutlink( protected boolean allowRedirs() { return allowRedirs; } + + @Override + public void cleanup() { + if (urlFilters != null) { + urlFilters.cleanup(); + } + } } diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/URLFilterBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/URLFilterBolt.java index 416d4c57e..0ae7be7bc 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/URLFilterBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/URLFilterBolt.java @@ -122,4 +122,11 @@ public void prepare( urlFilters = URLFilters.fromConf(stormConf); } } + + @Override + public void cleanup() { + if (urlFilters != null) { + urlFilters.cleanup(); + } + } } diff --git a/core/src/main/java/org/apache/stormcrawler/filtering/URLFilter.java b/core/src/main/java/org/apache/stormcrawler/filtering/URLFilter.java index e61f33480..8c50071e4 100644 --- a/core/src/main/java/org/apache/stormcrawler/filtering/URLFilter.java +++ b/core/src/main/java/org/apache/stormcrawler/filtering/URLFilter.java @@ -18,6 +18,7 @@ package org.apache.stormcrawler.filtering; import java.net.URL; +import org.apache.storm.task.IBolt; import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.util.AbstractConfigurable; import org.jetbrains.annotations.NotNull; @@ -46,4 +47,12 @@ public abstract String filter( @Nullable URL sourceUrl, @Nullable Metadata sourceMetadata, @NotNull String urlToFilter); + + /** + * Might be used to clean any resources associated with this {@link URLFilter}. See {@link + * IBolt#cleanup()} for more details. + */ + public void cleanup() { + // nothing to do here + } } diff --git a/core/src/main/java/org/apache/stormcrawler/filtering/URLFilters.java b/core/src/main/java/org/apache/stormcrawler/filtering/URLFilters.java index 1df51a83b..7453c9bf8 100644 --- a/core/src/main/java/org/apache/stormcrawler/filtering/URLFilters.java +++ b/core/src/main/java/org/apache/stormcrawler/filtering/URLFilters.java @@ -142,6 +142,13 @@ public void configure(@NotNull Map stormConf, @NotNull JsonNode filters = list.toArray(new URLFilter[0]); } + @Override + public void cleanup() { + for (URLFilter filter : filters) { + filter.cleanup(); + } + } + /** Utility to check the filtering of a URL. */ public static void main(String[] args) throws ParseException { diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java index 900223fa0..523a86f93 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.net.URL; import java.util.Map; import java.util.Timer; @@ -71,6 +72,8 @@ public class JSONURLFilterWrapper extends URLFilter { private static final Logger LOG = LoggerFactory.getLogger(JSONURLFilterWrapper.class); private URLFilter delegatedURLFilter; + private Timer refreshTimer; + private RestHighLevelClient osClient; public void configure(@NotNull Map stormConf, @NotNull JsonNode filterParams) { @@ -127,11 +130,9 @@ public void configure(@NotNull Map stormConf, @NotNull JsonNode final JSONResource resource = (JSONResource) delegatedURLFilter; - new Timer() - .schedule( + refreshTimer = new Timer(); + refreshTimer.schedule( new TimerTask() { - private RestHighLevelClient osClient; - public void run() { if (osClient == null) { try { @@ -172,4 +173,18 @@ public void run() { @NotNull String urlToFilter) { return delegatedURLFilter.filter(sourceUrl, sourceMetadata, urlToFilter); } + + @Override + public void cleanup() { + if (refreshTimer != null) { + refreshTimer.cancel(); + } + if (osClient != null) { + try { + osClient.close(); + } catch (IOException e) { + LOG.error("Exception when closing OpenSearch client", e); + } + } + } } From f5c9b4335e8c4ed042f3894ddb9dcab3eb596ae3 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Thu, 2 Apr 2026 19:19:31 +0100 Subject: [PATCH 2/2] Reformat Signed-off-by: Julien Nioche --- .../filtering/JSONURLFilterWrapper.java | 57 +++++++++---------- 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java index 523a86f93..c1a04007d 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java @@ -132,38 +132,33 @@ public void configure(@NotNull Map stormConf, @NotNull JsonNode refreshTimer = new Timer(); refreshTimer.schedule( - new TimerTask() { - public void run() { - if (osClient == null) { - try { - osClient = - OpenSearchConnection.getClient(stormConf, "config"); - } catch (Exception e) { - LOG.error( - "Exception while creating OpenSearch connection", - e); - } - } - if (osClient != null) { - LOG.info("Reloading json resources from OpenSearch"); - try { - GetResponse response = - osClient.get( - new GetRequest( - "config", - resource.getResourceFile()), - RequestOptions.DEFAULT); - resource.loadJSONResources( - new ByteArrayInputStream( - response.getSourceAsBytes())); - } catch (Exception e) { - LOG.error("Can't load config from OpenSearch", e); - } - } + new TimerTask() { + public void run() { + if (osClient == null) { + try { + osClient = OpenSearchConnection.getClient(stormConf, "config"); + } catch (Exception e) { + LOG.error("Exception while creating OpenSearch connection", e); } - }, - 0, - refreshRate * 1000); + } + if (osClient != null) { + LOG.info("Reloading json resources from OpenSearch"); + try { + GetResponse response = + osClient.get( + new GetRequest( + "config", resource.getResourceFile()), + RequestOptions.DEFAULT); + resource.loadJSONResources( + new ByteArrayInputStream(response.getSourceAsBytes())); + } catch (Exception e) { + LOG.error("Can't load config from OpenSearch", e); + } + } + } + }, + 0, + refreshRate * 1000); } @Override