diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java index 680c65103..df97bde70 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java @@ -41,9 +41,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHeaders; import org.apache.storm.Config; -import org.apache.storm.metric.api.MeanReducer; -import org.apache.storm.metric.api.MultiCountMetric; -import org.apache.storm.metric.api.MultiReducedMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -54,13 +51,15 @@ import org.apache.storm.utils.Utils; import org.apache.stormcrawler.Constants; import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.metrics.CrawlerMetrics; +import org.apache.stormcrawler.metrics.ScopedCounter; +import org.apache.stormcrawler.metrics.ScopedReducedMetric; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.protocol.Protocol; import org.apache.stormcrawler.protocol.ProtocolFactory; import org.apache.stormcrawler.protocol.ProtocolResponse; import org.apache.stormcrawler.protocol.RobotRules; import org.apache.stormcrawler.util.ConfUtils; -import org.apache.stormcrawler.util.PerSecondReducer; import org.apache.stormcrawler.util.URLUtil; import org.slf4j.LoggerFactory; @@ -97,8 +96,8 @@ public class FetcherBolt extends StatusEmitterBolt { private FetchItemQueues fetchQueues; - private MultiCountMetric eventCounter; - private MultiReducedMetric averagedMetrics; + private ScopedCounter eventCounter; + private ScopedReducedMetric averagedMetrics; private ProtocolFactory protocolFactory; @@ -106,7 +105,7 @@ public class FetcherBolt extends StatusEmitterBolt { boolean sitemapsAutoDiscovery = false; - private MultiReducedMetric perSecMetrics; + private ScopedReducedMetric perSecMetrics; private File debugfiletrigger; @@ -844,42 +843,34 @@ public void prepare( // The data can be accessed by registering a "MetricConsumer" in the // topology this.eventCounter = - context.registerMetric( - "fetcher_counter", new MultiCountMetric(), metricsTimeBucketSecs); + CrawlerMetrics.registerCounter( + context, stormConf, "fetcher_counter", metricsTimeBucketSecs); // create gauges - context.registerMetric( - "activethreads", - () -> { - return activeThreads.get(); - }, - metricsTimeBucketSecs); + CrawlerMetrics.registerGauge( + context, stormConf, "activethreads", activeThreads::get, metricsTimeBucketSecs); - context.registerMetric( + CrawlerMetrics.registerGauge( + context, + stormConf, "in_queues", - () -> { - return fetchQueues.inQueues.get(); - }, + () -> fetchQueues.inQueues.get(), metricsTimeBucketSecs); - context.registerMetric( + CrawlerMetrics.registerGauge( + context, + stormConf, "num_queues", - () -> { - return fetchQueues.queues.size(); - }, + () -> fetchQueues.queues.size(), metricsTimeBucketSecs); this.averagedMetrics = - context.registerMetric( - "fetcher_average_perdoc", - new MultiReducedMetric(new MeanReducer()), - metricsTimeBucketSecs); + CrawlerMetrics.registerMeanMetric( + context, stormConf, "fetcher_average_perdoc", metricsTimeBucketSecs); this.perSecMetrics = - context.registerMetric( - "fetcher_average_persec", - new MultiReducedMetric(new PerSecondReducer()), - metricsTimeBucketSecs); + CrawlerMetrics.registerPerSecMetric( + context, stormConf, "fetcher_average_persec", metricsTimeBucketSecs); protocolFactory = ProtocolFactory.getInstance(conf); diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java index b412fd9c0..4f9eedfe1 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/JSoupParserBolt.java @@ -35,7 +35,6 @@ import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHeaders; -import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -44,6 +43,8 @@ import org.apache.storm.tuple.Values; import org.apache.stormcrawler.Constants; import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.metrics.CrawlerMetrics; +import org.apache.stormcrawler.metrics.ScopedCounter; import org.apache.stormcrawler.parse.DocumentFragmentBuilder; import org.apache.stormcrawler.parse.JSoupFilter; import org.apache.stormcrawler.parse.JSoupFilters; @@ -82,7 +83,7 @@ public class JSoupParserBolt extends StatusEmitterBolt { private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(JSoupParserBolt.class); - private MultiCountMetric eventCounter; + private ScopedCounter eventCounter; private ParseFilter parseFilters = null; @@ -132,7 +133,7 @@ public void prepare( super.prepare(conf, context, collector); eventCounter = - context.registerMetric(this.getClass().getSimpleName(), new MultiCountMetric(), 10); + CrawlerMetrics.registerCounter(context, conf, this.getClass().getSimpleName(), 10); parseFilters = ParseFilters.fromConf(conf); diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java index 410f34f92..fafc7bf6d 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java @@ -33,10 +33,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHeaders; import org.apache.storm.Config; -import org.apache.storm.metric.api.IMetric; -import org.apache.storm.metric.api.MeanReducer; -import org.apache.storm.metric.api.MultiCountMetric; -import org.apache.storm.metric.api.MultiReducedMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -46,13 +42,15 @@ import org.apache.storm.utils.Utils; import org.apache.stormcrawler.Constants; import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.metrics.CrawlerMetrics; +import org.apache.stormcrawler.metrics.ScopedCounter; +import org.apache.stormcrawler.metrics.ScopedReducedMetric; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.protocol.Protocol; import org.apache.stormcrawler.protocol.ProtocolFactory; import org.apache.stormcrawler.protocol.ProtocolResponse; import org.apache.stormcrawler.protocol.RobotRules; import org.apache.stormcrawler.util.ConfUtils; -import org.apache.stormcrawler.util.PerSecondReducer; import org.apache.stormcrawler.util.URLUtil; import org.slf4j.LoggerFactory; @@ -79,9 +77,9 @@ public class SimpleFetcherBolt extends StatusEmitterBolt { private Config conf; - private MultiCountMetric eventCounter; - private MultiReducedMetric averagedMetrics; - private MultiReducedMetric perSecMetrics; + private ScopedCounter eventCounter; + private ScopedReducedMetric averagedMetrics; + private ScopedReducedMetric perSecMetrics; private ProtocolFactory protocolFactory; @@ -163,40 +161,26 @@ public void prepare( int metricsTimeBucketSecs = ConfUtils.getInt(conf, "fetcher.metrics.time.bucket.secs", 10); this.eventCounter = - context.registerMetric( - "fetcher_counter", new MultiCountMetric(), metricsTimeBucketSecs); + CrawlerMetrics.registerCounter( + context, stormConf, "fetcher_counter", metricsTimeBucketSecs); this.averagedMetrics = - context.registerMetric( - "fetcher_average", - new MultiReducedMetric(new MeanReducer()), - metricsTimeBucketSecs); + CrawlerMetrics.registerMeanMetric( + context, stormConf, "fetcher_average", metricsTimeBucketSecs); this.perSecMetrics = - context.registerMetric( - "fetcher_average_persec", - new MultiReducedMetric(new PerSecondReducer()), - metricsTimeBucketSecs); + CrawlerMetrics.registerPerSecMetric( + context, stormConf, "fetcher_average_persec", metricsTimeBucketSecs); // create gauges - context.registerMetric( - "activethreads", - new IMetric() { - @Override - public Object getValueAndReset() { - return activeThreads.get(); - } - }, - metricsTimeBucketSecs); + CrawlerMetrics.registerGauge( + context, stormConf, "activethreads", activeThreads::get, metricsTimeBucketSecs); - context.registerMetric( + CrawlerMetrics.registerGauge( + context, + stormConf, "throttler_size", - new IMetric() { - @Override - public Object getValueAndReset() { - return throttler.estimatedSize(); - } - }, + throttler::estimatedSize, metricsTimeBucketSecs); protocolFactory = ProtocolFactory.getInstance(conf); diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/SiteMapParserBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/SiteMapParserBolt.java index 466fa691f..9dbd505d2 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/SiteMapParserBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/SiteMapParserBolt.java @@ -42,10 +42,9 @@ import java.util.Locale; import java.util.Map; import java.util.TimeZone; +import java.util.function.Consumer; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHeaders; -import org.apache.storm.metric.api.MeanReducer; -import org.apache.storm.metric.api.ReducedMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -54,6 +53,7 @@ import org.apache.storm.tuple.Values; import org.apache.stormcrawler.Constants; import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.metrics.CrawlerMetrics; import org.apache.stormcrawler.parse.Outlink; import org.apache.stormcrawler.parse.ParseFilter; import org.apache.stormcrawler.parse.ParseFilters; @@ -86,7 +86,7 @@ public class SiteMapParserBolt extends StatusEmitterBolt { private int maxOffsetGuess = 300; - private ReducedMetric averagedMetrics; + private Consumer averagedMetrics; /** Delay in minutes used for scheduling sub-sitemaps. */ private int scheduleSitemapsWithDelay = -1; @@ -194,7 +194,7 @@ private List parseSiteMap( siteMap = parser.parseSiteMap(contentType, content, url1); } long end = System.currentTimeMillis(); - averagedMetrics.update(end - start); + averagedMetrics.accept(end - start); List links = new ArrayList<>(); @@ -341,10 +341,8 @@ public void prepare( parseFilters = ParseFilters.fromConf(stormConf); maxOffsetGuess = ConfUtils.getInt(stormConf, "sitemap.offset.guess", 300); averagedMetrics = - context.registerMetric( - "sitemap_average_processing_time", - new ReducedMetric(new MeanReducer()), - 30); + CrawlerMetrics.registerSingleMeanMetric( + context, stormConf, "sitemap_average_processing_time", 30); scheduleSitemapsWithDelay = ConfUtils.getInt(stormConf, "sitemap.schedule.delay", scheduleSitemapsWithDelay); List extensionsStrings = diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/URLPartitionerBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/URLPartitionerBolt.java index ad0deb529..cb510babc 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/URLPartitionerBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/URLPartitionerBolt.java @@ -25,7 +25,6 @@ import java.util.LinkedHashMap; import java.util.Map; import org.apache.commons.lang3.StringUtils; -import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -35,6 +34,8 @@ import org.apache.storm.tuple.Values; import org.apache.stormcrawler.Constants; import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.metrics.CrawlerMetrics; +import org.apache.stormcrawler.metrics.ScopedCounter; import org.apache.stormcrawler.util.ConfUtils; import org.apache.stormcrawler.util.URLUtil; import org.slf4j.Logger; @@ -47,7 +48,7 @@ public class URLPartitionerBolt extends BaseRichBolt { private OutputCollector collector; - private MultiCountMetric eventCounter; + private ScopedCounter eventCounter; private Map cache; @@ -165,7 +166,8 @@ public void prepare( // system stream // The data can be accessed by registering a "MetricConsumer" in the // topology - this.eventCounter = context.registerMetric("URLPartitioner", new MultiCountMetric(), 10); + this.eventCounter = + CrawlerMetrics.registerCounter(context, stormConf, "URLPartitioner", 10); final int maxEntries = 500; cache = diff --git a/core/src/main/java/org/apache/stormcrawler/metrics/CrawlerMetrics.java b/core/src/main/java/org/apache/stormcrawler/metrics/CrawlerMetrics.java new file mode 100644 index 000000000..2b6bec6f0 --- /dev/null +++ b/core/src/main/java/org/apache/stormcrawler/metrics/CrawlerMetrics.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.metrics; + +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.MeanReducer; +import org.apache.storm.metric.api.MultiCountMetric; +import org.apache.storm.metric.api.MultiReducedMetric; +import org.apache.storm.metric.api.ReducedMetric; +import org.apache.storm.task.TopologyContext; +import org.apache.stormcrawler.metrics.v1.V1CounterMetric; +import org.apache.stormcrawler.metrics.v1.V1ReducedMetric; +import org.apache.stormcrawler.metrics.v2.V2CounterMetric; +import org.apache.stormcrawler.metrics.v2.V2HistogramReducedMetric; +import org.apache.stormcrawler.metrics.v2.V2MeterReducedMetric; +import org.apache.stormcrawler.util.CollectionMetric; +import org.apache.stormcrawler.util.ConfUtils; +import org.apache.stormcrawler.util.PerSecondReducer; + +/** + * Factory for creating metric instances that route to Storm V1, V2, or both metric APIs. The + * metrics version is controlled by the configuration property {@value #METRICS_VERSION_KEY}. + * + *
    + *
  • {@code "v1"} (default) — uses the legacy Storm V1 metrics API + *
  • {@code "v2"} — uses the Codahale/Dropwizard V2 metrics API + *
  • {@code "both"} — registers with both V1 and V2 simultaneously for parallel operation during + * migration + *
+ */ +public final class CrawlerMetrics { + + /** Configuration key for selecting the metrics version: "v1", "v2", or "both". */ + public static final String METRICS_VERSION_KEY = "stormcrawler.metrics.version"; + + private static final String VERSION_V1 = "v1"; + private static final String VERSION_V2 = "v2"; + private static final String VERSION_BOTH = "both"; + + private CrawlerMetrics() {} + + /** + * Registers a scoped counter metric. + * + * @param context the Storm topology context + * @param stormConf the Storm configuration map + * @param name the metric name (e.g. "fetcher_counter") + * @param timeBucketSecs the V1 reporting interval in seconds (ignored for V2-only mode) + * @return a {@link ScopedCounter} that can be used via {@code scope("x").incrBy(n)} + */ + public static ScopedCounter registerCounter( + TopologyContext context, + Map stormConf, + String name, + int timeBucketSecs) { + + String version = getVersion(stormConf); + + switch (version) { + case VERSION_V2: + return new V2CounterMetric(name, context); + + case VERSION_BOTH: + ScopedCounter v1 = createV1Counter(context, name, timeBucketSecs); + ScopedCounter v2 = new V2CounterMetric(name, context); + return new DualCounterMetric(v1, v2); + + case VERSION_V1: + default: + return createV1Counter(context, name, timeBucketSecs); + } + } + + /** + * Registers a scoped reduced metric backed by a {@link MeanReducer}. In V2 mode, this uses a + * Codahale {@link com.codahale.metrics.Histogram}. + * + * @param context the Storm topology context + * @param stormConf the Storm configuration map + * @param name the metric name (e.g. "fetcher_average") + * @param timeBucketSecs the V1 reporting interval in seconds (ignored for V2-only mode) + * @return a {@link ScopedReducedMetric} that can be used via {@code scope("x").update(val)} + */ + public static ScopedReducedMetric registerMeanMetric( + TopologyContext context, + Map stormConf, + String name, + int timeBucketSecs) { + + String version = getVersion(stormConf); + + switch (version) { + case VERSION_V2: + return new V2HistogramReducedMetric(name, context); + + case VERSION_BOTH: + ScopedReducedMetric v1 = createV1MeanMetric(context, name, timeBucketSecs); + ScopedReducedMetric v2 = new V2HistogramReducedMetric(name, context); + return new DualReducedMetric(v1, v2); + + case VERSION_V1: + default: + return createV1MeanMetric(context, name, timeBucketSecs); + } + } + + /** + * Registers a scoped reduced metric backed by a {@link PerSecondReducer}. In V2 mode, this uses + * a Codahale {@link com.codahale.metrics.Meter} which natively tracks rate per second. + * + * @param context the Storm topology context + * @param stormConf the Storm configuration map + * @param name the metric name (e.g. "fetcher_average_persec") + * @param timeBucketSecs the V1 reporting interval in seconds (ignored for V2-only mode) + * @return a {@link ScopedReducedMetric} that can be used via {@code scope("x").update(val)} + */ + public static ScopedReducedMetric registerPerSecMetric( + TopologyContext context, + Map stormConf, + String name, + int timeBucketSecs) { + + String version = getVersion(stormConf); + + switch (version) { + case VERSION_V2: + return new V2MeterReducedMetric(name, context); + + case VERSION_BOTH: + ScopedReducedMetric v1 = createV1PerSecMetric(context, name, timeBucketSecs); + ScopedReducedMetric v2 = new V2MeterReducedMetric(name, context); + return new DualReducedMetric(v1, v2); + + case VERSION_V1: + default: + return createV1PerSecMetric(context, name, timeBucketSecs); + } + } + + /** + * Registers a gauge metric. + * + * @param context the Storm topology context + * @param stormConf the Storm configuration map + * @param name the metric name (e.g. "activethreads") + * @param supplier a supplier providing the current gauge value + * @param timeBucketSecs the V1 reporting interval in seconds (ignored for V2-only mode) + * @param the gauge value type + */ + public static void registerGauge( + TopologyContext context, + Map stormConf, + String name, + Supplier supplier, + int timeBucketSecs) { + + String version = getVersion(stormConf); + + boolean registerV1 = VERSION_V1.equals(version) || VERSION_BOTH.equals(version); + boolean registerV2 = VERSION_V2.equals(version) || VERSION_BOTH.equals(version); + + if (registerV1) { + context.registerMetric( + name, + new IMetric() { + @Override + public Object getValueAndReset() { + return supplier.get(); + } + }, + timeBucketSecs); + } + + if (registerV2) { + context.registerGauge(name, supplier::get); + } + } + + /** + * Registers a single (non-scoped) mean metric, e.g. for tracking average processing time. In V1 + * mode, this uses a {@link ReducedMetric} with {@link MeanReducer}. In V2 mode, this uses a + * Codahale {@link com.codahale.metrics.Histogram}. + * + * @param context the Storm topology context + * @param stormConf the Storm configuration map + * @param name the metric name (e.g. "sitemap_average_processing_time") + * @param timeBucketSecs the V1 reporting interval in seconds (ignored for V2-only mode) + * @return a {@link Consumer} that accepts numeric values to update the metric + */ + public static Consumer registerSingleMeanMetric( + TopologyContext context, + Map stormConf, + String name, + int timeBucketSecs) { + + String version = getVersion(stormConf); + boolean useV1 = VERSION_V1.equals(version) || VERSION_BOTH.equals(version); + boolean useV2 = VERSION_V2.equals(version) || VERSION_BOTH.equals(version); + + Consumer v1Consumer = null; + Consumer v2Consumer = null; + + if (useV1) { + ReducedMetric metric = + context.registerMetric( + name, new ReducedMetric(new MeanReducer()), timeBucketSecs); + v1Consumer = value -> metric.update(value); + } + + if (useV2) { + com.codahale.metrics.Histogram histogram = context.registerHistogram(name); + v2Consumer = value -> histogram.update(value.longValue()); + } + + if (useV1 && useV2) { + final Consumer f1 = v1Consumer; + final Consumer f2 = v2Consumer; + return value -> { + f1.accept(value); + f2.accept(value); + }; + } + return useV2 ? v2Consumer : v1Consumer; + } + + /** + * Registers a collection metric for tracking timing measurements. In V1 mode, this uses {@link + * CollectionMetric}. In V2 mode, this uses a Codahale {@link com.codahale.metrics.Histogram}. + * + * @param context the Storm topology context + * @param stormConf the Storm configuration map + * @param name the metric name (e.g. "spout_query_time_msec") + * @param timeBucketSecs the V1 reporting interval in seconds (ignored for V2-only mode) + * @return a {@link Consumer} that accepts long measurement values + */ + public static Consumer registerCollectionMetric( + TopologyContext context, + Map stormConf, + String name, + int timeBucketSecs) { + + String version = getVersion(stormConf); + boolean useV1 = VERSION_V1.equals(version) || VERSION_BOTH.equals(version); + boolean useV2 = VERSION_V2.equals(version) || VERSION_BOTH.equals(version); + + Consumer v1Consumer = null; + Consumer v2Consumer = null; + + if (useV1) { + CollectionMetric metric = new CollectionMetric(); + context.registerMetric(name, metric, timeBucketSecs); + v1Consumer = metric::addMeasurement; + } + + if (useV2) { + com.codahale.metrics.Histogram histogram = context.registerHistogram(name); + v2Consumer = histogram::update; + } + + if (useV1 && useV2) { + final Consumer f1 = v1Consumer; + final Consumer f2 = v2Consumer; + return value -> { + f1.accept(value); + f2.accept(value); + }; + } + return useV2 ? v2Consumer : v1Consumer; + } + + private static ScopedCounter createV1Counter( + TopologyContext context, String name, int timeBucketSecs) { + MultiCountMetric metric = + context.registerMetric(name, new MultiCountMetric(), timeBucketSecs); + return new V1CounterMetric(metric); + } + + private static ScopedReducedMetric createV1MeanMetric( + TopologyContext context, String name, int timeBucketSecs) { + MultiReducedMetric metric = + context.registerMetric( + name, new MultiReducedMetric(new MeanReducer()), timeBucketSecs); + return new V1ReducedMetric(metric); + } + + private static ScopedReducedMetric createV1PerSecMetric( + TopologyContext context, String name, int timeBucketSecs) { + MultiReducedMetric metric = + context.registerMetric( + name, new MultiReducedMetric(new PerSecondReducer()), timeBucketSecs); + return new V1ReducedMetric(metric); + } + + private static String getVersion(Map stormConf) { + return ConfUtils.getString(stormConf, METRICS_VERSION_KEY, VERSION_V1); + } +} diff --git a/core/src/main/java/org/apache/stormcrawler/metrics/DualCounterMetric.java b/core/src/main/java/org/apache/stormcrawler/metrics/DualCounterMetric.java new file mode 100644 index 000000000..eb81595c7 --- /dev/null +++ b/core/src/main/java/org/apache/stormcrawler/metrics/DualCounterMetric.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.metrics; + +/** + * Forwards counter operations to both a V1 and a V2 {@link ScopedCounter} simultaneously, allowing + * both metric pipelines to receive data during migration. + */ +public class DualCounterMetric implements ScopedCounter { + + private final ScopedCounter v1; + private final ScopedCounter v2; + + public DualCounterMetric(ScopedCounter v1, ScopedCounter v2) { + this.v1 = v1; + this.v2 = v2; + } + + @Override + public CountHandle scope(String scopeName) { + CountHandle v1Handle = v1.scope(scopeName); + CountHandle v2Handle = v2.scope(scopeName); + return incrementBy -> { + v1Handle.incrBy(incrementBy); + v2Handle.incrBy(incrementBy); + }; + } +} diff --git a/core/src/main/java/org/apache/stormcrawler/metrics/DualReducedMetric.java b/core/src/main/java/org/apache/stormcrawler/metrics/DualReducedMetric.java new file mode 100644 index 000000000..da12f634c --- /dev/null +++ b/core/src/main/java/org/apache/stormcrawler/metrics/DualReducedMetric.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.metrics; + +/** + * Forwards reduced metric operations to both a V1 and a V2 {@link ScopedReducedMetric} + * simultaneously, allowing both metric pipelines to receive data during migration. + */ +public class DualReducedMetric implements ScopedReducedMetric { + + private final ScopedReducedMetric v1; + private final ScopedReducedMetric v2; + + public DualReducedMetric(ScopedReducedMetric v1, ScopedReducedMetric v2) { + this.v1 = v1; + this.v2 = v2; + } + + @Override + public ReduceHandle scope(String scopeName) { + ReduceHandle v1Handle = v1.scope(scopeName); + ReduceHandle v2Handle = v2.scope(scopeName); + return value -> { + v1Handle.update(value); + v2Handle.update(value); + }; + } +} diff --git a/core/src/main/java/org/apache/stormcrawler/metrics/ScopedCounter.java b/core/src/main/java/org/apache/stormcrawler/metrics/ScopedCounter.java new file mode 100644 index 000000000..9c53e05f0 --- /dev/null +++ b/core/src/main/java/org/apache/stormcrawler/metrics/ScopedCounter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.metrics; + +/** + * Abstraction for a scoped counter metric, compatible with both Storm V1 {@link + * org.apache.storm.metric.api.MultiCountMetric} and V2 Codahale {@link + * com.codahale.metrics.Counter} APIs. + */ +public interface ScopedCounter { + + /** Returns a counter handle for the given scope name. */ + CountHandle scope(String scopeName); + + /** A handle to increment a counter within a specific scope. */ + interface CountHandle { + + void incrBy(long incrementBy); + + default void incr() { + incrBy(1); + } + } +} diff --git a/core/src/main/java/org/apache/stormcrawler/metrics/ScopedReducedMetric.java b/core/src/main/java/org/apache/stormcrawler/metrics/ScopedReducedMetric.java new file mode 100644 index 000000000..60280dbec --- /dev/null +++ b/core/src/main/java/org/apache/stormcrawler/metrics/ScopedReducedMetric.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.metrics; + +/** + * Abstraction for a scoped reduced metric (mean, per-second rate, etc.), compatible with both Storm + * V1 {@link org.apache.storm.metric.api.MultiReducedMetric} and V2 Codahale metric APIs. + */ +public interface ScopedReducedMetric { + + /** Returns a metric handle for the given scope name. */ + ReduceHandle scope(String scopeName); + + /** A handle to update a reduced metric within a specific scope. */ + interface ReduceHandle { + + void update(Object value); + } +} diff --git a/core/src/main/java/org/apache/stormcrawler/metrics/v1/V1CounterMetric.java b/core/src/main/java/org/apache/stormcrawler/metrics/v1/V1CounterMetric.java new file mode 100644 index 000000000..43875478b --- /dev/null +++ b/core/src/main/java/org/apache/stormcrawler/metrics/v1/V1CounterMetric.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.metrics.v1; + +import org.apache.storm.metric.api.MultiCountMetric; +import org.apache.stormcrawler.metrics.ScopedCounter; + +/** V1 implementation of {@link ScopedCounter} backed by {@link MultiCountMetric}. */ +public class V1CounterMetric implements ScopedCounter { + + private final MultiCountMetric delegate; + + public V1CounterMetric(MultiCountMetric delegate) { + this.delegate = delegate; + } + + @Override + public CountHandle scope(String scopeName) { + return delegate.scope(scopeName)::incrBy; + } +} diff --git a/core/src/main/java/org/apache/stormcrawler/metrics/v1/V1ReducedMetric.java b/core/src/main/java/org/apache/stormcrawler/metrics/v1/V1ReducedMetric.java new file mode 100644 index 000000000..9b6889733 --- /dev/null +++ b/core/src/main/java/org/apache/stormcrawler/metrics/v1/V1ReducedMetric.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.metrics.v1; + +import org.apache.storm.metric.api.MultiReducedMetric; +import org.apache.stormcrawler.metrics.ScopedReducedMetric; + +/** V1 implementation of {@link ScopedReducedMetric} backed by {@link MultiReducedMetric}. */ +public class V1ReducedMetric implements ScopedReducedMetric { + + private final MultiReducedMetric delegate; + + public V1ReducedMetric(MultiReducedMetric delegate) { + this.delegate = delegate; + } + + @Override + public ReduceHandle scope(String scopeName) { + return delegate.scope(scopeName)::update; + } +} diff --git a/core/src/main/java/org/apache/stormcrawler/metrics/v2/V2CounterMetric.java b/core/src/main/java/org/apache/stormcrawler/metrics/v2/V2CounterMetric.java new file mode 100644 index 000000000..96d1e6be4 --- /dev/null +++ b/core/src/main/java/org/apache/stormcrawler/metrics/v2/V2CounterMetric.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.metrics.v2; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.storm.task.TopologyContext; +import org.apache.stormcrawler.metrics.ScopedCounter; + +/** + * V2 implementation of {@link ScopedCounter} using Codahale {@link com.codahale.metrics.Counter} + * instances registered via the Storm V2 metrics API. + */ +public class V2CounterMetric implements ScopedCounter { + + private final String baseName; + private final TopologyContext context; + private final ConcurrentMap handles = new ConcurrentHashMap<>(); + + public V2CounterMetric(String baseName, TopologyContext context) { + this.baseName = baseName; + this.context = context; + } + + @Override + public CountHandle scope(String scopeName) { + return handles.computeIfAbsent( + scopeName, + name -> { + String metricName = baseName + "." + name; + com.codahale.metrics.Counter counter = context.registerCounter(metricName); + return counter::inc; + }); + } +} diff --git a/core/src/main/java/org/apache/stormcrawler/metrics/v2/V2HistogramReducedMetric.java b/core/src/main/java/org/apache/stormcrawler/metrics/v2/V2HistogramReducedMetric.java new file mode 100644 index 000000000..5552050d2 --- /dev/null +++ b/core/src/main/java/org/apache/stormcrawler/metrics/v2/V2HistogramReducedMetric.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.metrics.v2; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.storm.task.TopologyContext; +import org.apache.stormcrawler.metrics.ScopedReducedMetric; + +/** + * V2 implementation of {@link ScopedReducedMetric} using Codahale {@link + * com.codahale.metrics.Histogram} instances. Used as the V2 replacement for {@code + * MultiReducedMetric(MeanReducer)}. + */ +public class V2HistogramReducedMetric implements ScopedReducedMetric { + + private final String baseName; + private final TopologyContext context; + private final ConcurrentMap handles = new ConcurrentHashMap<>(); + + public V2HistogramReducedMetric(String baseName, TopologyContext context) { + this.baseName = baseName; + this.context = context; + } + + @Override + public ReduceHandle scope(String scopeName) { + return handles.computeIfAbsent( + scopeName, + name -> { + String metricName = baseName + "." + name; + com.codahale.metrics.Histogram histogram = + context.registerHistogram(metricName); + return value -> histogram.update(toLong(value)); + }); + } + + private static long toLong(Object value) { + if (value instanceof Number) { + return ((Number) value).longValue(); + } + throw new IllegalArgumentException( + "V2HistogramReducedMetric: unsupported value type " + value.getClass()); + } +} diff --git a/core/src/main/java/org/apache/stormcrawler/metrics/v2/V2MeterReducedMetric.java b/core/src/main/java/org/apache/stormcrawler/metrics/v2/V2MeterReducedMetric.java new file mode 100644 index 000000000..d276609a3 --- /dev/null +++ b/core/src/main/java/org/apache/stormcrawler/metrics/v2/V2MeterReducedMetric.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.metrics.v2; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.storm.task.TopologyContext; +import org.apache.stormcrawler.metrics.ScopedReducedMetric; + +/** + * V2 implementation of {@link ScopedReducedMetric} using Codahale {@link + * com.codahale.metrics.Meter} instances. Used as the V2 replacement for {@code + * MultiReducedMetric(PerSecondReducer)}. + */ +public class V2MeterReducedMetric implements ScopedReducedMetric { + + private final String baseName; + private final TopologyContext context; + private final ConcurrentMap handles = new ConcurrentHashMap<>(); + + public V2MeterReducedMetric(String baseName, TopologyContext context) { + this.baseName = baseName; + this.context = context; + } + + @Override + public ReduceHandle scope(String scopeName) { + return handles.computeIfAbsent( + scopeName, + name -> { + String metricName = baseName + "." + name; + com.codahale.metrics.Meter meter = context.registerMeter(metricName); + return value -> meter.mark(toLong(value)); + }); + } + + private static long toLong(Object value) { + if (value instanceof Number) { + return ((Number) value).longValue(); + } + throw new IllegalArgumentException( + "V2MeterReducedMetric: unsupported value type " + value.getClass()); + } +} diff --git a/core/src/main/java/org/apache/stormcrawler/persistence/AbstractQueryingSpout.java b/core/src/main/java/org/apache/stormcrawler/persistence/AbstractQueryingSpout.java index e13e953b7..a1a164a29 100644 --- a/core/src/main/java/org/apache/stormcrawler/persistence/AbstractQueryingSpout.java +++ b/core/src/main/java/org/apache/stormcrawler/persistence/AbstractQueryingSpout.java @@ -26,15 +26,16 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.storm.metric.api.MultiCountMetric; +import java.util.function.Consumer; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; +import org.apache.stormcrawler.metrics.CrawlerMetrics; +import org.apache.stormcrawler.metrics.ScopedCounter; import org.apache.stormcrawler.persistence.urlbuffer.URLBuffer; -import org.apache.stormcrawler.util.CollectionMetric; import org.apache.stormcrawler.util.ConfUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,7 +86,7 @@ public abstract class AbstractQueryingSpout extends BaseRichSpout { private long timestampEmptyBuffer = -1; - protected MultiCountMetric eventCounter; + protected ScopedCounter eventCounter; protected URLBuffer buffer; @@ -94,7 +95,7 @@ public abstract class AbstractQueryingSpout extends BaseRichSpout { /** Required for implementations doing asynchronous calls. */ protected AtomicBoolean isInQuery = new AtomicBoolean(false); - protected CollectionMetric queryTimes; + protected Consumer queryTimes; @Override public void open( @@ -111,18 +112,21 @@ public void open( beingProcessed = new InProcessMap<>(ttlPurgatory, TimeUnit.SECONDS); - eventCounter = context.registerMetric("counters", new MultiCountMetric(), 10); + eventCounter = CrawlerMetrics.registerCounter(context, stormConf, "counters", 10); buffer = URLBuffer.createInstance(stormConf); - context.registerMetric("buffer_size", () -> buffer.size(), 10); - context.registerMetric("numQueues", () -> buffer.numQueues(), 10); + CrawlerMetrics.registerGauge(context, stormConf, "buffer_size", buffer::size, 10); + CrawlerMetrics.registerGauge(context, stormConf, "numQueues", buffer::numQueues, 10); - context.registerMetric("beingProcessed", () -> beingProcessed.size(), 10); - context.registerMetric("inPurgatory", () -> beingProcessed.inCache(), 10); + CrawlerMetrics.registerGauge( + context, stormConf, "beingProcessed", beingProcessed::size, 10); + CrawlerMetrics.registerGauge( + context, stormConf, "inPurgatory", beingProcessed::inCache, 10); - queryTimes = new CollectionMetric(); - context.registerMetric("spout_query_time_msec", queryTimes, 10); + queryTimes = + CrawlerMetrics.registerCollectionMetric( + context, stormConf, "spout_query_time_msec", 10); resetFetchDateAfterNSecs = ConfUtils.getInt(stormConf, resetFetchDateParamName, resetFetchDateAfterNSecs); diff --git a/core/src/main/java/org/apache/stormcrawler/persistence/AbstractStatusUpdaterBolt.java b/core/src/main/java/org/apache/stormcrawler/persistence/AbstractStatusUpdaterBolt.java index 0eb2d2d72..ee3f648ff 100644 --- a/core/src/main/java/org/apache/stormcrawler/persistence/AbstractStatusUpdaterBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/persistence/AbstractStatusUpdaterBolt.java @@ -23,11 +23,9 @@ import java.time.format.DateTimeFormatter; import java.util.Calendar; import java.util.Date; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import org.apache.commons.lang3.time.DateUtils; -import org.apache.storm.metric.api.IMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -37,6 +35,7 @@ import org.apache.storm.tuple.Values; import org.apache.stormcrawler.Constants; import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.metrics.CrawlerMetrics; import org.apache.stormcrawler.util.ConfUtils; import org.apache.stormcrawler.util.MetadataTransfer; import org.slf4j.Logger; @@ -109,21 +108,28 @@ public void prepare( String spec = ConfUtils.getString(stormConf, cacheConfigParamName); cache = Caffeine.from(spec).build(); - context.registerMetric( - "cache", - new IMetric() { - @Override - public Object getValueAndReset() { - Map statsMap = new HashMap<>(); - statsMap.put("hits", cacheHits); - statsMap.put("misses", cacheMisses); - statsMap.put("size", cache.estimatedSize()); - cacheHits = 0; - cacheMisses = 0; - return statsMap; - } + CrawlerMetrics.registerGauge( + context, + stormConf, + "cache.hits", + () -> { + long v = cacheHits; + cacheHits = 0; + return v; }, 30); + CrawlerMetrics.registerGauge( + context, + stormConf, + "cache.misses", + () -> { + long v = cacheMisses; + cacheMisses = 0; + return v; + }, + 30); + CrawlerMetrics.registerGauge( + context, stormConf, "cache.size", cache::estimatedSize, 30); } maxFetchErrors = ConfUtils.getInt(stormConf, maxFetchErrorsParamName, 3); diff --git a/core/src/main/java/org/apache/stormcrawler/spout/MemorySpout.java b/core/src/main/java/org/apache/stormcrawler/spout/MemorySpout.java index 4c4480bbc..221dae683 100644 --- a/core/src/main/java/org/apache/stormcrawler/spout/MemorySpout.java +++ b/core/src/main/java/org/apache/stormcrawler/spout/MemorySpout.java @@ -31,6 +31,7 @@ import org.apache.storm.tuple.Fields; import org.apache.stormcrawler.Constants; import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.metrics.CrawlerMetrics; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.util.StringTabScheme; import org.slf4j.Logger; @@ -99,7 +100,7 @@ public void open( scheme.deserialize(ByteBuffer.wrap(u.getBytes(StandardCharsets.UTF_8))); add((String) tuple.get(0), (Metadata) tuple.get(1), now); } - context.registerMetric("queue_size", () -> queue.size(), 10); + CrawlerMetrics.registerGauge(context, conf, "queue_size", queue::size, 10); } @Override diff --git a/core/src/test/java/org/apache/stormcrawler/TestUtil.java b/core/src/test/java/org/apache/stormcrawler/TestUtil.java index c226cfc1f..058ef862d 100644 --- a/core/src/test/java/org/apache/stormcrawler/TestUtil.java +++ b/core/src/test/java/org/apache/stormcrawler/TestUtil.java @@ -39,15 +39,7 @@ private TestUtil() {} public static TopologyContext getMockedTopologyContext() { TopologyContext context = mock(TopologyContext.class); - when(context.registerMetric(anyString(), any(IMetric.class), anyInt())) - .thenAnswer( - new Answer() { - - @Override - public IMetric answer(InvocationOnMock invocation) throws Throwable { - return invocation.getArgument(1, IMetric.class); - } - }); + mockMetricRegistration(context); return context; } @@ -63,15 +55,7 @@ public static TopologyContext getMockedTopologyContextWithBucket( int taskIndex, int totalTasks, String componentId) { TopologyContext context = mock(TopologyContext.class); - // Mock metric registration - when(context.registerMetric(anyString(), any(IMetric.class), anyInt())) - .thenAnswer( - new Answer() { - @Override - public IMetric answer(InvocationOnMock invocation) throws Throwable { - return invocation.getArgument(1, IMetric.class); - } - }); + mockMetricRegistration(context); // Mock task information for bucket assignment when(context.getThisTaskIndex()).thenReturn(taskIndex); @@ -87,6 +71,33 @@ public IMetric answer(InvocationOnMock invocation) throws Throwable { return context; } + /** Sets up mock responses for both V1 and V2 metric registration on a TopologyContext. */ + private static void mockMetricRegistration(TopologyContext context) { + // V1 metric registration + when(context.registerMetric(anyString(), any(IMetric.class), anyInt())) + .thenAnswer( + new Answer() { + @Override + public IMetric answer(InvocationOnMock invocation) throws Throwable { + return invocation.getArgument(1, IMetric.class); + } + }); + + // V2 metric registration + when(context.registerCounter(anyString())) + .thenAnswer(invocation -> new com.codahale.metrics.Counter()); + when(context.registerHistogram(anyString())) + .thenAnswer( + invocation -> + new com.codahale.metrics.Histogram( + new com.codahale.metrics.ExponentiallyDecayingReservoir())); + when(context.registerMeter(anyString())) + .thenAnswer(invocation -> new com.codahale.metrics.Meter()); + when(context.registerGauge(anyString(), any(com.codahale.metrics.Gauge.class))) + .thenAnswer( + invocation -> invocation.getArgument(1, com.codahale.metrics.Gauge.class)); + } + public static Tuple getMockedTestTuple(String url, String content, Metadata metadata) { Tuple tuple = mock(Tuple.class); when(tuple.getStringByField("url")).thenReturn(url); diff --git a/core/src/test/java/org/apache/stormcrawler/metrics/DualCounterMetricTest.java b/core/src/test/java/org/apache/stormcrawler/metrics/DualCounterMetricTest.java new file mode 100644 index 000000000..28cbebd71 --- /dev/null +++ b/core/src/test/java/org/apache/stormcrawler/metrics/DualCounterMetricTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.metrics; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.concurrent.atomic.AtomicLong; +import org.junit.jupiter.api.Test; + +class DualCounterMetricTest { + + @Test + void scopeDelegatesToBothCounters() { + AtomicLong v1Total = new AtomicLong(); + AtomicLong v2Total = new AtomicLong(); + + ScopedCounter v1 = scopeName -> v1Total::addAndGet; + ScopedCounter v2 = scopeName -> v2Total::addAndGet; + + DualCounterMetric dual = new DualCounterMetric(v1, v2); + ScopedCounter.CountHandle handle = dual.scope("test"); + + handle.incrBy(5); + assertEquals(5, v1Total.get()); + assertEquals(5, v2Total.get()); + + handle.incrBy(3); + assertEquals(8, v1Total.get()); + assertEquals(8, v2Total.get()); + } +} diff --git a/core/src/test/java/org/apache/stormcrawler/metrics/DualReducedMetricTest.java b/core/src/test/java/org/apache/stormcrawler/metrics/DualReducedMetricTest.java new file mode 100644 index 000000000..0f9cf5c52 --- /dev/null +++ b/core/src/test/java/org/apache/stormcrawler/metrics/DualReducedMetricTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.metrics; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +class DualReducedMetricTest { + + @Test + void scopeDelegatesToBothReducedMetrics() { + List v1Values = new ArrayList<>(); + List v2Values = new ArrayList<>(); + + ScopedReducedMetric v1 = scopeName -> v1Values::add; + ScopedReducedMetric v2 = scopeName -> v2Values::add; + + DualReducedMetric dual = new DualReducedMetric(v1, v2); + ScopedReducedMetric.ReduceHandle handle = dual.scope("test"); + + handle.update(42); + handle.update(100); + + assertEquals(List.of(42, 100), v1Values); + assertEquals(List.of(42, 100), v2Values); + } +} diff --git a/external/aws/src/main/java/org/apache/stormcrawler/aws/bolt/CloudSearchIndexerBolt.java b/external/aws/src/main/java/org/apache/stormcrawler/aws/bolt/CloudSearchIndexerBolt.java index 2491f5163..24f6bac6c 100644 --- a/external/aws/src/main/java/org/apache/stormcrawler/aws/bolt/CloudSearchIndexerBolt.java +++ b/external/aws/src/main/java/org/apache/stormcrawler/aws/bolt/CloudSearchIndexerBolt.java @@ -52,7 +52,6 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.storm.Config; -import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; @@ -60,6 +59,8 @@ import org.apache.storm.utils.TupleUtils; import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.indexing.AbstractIndexerBolt; +import org.apache.stormcrawler.metrics.CrawlerMetrics; +import org.apache.stormcrawler.metrics.ScopedCounter; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.util.ConfUtils; import org.slf4j.Logger; @@ -91,7 +92,7 @@ public class CloudSearchIndexerBolt extends AbstractIndexerBolt { private OutputCollector _collector; - private MultiCountMetric eventCounter; + private ScopedCounter eventCounter; private Map csfields = new HashMap<>(); @@ -105,8 +106,7 @@ public void prepare( super.prepare(conf, context, collector); _collector = collector; - this.eventCounter = - context.registerMetric("CloudSearchIndexer", new MultiCountMetric(), 10); + this.eventCounter = CrawlerMetrics.registerCounter(context, conf, "CloudSearchIndexer", 10); maxTimeBuffered = ConfUtils.getInt(conf, CloudSearchConstants.MAX_TIME_BUFFERED, 10); diff --git a/external/aws/src/main/java/org/apache/stormcrawler/aws/s3/AbstractS3CacheBolt.java b/external/aws/src/main/java/org/apache/stormcrawler/aws/s3/AbstractS3CacheBolt.java index 94ab205ee..351b35674 100644 --- a/external/aws/src/main/java/org/apache/stormcrawler/aws/s3/AbstractS3CacheBolt.java +++ b/external/aws/src/main/java/org/apache/stormcrawler/aws/s3/AbstractS3CacheBolt.java @@ -25,12 +25,12 @@ import com.amazonaws.services.s3.AmazonS3Client; import java.util.Map; import org.apache.commons.lang3.StringUtils; -import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; +import org.apache.stormcrawler.metrics.ScopedCounter; import org.apache.stormcrawler.util.ConfUtils; public abstract class AbstractS3CacheBolt extends BaseRichBolt { @@ -45,7 +45,7 @@ public abstract class AbstractS3CacheBolt extends BaseRichBolt { public static final String INCACHE = S3_PREFIX + "inCache"; protected OutputCollector _collector; - protected MultiCountMetric eventCounter; + protected ScopedCounter eventCounter; protected AmazonS3Client client; diff --git a/external/aws/src/main/java/org/apache/stormcrawler/aws/s3/S3CacheChecker.java b/external/aws/src/main/java/org/apache/stormcrawler/aws/s3/S3CacheChecker.java index 52bc447fa..c560dc00c 100644 --- a/external/aws/src/main/java/org/apache/stormcrawler/aws/s3/S3CacheChecker.java +++ b/external/aws/src/main/java/org/apache/stormcrawler/aws/s3/S3CacheChecker.java @@ -23,7 +23,6 @@ import java.net.URLEncoder; import java.util.Map; import org.apache.commons.io.IOUtils; -import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -31,6 +30,7 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.metrics.CrawlerMetrics; import org.apache.stormcrawler.util.ConfUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +56,7 @@ public void prepare(Map conf, TopologyContext context, OutputCollector collector String message = "Bucket " + bucketName + " does not exist"; throw new RuntimeException(message); } - this.eventCounter = context.registerMetric("s3cache_counter", new MultiCountMetric(), 10); + this.eventCounter = CrawlerMetrics.registerCounter(context, conf, "s3cache_counter", 10); } @Override diff --git a/external/aws/src/main/java/org/apache/stormcrawler/aws/s3/S3Cacher.java b/external/aws/src/main/java/org/apache/stormcrawler/aws/s3/S3Cacher.java index d8b0a271c..ce1203949 100644 --- a/external/aws/src/main/java/org/apache/stormcrawler/aws/s3/S3Cacher.java +++ b/external/aws/src/main/java/org/apache/stormcrawler/aws/s3/S3Cacher.java @@ -25,12 +25,12 @@ import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.Map; -import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.metrics.CrawlerMetrics; import org.apache.stormcrawler.util.ConfUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,8 +61,8 @@ public void prepare(Map conf, TopologyContext context, OutputCollector collector throw new RuntimeException(message); } this.eventCounter = - context.registerMetric( - getMetricPrefix() + "s3cache_counter", new MultiCountMetric(), 10); + CrawlerMetrics.registerCounter( + context, conf, getMetricPrefix() + "s3cache_counter", 10); } @Override diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java index c67b90951..f0fdec9ea 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java @@ -36,6 +36,7 @@ import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.metrics.CrawlerMetrics; import org.apache.stormcrawler.opensearch.BulkItemResponseToFailedFlag; import org.apache.stormcrawler.opensearch.OpenSearchConnection; import org.apache.stormcrawler.util.ConfUtils; @@ -103,7 +104,7 @@ public void prepare( .removalListener(this) .build(); - context.registerMetric("waitAck", () -> waitAck.estimatedSize(), 10); + CrawlerMetrics.registerGauge(context, conf, "waitAck", waitAck::estimatedSize, 10); } public void onRemoval( diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java index 04de31cae..39b8bf053 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java @@ -37,8 +37,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import org.apache.storm.metric.api.MultiCountMetric; -import org.apache.storm.metric.api.MultiReducedMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; @@ -46,12 +44,14 @@ import org.apache.stormcrawler.Constants; import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.indexing.AbstractIndexerBolt; +import org.apache.stormcrawler.metrics.CrawlerMetrics; +import org.apache.stormcrawler.metrics.ScopedCounter; +import org.apache.stormcrawler.metrics.ScopedReducedMetric; import org.apache.stormcrawler.opensearch.BulkItemResponseToFailedFlag; import org.apache.stormcrawler.opensearch.IndexCreation; import org.apache.stormcrawler.opensearch.OpenSearchConnection; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.util.ConfUtils; -import org.apache.stormcrawler.util.PerSecondReducer; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.opensearch.action.DocWriteRequest; @@ -93,11 +93,11 @@ public class IndexerBolt extends AbstractIndexerBolt // overwritten private boolean create = false; - private MultiCountMetric eventCounter; + private ScopedCounter eventCounter; private OpenSearchConnection connection; - private MultiReducedMetric perSecMetrics; + private ScopedReducedMetric perSecMetrics; private Cache> waitAck; @@ -130,13 +130,10 @@ public void prepare( throw new RuntimeException(e1); } - this.eventCounter = context.registerMetric("OpensearchIndexer", new MultiCountMetric(), 10); + this.eventCounter = CrawlerMetrics.registerCounter(context, conf, "OpensearchIndexer", 10); this.perSecMetrics = - context.registerMetric( - "Indexer_average_persec", - new MultiReducedMetric(new PerSecondReducer()), - 10); + CrawlerMetrics.registerPerSecMetric(context, conf, "Indexer_average_persec", 10); waitAck = Caffeine.newBuilder() @@ -144,7 +141,7 @@ public void prepare( .removalListener(this) .build(); - context.registerMetric("waitAck", () -> waitAck.estimatedSize(), 10); + CrawlerMetrics.registerGauge(context, conf, "waitAck", waitAck::estimatedSize, 10); // use the default status schema if none has been specified try { diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/metrics/MetricsReporter.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/metrics/MetricsReporter.java new file mode 100644 index 000000000..3957ba6ab --- /dev/null +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/metrics/MetricsReporter.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.opensearch.metrics; + +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.Timer; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; +import java.util.Map; +import java.util.SortedMap; +import java.util.concurrent.TimeUnit; +import org.apache.storm.metrics2.reporters.ScheduledStormReporter; +import org.apache.stormcrawler.opensearch.IndexCreation; +import org.apache.stormcrawler.opensearch.OpenSearchConnection; +import org.apache.stormcrawler.util.ConfUtils; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.core.xcontent.XContentBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Storm V2 metrics reporter that writes metrics to an OpenSearch index with the same document + * structure as the V1 {@link MetricsConsumer}. This allows existing OpenSearch dashboards to work + * unchanged during migration from V1 to V2 metrics. + * + *

Configuration in storm.yaml: + * + *

+ *   storm.metrics.reporters:
+ *     - class: "org.apache.stormcrawler.opensearch.metrics.MetricsReporter"
+ *       report.period: 10
+ *       report.period.units: "SECONDS"
+ * 
+ */ +public class MetricsReporter extends ScheduledStormReporter { + + private static final Logger LOG = LoggerFactory.getLogger(MetricsReporter.class); + + private static final String OSBoltType = "metrics"; + + private static final String OSMetricsIndexNameParamName = + "opensearch." + OSBoltType + ".index.name"; + + private static final String DATE_FORMAT_KEY = "opensearch.metrics.date.format"; + + @Override + public void prepare( + MetricRegistry metricsRegistry, + Map topoConf, + Map reporterConf) { + + String indexName = ConfUtils.getString(topoConf, OSMetricsIndexNameParamName, "metrics"); + String stormId = (String) topoConf.getOrDefault("storm.id", "unknown"); + + SimpleDateFormat dateFormat = null; + String dateFormatStr = ConfUtils.getString(topoConf, DATE_FORMAT_KEY, null); + if (dateFormatStr != null) { + dateFormat = new SimpleDateFormat(dateFormatStr, Locale.ROOT); + } + + OpenSearchConnection connection; + try { + connection = OpenSearchConnection.getConnection(topoConf, OSBoltType); + } catch (Exception e) { + LOG.error("Can't connect to OpenSearch", e); + throw new RuntimeException(e); + } + + try { + IndexCreation.checkOrCreateIndexTemplate(connection.getClient(), OSBoltType, LOG); + } catch (IOException e) { + throw new RuntimeException(e); + } + + TimeUnit reportPeriodUnit = getReportPeriodUnit(reporterConf); + long reportPeriod = getReportPeriod(reporterConf); + + reporter = + new OpenSearchScheduledReporter( + metricsRegistry, indexName, stormId, dateFormat, connection); + + reporter.start(reportPeriod, reportPeriodUnit); + } + + private ScheduledReporter reporter; + + @Override + public void start() { + // already started in prepare() + } + + @Override + public void stop() { + if (reporter != null) { + reporter.stop(); + } + } + + /** + * Inner ScheduledReporter that writes Codahale metrics to OpenSearch in the same format as the + * V1 MetricsConsumer. + */ + private static class OpenSearchScheduledReporter extends ScheduledReporter { + + private final String indexName; + private final String stormId; + private final SimpleDateFormat dateFormat; + private final OpenSearchConnection connection; + + OpenSearchScheduledReporter( + MetricRegistry registry, + String indexName, + String stormId, + SimpleDateFormat dateFormat, + OpenSearchConnection connection) { + super( + registry, + "opensearch-metrics-reporter", + MetricFilter.ALL, + TimeUnit.SECONDS, + TimeUnit.MILLISECONDS); + this.indexName = indexName; + this.stormId = stormId; + this.dateFormat = dateFormat; + this.connection = connection; + } + + @Override + @SuppressWarnings("rawtypes") + public void report( + SortedMap gauges, + SortedMap counters, + SortedMap histograms, + SortedMap meters, + SortedMap timers) { + + Date now = new Date(); + + for (Map.Entry entry : gauges.entrySet()) { + Object value = entry.getValue().getValue(); + if (value instanceof Number) { + indexDataPoint(now, entry.getKey(), ((Number) value).doubleValue()); + } else if (value instanceof Map) { + for (Map.Entry mapEntry : ((Map) value).entrySet()) { + if (mapEntry.getValue() instanceof Number) { + indexDataPoint( + now, + entry.getKey() + "." + mapEntry.getKey(), + ((Number) mapEntry.getValue()).doubleValue()); + } + } + } + } + + for (Map.Entry entry : counters.entrySet()) { + indexDataPoint(now, entry.getKey(), entry.getValue().getCount()); + } + + for (Map.Entry entry : histograms.entrySet()) { + indexDataPoint(now, entry.getKey(), entry.getValue().getSnapshot().getMean()); + } + + for (Map.Entry entry : meters.entrySet()) { + indexDataPoint(now, entry.getKey(), entry.getValue().getOneMinuteRate()); + } + + for (Map.Entry entry : timers.entrySet()) { + indexDataPoint(now, entry.getKey(), entry.getValue().getSnapshot().getMean()); + } + } + + private String getIndexName(Date timestamp) { + if (dateFormat == null) { + return indexName; + } + return indexName + "-" + dateFormat.format(timestamp); + } + + private void indexDataPoint(Date timestamp, String name, double value) { + try { + XContentBuilder builder = jsonBuilder().startObject(); + builder.field("stormId", stormId); + builder.field("name", name); + builder.field("value", value); + builder.field("timestamp", timestamp); + builder.endObject(); + + IndexRequest indexRequest = + new IndexRequest(getIndexName(timestamp)).source(builder); + connection.addToProcessor(indexRequest); + } catch (Exception e) { + LOG.error("Problem when building request for OpenSearch", e); + } + } + } +} diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/metrics/StatusMetricsBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/metrics/StatusMetricsBolt.java index 56edf6967..30c0ebad9 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/metrics/StatusMetricsBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/metrics/StatusMetricsBolt.java @@ -26,6 +26,7 @@ import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.TupleUtils; +import org.apache.stormcrawler.metrics.CrawlerMetrics; import org.apache.stormcrawler.opensearch.Constants; import org.apache.stormcrawler.opensearch.OpenSearchConnection; import org.apache.stormcrawler.util.ConfUtils; @@ -106,12 +107,8 @@ public void prepare( throw new RuntimeException(e1); } - context.registerMetric( - "status.count", - () -> { - return latestStatusCounts; - }, - freqStats); + CrawlerMetrics.registerGauge( + context, stormConf, "status.count", () -> latestStatusCounts, freqStats); listeners = new StatusActionListener[6]; diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/AggregationSpout.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/AggregationSpout.java index 2eb97102f..dd1eee983 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/AggregationSpout.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/AggregationSpout.java @@ -301,7 +301,7 @@ public void onResponse(SearchResponse response) { alreadyprocessed, ((float) timeTaken / numhits)); - queryTimes.addMeasurement(timeTaken); + queryTimes.accept(timeTaken); eventCounter.scope("already_being_processed").incrBy(alreadyprocessed); eventCounter.scope("ES_queries").incrBy(1); eventCounter.scope("ES_docs").incrBy(numhits); diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java index bd178f7db..df1598f07 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java @@ -35,12 +35,13 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import org.apache.storm.metric.api.MultiCountMetric; -import org.apache.storm.metric.api.MultiReducedMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.metrics.CrawlerMetrics; +import org.apache.stormcrawler.metrics.ScopedCounter; +import org.apache.stormcrawler.metrics.ScopedReducedMetric; import org.apache.stormcrawler.opensearch.BulkItemResponseToFailedFlag; import org.apache.stormcrawler.opensearch.Constants; import org.apache.stormcrawler.opensearch.IndexCreation; @@ -48,7 +49,6 @@ import org.apache.stormcrawler.persistence.AbstractStatusUpdaterBolt; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.util.ConfUtils; -import org.apache.stormcrawler.util.PerSecondReducer; import org.apache.stormcrawler.util.URLPartitioner; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -100,9 +100,9 @@ public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt // Be fair due to cache timeout private final ReentrantLock waitAckLock = new ReentrantLock(true); - private MultiCountMetric eventCounter; + private ScopedCounter eventCounter; - private MultiReducedMetric receivedPerSecMetrics; + private ScopedReducedMetric receivedPerSecMetrics; public StatusUpdaterBolt() { super(); @@ -174,18 +174,17 @@ public void prepare( int metrics_time_bucket_secs = 30; // create gauge for waitAck - context.registerMetric("waitAck", () -> waitAck.estimatedSize(), metrics_time_bucket_secs); + CrawlerMetrics.registerGauge( + context, stormConf, "waitAck", waitAck::estimatedSize, metrics_time_bucket_secs); // benchmarking - average number of items received back by Elastic per second this.receivedPerSecMetrics = - context.registerMetric( - "average_persec", - new MultiReducedMetric(new PerSecondReducer()), - metrics_time_bucket_secs); + CrawlerMetrics.registerPerSecMetric( + context, stormConf, "average_persec", metrics_time_bucket_secs); this.eventCounter = - context.registerMetric( - "counters", new MultiCountMetric(), metrics_time_bucket_secs); + CrawlerMetrics.registerCounter( + context, stormConf, "counters", metrics_time_bucket_secs); try { connection = OpenSearchConnection.getConnection(stormConf, OSBoltType, this); diff --git a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/metrics/MetricsReporterTest.java b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/metrics/MetricsReporterTest.java new file mode 100644 index 000000000..8c0c9413e --- /dev/null +++ b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/metrics/MetricsReporterTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.opensearch.metrics; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.MetricRegistry; +import java.util.HashMap; +import java.util.Map; +import org.apache.stormcrawler.opensearch.bolt.AbstractOpenSearchTest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +class MetricsReporterTest extends AbstractOpenSearchTest { + + @Test + @Timeout(60) + void prepareAndReportMetrics() { + MetricRegistry registry = new MetricRegistry(); + Counter counter = registry.counter("test.counter"); + counter.inc(42); + + Map topoConf = new HashMap<>(); + topoConf.put( + "opensearch.metrics.addresses", + opensearchContainer.getHost() + ":" + opensearchContainer.getFirstMappedPort()); + + Map reporterConf = new HashMap<>(); + reporterConf.put("report.period", 60L); + reporterConf.put("report.period.units", "SECONDS"); + + MetricsReporter reporter = new MetricsReporter(); + assertDoesNotThrow(() -> reporter.prepare(registry, topoConf, reporterConf)); + assertNotNull(reporter); + reporter.stop(); + } +} diff --git a/external/solr/src/main/java/org/apache/stormcrawler/solr/bolt/IndexerBolt.java b/external/solr/src/main/java/org/apache/stormcrawler/solr/bolt/IndexerBolt.java index 2b10dc838..c14854c71 100644 --- a/external/solr/src/main/java/org/apache/stormcrawler/solr/bolt/IndexerBolt.java +++ b/external/solr/src/main/java/org/apache/stormcrawler/solr/bolt/IndexerBolt.java @@ -22,13 +22,14 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.solr.common.SolrInputDocument; -import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.indexing.AbstractIndexerBolt; +import org.apache.stormcrawler.metrics.CrawlerMetrics; +import org.apache.stormcrawler.metrics.ScopedCounter; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.solr.SolrConnection; import org.slf4j.Logger; @@ -43,7 +44,7 @@ public class IndexerBolt extends AbstractIndexerBolt { private OutputCollector _collector; - private MultiCountMetric eventCounter; + private ScopedCounter eventCounter; private SolrConnection connection; @@ -61,7 +62,7 @@ public void prepare( throw new RuntimeException(e); } - this.eventCounter = context.registerMetric("SolrIndexerBolt", new MultiCountMetric(), 10); + this.eventCounter = CrawlerMetrics.registerCounter(context, conf, "SolrIndexerBolt", 10); } @Override diff --git a/external/solr/src/main/java/org/apache/stormcrawler/solr/metrics/MetricsReporter.java b/external/solr/src/main/java/org/apache/stormcrawler/solr/metrics/MetricsReporter.java new file mode 100644 index 000000000..f8cfd4b0a --- /dev/null +++ b/external/solr/src/main/java/org/apache/stormcrawler/solr/metrics/MetricsReporter.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.solr.metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.Timer; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; +import java.util.Map; +import java.util.SortedMap; +import java.util.concurrent.TimeUnit; +import org.apache.solr.common.SolrInputDocument; +import org.apache.storm.metrics2.reporters.ScheduledStormReporter; +import org.apache.stormcrawler.solr.SolrConnection; +import org.apache.stormcrawler.util.ConfUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Storm V2 metrics reporter that writes metrics to Solr with the same document structure as the V1 + * {@link MetricsConsumer}. + * + *

Configuration in storm.yaml: + * + *

+ *   storm.metrics.reporters:
+ *     - class: "org.apache.stormcrawler.solr.metrics.MetricsReporter"
+ *       report.period: 10
+ *       report.period.units: "SECONDS"
+ * 
+ */ +public class MetricsReporter extends ScheduledStormReporter { + + private static final Logger LOG = LoggerFactory.getLogger(MetricsReporter.class); + + private static final String BOLT_TYPE = "metrics"; + + private static final String SolrTTLParamName = "solr.metrics.ttl"; + private static final String SolrTTLFieldParamName = "solr.metrics.ttl.field"; + + private ScheduledReporter reporter; + + @Override + public void prepare( + MetricRegistry metricsRegistry, + Map topoConf, + Map reporterConf) { + + String ttlField = ConfUtils.getString(topoConf, SolrTTLFieldParamName, "__ttl__"); + String ttl = ConfUtils.getString(topoConf, SolrTTLParamName, null); + + SolrConnection connection; + try { + connection = SolrConnection.getConnection(topoConf, BOLT_TYPE); + } catch (Exception e) { + LOG.error("Can't connect to Solr: {}", e); + throw new RuntimeException(e); + } + + TimeUnit reportPeriodUnit = getReportPeriodUnit(reporterConf); + long reportPeriod = getReportPeriod(reporterConf); + + reporter = new SolrScheduledReporter(metricsRegistry, connection, ttlField, ttl); + reporter.start(reportPeriod, reportPeriodUnit); + } + + @Override + public void start() { + // already started in prepare() + } + + @Override + public void stop() { + if (reporter != null) { + reporter.stop(); + } + } + + private static class SolrScheduledReporter extends ScheduledReporter { + + private final SolrConnection connection; + private final String ttlField; + private final String ttl; + private final DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.ROOT); + + SolrScheduledReporter( + MetricRegistry registry, SolrConnection connection, String ttlField, String ttl) { + super( + registry, + "solr-metrics-reporter", + MetricFilter.ALL, + TimeUnit.SECONDS, + TimeUnit.MILLISECONDS); + this.connection = connection; + this.ttlField = ttlField; + this.ttl = ttl; + } + + @Override + @SuppressWarnings("rawtypes") + public void report( + SortedMap gauges, + SortedMap counters, + SortedMap histograms, + SortedMap meters, + SortedMap timers) { + + Date now = new Date(); + + for (Map.Entry entry : gauges.entrySet()) { + Object value = entry.getValue().getValue(); + if (value instanceof Number) { + indexDataPoint(now, entry.getKey(), ((Number) value).doubleValue()); + } + } + + for (Map.Entry entry : counters.entrySet()) { + indexDataPoint(now, entry.getKey(), entry.getValue().getCount()); + } + + for (Map.Entry entry : histograms.entrySet()) { + indexDataPoint(now, entry.getKey(), entry.getValue().getSnapshot().getMean()); + } + + for (Map.Entry entry : meters.entrySet()) { + indexDataPoint(now, entry.getKey(), entry.getValue().getOneMinuteRate()); + } + + for (Map.Entry entry : timers.entrySet()) { + indexDataPoint(now, entry.getKey(), entry.getValue().getSnapshot().getMean()); + } + } + + private void indexDataPoint(Date timestamp, String name, double value) { + try { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("name", name); + doc.addField("value", value); + doc.addField("timestamp", df.format(timestamp)); + + if (ttl != null) { + doc.addField(ttlField, ttl); + } + + connection.addAsync(doc); + } catch (Exception e) { + LOG.error("Problem building a document to Solr", e); + } + } + } +} diff --git a/external/solr/src/main/java/org/apache/stormcrawler/solr/persistence/SolrSpout.java b/external/solr/src/main/java/org/apache/stormcrawler/solr/persistence/SolrSpout.java index 10b437c54..813c401ce 100644 --- a/external/solr/src/main/java/org/apache/stormcrawler/solr/persistence/SolrSpout.java +++ b/external/solr/src/main/java/org/apache/stormcrawler/solr/persistence/SolrSpout.java @@ -202,7 +202,7 @@ protected void handleSuccess(QueryResponse response) { markQueryReceivedNow(); - queryTimes.addMeasurement(timeTaken); + queryTimes.accept(timeTaken); SolrDocumentList docs = new SolrDocumentList(); diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java index 2ae45ae7d..53c320ad8 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/IndexerBolt.java @@ -28,13 +28,14 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.indexing.AbstractIndexerBolt; +import org.apache.stormcrawler.metrics.CrawlerMetrics; +import org.apache.stormcrawler.metrics.ScopedCounter; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.util.ConfUtils; import org.slf4j.Logger; @@ -49,7 +50,7 @@ public class IndexerBolt extends AbstractIndexerBolt { private OutputCollector collector; - private MultiCountMetric eventCounter; + private ScopedCounter eventCounter; private Connection connection; @@ -65,7 +66,7 @@ public void prepare( super.prepare(conf, context, collector); this.collector = collector; - this.eventCounter = context.registerMetric("SQLIndexer", new MultiCountMetric(), 10); + this.eventCounter = CrawlerMetrics.registerCounter(context, conf, "SQLIndexer", 10); this.tableName = ConfUtils.getString(conf, SQL_INDEX_TABLE_PARAM_NAME); diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java index 8c69c511f..0129cf3dd 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/SQLSpout.java @@ -221,7 +221,7 @@ public void close() { private long recordQueryTiming(long timeStartQuery) { long timeTaken = System.currentTimeMillis() - timeStartQuery; - queryTimes.addMeasurement(timeTaken); + queryTimes.accept(timeTaken); return timeTaken; } diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java index e2b0e91ed..80642dba0 100644 --- a/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/StatusUpdaterBolt.java @@ -32,11 +32,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.metrics.CrawlerMetrics; +import org.apache.stormcrawler.metrics.ScopedCounter; import org.apache.stormcrawler.persistence.AbstractStatusUpdaterBolt; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.util.ConfUtils; @@ -53,7 +54,7 @@ public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt { private static final Timestamp NEVER = Timestamp.valueOf("3000-01-01 00:00:00"); - private MultiCountMetric eventCounter; + private ScopedCounter eventCounter; private Connection connection; @@ -87,7 +88,7 @@ public void prepare( partitioner = new URLPartitioner(); partitioner.configure(stormConf); - this.eventCounter = context.registerMetric("counter", new MultiCountMetric(), 10); + this.eventCounter = CrawlerMetrics.registerCounter(context, stormConf, "counter", 10); final String tableName = ConfUtils.getString(stormConf, Constants.SQL_STATUS_TABLE_PARAM_NAME, "urls"); diff --git a/external/sql/src/main/java/org/apache/stormcrawler/sql/metrics/MetricsReporter.java b/external/sql/src/main/java/org/apache/stormcrawler/sql/metrics/MetricsReporter.java new file mode 100644 index 000000000..c8e6240c2 --- /dev/null +++ b/external/sql/src/main/java/org/apache/stormcrawler/sql/metrics/MetricsReporter.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.sql.metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.Timer; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.Map; +import java.util.SortedMap; +import java.util.concurrent.TimeUnit; +import org.apache.storm.metrics2.reporters.ScheduledStormReporter; +import org.apache.stormcrawler.sql.Constants; +import org.apache.stormcrawler.sql.SQLUtil; +import org.apache.stormcrawler.util.ConfUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Storm V2 metrics reporter that writes metrics to a SQL table with the same schema as the V1 + * {@link MetricsConsumer}. + * + *

Configuration in storm.yaml: + * + *

+ *   storm.metrics.reporters:
+ *     - class: "org.apache.stormcrawler.sql.metrics.MetricsReporter"
+ *       report.period: 10
+ *       report.period.units: "SECONDS"
+ * 
+ */ +public class MetricsReporter extends ScheduledStormReporter { + + private static final Logger LOG = LoggerFactory.getLogger(MetricsReporter.class); + + private ScheduledReporter reporter; + + @Override + public void prepare( + MetricRegistry metricsRegistry, + Map topoConf, + Map reporterConf) { + + String tableName = + ConfUtils.getString(topoConf, Constants.SQL_METRICS_TABLE_PARAM_NAME, "metrics"); + + Connection connection; + try { + connection = SQLUtil.getConnection(topoConf); + } catch (SQLException ex) { + LOG.error(ex.getMessage(), ex); + throw new RuntimeException(ex); + } + + String query = + "INSERT INTO " + tableName + " (name, value, timestamp)" + " values (?, ?, ?)"; + + TimeUnit reportPeriodUnit = getReportPeriodUnit(reporterConf); + long reportPeriod = getReportPeriod(reporterConf); + + reporter = new SQLScheduledReporter(metricsRegistry, connection, query); + reporter.start(reportPeriod, reportPeriodUnit); + } + + @Override + public void start() { + // already started in prepare() + } + + @Override + public void stop() { + if (reporter != null) { + reporter.stop(); + } + } + + private static class SQLScheduledReporter extends ScheduledReporter { + + private final Connection connection; + private final String query; + + SQLScheduledReporter(MetricRegistry registry, Connection connection, String query) { + super( + registry, + "sql-metrics-reporter", + MetricFilter.ALL, + TimeUnit.SECONDS, + TimeUnit.MILLISECONDS); + this.connection = connection; + this.query = query; + } + + @Override + @SuppressWarnings("rawtypes") + public void report( + SortedMap gauges, + SortedMap counters, + SortedMap histograms, + SortedMap meters, + SortedMap timers) { + + Timestamp now = Timestamp.from(Instant.now()); + + try { + PreparedStatement preparedStmt = connection.prepareStatement(query); + + for (Map.Entry entry : gauges.entrySet()) { + Object value = entry.getValue().getValue(); + if (value instanceof Number) { + addDataPoint( + preparedStmt, now, entry.getKey(), ((Number) value).doubleValue()); + } + } + + for (Map.Entry entry : counters.entrySet()) { + addDataPoint(preparedStmt, now, entry.getKey(), entry.getValue().getCount()); + } + + for (Map.Entry entry : histograms.entrySet()) { + addDataPoint( + preparedStmt, + now, + entry.getKey(), + entry.getValue().getSnapshot().getMean()); + } + + for (Map.Entry entry : meters.entrySet()) { + addDataPoint( + preparedStmt, now, entry.getKey(), entry.getValue().getOneMinuteRate()); + } + + for (Map.Entry entry : timers.entrySet()) { + addDataPoint( + preparedStmt, + now, + entry.getKey(), + entry.getValue().getSnapshot().getMean()); + } + + preparedStmt.executeBatch(); + preparedStmt.close(); + } catch (SQLException ex) { + LOG.error(ex.getMessage(), ex); + } + } + + private void addDataPoint( + PreparedStatement preparedStmt, Timestamp timestamp, String name, double value) + throws SQLException { + preparedStmt.setString(1, name); + preparedStmt.setDouble(2, value); + preparedStmt.setObject(3, timestamp); + preparedStmt.addBatch(); + } + } +} diff --git a/external/tika/src/main/java/org/apache/stormcrawler/tika/ParserBolt.java b/external/tika/src/main/java/org/apache/stormcrawler/tika/ParserBolt.java index 2586114a0..46179df97 100644 --- a/external/tika/src/main/java/org/apache/stormcrawler/tika/ParserBolt.java +++ b/external/tika/src/main/java/org/apache/stormcrawler/tika/ParserBolt.java @@ -32,7 +32,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.html.dom.HTMLDocumentImpl; import org.apache.http.HttpHeaders; -import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -43,6 +42,8 @@ import org.apache.stormcrawler.Constants; import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.filtering.URLFilters; +import org.apache.stormcrawler.metrics.CrawlerMetrics; +import org.apache.stormcrawler.metrics.ScopedCounter; import org.apache.stormcrawler.parse.Outlink; import org.apache.stormcrawler.parse.ParseData; import org.apache.stormcrawler.parse.ParseFilter; @@ -83,7 +84,7 @@ public class ParserBolt extends BaseRichBolt { private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(ParserBolt.class); - private MultiCountMetric eventCounter; + private ScopedCounter eventCounter; private boolean upperCaseElementNames = true; private Class htmlMapperClass = IdentityHtmlMapper.class; @@ -144,7 +145,7 @@ public void prepare( this.collector = collector; this.eventCounter = - context.registerMetric(this.getClass().getSimpleName(), new MultiCountMetric(), 10); + CrawlerMetrics.registerCounter(context, conf, this.getClass().getSimpleName(), 10); this.metadataTransfer = MetadataTransfer.getInstance(conf); } diff --git a/external/urlfrontier/pom.xml b/external/urlfrontier/pom.xml index 1e4c99d5c..fa3c75618 100644 --- a/external/urlfrontier/pom.xml +++ b/external/urlfrontier/pom.xml @@ -40,7 +40,7 @@ under the License. 2.5 true 0.50 - 0.50 + 0.49 0.53 0.42 0.45 diff --git a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java index 87c8fa04a..34ba3a4c9 100644 --- a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java +++ b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java @@ -56,11 +56,12 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; -import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.metrics.CrawlerMetrics; +import org.apache.stormcrawler.metrics.ScopedCounter; import org.apache.stormcrawler.persistence.AbstractStatusUpdaterBolt; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.util.ConfUtils; @@ -93,7 +94,7 @@ public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt // Faster ways of locking until n messages are processed private Semaphore inFlightSemaphore; - private MultiCountMetric eventCounter; + private ScopedCounter eventCounter; /** Globally set crawlID * */ private String globalCrawlID; @@ -119,7 +120,8 @@ public void prepare( throttleTimeMS = ConfUtils.getLong(stormConf, URLFRONTIER_THROTTLING_TIME_MS_KEY, 10); eventCounter = - context.registerMetric(this.getClass().getSimpleName(), new MultiCountMetric(), 30); + CrawlerMetrics.registerCounter( + context, stormConf, this.getClass().getSimpleName(), 30); maxMessagesInFlight = ConfUtils.getInt( diff --git a/external/warc/src/main/java/org/apache/stormcrawler/warc/WARCSpout.java b/external/warc/src/main/java/org/apache/stormcrawler/warc/WARCSpout.java index 87308846a..e2e5e5b49 100644 --- a/external/warc/src/main/java/org/apache/stormcrawler/warc/WARCSpout.java +++ b/external/warc/src/main/java/org/apache/stormcrawler/warc/WARCSpout.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.Optional; import org.apache.hadoop.conf.Configuration; -import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -39,6 +38,8 @@ import org.apache.storm.tuple.Values; import org.apache.stormcrawler.Constants; import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.metrics.CrawlerMetrics; +import org.apache.stormcrawler.metrics.ScopedCounter; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.protocol.ProtocolResponse; import org.apache.stormcrawler.spout.FileSpout; @@ -78,7 +79,7 @@ public class WARCSpout extends FileSpout { private WarcRequest precedingWarcRequest; private Optional record; - private MultiCountMetric eventCounter; + private ScopedCounter eventCounter; protected transient Configuration hdfsConfig; @@ -413,8 +414,8 @@ record = Optional.empty(); int metricsTimeBucketSecs = ConfUtils.getInt(conf, "fetcher.metrics.time.bucket.secs", 10); eventCounter = - context.registerMetric( - "warc_spout_counter", new MultiCountMetric(), metricsTimeBucketSecs); + CrawlerMetrics.registerCounter( + context, conf, "warc_spout_counter", metricsTimeBucketSecs); hdfsConfig = new Configuration();