Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 22 additions & 31 deletions core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.storm.Config;
import org.apache.storm.metric.api.MeanReducer;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.metric.api.MultiReducedMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
Expand All @@ -54,13 +51,15 @@
import org.apache.storm.utils.Utils;
import org.apache.stormcrawler.Constants;
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.metrics.CrawlerMetrics;
import org.apache.stormcrawler.metrics.ScopedCounter;
import org.apache.stormcrawler.metrics.ScopedReducedMetric;
import org.apache.stormcrawler.persistence.Status;
import org.apache.stormcrawler.protocol.Protocol;
import org.apache.stormcrawler.protocol.ProtocolFactory;
import org.apache.stormcrawler.protocol.ProtocolResponse;
import org.apache.stormcrawler.protocol.RobotRules;
import org.apache.stormcrawler.util.ConfUtils;
import org.apache.stormcrawler.util.PerSecondReducer;
import org.apache.stormcrawler.util.URLUtil;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -97,16 +96,16 @@ public class FetcherBolt extends StatusEmitterBolt {

private FetchItemQueues fetchQueues;

private MultiCountMetric eventCounter;
private MultiReducedMetric averagedMetrics;
private ScopedCounter eventCounter;
private ScopedReducedMetric averagedMetrics;

private ProtocolFactory protocolFactory;

private int taskId = -1;

boolean sitemapsAutoDiscovery = false;

private MultiReducedMetric perSecMetrics;
private ScopedReducedMetric perSecMetrics;

private File debugfiletrigger;

Expand Down Expand Up @@ -844,42 +843,34 @@ public void prepare(
// The data can be accessed by registering a "MetricConsumer" in the
// topology
this.eventCounter =
context.registerMetric(
"fetcher_counter", new MultiCountMetric(), metricsTimeBucketSecs);
CrawlerMetrics.registerCounter(
context, stormConf, "fetcher_counter", metricsTimeBucketSecs);

// create gauges
context.registerMetric(
"activethreads",
() -> {
return activeThreads.get();
},
metricsTimeBucketSecs);
CrawlerMetrics.registerGauge(
context, stormConf, "activethreads", activeThreads::get, metricsTimeBucketSecs);

context.registerMetric(
CrawlerMetrics.registerGauge(
context,
stormConf,
"in_queues",
() -> {
return fetchQueues.inQueues.get();
},
() -> fetchQueues.inQueues.get(),
metricsTimeBucketSecs);

context.registerMetric(
CrawlerMetrics.registerGauge(
context,
stormConf,
"num_queues",
() -> {
return fetchQueues.queues.size();
},
() -> fetchQueues.queues.size(),
metricsTimeBucketSecs);

this.averagedMetrics =
context.registerMetric(
"fetcher_average_perdoc",
new MultiReducedMetric(new MeanReducer()),
metricsTimeBucketSecs);
CrawlerMetrics.registerMeanMetric(
context, stormConf, "fetcher_average_perdoc", metricsTimeBucketSecs);

this.perSecMetrics =
context.registerMetric(
"fetcher_average_persec",
new MultiReducedMetric(new PerSecondReducer()),
metricsTimeBucketSecs);
CrawlerMetrics.registerPerSecMetric(
context, stormConf, "fetcher_average_persec", metricsTimeBucketSecs);

protocolFactory = ProtocolFactory.getInstance(conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
Expand All @@ -44,6 +43,8 @@
import org.apache.storm.tuple.Values;
import org.apache.stormcrawler.Constants;
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.metrics.CrawlerMetrics;
import org.apache.stormcrawler.metrics.ScopedCounter;
import org.apache.stormcrawler.parse.DocumentFragmentBuilder;
import org.apache.stormcrawler.parse.JSoupFilter;
import org.apache.stormcrawler.parse.JSoupFilters;
Expand Down Expand Up @@ -82,7 +83,7 @@ public class JSoupParserBolt extends StatusEmitterBolt {

private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(JSoupParserBolt.class);

private MultiCountMetric eventCounter;
private ScopedCounter eventCounter;

private ParseFilter parseFilters = null;

Expand Down Expand Up @@ -132,7 +133,7 @@ public void prepare(
super.prepare(conf, context, collector);

eventCounter =
context.registerMetric(this.getClass().getSimpleName(), new MultiCountMetric(), 10);
CrawlerMetrics.registerCounter(context, conf, this.getClass().getSimpleName(), 10);

parseFilters = ParseFilters.fromConf(conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.storm.Config;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.MeanReducer;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.metric.api.MultiReducedMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
Expand All @@ -46,13 +42,15 @@
import org.apache.storm.utils.Utils;
import org.apache.stormcrawler.Constants;
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.metrics.CrawlerMetrics;
import org.apache.stormcrawler.metrics.ScopedCounter;
import org.apache.stormcrawler.metrics.ScopedReducedMetric;
import org.apache.stormcrawler.persistence.Status;
import org.apache.stormcrawler.protocol.Protocol;
import org.apache.stormcrawler.protocol.ProtocolFactory;
import org.apache.stormcrawler.protocol.ProtocolResponse;
import org.apache.stormcrawler.protocol.RobotRules;
import org.apache.stormcrawler.util.ConfUtils;
import org.apache.stormcrawler.util.PerSecondReducer;
import org.apache.stormcrawler.util.URLUtil;
import org.slf4j.LoggerFactory;

Expand All @@ -79,9 +77,9 @@ public class SimpleFetcherBolt extends StatusEmitterBolt {

private Config conf;

private MultiCountMetric eventCounter;
private MultiReducedMetric averagedMetrics;
private MultiReducedMetric perSecMetrics;
private ScopedCounter eventCounter;
private ScopedReducedMetric averagedMetrics;
private ScopedReducedMetric perSecMetrics;

private ProtocolFactory protocolFactory;

Expand Down Expand Up @@ -163,40 +161,26 @@ public void prepare(
int metricsTimeBucketSecs = ConfUtils.getInt(conf, "fetcher.metrics.time.bucket.secs", 10);

this.eventCounter =
context.registerMetric(
"fetcher_counter", new MultiCountMetric(), metricsTimeBucketSecs);
CrawlerMetrics.registerCounter(
context, stormConf, "fetcher_counter", metricsTimeBucketSecs);

this.averagedMetrics =
context.registerMetric(
"fetcher_average",
new MultiReducedMetric(new MeanReducer()),
metricsTimeBucketSecs);
CrawlerMetrics.registerMeanMetric(
context, stormConf, "fetcher_average", metricsTimeBucketSecs);

this.perSecMetrics =
context.registerMetric(
"fetcher_average_persec",
new MultiReducedMetric(new PerSecondReducer()),
metricsTimeBucketSecs);
CrawlerMetrics.registerPerSecMetric(
context, stormConf, "fetcher_average_persec", metricsTimeBucketSecs);

// create gauges
context.registerMetric(
"activethreads",
new IMetric() {
@Override
public Object getValueAndReset() {
return activeThreads.get();
}
},
metricsTimeBucketSecs);
CrawlerMetrics.registerGauge(
context, stormConf, "activethreads", activeThreads::get, metricsTimeBucketSecs);

context.registerMetric(
CrawlerMetrics.registerGauge(
context,
stormConf,
"throttler_size",
new IMetric() {
@Override
public Object getValueAndReset() {
return throttler.estimatedSize();
}
},
throttler::estimatedSize,
metricsTimeBucketSecs);

protocolFactory = ProtocolFactory.getInstance(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.storm.metric.api.MeanReducer;
import org.apache.storm.metric.api.ReducedMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
Expand All @@ -54,6 +53,7 @@
import org.apache.storm.tuple.Values;
import org.apache.stormcrawler.Constants;
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.metrics.CrawlerMetrics;
import org.apache.stormcrawler.parse.Outlink;
import org.apache.stormcrawler.parse.ParseFilter;
import org.apache.stormcrawler.parse.ParseFilters;
Expand Down Expand Up @@ -86,7 +86,7 @@ public class SiteMapParserBolt extends StatusEmitterBolt {

private int maxOffsetGuess = 300;

private ReducedMetric averagedMetrics;
private Consumer<Number> averagedMetrics;

/** Delay in minutes used for scheduling sub-sitemaps. */
private int scheduleSitemapsWithDelay = -1;
Expand Down Expand Up @@ -194,7 +194,7 @@ private List<Outlink> parseSiteMap(
siteMap = parser.parseSiteMap(contentType, content, url1);
}
long end = System.currentTimeMillis();
averagedMetrics.update(end - start);
averagedMetrics.accept(end - start);

List<Outlink> links = new ArrayList<>();

Expand Down Expand Up @@ -341,10 +341,8 @@ public void prepare(
parseFilters = ParseFilters.fromConf(stormConf);
maxOffsetGuess = ConfUtils.getInt(stormConf, "sitemap.offset.guess", 300);
averagedMetrics =
context.registerMetric(
"sitemap_average_processing_time",
new ReducedMetric(new MeanReducer()),
30);
CrawlerMetrics.registerSingleMeanMetric(
context, stormConf, "sitemap_average_processing_time", 30);
scheduleSitemapsWithDelay =
ConfUtils.getInt(stormConf, "sitemap.schedule.delay", scheduleSitemapsWithDelay);
List<String> extensionsStrings =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.metric.api.MultiCountMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
Expand All @@ -35,6 +34,8 @@
import org.apache.storm.tuple.Values;
import org.apache.stormcrawler.Constants;
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.metrics.CrawlerMetrics;
import org.apache.stormcrawler.metrics.ScopedCounter;
import org.apache.stormcrawler.util.ConfUtils;
import org.apache.stormcrawler.util.URLUtil;
import org.slf4j.Logger;
Expand All @@ -47,7 +48,7 @@ public class URLPartitionerBolt extends BaseRichBolt {

private OutputCollector collector;

private MultiCountMetric eventCounter;
private ScopedCounter eventCounter;

private Map<String, String> cache;

Expand Down Expand Up @@ -165,7 +166,8 @@ public void prepare(
// system stream
// The data can be accessed by registering a "MetricConsumer" in the
// topology
this.eventCounter = context.registerMetric("URLPartitioner", new MultiCountMetric(), 10);
this.eventCounter =
CrawlerMetrics.registerCounter(context, stormConf, "URLPartitioner", 10);

final int maxEntries = 500;
cache =
Expand Down
Loading