Skip to content

Commit b257bbc

Browse files
committed
#585 Add Storm V2 metrics support with backward-compatible bridge
Introduce a CrawlerMetrics factory that routes metric registration to Storm V1, V2 (Codahale/Dropwizard), or both APIs based on the config property `stormcrawler.metrics.version` ("v1" default, "v2", "both"). This enables gradual migration from deprecated V1 metrics without breaking existing deployments or dashboards. - New metrics bridge infrastructure in core (ScopedCounter, ScopedReducedMetric interfaces with V1/V2/Dual implementations) - Migrated all bolt/spout metric registration across core and all external modules (opensearch, sql, solr, aws, tika, warc, urlfrontier) - Added V2 ScheduledStormReporter implementations for OpenSearch, SQL, and Solr that write the same document schema as V1 MetricsConsumer
1 parent fbefb88 commit b257bbc

39 files changed

Lines changed: 1476 additions & 187 deletions

File tree

core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@
4141
import org.apache.commons.lang3.StringUtils;
4242
import org.apache.http.HttpHeaders;
4343
import org.apache.storm.Config;
44-
import org.apache.storm.metric.api.MeanReducer;
45-
import org.apache.storm.metric.api.MultiCountMetric;
46-
import org.apache.storm.metric.api.MultiReducedMetric;
4744
import org.apache.storm.task.OutputCollector;
4845
import org.apache.storm.task.TopologyContext;
4946
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -54,13 +51,15 @@
5451
import org.apache.storm.utils.Utils;
5552
import org.apache.stormcrawler.Constants;
5653
import org.apache.stormcrawler.Metadata;
54+
import org.apache.stormcrawler.metrics.CrawlerMetrics;
55+
import org.apache.stormcrawler.metrics.ScopedCounter;
56+
import org.apache.stormcrawler.metrics.ScopedReducedMetric;
5757
import org.apache.stormcrawler.persistence.Status;
5858
import org.apache.stormcrawler.protocol.Protocol;
5959
import org.apache.stormcrawler.protocol.ProtocolFactory;
6060
import org.apache.stormcrawler.protocol.ProtocolResponse;
6161
import org.apache.stormcrawler.protocol.RobotRules;
6262
import org.apache.stormcrawler.util.ConfUtils;
63-
import org.apache.stormcrawler.util.PerSecondReducer;
6463
import org.apache.stormcrawler.util.URLUtil;
6564
import org.slf4j.LoggerFactory;
6665

@@ -97,16 +96,16 @@ public class FetcherBolt extends StatusEmitterBolt {
9796

9897
private FetchItemQueues fetchQueues;
9998

100-
private MultiCountMetric eventCounter;
101-
private MultiReducedMetric averagedMetrics;
99+
private ScopedCounter eventCounter;
100+
private ScopedReducedMetric averagedMetrics;
102101

103102
private ProtocolFactory protocolFactory;
104103

105104
private int taskId = -1;
106105

107106
boolean sitemapsAutoDiscovery = false;
108107

109-
private MultiReducedMetric perSecMetrics;
108+
private ScopedReducedMetric perSecMetrics;
110109

111110
private File debugfiletrigger;
112111

@@ -844,42 +843,34 @@ public void prepare(
844843
// The data can be accessed by registering a "MetricConsumer" in the
845844
// topology
846845
this.eventCounter =
847-
context.registerMetric(
848-
"fetcher_counter", new MultiCountMetric(), metricsTimeBucketSecs);
846+
CrawlerMetrics.registerCounter(
847+
context, stormConf, "fetcher_counter", metricsTimeBucketSecs);
849848

850849
// create gauges
851-
context.registerMetric(
852-
"activethreads",
853-
() -> {
854-
return activeThreads.get();
855-
},
856-
metricsTimeBucketSecs);
850+
CrawlerMetrics.registerGauge(
851+
context, stormConf, "activethreads", activeThreads::get, metricsTimeBucketSecs);
857852

858-
context.registerMetric(
853+
CrawlerMetrics.registerGauge(
854+
context,
855+
stormConf,
859856
"in_queues",
860-
() -> {
861-
return fetchQueues.inQueues.get();
862-
},
857+
() -> fetchQueues.inQueues.get(),
863858
metricsTimeBucketSecs);
864859

865-
context.registerMetric(
860+
CrawlerMetrics.registerGauge(
861+
context,
862+
stormConf,
866863
"num_queues",
867-
() -> {
868-
return fetchQueues.queues.size();
869-
},
864+
() -> fetchQueues.queues.size(),
870865
metricsTimeBucketSecs);
871866

872867
this.averagedMetrics =
873-
context.registerMetric(
874-
"fetcher_average_perdoc",
875-
new MultiReducedMetric(new MeanReducer()),
876-
metricsTimeBucketSecs);
868+
CrawlerMetrics.registerMeanMetric(
869+
context, stormConf, "fetcher_average_perdoc", metricsTimeBucketSecs);
877870

878871
this.perSecMetrics =
879-
context.registerMetric(
880-
"fetcher_average_persec",
881-
new MultiReducedMetric(new PerSecondReducer()),
882-
metricsTimeBucketSecs);
872+
CrawlerMetrics.registerPerSecMetric(
873+
context, stormConf, "fetcher_average_persec", metricsTimeBucketSecs);
883874

884875
protocolFactory = ProtocolFactory.getInstance(conf);
885876

core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.util.stream.Stream;
3636
import org.apache.commons.lang3.StringUtils;
3737
import org.apache.http.HttpHeaders;
38-
import org.apache.storm.metric.api.MultiCountMetric;
3938
import org.apache.storm.task.OutputCollector;
4039
import org.apache.storm.task.TopologyContext;
4140
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -44,6 +43,8 @@
4443
import org.apache.storm.tuple.Values;
4544
import org.apache.stormcrawler.Constants;
4645
import org.apache.stormcrawler.Metadata;
46+
import org.apache.stormcrawler.metrics.CrawlerMetrics;
47+
import org.apache.stormcrawler.metrics.ScopedCounter;
4748
import org.apache.stormcrawler.parse.DocumentFragmentBuilder;
4849
import org.apache.stormcrawler.parse.JSoupFilter;
4950
import org.apache.stormcrawler.parse.JSoupFilters;
@@ -82,7 +83,7 @@ public class JSoupParserBolt extends StatusEmitterBolt {
8283

8384
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(JSoupParserBolt.class);
8485

85-
private MultiCountMetric eventCounter;
86+
private ScopedCounter eventCounter;
8687

8788
private ParseFilter parseFilters = null;
8889

@@ -132,7 +133,7 @@ public void prepare(
132133
super.prepare(conf, context, collector);
133134

134135
eventCounter =
135-
context.registerMetric(this.getClass().getSimpleName(), new MultiCountMetric(), 10);
136+
CrawlerMetrics.registerCounter(context, conf, this.getClass().getSimpleName(), 10);
136137

137138
parseFilters = ParseFilters.fromConf(conf);
138139

core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,6 @@
3333
import org.apache.commons.lang3.StringUtils;
3434
import org.apache.http.HttpHeaders;
3535
import org.apache.storm.Config;
36-
import org.apache.storm.metric.api.IMetric;
37-
import org.apache.storm.metric.api.MeanReducer;
38-
import org.apache.storm.metric.api.MultiCountMetric;
39-
import org.apache.storm.metric.api.MultiReducedMetric;
4036
import org.apache.storm.task.OutputCollector;
4137
import org.apache.storm.task.TopologyContext;
4238
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -46,13 +42,15 @@
4642
import org.apache.storm.utils.Utils;
4743
import org.apache.stormcrawler.Constants;
4844
import org.apache.stormcrawler.Metadata;
45+
import org.apache.stormcrawler.metrics.CrawlerMetrics;
46+
import org.apache.stormcrawler.metrics.ScopedCounter;
47+
import org.apache.stormcrawler.metrics.ScopedReducedMetric;
4948
import org.apache.stormcrawler.persistence.Status;
5049
import org.apache.stormcrawler.protocol.Protocol;
5150
import org.apache.stormcrawler.protocol.ProtocolFactory;
5251
import org.apache.stormcrawler.protocol.ProtocolResponse;
5352
import org.apache.stormcrawler.protocol.RobotRules;
5453
import org.apache.stormcrawler.util.ConfUtils;
55-
import org.apache.stormcrawler.util.PerSecondReducer;
5654
import org.apache.stormcrawler.util.URLUtil;
5755
import org.slf4j.LoggerFactory;
5856

@@ -79,9 +77,9 @@ public class SimpleFetcherBolt extends StatusEmitterBolt {
7977

8078
private Config conf;
8179

82-
private MultiCountMetric eventCounter;
83-
private MultiReducedMetric averagedMetrics;
84-
private MultiReducedMetric perSecMetrics;
80+
private ScopedCounter eventCounter;
81+
private ScopedReducedMetric averagedMetrics;
82+
private ScopedReducedMetric perSecMetrics;
8583

8684
private ProtocolFactory protocolFactory;
8785

@@ -163,40 +161,26 @@ public void prepare(
163161
int metricsTimeBucketSecs = ConfUtils.getInt(conf, "fetcher.metrics.time.bucket.secs", 10);
164162

165163
this.eventCounter =
166-
context.registerMetric(
167-
"fetcher_counter", new MultiCountMetric(), metricsTimeBucketSecs);
164+
CrawlerMetrics.registerCounter(
165+
context, stormConf, "fetcher_counter", metricsTimeBucketSecs);
168166

169167
this.averagedMetrics =
170-
context.registerMetric(
171-
"fetcher_average",
172-
new MultiReducedMetric(new MeanReducer()),
173-
metricsTimeBucketSecs);
168+
CrawlerMetrics.registerMeanMetric(
169+
context, stormConf, "fetcher_average", metricsTimeBucketSecs);
174170

175171
this.perSecMetrics =
176-
context.registerMetric(
177-
"fetcher_average_persec",
178-
new MultiReducedMetric(new PerSecondReducer()),
179-
metricsTimeBucketSecs);
172+
CrawlerMetrics.registerPerSecMetric(
173+
context, stormConf, "fetcher_average_persec", metricsTimeBucketSecs);
180174

181175
// create gauges
182-
context.registerMetric(
183-
"activethreads",
184-
new IMetric() {
185-
@Override
186-
public Object getValueAndReset() {
187-
return activeThreads.get();
188-
}
189-
},
190-
metricsTimeBucketSecs);
176+
CrawlerMetrics.registerGauge(
177+
context, stormConf, "activethreads", activeThreads::get, metricsTimeBucketSecs);
191178

192-
context.registerMetric(
179+
CrawlerMetrics.registerGauge(
180+
context,
181+
stormConf,
193182
"throttler_size",
194-
new IMetric() {
195-
@Override
196-
public Object getValueAndReset() {
197-
return throttler.estimatedSize();
198-
}
199-
},
183+
throttler::estimatedSize,
200184
metricsTimeBucketSecs);
201185

202186
protocolFactory = ProtocolFactory.getInstance(conf);

core/src/main/java/org/apache/stormcrawler/bolt/SiteMapParserBolt.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,9 @@
4242
import java.util.Locale;
4343
import java.util.Map;
4444
import java.util.TimeZone;
45+
import java.util.function.Consumer;
4546
import org.apache.commons.lang3.StringUtils;
4647
import org.apache.http.HttpHeaders;
47-
import org.apache.storm.metric.api.MeanReducer;
48-
import org.apache.storm.metric.api.ReducedMetric;
4948
import org.apache.storm.task.OutputCollector;
5049
import org.apache.storm.task.TopologyContext;
5150
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -54,6 +53,7 @@
5453
import org.apache.storm.tuple.Values;
5554
import org.apache.stormcrawler.Constants;
5655
import org.apache.stormcrawler.Metadata;
56+
import org.apache.stormcrawler.metrics.CrawlerMetrics;
5757
import org.apache.stormcrawler.parse.Outlink;
5858
import org.apache.stormcrawler.parse.ParseFilter;
5959
import org.apache.stormcrawler.parse.ParseFilters;
@@ -86,7 +86,7 @@ public class SiteMapParserBolt extends StatusEmitterBolt {
8686

8787
private int maxOffsetGuess = 300;
8888

89-
private ReducedMetric averagedMetrics;
89+
private Consumer<Number> averagedMetrics;
9090

9191
/** Delay in minutes used for scheduling sub-sitemaps. */
9292
private int scheduleSitemapsWithDelay = -1;
@@ -194,7 +194,7 @@ private List<Outlink> parseSiteMap(
194194
siteMap = parser.parseSiteMap(contentType, content, url1);
195195
}
196196
long end = System.currentTimeMillis();
197-
averagedMetrics.update(end - start);
197+
averagedMetrics.accept(end - start);
198198

199199
List<Outlink> links = new ArrayList<>();
200200

@@ -341,10 +341,8 @@ public void prepare(
341341
parseFilters = ParseFilters.fromConf(stormConf);
342342
maxOffsetGuess = ConfUtils.getInt(stormConf, "sitemap.offset.guess", 300);
343343
averagedMetrics =
344-
context.registerMetric(
345-
"sitemap_average_processing_time",
346-
new ReducedMetric(new MeanReducer()),
347-
30);
344+
CrawlerMetrics.registerSingleMeanMetric(
345+
context, stormConf, "sitemap_average_processing_time", 30);
348346
scheduleSitemapsWithDelay =
349347
ConfUtils.getInt(stormConf, "sitemap.schedule.delay", scheduleSitemapsWithDelay);
350348
List<String> extensionsStrings =

core/src/main/java/org/apache/stormcrawler/bolt/URLPartitionerBolt.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.LinkedHashMap;
2626
import java.util.Map;
2727
import org.apache.commons.lang3.StringUtils;
28-
import org.apache.storm.metric.api.MultiCountMetric;
2928
import org.apache.storm.task.OutputCollector;
3029
import org.apache.storm.task.TopologyContext;
3130
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -35,6 +34,8 @@
3534
import org.apache.storm.tuple.Values;
3635
import org.apache.stormcrawler.Constants;
3736
import org.apache.stormcrawler.Metadata;
37+
import org.apache.stormcrawler.metrics.CrawlerMetrics;
38+
import org.apache.stormcrawler.metrics.ScopedCounter;
3839
import org.apache.stormcrawler.util.ConfUtils;
3940
import org.apache.stormcrawler.util.URLUtil;
4041
import org.slf4j.Logger;
@@ -47,7 +48,7 @@ public class URLPartitionerBolt extends BaseRichBolt {
4748

4849
private OutputCollector collector;
4950

50-
private MultiCountMetric eventCounter;
51+
private ScopedCounter eventCounter;
5152

5253
private Map<String, String> cache;
5354

@@ -165,7 +166,8 @@ public void prepare(
165166
// system stream
166167
// The data can be accessed by registering a "MetricConsumer" in the
167168
// topology
168-
this.eventCounter = context.registerMetric("URLPartitioner", new MultiCountMetric(), 10);
169+
this.eventCounter =
170+
CrawlerMetrics.registerCounter(context, stormConf, "URLPartitioner", 10);
169171

170172
final int maxEntries = 500;
171173
cache =

0 commit comments

Comments
 (0)