Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.apache.stormcrawler.proxy.SCProxy;
import org.apache.stormcrawler.util.ConfUtils;
import org.apache.stormcrawler.util.CookieConverter;
import org.jetbrains.annotations.Nullable;
import org.slf4j.LoggerFactory;

/** Uses Apache httpclient to handle http and https. */
Expand All @@ -82,6 +81,11 @@ public class HttpProtocol extends AbstractHttpProtocol

private HttpClientBuilder builder;

private CloseableHttpClient client;

private String userAgent;
private Collection<BasicHeader> defaultHeaders;

private RequestConfig requestConfig;
private RequestConfig.Builder requestConfigBuilder;

Expand All @@ -102,9 +106,9 @@ public void configure(final Config conf) {

globalMaxContent = ConfUtils.getInt(conf, "http.content.limit", -1);

String userAgent = getAgentString(conf);
userAgent = getAgentString(conf);

Collection<BasicHeader> defaultHeaders = new LinkedList<>();
defaultHeaders = new LinkedList<>();

String accept = ConfUtils.getString(conf, "http.accept");
if (StringUtils.isNotBlank(accept)) {
Expand Down Expand Up @@ -153,6 +157,8 @@ public void configure(final Config conf) {
.setCookieSpec(CookieSpecs.STANDARD);

requestConfig = requestConfigBuilder.build();

client = builder.build();
}

@Override
Expand All @@ -163,8 +169,8 @@ public ProtocolResponse getProtocolOutput(String url, Metadata md) throws Except
// set default request config to global config
RequestConfig reqConfig = requestConfig;

// default to the shared builder for non-proxy requests
HttpClientBuilder clientBuilder = builder;
// default to the shared client for non-proxy requests
CloseableHttpClient httpClient = client;

// conditionally add a dynamic proxy
if (proxyManager != null) {
Expand All @@ -173,10 +179,12 @@ public ProtocolResponse getProtocolOutput(String url, Metadata md) throws Except
if (proxOptional.isPresent()) {
SCProxy prox = proxOptional.get();

// create local copies of builders to avoid polluting shared
// state across concurrent requests
// create a new builder with the same defaults (user-agent,
// headers) to avoid losing them in proxied requests
HttpClientBuilder localBuilder =
HttpClients.custom()
.setUserAgent(userAgent)
.setDefaultHeaders(defaultHeaders)
.setConnectionManager(CONNECTION_MANAGER)
.setConnectionManagerShared(true)
.disableRedirectHandling()
Expand Down Expand Up @@ -215,7 +223,7 @@ public ProtocolResponse getProtocolOutput(String url, Metadata md) throws Except
System.currentTimeMillis() - buildStart);

LOG.debug("fetching with " + prox.toString());
clientBuilder = localBuilder;
httpClient = localBuilder.build();
}
}

Expand Down Expand Up @@ -268,11 +276,7 @@ public ProtocolResponse getProtocolOutput(String url, Metadata md) throws Except

request.setConfig(reqConfig);

// no need to release the connection explicitly as this is handled
// automatically. The client itself must be closed though.
try (CloseableHttpClient httpclient = clientBuilder.build()) {
return httpclient.execute(request, responseHandler);
}
return httpClient.execute(request, responseHandler);
}

private void addCookiesToRequest(HttpRequestBase request, Metadata md) {
Expand Down Expand Up @@ -353,7 +357,6 @@ public ProtocolResponse handleResponse(final HttpResponse response) throws IOExc
};
}

@Nullable
private static byte[] toByteArray(
final HttpEntity entity, int maxContent, MutableBoolean trimmed) throws IOException {

Expand All @@ -363,35 +366,46 @@ private static byte[] toByteArray(

final InputStream instream = entity.getContent();
if (instream == null) {
return null;
}
Args.check(
(entity.getContentLength() <= Constants.MAX_ARRAY_SIZE)
|| (maxContent >= 0 && maxContent <= Constants.MAX_ARRAY_SIZE),
"HTTP entity too large to be buffered in memory");
int reportedLength = (int) entity.getContentLength();
// set default size for buffer: 100 KB
int bufferInitSize = 102400;
if (reportedLength != -1) {
bufferInitSize = reportedLength;
}
// avoid init of too large a buffer when we will trim anyway
if (maxContent != -1 && bufferInitSize > maxContent) {
bufferInitSize = maxContent;
return new byte[] {};
}
final ByteArrayBuffer buffer = new ByteArrayBuffer(bufferInitSize);
final byte[] tmp = new byte[4096];
int lengthRead;
while ((lengthRead = instream.read(tmp)) != -1) {
// check whether we need to trim
if (maxContent != -1 && buffer.length() + lengthRead > maxContent) {
buffer.append(tmp, 0, maxContent - buffer.length());
trimmed.setValue(true);
break;
try (instream) {
Args.check(
(entity.getContentLength() <= Constants.MAX_ARRAY_SIZE)
|| (maxContent >= 0 && maxContent <= Constants.MAX_ARRAY_SIZE),
"HTTP entity too large to be buffered in memory");
int reportedLength = (int) entity.getContentLength();
// set default size for buffer: 100 KB
int bufferInitSize = 102400;
if (reportedLength != -1) {
bufferInitSize = reportedLength;
}
buffer.append(tmp, 0, lengthRead);
// avoid init of too large a buffer when we will trim anyway
if (maxContent != -1 && bufferInitSize > maxContent) {
bufferInitSize = maxContent;
}
final ByteArrayBuffer buffer = new ByteArrayBuffer(bufferInitSize);
final byte[] tmp = new byte[4096];
int lengthRead;
while ((lengthRead = instream.read(tmp)) != -1) {
// check whether we need to trim
if (maxContent != -1 && buffer.length() + lengthRead > maxContent) {
buffer.append(tmp, 0, maxContent - buffer.length());
trimmed.setValue(true);
break;
}
buffer.append(tmp, 0, lengthRead);
}
return buffer.toByteArray();
}
}

@Override
public void cleanup() {
try {
client.close();
} catch (IOException e) {
LOG.error("Error closing HTTP client", e);
}
return buffer.toByteArray();
}

public static void main(String[] args) throws Exception {
Expand Down