Skip to content

Commit cdac7e4

Browse files
jniocheclaude
andauthored
Add cleanup() to URLFilter and fix timer/client leak in JSONURLFilterWrapper (#1867)
* Fix timer and client leak in OpenSearch JSONURLFilterWrapper Add cleanup() lifecycle method to URLFilter (mirroring ParseFilter.cleanup()) and wire it through URLFilters, StatusEmitterBolt, URLFilterBolt, and all bolt subclasses via super.cleanup() calls. The Timer and RestHighLevelClient in JSONURLFilterWrapper were created inside an anonymous TimerTask with no way to cancel or close them. The timer thread runs forever and the client's connection pool is never shut down. Store both as instance fields and override cleanup() to cancel the timer and close the client. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Reformat Signed-off-by: Julien Nioche <julien@digitalpebble.com> --------- Signed-off-by: Julien Nioche <julien@digitalpebble.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 29d3c26 commit cdac7e4

10 files changed

Lines changed: 80 additions & 35 deletions

File tree

core/src/main/java/org/apache/stormcrawler/bolt/FeedParserBolt.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
248248

249249
@Override
250250
public void cleanup() {
251+
super.cleanup();
251252
if (parseFilters != null) {
252253
parseFilters.cleanup();
253254
}

core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
10001000

10011001
@Override
10021002
public void cleanup() {
1003+
super.cleanup();
10031004
protocolFactory.cleanup();
10041005
}
10051006

core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,7 @@ protected List<Outlink> toOutlinks(
570570

571571
@Override
572572
public void cleanup() {
573+
super.cleanup();
573574
if (parseFilters != null) {
574575
parseFilters.cleanup();
575576
}

core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
260260

261261
@Override
262262
public void cleanup() {
263+
super.cleanup();
263264
protocolFactory.cleanup();
264265
if (fetchExecutor != null) {
265266
fetchExecutor.shutdownNow();

core/src/main/java/org/apache/stormcrawler/bolt/SiteMapParserBolt.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ private boolean sniff(byte[] content) {
379379

380380
@Override
381381
public void cleanup() {
382+
super.cleanup();
382383
if (parseFilters != null) {
383384
parseFilters.cleanup();
384385
}

core/src/main/java/org/apache/stormcrawler/bolt/StatusEmitterBolt.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,11 @@ protected Outlink filterOutlink(
118118
protected boolean allowRedirs() {
119119
return allowRedirs;
120120
}
121+
122+
@Override
123+
public void cleanup() {
124+
if (urlFilters != null) {
125+
urlFilters.cleanup();
126+
}
127+
}
121128
}

core/src/main/java/org/apache/stormcrawler/bolt/URLFilterBolt.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,4 +122,11 @@ public void prepare(
122122
urlFilters = URLFilters.fromConf(stormConf);
123123
}
124124
}
125+
126+
@Override
127+
public void cleanup() {
128+
if (urlFilters != null) {
129+
urlFilters.cleanup();
130+
}
131+
}
125132
}

core/src/main/java/org/apache/stormcrawler/filtering/URLFilter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.stormcrawler.filtering;
1919

2020
import java.net.URL;
21+
import org.apache.storm.task.IBolt;
2122
import org.apache.stormcrawler.Metadata;
2223
import org.apache.stormcrawler.util.AbstractConfigurable;
2324
import org.jetbrains.annotations.NotNull;
@@ -46,4 +47,12 @@ public abstract String filter(
4647
@Nullable URL sourceUrl,
4748
@Nullable Metadata sourceMetadata,
4849
@NotNull String urlToFilter);
50+
51+
/**
52+
* Might be used to clean any resources associated with this {@link URLFilter}. See {@link
53+
* IBolt#cleanup()} for more details.
54+
*/
55+
public void cleanup() {
56+
// nothing to do here
57+
}
4958
}

core/src/main/java/org/apache/stormcrawler/filtering/URLFilters.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,13 @@ public void configure(@NotNull Map<String, Object> stormConf, @NotNull JsonNode
142142
filters = list.toArray(new URLFilter[0]);
143143
}
144144

145+
@Override
146+
public void cleanup() {
147+
for (URLFilter filter : filters) {
148+
filter.cleanup();
149+
}
150+
}
151+
145152
/** Utility to check the filtering of a URL. */
146153
public static void main(String[] args) throws ParseException {
147154

external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.fasterxml.jackson.databind.JsonNode;
2121
import java.io.ByteArrayInputStream;
22+
import java.io.IOException;
2223
import java.net.URL;
2324
import java.util.Map;
2425
import java.util.Timer;
@@ -71,6 +72,8 @@ public class JSONURLFilterWrapper extends URLFilter {
7172
private static final Logger LOG = LoggerFactory.getLogger(JSONURLFilterWrapper.class);
7273

7374
private URLFilter delegatedURLFilter;
75+
private Timer refreshTimer;
76+
private RestHighLevelClient osClient;
7477

7578
public void configure(@NotNull Map<String, Object> stormConf, @NotNull JsonNode filterParams) {
7679

@@ -127,42 +130,35 @@ public void configure(@NotNull Map<String, Object> stormConf, @NotNull JsonNode
127130

128131
final JSONResource resource = (JSONResource) delegatedURLFilter;
129132

130-
new Timer()
131-
.schedule(
132-
new TimerTask() {
133-
private RestHighLevelClient osClient;
134-
135-
public void run() {
136-
if (osClient == null) {
137-
try {
138-
osClient =
139-
OpenSearchConnection.getClient(stormConf, "config");
140-
} catch (Exception e) {
141-
LOG.error(
142-
"Exception while creating OpenSearch connection",
143-
e);
144-
}
145-
}
146-
if (osClient != null) {
147-
LOG.info("Reloading json resources from OpenSearch");
148-
try {
149-
GetResponse response =
150-
osClient.get(
151-
new GetRequest(
152-
"config",
153-
resource.getResourceFile()),
154-
RequestOptions.DEFAULT);
155-
resource.loadJSONResources(
156-
new ByteArrayInputStream(
157-
response.getSourceAsBytes()));
158-
} catch (Exception e) {
159-
LOG.error("Can't load config from OpenSearch", e);
160-
}
161-
}
133+
refreshTimer = new Timer();
134+
refreshTimer.schedule(
135+
new TimerTask() {
136+
public void run() {
137+
if (osClient == null) {
138+
try {
139+
osClient = OpenSearchConnection.getClient(stormConf, "config");
140+
} catch (Exception e) {
141+
LOG.error("Exception while creating OpenSearch connection", e);
162142
}
163-
},
164-
0,
165-
refreshRate * 1000);
143+
}
144+
if (osClient != null) {
145+
LOG.info("Reloading json resources from OpenSearch");
146+
try {
147+
GetResponse response =
148+
osClient.get(
149+
new GetRequest(
150+
"config", resource.getResourceFile()),
151+
RequestOptions.DEFAULT);
152+
resource.loadJSONResources(
153+
new ByteArrayInputStream(response.getSourceAsBytes()));
154+
} catch (Exception e) {
155+
LOG.error("Can't load config from OpenSearch", e);
156+
}
157+
}
158+
}
159+
},
160+
0,
161+
refreshRate * 1000);
166162
}
167163

168164
@Override
@@ -172,4 +168,18 @@ public void run() {
172168
@NotNull String urlToFilter) {
173169
return delegatedURLFilter.filter(sourceUrl, sourceMetadata, urlToFilter);
174170
}
171+
172+
@Override
173+
public void cleanup() {
174+
if (refreshTimer != null) {
175+
refreshTimer.cancel();
176+
}
177+
if (osClient != null) {
178+
try {
179+
osClient.close();
180+
} catch (IOException e) {
181+
LOG.error("Exception when closing OpenSearch client", e);
182+
}
183+
}
184+
}
175185
}

0 commit comments

Comments
 (0)