Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 66 additions & 1 deletion core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,14 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -80,6 +87,14 @@ public class FetcherBolt extends StatusEmitterBolt {
*/
public static final String QUEUED_TIMEOUT_PARAM_KEY = "fetcher.timeout.queue";

/**
* Hard timeout in seconds for a single call to {@link Protocol#getProtocolOutput}. If a fetch
* exceeds this duration the thread is interrupted, the URL is marked as FETCH_ERROR, and the
* thread moves on to the next item. A value of {@code -1} (the default) disables the bolt-level
* timeout, relying solely on the protocol-level socket timeouts.
*/
public static final String FETCH_TIMEOUT_PARAM_KEY = "fetcher.thread.timeout";

/** Key name of the custom crawl delay for a queue that may be present in the metadata. */
private static final String CRAWL_DELAY_KEY_NAME = "crawl.delay";

Expand Down Expand Up @@ -463,6 +478,15 @@ private class FetcherThread extends Thread {

private long timeoutInQueues = -1;

/** Hard timeout in seconds for a single protocol fetch. -1 means disabled. */
private long fetchTimeout = -1;

/**
* Single-thread executor used to run the protocol call so that it can be interrupted via
* {@link Future#cancel(boolean)} when the bolt-level timeout fires.
*/
private final ExecutorService fetchExecutor;

// by default remains as is-pre 1.17
private String protocolMetadataPrefix = "";

Expand All @@ -476,11 +500,24 @@ public FetcherThread(Config conf, int num) {
this.crawlDelayForce = ConfUtils.getBoolean(conf, "fetcher.server.delay.force", false);
this.threadNum = num;
timeoutInQueues = ConfUtils.getLong(conf, QUEUED_TIMEOUT_PARAM_KEY, timeoutInQueues);
fetchTimeout = ConfUtils.getLong(conf, FETCH_TIMEOUT_PARAM_KEY, fetchTimeout);
protocolMetadataPrefix =
ConfUtils.getString(
conf,
ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM,
protocolMetadataPrefix);

if (fetchTimeout > 0) {
fetchExecutor =
Executors.newSingleThreadExecutor(
r -> {
Thread t = new Thread(r, "FetcherTimeout #" + num);
t.setDaemon(true);
return t;
});
} else {
fetchExecutor = null;
}
}

@Override
Expand Down Expand Up @@ -665,7 +702,35 @@ public void run() {
continue;
}

ProtocolResponse response = protocol.getProtocolOutput(fit.url, metadata);
final Metadata fetchMetadata = metadata;
ProtocolResponse response;
if (fetchExecutor != null) {
Future<ProtocolResponse> future =
fetchExecutor.submit(
() -> protocol.getProtocolOutput(fit.url, fetchMetadata));
try {
response = future.get(fetchTimeout, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
throw new Exception(
"Fetch timed out after "
+ fetchTimeout
+ "s fetching "
+ fit.url,
e);
} catch (CancellationException e) {
throw new Exception("Fetch cancelled for " + fit.url);
} catch (ExecutionException e) {
// unwrap the real cause so existing catch logic handles it
Throwable cause = e.getCause();
if (cause instanceof Exception) {
throw (Exception) cause;
}
throw new Exception(cause);
}
} else {
response = protocol.getProtocolOutput(fit.url, metadata);
}

long timeFetching = System.currentTimeMillis() - start;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
Expand Down Expand Up @@ -122,6 +128,11 @@ public class SimpleFetcherBolt extends StatusEmitterBolt {
// by default remains as is-pre 1.17
private String protocolMetadataPrefix = "";

/** Hard timeout in seconds for a single protocol fetch. -1 means disabled. */
private long fetchTimeout = -1;

private ExecutorService fetchExecutor;

private void checkConfiguration() {

// ensure that a value has been set for the agent name and that that
Expand Down Expand Up @@ -226,6 +237,18 @@ public Object getValueAndReset() {
this.protocolMetadataPrefix =
ConfUtils.getString(
conf, ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM, protocolMetadataPrefix);

this.fetchTimeout =
ConfUtils.getLong(conf, FetcherBolt.FETCH_TIMEOUT_PARAM_KEY, fetchTimeout);
if (fetchTimeout > 0) {
fetchExecutor =
Executors.newSingleThreadExecutor(
r -> {
Thread t = new Thread(r, "SimpleFetcherTimeout #" + taskId);
t.setDaemon(true);
return t;
});
}
}

@Override
Expand All @@ -238,6 +261,9 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
@Override
public void cleanup() {
protocolFactory.cleanup();
if (fetchExecutor != null) {
fetchExecutor.shutdownNow();
}
}

@Override
Expand Down Expand Up @@ -425,7 +451,31 @@ public void execute(Tuple input) {
activeThreads.incrementAndGet();

long start = System.currentTimeMillis();
ProtocolResponse response = protocol.getProtocolOutput(urlString, metadata);
final String fetchUrl = urlString;
final Metadata fetchMetadata = metadata;
ProtocolResponse response;
if (fetchExecutor != null) {
Future<ProtocolResponse> future =
fetchExecutor.submit(
() -> protocol.getProtocolOutput(fetchUrl, fetchMetadata));
try {
response = future.get(fetchTimeout, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
throw new Exception(
"Fetch timed out after " + fetchTimeout + "s fetching " + urlString, e);
} catch (CancellationException e) {
throw new Exception("Fetch cancelled for " + urlString);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof Exception) {
throw (Exception) cause;
}
throw new Exception(cause);
}
} else {
response = protocol.getProtocolOutput(urlString, metadata);
}
long timeFetching = System.currentTimeMillis() - start;

final int byteLength = response.getContent().length;
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/resources/crawler-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ config:
fetcher.max.urls.in.queues: -1
fetcher.max.queue.size: -1
fetcher.timeout.queue: -1
# hard timeout in seconds for a single protocol fetch at the bolt level;
# -1 disables (relies on protocol-level socket timeouts only)
fetcher.thread.timeout: -1
# max. crawl-delay accepted in robots.txt (in seconds)
fetcher.max.crawl.delay: 30
# behavior of fetcher when the crawl-delay in the robots.txt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Utils;
import org.apache.stormcrawler.Constants;
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.TestOutputCollector;
import org.apache.stormcrawler.TestUtil;
import org.apache.stormcrawler.persistence.Status;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -104,4 +106,48 @@ void test304(WireMockRuntimeInfo wmRuntimeInfo) {
// index
Assertions.assertEquals(0, output.getEmitted(Utils.DEFAULT_STREAM_ID).size());
}

@Test
void testThreadTimeout(WireMockRuntimeInfo wmRuntimeInfo) {
// server delays response for 10 seconds — longer than the bolt timeout
stubFor(
get(urlMatching(".+"))
.willReturn(aResponse().withStatus(200).withFixedDelay(10_000)));

TestOutputCollector output = new TestOutputCollector();
Map<String, Object> config = new HashMap<>();
config.put("http.agent.name", "this_is_only_a_test");
// bolt-level timeout: 2 seconds
config.put("fetcher.thread.timeout", 2L);
// raise the socket timeout so the bolt timeout fires first
config.put("http.timeout", 30_000);
bolt.prepare(config, TestUtil.getMockedTopologyContext(), new OutputCollector(output));

Tuple tuple = mock(Tuple.class);
when(tuple.getSourceComponent()).thenReturn("source");
when(tuple.getStringByField("url"))
.thenReturn("http://localhost:" + wmRuntimeInfo.getHttpPort() + "/slow");
when(tuple.getValueByField("metadata")).thenReturn(null);
bolt.execute(tuple);

// the bolt should ack within ~2s + margin, not wait the full 10s
await().atMost(8, TimeUnit.SECONDS).until(() -> output.getAckedTuples().size() > 0);

Assertions.assertTrue(output.getAckedTuples().contains(tuple));

// should have emitted a FETCH_ERROR on the status stream
List<List<Object>> statusTuples = output.getEmitted(Constants.StatusStreamName);
Assertions.assertEquals(1, statusTuples.size());
Status status = (Status) statusTuples.get(0).get(2);
Assertions.assertEquals(Status.FETCH_ERROR, status);

// verify the metadata records the timeout reason
Metadata metadata = (Metadata) statusTuples.get(0).get(1);
String exception = metadata.getFirstValue("fetch.exception");
Assertions.assertNotNull(exception);
Assertions.assertEquals("Socket timeout fetching", exception);

// nothing on the default stream — no content was fetched
Assertions.assertEquals(0, output.getEmitted(Utils.DEFAULT_STREAM_ID).size());
}
}