From db9fc5d0b2100e5403b7443dfc63a4aacb515aed Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Wed, 1 Apr 2026 08:55:23 +0100 Subject: [PATCH 1/2] Add bolt-level timeout for fetcher threads to prevent stuck fetches Wraps protocol.getProtocolOutput() in a Future with a configurable hard timeout (fetcher.thread.timeout). When a fetch exceeds the timeout the future is cancelled, the URL is marked as FETCH_ERROR, and the thread moves on. Disabled by default (-1). Fixes #996 Co-Authored-By: Claude Opus 4.6 --- .../apache/stormcrawler/bolt/FetcherBolt.java | 67 ++++++++++++++++++- .../stormcrawler/bolt/SimpleFetcherBolt.java | 56 +++++++++++++++- core/src/main/resources/crawler-default.yaml | 3 + .../bolt/AbstractFetcherBoltTest.java | 47 +++++++++++++ 4 files changed, 171 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java index 680c65103..b25b6fa2d 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java @@ -34,7 +34,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; @@ -80,6 +87,14 @@ public class FetcherBolt extends StatusEmitterBolt { */ public static final String QUEUED_TIMEOUT_PARAM_KEY = "fetcher.timeout.queue"; + /** + * Hard timeout in seconds for a single call to {@link Protocol#getProtocolOutput}. If a fetch + * exceeds this duration the thread is interrupted, the URL is marked as FETCH_ERROR, and the + * thread moves on to the next item. A value of {@code -1} (the default) disables the + * bolt-level timeout, relying solely on the protocol-level socket timeouts. + */ + public static final String FETCH_TIMEOUT_PARAM_KEY = "fetcher.thread.timeout"; + /** Key name of the custom crawl delay for a queue that may be present in the metadata. */ private static final String CRAWL_DELAY_KEY_NAME = "crawl.delay"; @@ -463,6 +478,15 @@ private class FetcherThread extends Thread { private long timeoutInQueues = -1; + /** Hard timeout in seconds for a single protocol fetch. -1 means disabled. */ + private long fetchTimeout = -1; + + /** + * Single-thread executor used to run the protocol call so that it can be interrupted via + * {@link Future#cancel(boolean)} when the bolt-level timeout fires. + */ + private final ExecutorService fetchExecutor; + // by default remains as is-pre 1.17 private String protocolMetadataPrefix = ""; @@ -476,11 +500,24 @@ public FetcherThread(Config conf, int num) { this.crawlDelayForce = ConfUtils.getBoolean(conf, "fetcher.server.delay.force", false); this.threadNum = num; timeoutInQueues = ConfUtils.getLong(conf, QUEUED_TIMEOUT_PARAM_KEY, timeoutInQueues); + fetchTimeout = ConfUtils.getLong(conf, FETCH_TIMEOUT_PARAM_KEY, fetchTimeout); protocolMetadataPrefix = ConfUtils.getString( conf, ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM, protocolMetadataPrefix); + + if (fetchTimeout > 0) { + fetchExecutor = + Executors.newSingleThreadExecutor( + r -> { + Thread t = new Thread(r, "FetcherTimeout #" + num); + t.setDaemon(true); + return t; + }); + } else { + fetchExecutor = null; + } } @Override @@ -665,7 +702,35 @@ public void run() { continue; } - ProtocolResponse response = protocol.getProtocolOutput(fit.url, metadata); + final Metadata fetchMetadata = metadata; + ProtocolResponse response; + if (fetchExecutor != null) { + Future future = + fetchExecutor.submit( + () -> protocol.getProtocolOutput(fit.url, fetchMetadata)); + try { + response = future.get(fetchTimeout, TimeUnit.SECONDS); + } catch (TimeoutException e) { + future.cancel(true); + throw new Exception( + "Fetch timed out after " + + fetchTimeout + + "s fetching " + + fit.url, + e); + } catch (CancellationException e) { + throw new Exception("Fetch cancelled for " + fit.url); + } catch (ExecutionException e) { + // unwrap the real cause so existing catch logic handles it + Throwable cause = e.getCause(); + if (cause instanceof Exception) { + throw (Exception) cause; + } + throw new Exception(cause); + } + } else { + response = protocol.getProtocolOutput(fit.url, metadata); + } long timeFetching = System.currentTimeMillis() - start; diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java index 410f34f92..6c1d69ea6 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java @@ -28,7 +28,13 @@ import java.text.SimpleDateFormat; import java.util.Locale; import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHeaders; @@ -122,6 +128,11 @@ public class SimpleFetcherBolt extends StatusEmitterBolt { // by default remains as is-pre 1.17 private String protocolMetadataPrefix = ""; + /** Hard timeout in seconds for a single protocol fetch. -1 means disabled. */ + private long fetchTimeout = -1; + + private ExecutorService fetchExecutor; + private void checkConfiguration() { // ensure that a value has been set for the agent name and that that @@ -226,6 +237,18 @@ public Object getValueAndReset() { this.protocolMetadataPrefix = ConfUtils.getString( conf, ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM, protocolMetadataPrefix); + + this.fetchTimeout = + ConfUtils.getLong(conf, FetcherBolt.FETCH_TIMEOUT_PARAM_KEY, fetchTimeout); + if (fetchTimeout > 0) { + fetchExecutor = + Executors.newSingleThreadExecutor( + r -> { + Thread t = new Thread(r, "SimpleFetcherTimeout #" + taskId); + t.setDaemon(true); + return t; + }); + } } @Override @@ -238,6 +261,9 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { @Override public void cleanup() { protocolFactory.cleanup(); + if (fetchExecutor != null) { + fetchExecutor.shutdownNow(); + } } @Override @@ -425,7 +451,35 @@ public void execute(Tuple input) { activeThreads.incrementAndGet(); long start = System.currentTimeMillis(); - ProtocolResponse response = protocol.getProtocolOutput(urlString, metadata); + final String fetchUrl = urlString; + final Metadata fetchMetadata = metadata; + ProtocolResponse response; + if (fetchExecutor != null) { + Future future = + fetchExecutor.submit( + () -> protocol.getProtocolOutput(fetchUrl, fetchMetadata)); + try { + response = future.get(fetchTimeout, TimeUnit.SECONDS); + } catch (TimeoutException e) { + future.cancel(true); + throw new Exception( + "Fetch timed out after " + + fetchTimeout + + "s fetching " + + urlString, + e); + } catch (CancellationException e) { + throw new Exception("Fetch cancelled for " + urlString); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof Exception) { + throw (Exception) cause; + } + throw new Exception(cause); + } + } else { + response = protocol.getProtocolOutput(urlString, metadata); + } long timeFetching = System.currentTimeMillis() - start; final int byteLength = response.getContent().length; diff --git a/core/src/main/resources/crawler-default.yaml b/core/src/main/resources/crawler-default.yaml index 3611e54fb..27e629791 100644 --- a/core/src/main/resources/crawler-default.yaml +++ b/core/src/main/resources/crawler-default.yaml @@ -29,6 +29,9 @@ config: fetcher.max.urls.in.queues: -1 fetcher.max.queue.size: -1 fetcher.timeout.queue: -1 + # hard timeout in seconds for a single protocol fetch at the bolt level; + # -1 disables (relies on protocol-level socket timeouts only) + fetcher.thread.timeout: -1 # max. crawl-delay accepted in robots.txt (in seconds) fetcher.max.crawl.delay: 30 # behavior of fetcher when the crawl-delay in the robots.txt diff --git a/core/src/test/java/org/apache/stormcrawler/bolt/AbstractFetcherBoltTest.java b/core/src/test/java/org/apache/stormcrawler/bolt/AbstractFetcherBoltTest.java index 7de88b7ed..a06b76e12 100644 --- a/core/src/test/java/org/apache/stormcrawler/bolt/AbstractFetcherBoltTest.java +++ b/core/src/test/java/org/apache/stormcrawler/bolt/AbstractFetcherBoltTest.java @@ -37,8 +37,10 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.Utils; import org.apache.stormcrawler.Constants; +import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.TestOutputCollector; import org.apache.stormcrawler.TestUtil; +import org.apache.stormcrawler.persistence.Status; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -104,4 +106,49 @@ void test304(WireMockRuntimeInfo wmRuntimeInfo) { // index Assertions.assertEquals(0, output.getEmitted(Utils.DEFAULT_STREAM_ID).size()); } + + @Test + void testThreadTimeout(WireMockRuntimeInfo wmRuntimeInfo) { + // server delays response for 10 seconds — longer than the bolt timeout + stubFor( + get(urlMatching(".+")) + .willReturn(aResponse().withStatus(200).withFixedDelay(10_000))); + + TestOutputCollector output = new TestOutputCollector(); + Map config = new HashMap<>(); + config.put("http.agent.name", "this_is_only_a_test"); + // bolt-level timeout: 2 seconds + config.put("fetcher.thread.timeout", 2L); + // raise the socket timeout so the bolt timeout fires first + config.put("http.timeout", 30_000); + bolt.prepare(config, TestUtil.getMockedTopologyContext(), new OutputCollector(output)); + + Tuple tuple = mock(Tuple.class); + when(tuple.getSourceComponent()).thenReturn("source"); + when(tuple.getStringByField("url")) + .thenReturn("http://localhost:" + wmRuntimeInfo.getHttpPort() + "/slow"); + when(tuple.getValueByField("metadata")).thenReturn(null); + bolt.execute(tuple); + + // the bolt should ack within ~2s + margin, not wait the full 10s + await().atMost(8, TimeUnit.SECONDS) + .until(() -> output.getAckedTuples().size() > 0); + + Assertions.assertTrue(output.getAckedTuples().contains(tuple)); + + // should have emitted a FETCH_ERROR on the status stream + List> statusTuples = output.getEmitted(Constants.StatusStreamName); + Assertions.assertEquals(1, statusTuples.size()); + Status status = (Status) statusTuples.get(0).get(2); + Assertions.assertEquals(Status.FETCH_ERROR, status); + + // verify the metadata records the timeout reason + Metadata metadata = (Metadata) statusTuples.get(0).get(1); + String exception = metadata.getFirstValue("fetch.exception"); + Assertions.assertNotNull(exception); + Assertions.assertEquals("Socket timeout fetching", exception); + + // nothing on the default stream — no content was fetched + Assertions.assertEquals(0, output.getEmitted(Utils.DEFAULT_STREAM_ID).size()); + } } From 54f630803f9707ce44c38a364e9af3d5b2a9e71e Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Wed, 1 Apr 2026 08:57:58 +0100 Subject: [PATCH 2/2] Formatting fix again Signed-off-by: Julien Nioche --- .../main/java/org/apache/stormcrawler/bolt/FetcherBolt.java | 4 ++-- .../org/apache/stormcrawler/bolt/SimpleFetcherBolt.java | 6 +----- .../apache/stormcrawler/bolt/AbstractFetcherBoltTest.java | 3 +-- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java index b25b6fa2d..333f17653 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java @@ -90,8 +90,8 @@ public class FetcherBolt extends StatusEmitterBolt { /** * Hard timeout in seconds for a single call to {@link Protocol#getProtocolOutput}. If a fetch * exceeds this duration the thread is interrupted, the URL is marked as FETCH_ERROR, and the - * thread moves on to the next item. A value of {@code -1} (the default) disables the - * bolt-level timeout, relying solely on the protocol-level socket timeouts. + * thread moves on to the next item. A value of {@code -1} (the default) disables the bolt-level + * timeout, relying solely on the protocol-level socket timeouts. */ public static final String FETCH_TIMEOUT_PARAM_KEY = "fetcher.thread.timeout"; diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java index 6c1d69ea6..6597d762a 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java @@ -463,11 +463,7 @@ public void execute(Tuple input) { } catch (TimeoutException e) { future.cancel(true); throw new Exception( - "Fetch timed out after " - + fetchTimeout - + "s fetching " - + urlString, - e); + "Fetch timed out after " + fetchTimeout + "s fetching " + urlString, e); } catch (CancellationException e) { throw new Exception("Fetch cancelled for " + urlString); } catch (ExecutionException e) { diff --git a/core/src/test/java/org/apache/stormcrawler/bolt/AbstractFetcherBoltTest.java b/core/src/test/java/org/apache/stormcrawler/bolt/AbstractFetcherBoltTest.java index a06b76e12..8fc4d9025 100644 --- a/core/src/test/java/org/apache/stormcrawler/bolt/AbstractFetcherBoltTest.java +++ b/core/src/test/java/org/apache/stormcrawler/bolt/AbstractFetcherBoltTest.java @@ -131,8 +131,7 @@ void testThreadTimeout(WireMockRuntimeInfo wmRuntimeInfo) { bolt.execute(tuple); // the bolt should ack within ~2s + margin, not wait the full 10s - await().atMost(8, TimeUnit.SECONDS) - .until(() -> output.getAckedTuples().size() > 0); + await().atMost(8, TimeUnit.SECONDS).until(() -> output.getAckedTuples().size() > 0); Assertions.assertTrue(output.getAckedTuples().contains(tuple));