Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/batch-processor",
"comment": "allow to set optional custom PrometheusServer in run()",
"type": "minor"
}
],
"packageName": "@subsquid/batch-processor"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/util-internal-processor-tools",
"comment": "allow constructing RunnerMetrics with a custom blocksCountInRange(range) instead of only request ranges",
"type": "minor"
}
],
"packageName": "@subsquid/util-internal-processor-tools"
}
5 changes: 2 additions & 3 deletions processor/batch-processor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
"dependencies": {
"@subsquid/logger": "^1.5.0",
"@subsquid/util-internal": "^3.2.0",
"@subsquid/util-internal-processor-tools": "^4.3.0",
"@subsquid/util-internal-counters": "^1.3.2",
"@subsquid/util-internal-prometheus-server": "^1.3.0",
"@subsquid/util-internal-range": "^0.3.0",
"prom-client": "^14.2.0"
"@subsquid/util-internal-range": "^0.3.0"
},
"devDependencies": {
"@types/node": "^18.18.14",
Expand Down
111 changes: 0 additions & 111 deletions processor/batch-processor/src/metrics.ts

This file was deleted.

74 changes: 38 additions & 36 deletions processor/batch-processor/src/run.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import {createLogger} from '@subsquid/logger'
import {last, runProgram, Throttler} from '@subsquid/util-internal'
import {createPrometheusServer} from '@subsquid/util-internal-prometheus-server'
import * as prom from 'prom-client'
import {PrometheusServer, RunnerMetrics} from '@subsquid/util-internal-processor-tools'
import {Database, HashAndHeight} from './database'
import {DataSource} from './datasource'
import {Metrics} from './metrics'
import {formatHead, getItemsCount} from './util'

export {PrometheusServer}


const log = createLogger('sqd:batch-processor')

Expand All @@ -31,6 +31,10 @@ interface BlockBase {
header: HashAndHeight
}

interface RunOptions {
prometheus?: PrometheusServer
}


/**
* Run data processing.
Expand All @@ -49,27 +53,32 @@ interface BlockBase {
export function run<Block extends BlockBase, Store>(
src: DataSource<Block>,
db: Database<Store>,
dataHandler: (ctx: DataHandlerContext<Block, Store>) => Promise<void>
dataHandler: (ctx: DataHandlerContext<Block, Store>) => Promise<void>,
opts?: RunOptions
): void {
runProgram(() => {
return new Processor(src, db, dataHandler).run()
return new Processor(src, db, dataHandler, opts).run()
}, err => {
log.fatal(err)
log.fatal(err)
})
}


class Processor<B extends BlockBase, S> {
private metrics = new Metrics()
private metrics: RunnerMetrics
private chainHeight: Throttler<number>
private statusReportTimer?: any
private hasStatusNews = false

constructor(
private src: DataSource<B>,
private db: Database<S>,
private handler: (ctx: DataHandlerContext<B, S>) => Promise<void>
private handler: (ctx: DataHandlerContext<B, S>) => Promise<void>,
private readonly opts?: RunOptions
) {
this.metrics = new RunnerMetrics(
src.getBlocksCountInRange?.bind(src) ?? ((range) => Math.max(0, range.to - range.from + 1)),
)
this.chainHeight = new Throttler(() => this.src.getFinalizedHeight(), 30_000)
}

Expand Down Expand Up @@ -101,34 +110,27 @@ class Processor<B extends BlockBase, S> {
}

private async initMetrics(state: HashAndHeight): Promise<void> {
await this.updateProgressMetrics(await this.chainHeight.get(), state)
this.updateProgressMetrics(await this.chainHeight.get(), state)
let port = process.env.PROCESSOR_PROMETHEUS_PORT || process.env.PROMETHEUS_PORT
if (port == null) return
prom.collectDefaultMetrics()
this.metrics.install()
let server = await createPrometheusServer(prom.register, port)
log.info(`prometheus metrics are served on port ${server.port}`)

let prometheusServer: PrometheusServer | undefined
if (this.opts?.prometheus != null) {
prometheusServer = this.opts.prometheus
} else if (port != null) {
prometheusServer = new PrometheusServer()
prometheusServer.setPort(port)
}
if (prometheusServer == null) return

prometheusServer.addRunnerMetrics(this.metrics)
let listening = await prometheusServer.serve()
log.info(`prometheus metrics are served on port ${listening.port}`)
}

private updateProgressMetrics(chainHeight: number, state: HashAndHeight, time?: bigint): void {
this.metrics.setChainHeight(chainHeight)
this.metrics.setLastProcessedBlock(state.height)
let left: number
let processed: number
if (this.src.getBlocksCountInRange) {
left = this.src.getBlocksCountInRange({
from: this.metrics.getLastProcessedBlock() + 1,
to: this.metrics.getChainHeight()
})
processed = this.src.getBlocksCountInRange({
from: 0,
to: this.metrics.getChainHeight()
}) - left
} else {
left = this.metrics.getChainHeight() - this.metrics.getLastProcessedBlock()
processed = this.metrics.getLastProcessedBlock()
}
this.metrics.updateProgress(processed, left, time)
this.metrics.updateProgress(time)
}

private async processBatch(prevHead: HashAndHeight, blocks: B[]): Promise<HashAndHeight> {
Expand All @@ -144,15 +146,15 @@ class Processor<B extends BlockBase, S> {
let mappingStartTime = process.hrtime.bigint()

await this.db.transact({
prevHead,
nextHead,
prevHead,
nextHead,
isOnTop
}, store => {
return this.handler({
store,
blocks,
return this.handler({
store,
blocks,
isHead: isOnTop
})
})
})

let mappingEndTime = process.hrtime.bigint()
Expand Down
1 change: 1 addition & 0 deletions util/util-internal-processor-tools/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ export * from './datasource'
export * from './filter'
export * from './prometheus'
export * from './runner'
export * from './runner-metrics'
export {shortHash, formatId} from './util'
35 changes: 18 additions & 17 deletions util/util-internal-processor-tools/src/runner-metrics.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {def} from '@subsquid/util-internal'
import {Progress, Speed} from '@subsquid/util-internal-counters'
import {assertRangeList, getSize, Range, RangeList} from '@subsquid/util-internal-range'
import {assertRangeList, FiniteRange, getSize, Range} from '@subsquid/util-internal-range'
import {timeInterval} from './util'


Expand All @@ -10,8 +9,19 @@ export class RunnerMetrics {
private mappingSpeed = new Speed({windowSize: 5})
private mappingItemSpeed = new Speed({windowSize: 5})
private blockProgress = new Progress({initialValue: 0})

constructor(private requests: {range: Range}[]) {}
private blocksCountInRange: (range: FiniteRange) => number

constructor(requests: {range: Range}[])
constructor(blocksCountInRange: (range: FiniteRange) => number)
constructor(requestsOrCounter: {range: Range}[] | ((range: FiniteRange) => number)) {
if (typeof requestsOrCounter === 'function') {
this.blocksCountInRange = requestsOrCounter
} else {
let ranges = requestsOrCounter.map(r => r.range)
assertRangeList(ranges)
this.blocksCountInRange = range => getSize(ranges, range)
}
}

setChainHeight(height: number): void {
this.chainHeight = Math.max(height, this.lastBlock)
Expand All @@ -38,25 +48,16 @@ export class RunnerMetrics {
this.mappingItemSpeed.push(batchItemSize || 1, batchMappingStartTime, batchMappingEndTime)
}

@def
private getRequestedBlockRanges(): RangeList {
let ranges = this.requests.map(req => req.range)
assertRangeList(ranges)
return ranges
private get head(): number {
return Math.max(this.chainHeight, this.lastBlock, 0)
}

getEstimatedTotalBlocksCount(): number {
return getSize(this.getRequestedBlockRanges(), {
from: 0,
to: Math.max(this.chainHeight, this.lastBlock)
})
return this.blocksCountInRange({from: 0, to: this.head})
}

getEstimatedBlocksLeft(): number {
let count = getSize(this.getRequestedBlockRanges(), {
from: this.lastBlock,
to: Math.max(this.chainHeight, this.lastBlock)
})
let count = this.blocksCountInRange({from: Math.max(this.lastBlock, 0), to: this.head})
return count == 1 ? 0 : count
}

Expand Down
Loading