diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java index 680c65103..333f17653 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java @@ -34,7 +34,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; @@ -80,6 +87,14 @@ public class FetcherBolt extends StatusEmitterBolt { */ public static final String QUEUED_TIMEOUT_PARAM_KEY = "fetcher.timeout.queue"; + /** + * Hard timeout in seconds for a single call to {@link Protocol#getProtocolOutput}. If a fetch + * exceeds this duration the thread is interrupted, the URL is marked as FETCH_ERROR, and the + * thread moves on to the next item. A value of {@code -1} (the default) disables the bolt-level + * timeout, relying solely on the protocol-level socket timeouts. + */ + public static final String FETCH_TIMEOUT_PARAM_KEY = "fetcher.thread.timeout"; + /** Key name of the custom crawl delay for a queue that may be present in the metadata. */ private static final String CRAWL_DELAY_KEY_NAME = "crawl.delay"; @@ -463,6 +478,15 @@ private class FetcherThread extends Thread { private long timeoutInQueues = -1; + /** Hard timeout in seconds for a single protocol fetch. -1 means disabled. */ + private long fetchTimeout = -1; + + /** + * Single-thread executor used to run the protocol call so that it can be interrupted via + * {@link Future#cancel(boolean)} when the bolt-level timeout fires. + */ + private final ExecutorService fetchExecutor; + // by default remains as is-pre 1.17 private String protocolMetadataPrefix = ""; @@ -476,11 +500,24 @@ public FetcherThread(Config conf, int num) { this.crawlDelayForce = ConfUtils.getBoolean(conf, "fetcher.server.delay.force", false); this.threadNum = num; timeoutInQueues = ConfUtils.getLong(conf, QUEUED_TIMEOUT_PARAM_KEY, timeoutInQueues); + fetchTimeout = ConfUtils.getLong(conf, FETCH_TIMEOUT_PARAM_KEY, fetchTimeout); protocolMetadataPrefix = ConfUtils.getString( conf, ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM, protocolMetadataPrefix); + + if (fetchTimeout > 0) { + fetchExecutor = + Executors.newSingleThreadExecutor( + r -> { + Thread t = new Thread(r, "FetcherTimeout #" + num); + t.setDaemon(true); + return t; + }); + } else { + fetchExecutor = null; + } } @Override @@ -665,7 +702,35 @@ public void run() { continue; } - ProtocolResponse response = protocol.getProtocolOutput(fit.url, metadata); + final Metadata fetchMetadata = metadata; + ProtocolResponse response; + if (fetchExecutor != null) { + Future future = + fetchExecutor.submit( + () -> protocol.getProtocolOutput(fit.url, fetchMetadata)); + try { + response = future.get(fetchTimeout, TimeUnit.SECONDS); + } catch (TimeoutException e) { + future.cancel(true); + throw new Exception( + "Fetch timed out after " + + fetchTimeout + + "s fetching " + + fit.url, + e); + } catch (CancellationException e) { + throw new Exception("Fetch cancelled for " + fit.url); + } catch (ExecutionException e) { + // unwrap the real cause so existing catch logic handles it + Throwable cause = e.getCause(); + if (cause instanceof Exception) { + throw (Exception) cause; + } + throw new Exception(cause); + } + } else { + response = protocol.getProtocolOutput(fit.url, metadata); + } long timeFetching = System.currentTimeMillis() - start; diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java index 410f34f92..6597d762a 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java @@ -28,7 +28,13 @@ import java.text.SimpleDateFormat; import java.util.Locale; import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHeaders; @@ -122,6 +128,11 @@ public class SimpleFetcherBolt extends StatusEmitterBolt { // by default remains as is-pre 1.17 private String protocolMetadataPrefix = ""; + /** Hard timeout in seconds for a single protocol fetch. -1 means disabled. */ + private long fetchTimeout = -1; + + private ExecutorService fetchExecutor; + private void checkConfiguration() { // ensure that a value has been set for the agent name and that that @@ -226,6 +237,18 @@ public Object getValueAndReset() { this.protocolMetadataPrefix = ConfUtils.getString( conf, ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM, protocolMetadataPrefix); + + this.fetchTimeout = + ConfUtils.getLong(conf, FetcherBolt.FETCH_TIMEOUT_PARAM_KEY, fetchTimeout); + if (fetchTimeout > 0) { + fetchExecutor = + Executors.newSingleThreadExecutor( + r -> { + Thread t = new Thread(r, "SimpleFetcherTimeout #" + taskId); + t.setDaemon(true); + return t; + }); + } } @Override @@ -238,6 +261,9 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { @Override public void cleanup() { protocolFactory.cleanup(); + if (fetchExecutor != null) { + fetchExecutor.shutdownNow(); + } } @Override @@ -425,7 +451,31 @@ public void execute(Tuple input) { activeThreads.incrementAndGet(); long start = System.currentTimeMillis(); - ProtocolResponse response = protocol.getProtocolOutput(urlString, metadata); + final String fetchUrl = urlString; + final Metadata fetchMetadata = metadata; + ProtocolResponse response; + if (fetchExecutor != null) { + Future future = + fetchExecutor.submit( + () -> protocol.getProtocolOutput(fetchUrl, fetchMetadata)); + try { + response = future.get(fetchTimeout, TimeUnit.SECONDS); + } catch (TimeoutException e) { + future.cancel(true); + throw new Exception( + "Fetch timed out after " + fetchTimeout + "s fetching " + urlString, e); + } catch (CancellationException e) { + throw new Exception("Fetch cancelled for " + urlString); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof Exception) { + throw (Exception) cause; + } + throw new Exception(cause); + } + } else { + response = protocol.getProtocolOutput(urlString, metadata); + } long timeFetching = System.currentTimeMillis() - start; final int byteLength = response.getContent().length; diff --git a/core/src/main/resources/crawler-default.yaml b/core/src/main/resources/crawler-default.yaml index 3611e54fb..27e629791 100644 --- a/core/src/main/resources/crawler-default.yaml +++ b/core/src/main/resources/crawler-default.yaml @@ -29,6 +29,9 @@ config: fetcher.max.urls.in.queues: -1 fetcher.max.queue.size: -1 fetcher.timeout.queue: -1 + # hard timeout in seconds for a single protocol fetch at the bolt level; + # -1 disables (relies on protocol-level socket timeouts only) + fetcher.thread.timeout: -1 # max. crawl-delay accepted in robots.txt (in seconds) fetcher.max.crawl.delay: 30 # behavior of fetcher when the crawl-delay in the robots.txt diff --git a/core/src/test/java/org/apache/stormcrawler/bolt/AbstractFetcherBoltTest.java b/core/src/test/java/org/apache/stormcrawler/bolt/AbstractFetcherBoltTest.java index 7de88b7ed..8fc4d9025 100644 --- a/core/src/test/java/org/apache/stormcrawler/bolt/AbstractFetcherBoltTest.java +++ b/core/src/test/java/org/apache/stormcrawler/bolt/AbstractFetcherBoltTest.java @@ -37,8 +37,10 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.Utils; import org.apache.stormcrawler.Constants; +import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.TestOutputCollector; import org.apache.stormcrawler.TestUtil; +import org.apache.stormcrawler.persistence.Status; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -104,4 +106,48 @@ void test304(WireMockRuntimeInfo wmRuntimeInfo) { // index Assertions.assertEquals(0, output.getEmitted(Utils.DEFAULT_STREAM_ID).size()); } + + @Test + void testThreadTimeout(WireMockRuntimeInfo wmRuntimeInfo) { + // server delays response for 10 seconds — longer than the bolt timeout + stubFor( + get(urlMatching(".+")) + .willReturn(aResponse().withStatus(200).withFixedDelay(10_000))); + + TestOutputCollector output = new TestOutputCollector(); + Map config = new HashMap<>(); + config.put("http.agent.name", "this_is_only_a_test"); + // bolt-level timeout: 2 seconds + config.put("fetcher.thread.timeout", 2L); + // raise the socket timeout so the bolt timeout fires first + config.put("http.timeout", 30_000); + bolt.prepare(config, TestUtil.getMockedTopologyContext(), new OutputCollector(output)); + + Tuple tuple = mock(Tuple.class); + when(tuple.getSourceComponent()).thenReturn("source"); + when(tuple.getStringByField("url")) + .thenReturn("http://localhost:" + wmRuntimeInfo.getHttpPort() + "/slow"); + when(tuple.getValueByField("metadata")).thenReturn(null); + bolt.execute(tuple); + + // the bolt should ack within ~2s + margin, not wait the full 10s + await().atMost(8, TimeUnit.SECONDS).until(() -> output.getAckedTuples().size() > 0); + + Assertions.assertTrue(output.getAckedTuples().contains(tuple)); + + // should have emitted a FETCH_ERROR on the status stream + List> statusTuples = output.getEmitted(Constants.StatusStreamName); + Assertions.assertEquals(1, statusTuples.size()); + Status status = (Status) statusTuples.get(0).get(2); + Assertions.assertEquals(Status.FETCH_ERROR, status); + + // verify the metadata records the timeout reason + Metadata metadata = (Metadata) statusTuples.get(0).get(1); + String exception = metadata.getFirstValue("fetch.exception"); + Assertions.assertNotNull(exception); + Assertions.assertEquals("Socket timeout fetching", exception); + + // nothing on the default stream — no content was fetched + Assertions.assertEquals(0, output.getEmitted(Utils.DEFAULT_STREAM_ID).size()); + } }