diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/FeedParserBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/FeedParserBolt.java index 5d04a80fd..1591c14d9 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/FeedParserBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/FeedParserBolt.java @@ -248,6 +248,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { @Override public void cleanup() { + super.cleanup(); if (parseFilters != null) { parseFilters.cleanup(); } diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java index 333f17653..3b8c89a5c 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java @@ -1000,6 +1000,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { @Override public void cleanup() { + super.cleanup(); protocolFactory.cleanup(); } diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java index b412fd9c0..73b3f4536 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java @@ -570,6 +570,7 @@ protected List toOutlinks( @Override public void cleanup() { + super.cleanup(); if (parseFilters != null) { parseFilters.cleanup(); } diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java index 6597d762a..7beef050b 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java @@ -260,6 +260,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { @Override public void cleanup() { + super.cleanup(); protocolFactory.cleanup(); if (fetchExecutor != null) { fetchExecutor.shutdownNow(); diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/SiteMapParserBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/SiteMapParserBolt.java index 466fa691f..4e77b0f20 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/SiteMapParserBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/SiteMapParserBolt.java @@ -379,6 +379,7 @@ private boolean sniff(byte[] content) { @Override public void cleanup() { + super.cleanup(); if (parseFilters != null) { parseFilters.cleanup(); } diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/StatusEmitterBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/StatusEmitterBolt.java index aa3acefa8..45abeb100 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/StatusEmitterBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/StatusEmitterBolt.java @@ -118,4 +118,11 @@ protected Outlink filterOutlink( protected boolean allowRedirs() { return allowRedirs; } + + @Override + public void cleanup() { + if (urlFilters != null) { + urlFilters.cleanup(); + } + } } diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/URLFilterBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/URLFilterBolt.java index 416d4c57e..0ae7be7bc 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/URLFilterBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/URLFilterBolt.java @@ -122,4 +122,11 @@ public void prepare( urlFilters = URLFilters.fromConf(stormConf); } } + + @Override + public void cleanup() { + if (urlFilters != null) { + urlFilters.cleanup(); + } + } } diff --git a/core/src/main/java/org/apache/stormcrawler/filtering/URLFilter.java b/core/src/main/java/org/apache/stormcrawler/filtering/URLFilter.java index e61f33480..8c50071e4 100644 --- a/core/src/main/java/org/apache/stormcrawler/filtering/URLFilter.java +++ b/core/src/main/java/org/apache/stormcrawler/filtering/URLFilter.java @@ -18,6 +18,7 @@ package org.apache.stormcrawler.filtering; import java.net.URL; +import org.apache.storm.task.IBolt; import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.util.AbstractConfigurable; import org.jetbrains.annotations.NotNull; @@ -46,4 +47,12 @@ public abstract String filter( @Nullable URL sourceUrl, @Nullable Metadata sourceMetadata, @NotNull String urlToFilter); + + /** + * Might be used to clean any resources associated with this {@link URLFilter}. See {@link + * IBolt#cleanup()} for more details. + */ + public void cleanup() { + // nothing to do here + } } diff --git a/core/src/main/java/org/apache/stormcrawler/filtering/URLFilters.java b/core/src/main/java/org/apache/stormcrawler/filtering/URLFilters.java index 1df51a83b..7453c9bf8 100644 --- a/core/src/main/java/org/apache/stormcrawler/filtering/URLFilters.java +++ b/core/src/main/java/org/apache/stormcrawler/filtering/URLFilters.java @@ -142,6 +142,13 @@ public void configure(@NotNull Map stormConf, @NotNull JsonNode filters = list.toArray(new URLFilter[0]); } + @Override + public void cleanup() { + for (URLFilter filter : filters) { + filter.cleanup(); + } + } + /** Utility to check the filtering of a URL. */ public static void main(String[] args) throws ParseException { diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java index 900223fa0..c1a04007d 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/filtering/JSONURLFilterWrapper.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.net.URL; import java.util.Map; import java.util.Timer; @@ -71,6 +72,8 @@ public class JSONURLFilterWrapper extends URLFilter { private static final Logger LOG = LoggerFactory.getLogger(JSONURLFilterWrapper.class); private URLFilter delegatedURLFilter; + private Timer refreshTimer; + private RestHighLevelClient osClient; public void configure(@NotNull Map stormConf, @NotNull JsonNode filterParams) { @@ -127,42 +130,35 @@ public void configure(@NotNull Map stormConf, @NotNull JsonNode final JSONResource resource = (JSONResource) delegatedURLFilter; - new Timer() - .schedule( - new TimerTask() { - private RestHighLevelClient osClient; - - public void run() { - if (osClient == null) { - try { - osClient = - OpenSearchConnection.getClient(stormConf, "config"); - } catch (Exception e) { - LOG.error( - "Exception while creating OpenSearch connection", - e); - } - } - if (osClient != null) { - LOG.info("Reloading json resources from OpenSearch"); - try { - GetResponse response = - osClient.get( - new GetRequest( - "config", - resource.getResourceFile()), - RequestOptions.DEFAULT); - resource.loadJSONResources( - new ByteArrayInputStream( - response.getSourceAsBytes())); - } catch (Exception e) { - LOG.error("Can't load config from OpenSearch", e); - } - } + refreshTimer = new Timer(); + refreshTimer.schedule( + new TimerTask() { + public void run() { + if (osClient == null) { + try { + osClient = OpenSearchConnection.getClient(stormConf, "config"); + } catch (Exception e) { + LOG.error("Exception while creating OpenSearch connection", e); } - }, - 0, - refreshRate * 1000); + } + if (osClient != null) { + LOG.info("Reloading json resources from OpenSearch"); + try { + GetResponse response = + osClient.get( + new GetRequest( + "config", resource.getResourceFile()), + RequestOptions.DEFAULT); + resource.loadJSONResources( + new ByteArrayInputStream(response.getSourceAsBytes())); + } catch (Exception e) { + LOG.error("Can't load config from OpenSearch", e); + } + } + } + }, + 0, + refreshRate * 1000); } @Override @@ -172,4 +168,18 @@ public void run() { @NotNull String urlToFilter) { return delegatedURLFilter.filter(sourceUrl, sourceMetadata, urlToFilter); } + + @Override + public void cleanup() { + if (refreshTimer != null) { + refreshTimer.cancel(); + } + if (osClient != null) { + try { + osClient.close(); + } catch (IOException e) { + LOG.error("Exception when closing OpenSearch client", e); + } + } + } }