diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/parse/filter/JSONResourceWrapper.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/parse/filter/JSONResourceWrapper.java index e475afb2e..41561d79e 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/parse/filter/JSONResourceWrapper.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/parse/filter/JSONResourceWrapper.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.JsonNode; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.util.Map; import java.util.Timer; import java.util.TimerTask; @@ -70,6 +71,8 @@ public class JSONResourceWrapper extends ParseFilter { private static final Logger LOG = LoggerFactory.getLogger(JSONResourceWrapper.class); private ParseFilter delegatedParseFilter; + private Timer refreshTimer; + private RestHighLevelClient osClient; public void configure(@NotNull Map stormConf, @NotNull JsonNode filterParams) { @@ -126,46 +129,53 @@ public void configure(@NotNull Map stormConf, @NotNull JsonNode final JSONResource resource = (JSONResource) delegatedParseFilter; - new Timer() - .schedule( - new TimerTask() { - private RestHighLevelClient esClient; - - public void run() { - if (esClient == null) { - try { - esClient = - OpenSearchConnection.getClient(stormConf, "config"); - } catch (Exception e) { - LOG.error( - "Exception while creating OpenSearch connection", - e); - } - } - if (esClient != null) { - LOG.info("Reloading json resources from OpenSearch"); - try { - GetResponse response = - esClient.get( - new GetRequest( - "config", - resource.getResourceFile()), - RequestOptions.DEFAULT); - resource.loadJSONResources( - new ByteArrayInputStream( - response.getSourceAsBytes())); - } catch (Exception e) { - LOG.error("Can't load config from OpenSearch", e); - } - } + refreshTimer = new Timer(); + refreshTimer.schedule( + new TimerTask() { + public void run() { + if (osClient == null) { + try { + osClient = OpenSearchConnection.getClient(stormConf, "config"); + } catch (Exception e) { + LOG.error("Exception while creating OpenSearch connection", e); } - }, - 0, - refreshRate * 1000); + } + if (osClient != null) { + LOG.info("Reloading json resources from OpenSearch"); + try { + GetResponse response = + osClient.get( + new GetRequest( + "config", resource.getResourceFile()), + RequestOptions.DEFAULT); + resource.loadJSONResources( + new ByteArrayInputStream(response.getSourceAsBytes())); + } catch (Exception e) { + LOG.error("Can't load config from OpenSearch", e); + } + } + } + }, + 0, + refreshRate * 1000); } @Override public void filter(String URL, byte[] content, DocumentFragment doc, ParseResult parse) { delegatedParseFilter.filter(URL, content, doc, parse); } + + @Override + public void cleanup() { + if (refreshTimer != null) { + refreshTimer.cancel(); + } + if (osClient != null) { + try { + osClient.close(); + } catch (IOException e) { + LOG.error("Exception when closing OpenSearch client", e); + } + } + } }