From 191bb6e7eff4caa7457aa840a9f3cad6d5f5f704 Mon Sep 17 00:00:00 2001 From: Viacheslav Poturaev Date: Thu, 26 Jan 2023 23:12:48 +0100 Subject: [PATCH 1/6] Init repo --- .github/ISSUE_TEMPLATE/bug_report.md | 21 ++++++ .github/ISSUE_TEMPLATE/feature_request.md | 10 +++ .github/ISSUE_TEMPLATE/question.md | 10 +++ .github/workflows/bench.yml | 60 ++++++++++------ .github/workflows/cloc.yml | 17 +++-- .github/workflows/golangci-lint.yml | 13 +++- .github/workflows/gorelease.yml | 19 ++++-- .github/workflows/test-unit.yml | 83 ++++++++++++++++------- .golangci.yml | 16 +++++ LICENSE | 2 +- Makefile | 2 +- README.md | 14 ++-- dev_test.go | 2 +- go.mod | 4 +- go.sum | 10 +-- run_me.sh | 27 -------- 16 files changed, 202 insertions(+), 108 deletions(-) create mode 100644 .github/ISSUE_TEMPLATE/bug_report.md create mode 100644 .github/ISSUE_TEMPLATE/feature_request.md create mode 100644 .github/ISSUE_TEMPLATE/question.md delete mode 100755 run_me.sh diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 0000000..b3c2870 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,21 @@ +--- +name: Bug report +about: Create a report to help us improve +title: '' +labels: '' +assignees: '' + +--- + +**Describe the bug** +A clear and concise description of what the bug is. + +**To Reproduce** +If feasible/relevant, please provide a code snippet (inline or with Go playground) to reproduce the issue. + + +**Expected behavior** +A clear and concise description of what you expected to happen. + +**Additional context** +Add any other context about the problem here. diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 0000000..94c7c73 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,10 @@ +--- +name: Feature request +about: Suggest an idea for this project +title: '' +labels: '' +assignees: '' + +--- + +Please use discussions https://github.com/bool64/throttle/discussions/categories/ideas to share feature ideas. diff --git a/.github/ISSUE_TEMPLATE/question.md b/.github/ISSUE_TEMPLATE/question.md new file mode 100644 index 0000000..c23b1c7 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/question.md @@ -0,0 +1,10 @@ +--- +name: Question +about: Any question about features or usage +title: '' +labels: '' +assignees: '' + +--- + +Please use discussions https://github.com/bool64/throttle/discussions/categories/q-a to make your question more discoverable by other folks. diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 152d4a6..a85f316 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -12,20 +12,26 @@ on: description: 'New Ref' required: true +# Cancel the workflow in progress in newer build is about to start. +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + env: GO111MODULE: "on" CACHE_BENCHMARK: "off" # Enables benchmark result reuse between runs, may skew latency results. RUN_BASE_BENCHMARK: "on" # Runs benchmark for PR base in case benchmark result is missing. - GO_VERSION: 1.17.x + GO_VERSION: 1.19.x jobs: bench: runs-on: ubuntu-latest steps: - name: Install Go stable if: env.GO_VERSION != 'tip' - uses: actions/setup-go@v2 + uses: actions/setup-go@v3 with: go-version: ${{ env.GO_VERSION }} + - name: Install Go tip if: env.GO_VERSION == 'tip' run: | @@ -35,10 +41,12 @@ jobs: tar -C ~/sdk/gotip -xzf gotip.tar.gz ~/sdk/gotip/bin/go version echo "PATH=$HOME/go/bin:$HOME/sdk/gotip/bin/:$PATH" >> $GITHUB_ENV + - name: Checkout code uses: actions/checkout@v2 with: ref: ${{ (github.event.inputs.new != '') && github.event.inputs.new || github.event.ref }} + - name: Go cache uses: actions/cache@v2 with: @@ -51,14 +59,16 @@ jobs: key: ${{ runner.os }}-go-cache-${{ hashFiles('**/go.sum') }} restore-keys: | ${{ runner.os }}-go-cache + - name: Restore benchstat uses: actions/cache@v2 with: path: ~/go/bin/benchstat - key: ${{ runner.os }}-benchstat + key: ${{ runner.os }}-benchstat-legacy + - name: Restore base benchmark result + id: base-benchmark if: env.CACHE_BENCHMARK == 'on' - id: benchmark-base uses: actions/cache@v2 with: path: | @@ -66,32 +76,40 @@ jobs: bench-main.txt # Use base sha for PR or new commit hash for master/main push in benchmark result key. key: ${{ runner.os }}-bench-${{ (github.event.pull_request.base.sha != github.event.after) && github.event.pull_request.base.sha || github.event.after }} - - name: Checkout base code - if: env.RUN_BASE_BENCHMARK == 'on' && steps.benchmark-base.outputs.cache-hit != 'true' && (github.event.pull_request.base.sha != '' || github.event.inputs.old != '') - uses: actions/checkout@v2 - with: - ref: ${{ (github.event.pull_request.base.sha != '' ) && github.event.pull_request.base.sha || github.event.inputs.old }} - path: __base - - name: Run base benchmark - if: env.RUN_BASE_BENCHMARK == 'on' && steps.benchmark-base.outputs.cache-hit != 'true' && (github.event.pull_request.base.sha != '' || github.event.inputs.old != '') + + - name: Run benchmark run: | + export REF_NAME=new + make bench + OUTPUT=$(make bench-stat-diff) + echo "${OUTPUT}" + echo "diff<> $GITHUB_OUTPUT && echo "$OUTPUT" >> $GITHUB_OUTPUT && echo "EOF" >> $GITHUB_OUTPUT + OUTPUT=$(make bench-stat) + echo "${OUTPUT}" + echo "result<> $GITHUB_OUTPUT && echo "$OUTPUT" >> $GITHUB_OUTPUT && echo "EOF" >> $GITHUB_OUTPUT + + - name: Run benchmark for base code + if: env.RUN_BASE_BENCHMARK == 'on' && steps.base-benchmark.outputs.cache-hit != 'true' && (github.event.pull_request.base.sha != '' || github.event.inputs.old != '') + run: | + git fetch origin master ${{ github.event.pull_request.base.sha }} + HEAD=$(git rev-parse HEAD) + git reset --hard ${{ github.event.pull_request.base.sha }} export REF_NAME=master - cd __base - make | grep bench-run && (BENCH_COUNT=5 make bench-run bench-stat && cp bench-master.txt ../bench-master.txt) || echo "No benchmarks in base" - - name: Benchmark + make bench-run bench-stat + git reset --hard $HEAD + + - name: Benchmark stats id: bench run: | export REF_NAME=new - BENCH_COUNT=5 make bench OUTPUT=$(make bench-stat-diff) echo "${OUTPUT}" - OUTPUT="${OUTPUT//$'\n'/%0A}" - echo "::set-output name=diff::$OUTPUT" + echo "diff<> $GITHUB_OUTPUT && echo "$OUTPUT" >> $GITHUB_OUTPUT && echo "EOF" >> $GITHUB_OUTPUT OUTPUT=$(make bench-stat) echo "${OUTPUT}" - OUTPUT="${OUTPUT//$'\n'/%0A}" - echo "::set-output name=result::$OUTPUT" - - name: Comment Benchmark Result + echo "result<> $GITHUB_OUTPUT && echo "$OUTPUT" >> $GITHUB_OUTPUT && echo "EOF" >> $GITHUB_OUTPUT + + - name: Comment benchmark result continue-on-error: true uses: marocchino/sticky-pull-request-comment@v2 with: diff --git a/.github/workflows/cloc.yml b/.github/workflows/cloc.yml index 7002b22..927e099 100644 --- a/.github/workflows/cloc.yml +++ b/.github/workflows/cloc.yml @@ -2,6 +2,12 @@ name: cloc on: pull_request: + +# Cancel the workflow in progress in newer build is about to start. +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + jobs: cloc: runs-on: ubuntu-latest @@ -15,16 +21,17 @@ jobs: with: ref: ${{ github.event.pull_request.base.sha }} path: base - - name: Count Lines Of Code + - name: Count lines of code id: loc run: | - curl -sLO https://github.com/vearutop/sccdiff/releases/download/v1.0.1/linux_amd64.tar.gz && tar xf linux_amd64.tar.gz + curl -sLO https://github.com/vearutop/sccdiff/releases/download/v1.0.3/linux_amd64.tar.gz && tar xf linux_amd64.tar.gz + sccdiff_hash=$(git hash-object ./sccdiff) + [ "$sccdiff_hash" == "ae8a07b687bd3dba60861584efe724351aa7ff63" ] || (echo "::error::unexpected hash for sccdiff, possible tampering: $sccdiff_hash" && exit 1) OUTPUT=$(cd pr && ../sccdiff -basedir ../base) echo "${OUTPUT}" - OUTPUT="${OUTPUT//$'\n'/%0A}" - echo "::set-output name=diff::$OUTPUT" + echo "diff<> $GITHUB_OUTPUT && echo "$OUTPUT" >> $GITHUB_OUTPUT && echo "EOF" >> $GITHUB_OUTPUT - - name: Comment Code Lines + - name: Comment lines of code continue-on-error: true uses: marocchino/sticky-pull-request-comment@v2 with: diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 58c0ec6..fbf8167 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -8,17 +8,26 @@ on: - master - main pull_request: + +# Cancel the workflow in progress in newer build is about to start. +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + jobs: golangci: name: golangci-lint runs-on: ubuntu-latest steps: + - uses: actions/setup-go@v3 + with: + go-version: 1.19.x - uses: actions/checkout@v2 - name: golangci-lint - uses: golangci/golangci-lint-action@v2.5.2 + uses: golangci/golangci-lint-action@v3.3.1 with: # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. - version: v1.43.0 + version: v1.50.1 # Optional: working directory, useful for monorepos # working-directory: somedir diff --git a/.github/workflows/gorelease.yml b/.github/workflows/gorelease.yml index 94c6966..5d9fae0 100644 --- a/.github/workflows/gorelease.yml +++ b/.github/workflows/gorelease.yml @@ -2,15 +2,21 @@ name: gorelease on: pull_request: + +# Cancel the workflow in progress in newer build is about to start. +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + env: - GO_VERSION: 1.17.x + GO_VERSION: 1.19.x jobs: gorelease: runs-on: ubuntu-latest steps: - name: Install Go stable if: env.GO_VERSION != 'tip' - uses: actions/setup-go@v2 + uses: actions/setup-go@v3 with: go-version: ${{ env.GO_VERSION }} - name: Install Go tip @@ -29,16 +35,15 @@ jobs: with: path: | ~/go/bin/gorelease - key: ${{ runner.os }}-gorelease + key: ${{ runner.os }}-gorelease-generic - name: Gorelease id: gorelease run: | test -e ~/go/bin/gorelease || go install golang.org/x/exp/cmd/gorelease@latest - OUTPUT=$(gorelease || exit 0) + OUTPUT=$(gorelease 2>&1 || exit 0) echo "${OUTPUT}" - OUTPUT="${OUTPUT//$'\n'/%0A}" - echo "::set-output name=report::$OUTPUT" - - name: Comment Report + echo "report<> $GITHUB_OUTPUT && echo "$OUTPUT" >> $GITHUB_OUTPUT && echo "EOF" >> $GITHUB_OUTPUT + - name: Comment report continue-on-error: true uses: marocchino/sticky-pull-request-comment@v2 with: diff --git a/.github/workflows/test-unit.yml b/.github/workflows/test-unit.yml index 6442393..17cca25 100644 --- a/.github/workflows/test-unit.yml +++ b/.github/workflows/test-unit.yml @@ -6,22 +6,30 @@ on: - master - main pull_request: + +# Cancel the workflow in progress in newer build is about to start. +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + env: GO111MODULE: "on" RUN_BASE_COVERAGE: "on" # Runs test for PR base in case base test coverage is missing. - COV_GO_VERSION: 1.17.x # Version of Go to collect coverage + COV_GO_VERSION: 1.18.x # Version of Go to collect coverage + TARGET_DELTA_COV: 90 # Target coverage of changed lines, in percents jobs: test: strategy: matrix: - go-version: [ 1.16.x, 1.17.x, tip ] + go-version: [ 1.16.x, 1.17.x, 1.18.x, 1.19.x ] runs-on: ubuntu-latest steps: - name: Install Go stable if: matrix.go-version != 'tip' - uses: actions/setup-go@v2 + uses: actions/setup-go@v3 with: go-version: ${{ matrix.go-version }} + - name: Install Go tip if: matrix.go-version == 'tip' run: | @@ -31,8 +39,10 @@ jobs: tar -C ~/sdk/gotip -xzf gotip.tar.gz ~/sdk/gotip/bin/go version echo "PATH=$HOME/go/bin:$HOME/sdk/gotip/bin/:$PATH" >> $GITHUB_ENV + - name: Checkout code uses: actions/checkout@v2 + - name: Go cache uses: actions/cache@v2 with: @@ -45,44 +55,55 @@ jobs: key: ${{ runner.os }}-go-cache-${{ hashFiles('**/go.sum') }} restore-keys: | ${{ runner.os }}-go-cache + - name: Restore base test coverage id: base-coverage - if: matrix.go-version == env.COV_GO_VERSION + if: matrix.go-version == env.COV_GO_VERSION && github.event.pull_request.base.sha != '' uses: actions/cache@v2 with: path: | unit-base.txt # Use base sha for PR or new commit hash for master/main push in test result key. key: ${{ runner.os }}-unit-test-coverage-${{ (github.event.pull_request.base.sha != github.event.after) && github.event.pull_request.base.sha || github.event.after }} - - name: Checkout base code - if: matrix.go-version == env.COV_GO_VERSION && env.RUN_BASE_COVERAGE == 'on' && steps.base-coverage.outputs.cache-hit != 'true' && github.event.pull_request.base.sha != '' - uses: actions/checkout@v2 - with: - ref: ${{ github.event.pull_request.base.sha }} - path: __base + - name: Run test for base code if: matrix.go-version == env.COV_GO_VERSION && env.RUN_BASE_COVERAGE == 'on' && steps.base-coverage.outputs.cache-hit != 'true' && github.event.pull_request.base.sha != '' run: | - cd __base - make | grep test-unit && (make test-unit && go tool cover -func=./unit.coverprofile | sed -e 's/.go:[0-9]*:\t/.go\t/g' | sed -e 's/\t\t*/\t/g' > ../unit-base.txt) || echo "No test-unit in base" + git fetch origin master ${{ github.event.pull_request.base.sha }} + HEAD=$(git rev-parse HEAD) + git reset --hard ${{ github.event.pull_request.base.sha }} + (make test-unit && go tool cover -func=./unit.coverprofile > unit-base.txt) || echo "No test-unit in base" + git reset --hard $HEAD + - name: Test id: test run: | make test-unit - go tool cover -func=./unit.coverprofile | sed -e 's/.go:[0-9]*:\t/.go\t/g' | sed -e 's/\t\t*/\t/g' > unit.txt - OUTPUT=$(test -e unit-base.txt && (diff unit-base.txt unit.txt || exit 0) || cat unit.txt) - echo "${OUTPUT}" - OUTPUT="${OUTPUT//$'\n'/%0A}" + go tool cover -func=./unit.coverprofile > unit.txt TOTAL=$(grep 'total:' unit.txt) echo "${TOTAL}" - echo "::set-output name=diff::$OUTPUT" - echo "::set-output name=total::$TOTAL" - - name: Store base coverage - if: ${{ github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' }} - run: cp unit.txt unit-base.txt - - name: Comment Test Coverage + echo "total=$TOTAL" >> $GITHUB_OUTPUT + + - name: Annotate missing test coverage + id: annotate + if: matrix.go-version == env.COV_GO_VERSION && github.event.pull_request.base.sha != '' + run: | + curl -sLO https://github.com/vearutop/gocovdiff/releases/download/v1.3.6/linux_amd64.tar.gz && tar xf linux_amd64.tar.gz + gocovdiff_hash=$(git hash-object ./gocovdiff) + [ "$gocovdiff_hash" == "8e507e0d671d4d6dfb3612309b72b163492f28eb" ] || (echo "::error::unexpected hash for gocovdiff, possible tampering: $gocovdiff_hash" && exit 1) + git fetch origin master ${{ github.event.pull_request.base.sha }} + REP=$(./gocovdiff -cov unit.coverprofile -gha-annotations gha-unit.txt -delta-cov-file delta-cov-unit.txt -target-delta-cov ${TARGET_DELTA_COV}) + echo "${REP}" + cat gha-unit.txt + DIFF=$(test -e unit-base.txt && ./gocovdiff -func-cov unit.txt -func-base-cov unit-base.txt || echo "Missing base coverage file") + TOTAL=$(cat delta-cov-unit.txt) + echo "rep<> $GITHUB_OUTPUT && echo "$REP" >> $GITHUB_OUTPUT && echo "EOF" >> $GITHUB_OUTPUT + echo "diff<> $GITHUB_OUTPUT && echo "$DIFF" >> $GITHUB_OUTPUT && echo "EOF" >> $GITHUB_OUTPUT + echo "total<> $GITHUB_OUTPUT && echo "$TOTAL" >> $GITHUB_OUTPUT && echo "EOF" >> $GITHUB_OUTPUT + + - name: Comment test coverage continue-on-error: true - if: matrix.go-version == env.COV_GO_VERSION + if: matrix.go-version == env.COV_GO_VERSION && github.event.pull_request.base.sha != '' uses: marocchino/sticky-pull-request-comment@v2 with: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} @@ -90,13 +111,23 @@ jobs: message: | ### Unit Test Coverage ${{ steps.test.outputs.total }} + ${{ steps.annotate.outputs.total }} +
Coverage of changed lines + + ${{ steps.annotate.outputs.rep }} + +
+
Coverage diff with base branch - ```diff - ${{ steps.test.outputs.diff }} - ``` + ${{ steps.annotate.outputs.diff }} +
+ - name: Store base coverage + if: ${{ github.ref == 'refs/heads/master' || github.ref == 'refs/heads/main' }} + run: cp unit.txt unit-base.txt + - name: Upload code coverage if: matrix.go-version == env.COV_GO_VERSION uses: codecov/codecov-action@v1 diff --git a/.golangci.yml b/.golangci.yml index 97710e1..219b387 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -37,6 +37,14 @@ linters: - tagliatelle - errname - ireturn + - exhaustruct + - nonamedreturns + - nosnakecase + - structcheck + - varcheck + - deadcode + - testableexamples + - dupword issues: exclude-use-default: false @@ -48,5 +56,13 @@ issues: - noctx - funlen - dupl + - structcheck + - unused + - unparam + - nosnakecase path: "_test.go" + - linters: + - errcheck # Error checking omitted for brevity. + - gosec + path: "example_" diff --git a/LICENSE b/LICENSE index 3b9e882..d444da3 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2021 bool64 +Copyright (c) 2023 Viacheslav Poturaev Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/Makefile b/Makefile index 4b37b06..ec4a06c 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -#GOLANGCI_LINT_VERSION := "v1.43.0" # Optional configuration to pinpoint golangci-lint version. +#GOLANGCI_LINT_VERSION := "v1.50.1" # Optional configuration to pinpoint golangci-lint version. # The head of Makefile determines location of dev-go to include standard targets. GO ?= go diff --git a/README.md b/README.md index 7ece3e4..5fdf614 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,11 @@ -# go-template +# throttle -[![Build Status](https://github.com/bool64/go-template/workflows/test-unit/badge.svg)](https://github.com/bool64/go-template/actions?query=branch%3Amaster+workflow%3Atest-unit) -[![Coverage Status](https://codecov.io/gh/bool64/go-template/branch/master/graph/badge.svg)](https://codecov.io/gh/bool64/go-template) -[![GoDevDoc](https://img.shields.io/badge/dev-doc-00ADD8?logo=go)](https://pkg.go.dev/github.com/bool64/go-template) -[![Time Tracker](https://wakatime.com/badge/github/bool64/go-template.svg)](https://wakatime.com/badge/github/bool64/go-template) -![Code lines](https://sloc.xyz/github/bool64/go-template/?category=code) -![Comments](https://sloc.xyz/github/bool64/go-template/?category=comments) +[![Build Status](https://github.com/bool64/throttle/workflows/test-unit/badge.svg)](https://github.com/bool64/throttle/actions?query=branch%3Amaster+workflow%3Atest-unit) +[![Coverage Status](https://codecov.io/gh/bool64/throttle/branch/master/graph/badge.svg)](https://codecov.io/gh/bool64/throttle) +[![GoDevDoc](https://img.shields.io/badge/dev-doc-00ADD8?logo=go)](https://pkg.go.dev/github.com/bool64/throttle) +[![Time Tracker](https://wakatime.com/badge/github/bool64/throttle.svg)](https://wakatime.com/badge/github/bool64/throttle) +![Code lines](https://sloc.xyz/github/bool64/throttle/?category=code) +![Comments](https://sloc.xyz/github/bool64/throttle/?category=comments) diff --git a/dev_test.go b/dev_test.go index 99a9b88..46153b0 100644 --- a/dev_test.go +++ b/dev_test.go @@ -1,3 +1,3 @@ -package mypackage_test +package throttle_test import _ "github.com/bool64/dev" // Include CI/Dev scripts to project. diff --git a/go.mod b/go.mod index 2067a03..d2093e1 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ -module github.com/bool64/go-template +module github.com/bool64/throttle go 1.17 -require github.com/bool64/dev v0.2.5 +require github.com/bool64/dev v0.2.24 diff --git a/go.sum b/go.sum index ed7b77d..75049d8 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,2 @@ -github.com/bool64/dev v0.2.5 h1:H0bylghwcjDBBhEwSFTjArEO9Dr8cCaB54QSOF7esOA= -github.com/bool64/dev v0.2.5/go.mod h1:cTHiTDNc8EewrQPy3p1obNilpMpdmlUesDkFTF2zRWU= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +github.com/bool64/dev v0.2.24 h1:xptlKivPh870W3Xc9szPcM7wkFmTMuHT8rc0nu7dITk= +github.com/bool64/dev v0.2.24/go.mod h1:iJbh1y/HkunEPhgebWRNcs8wfGq7sjvJ6W5iabL8ACg= diff --git a/run_me.sh b/run_me.sh deleted file mode 100755 index 42e0530..0000000 --- a/run_me.sh +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env bash -set -e - -# Init script to kick-start your project -url=$(git remote get-url origin) - -url_nopro=${url#*//} -url_noatsign=${url_nopro#*@} - -gh_repo=${url_noatsign#"github.com:"} -gh_repo=${gh_repo#"github.com/"} -gh_repo=${gh_repo%".git"} - -copyright="$(date +%Y) $(git config user.name)" -project_name=$(basename $gh_repo) - -echo "## Replacing all go-template references by $project_name" -find . -type f -not -name run_me.sh -print0 | xargs -0 perl -i -pe "s|2021 bool64|$copyright|g" -find . -type f -not -name run_me.sh -print0 | xargs -0 perl -i -pe "s|bool64/go-template|$gh_repo|g" -find . -type f -not -name run_me.sh -print0 | xargs -0 perl -i -pe "s|go-template|$project_name|g" - -echo "## Removing this script" -rm ./run_me.sh - -echo "## Please check the @TODO's:" -git grep TODO | grep -v run_me.sh - From 9c26b7f82d6a7d3da082a8b457939d33d3c22b68 Mon Sep 17 00:00:00 2001 From: Viacheslav Poturaev Date: Thu, 26 Jan 2023 23:24:35 +0100 Subject: [PATCH 2/6] Initial code --- bench_test.go | 35 ++++ crawler/README.md | 9 + crawler/main.go | 209 ++++++++++++++++++++++ crawler/progress.go | 151 ++++++++++++++++ example_test.go | 1 + go.mod | 18 +- go.sum | 81 +++++++++ throttle.go | 411 ++++++++++++++++++++++++++++++++++++++++++++ throttle_test.go | 83 +++++++++ 9 files changed, 997 insertions(+), 1 deletion(-) create mode 100644 bench_test.go create mode 100644 crawler/README.md create mode 100644 crawler/main.go create mode 100644 crawler/progress.go create mode 100644 example_test.go create mode 100644 throttle.go create mode 100644 throttle_test.go diff --git a/bench_test.go b/bench_test.go new file mode 100644 index 0000000..e030a97 --- /dev/null +++ b/bench_test.go @@ -0,0 +1,35 @@ +package throttle_test + +import ( + "testing" + "time" + + throttle "github.com/bool64/throttle" +) + +func BenchmarkThrottle_Acquire(b *testing.B) { + s := throttle.NewThrottle(func(cfg *throttle.Config) { + cfg.MaxConcurrency = 1000 + cfg.MinConcurrency = 20 + }) + + b.ReportAllocs() + for i := 0; i < b.N; i++ { + s.Acquire() + go s.Release(time.Millisecond, false) + } +} + +func BenchmarkThrottle_Acquire_rateLimit(b *testing.B) { + s := throttle.NewThrottle(func(cfg *throttle.Config) { + cfg.MaxConcurrency = 1000 + cfg.MinConcurrency = 20 + cfg.MaxRate = 10000000 // Rate limit is unrealistically high to exhibit baseline performance impact. + }) + + b.ReportAllocs() + for i := 0; i < b.N; i++ { + s.Acquire() + go s.Release(time.Millisecond, false) + } +} diff --git a/crawler/README.md b/crawler/README.md new file mode 100644 index 0000000..6d81b22 --- /dev/null +++ b/crawler/README.md @@ -0,0 +1,9 @@ +## Usage + +``` +./crawler gr_replay_24.raw.gz > gr_replay_24.raw-result.log +``` + +``` +./crawler rgrr_replay_24.log.gz | gzip -c > rgrr_replay_24-result.log.gz +``` \ No newline at end of file diff --git a/crawler/main.go b/crawler/main.go new file mode 100644 index 0000000..bd1bd6f --- /dev/null +++ b/crawler/main.go @@ -0,0 +1,209 @@ +package main + +import ( + "bufio" + "compress/gzip" + "fmt" + "io" + "log" + "net" + "net/url" + "os" + "sort" + "strings" + "sync" + "time" + + "github.com/bool64/logz" + throttle "github.com/bool64/throttle" + "github.com/valyala/fasthttp" + "github.com/vearutop/dynhist-go" + "golang.org/x/time/rate" +) + +func main() { + if len(os.Args) == 1 { + println("usage: crawler ") + return + } + + println("fetching through", os.Args[1]) + + f, err := os.Open(os.Args[1]) + if err != nil { + log.Fatal(err) + } + defer f.Close() + + cr := &CountingReader{ + Reader: f, + } + + var r io.Reader = cr + + if strings.HasSuffix(os.Args[1], ".gz") { + r, err = gzip.NewReader(cr) + if err != nil { + println("gzip reader:", err.Error()) + return + } + } + + fi, err := f.Stat() + if err != nil { + println("file stats:", err.Error()) + return + } + + total := fi.Size() + + p := Progress{ + Interval: 10 * time.Second, + } + + p.Start(total, cr, "sending") + + scanner := bufio.NewScanner(r) + + buf := make([]byte, 1e7) + scanner.Buffer(buf, len(buf)) + + maxConcurrency := 1000 + latencyPercentile := 90.0 + t := throttle.NewThrottle(func(cfg *throttle.Config) { + cfg.MaxConcurrency = int64(maxConcurrency) + cfg.InitialConcurrency = 100 + cfg.MinConcurrency = 20 + + cfg.LimitLatency = 300 * time.Millisecond + cfg.LimitLatencyPercentile = latencyPercentile + cfg.LimitFailedPercent = 5 + + cfg.Interval = 20 * time.Second + cfg.ReceiveUpdate = func(grown bool, newRate rate.Limit, newConcurrency int64) { + println("concurrency updated:", newConcurrency) + } + }) + + statuses := dynhist.Collector{} + errs := logz.Observer{} + errs.MaxSamples = 2 + errs.MaxCardinality = 10 + + printStat := func() { + println(fmt.Sprintf( + "latency %.0f%%: %.1f ms, in progress: %d, failed: %.2f%%", + latencyPercentile, t.Latency(latencyPercentile)*1000, t.InProgress(), t.FailedPercent(), + )) + + println("statuses:") + println(statuses.String()) + + entries := errs.GetEntries() + if len(entries) > 0 { + // Order by count desc. + sort.Slice(entries, func(i, j int) bool { + return entries[i].Count > entries[j].Count + }) + + println("top errors:") + for _, e := range entries { + println(e.Count, e.Message) + } + println() + } + } + + go func() { + for { + time.Sleep(10 * time.Second) + + printStat() + } + }() + + sp := sync.Pool{ + New: func() any { + return make([]byte, 0, 100) + }, + } + + c := fasthttp.Client{} + c.MaxConnsPerHost = maxConcurrency + c.Dial = func(addr string) (net.Conn, error) { + return net.Dial("tcp", addr) + } + + for scanner.Scan() { + if err := scanner.Err(); err != nil { + log.Fatal(err) + } + + line := string(scanner.Bytes()) + + if len(line) == 0 { + statuses.Add(601) + continue + } + + if strings.HasPrefix(line, "FAILED: ") { + p := strings.LastIndex(line, " http") + if p == -1 { + statuses.Add(603) + continue + } + + line = line[p+1:] + } + + if _, err := url.Parse(line); err != nil { + statuses.Add(602) + errs.ObserveMessage(err.Error(), nil) + continue + } + + t.Acquire() + s := time.Now() + + go func() { + failed := false + defer func() { + p.CountLine() + t.Release(time.Since(s), failed) + }() + + body := sp.Get().([]byte) + + statusCode, body, err := c.Get(body, line) + sp.Put(body[:0]) + + if err != nil { + failed = true + + errs.ObserveMessage(err.Error(), line) + + fmt.Println("FAILED:", err.Error(), line) + + statuses.Add(600) + + return + } + + if statusCode >= 500 { + failed = true + } + + // mu.Lock() + // fmt.Println(statusCode, line) + // mu.Unlock() + + statuses.Add(float64(statusCode)) + }() + } + + // Wait for remaining requests in progress. + t.WaitInProgress() + p.Stop() + + printStat() +} diff --git a/crawler/progress.go b/crawler/progress.go new file mode 100644 index 0000000..cd82869 --- /dev/null +++ b/crawler/progress.go @@ -0,0 +1,151 @@ +package main + +import ( + "fmt" + "io" + "sync/atomic" + "time" +) + +// ProgressStatus describes current progress. +type ProgressStatus struct { + Task string + DonePercent float64 + LinesCompleted int64 + SpeedMBPS float64 + SpeedLPS float64 + Elapsed time.Duration + Remaining time.Duration +} + +// Progress reports reading performance. +type Progress struct { + Interval time.Duration + Print func(status ProgressStatus) + done chan bool + lines int64 + task string + cr *CountingReader + prnt func(s ProgressStatus) + start time.Time + tot float64 + + prevBytes int64 + prevElapsed int64 + prevLines int64 +} + +// Start spawns background progress reporter. +func (p *Progress) Start(total int64, cr *CountingReader, task string) { + p.done = make(chan bool) + p.lines = 0 + p.task = task + p.cr = cr + + interval := p.Interval + if interval == 0 { + interval = time.Second + } + + p.prnt = p.Print + if p.prnt == nil { + p.prnt = func(s ProgressStatus) { + if s.Task != "" { + s.Task += ": " + } + + println(fmt.Sprintf(s.Task+"%.1f%% bytes read, %d lines processed, %.1f l/s, %.2f MB/s, elapsed %s, remaining %s", + s.DonePercent, s.LinesCompleted, s.SpeedLPS, s.SpeedMBPS, + s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String())) + } + } + + p.start = time.Now() + p.tot = float64(total) + done := p.done + t := time.NewTicker(interval) + + go func() { + for { + select { + case <-t.C: + p.printStatus(false) + + case <-done: + t.Stop() + + return + } + } + }() +} + +func (p *Progress) printStatus(last bool) { + s := ProgressStatus{} + s.Task = p.task + s.LinesCompleted = atomic.LoadInt64(&p.lines) + + b := p.cr.Bytes() + s.DonePercent = 100 * float64(b) / p.tot + s.Elapsed = time.Since(p.start) + + prevLines := atomic.LoadInt64(&p.prevLines) + prevElapsed := atomic.LoadInt64(&p.prevElapsed) + prevBytes := atomic.LoadInt64(&p.prevBytes) + + atomic.StoreInt64(&p.prevLines, s.LinesCompleted) + atomic.StoreInt64(&p.prevElapsed, int64(s.Elapsed)) + atomic.StoreInt64(&p.prevBytes, b) + + deltaElapsed := s.Elapsed - time.Duration(prevElapsed) + deltaBytes := b - prevBytes + deltaLines := s.LinesCompleted - prevLines + + s.SpeedMBPS = (float64(deltaBytes) / deltaElapsed.Seconds()) / (1024 * 1024) + s.SpeedLPS = float64(deltaLines) / deltaElapsed.Seconds() + + s.Remaining = time.Duration(float64(100*s.Elapsed)/s.DonePercent) - s.Elapsed + s.Remaining = s.Remaining.Truncate(time.Second) + + if s.Remaining > 100*time.Millisecond || last { + p.prnt(s) + } +} + +// CountLine increments line counter. +func (p *Progress) CountLine() int64 { + return atomic.AddInt64(&p.lines, 1) +} + +// Lines returns number of counted lines. +func (p *Progress) Lines() int64 { + return atomic.LoadInt64(&p.lines) +} + +// Stop stops progress reporting. +func (p *Progress) Stop() { + p.printStatus(true) + + close(p.done) +} + +// CountingReader wraps io.Reader to count bytes. +type CountingReader struct { + Reader io.Reader + + readBytes int64 +} + +// Read reads and counts bytes. +func (cr *CountingReader) Read(p []byte) (n int, err error) { + n, err = cr.Reader.Read(p) + + atomic.AddInt64(&cr.readBytes, int64(n)) + + return n, err +} + +// Bytes returns number of read bytes. +func (cr *CountingReader) Bytes() int64 { + return atomic.LoadInt64(&cr.readBytes) +} diff --git a/example_test.go b/example_test.go new file mode 100644 index 0000000..fe2e267 --- /dev/null +++ b/example_test.go @@ -0,0 +1 @@ +package throttle_test diff --git a/go.mod b/go.mod index d2093e1..644bc42 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,20 @@ module github.com/bool64/throttle go 1.17 -require github.com/bool64/dev v0.2.24 +require ( + github.com/bool64/dev v0.2.24 + github.com/bool64/logz v1.1.0 + github.com/stretchr/testify v1.8.1 + github.com/valyala/fasthttp v1.44.0 + github.com/vearutop/dynhist-go v1.2.0 + golang.org/x/time v0.3.0 +) + +require ( + github.com/andybalholm/brotli v1.0.4 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/klauspost/compress v1.15.9 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index 75049d8..9552de7 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,83 @@ +github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= +github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/bool64/ctxd v1.0.0/go.mod h1:+rjDVFNOJeO+xlvMqQfG0p53CzuRB7FhPSo5nWSkpQ0= +github.com/bool64/dev v0.1.25/go.mod h1:cTHiTDNc8EewrQPy3p1obNilpMpdmlUesDkFTF2zRWU= +github.com/bool64/dev v0.1.28/go.mod h1:cTHiTDNc8EewrQPy3p1obNilpMpdmlUesDkFTF2zRWU= +github.com/bool64/dev v0.1.41/go.mod h1:cTHiTDNc8EewrQPy3p1obNilpMpdmlUesDkFTF2zRWU= +github.com/bool64/dev v0.2.22/go.mod h1:iJbh1y/HkunEPhgebWRNcs8wfGq7sjvJ6W5iabL8ACg= github.com/bool64/dev v0.2.24 h1:xptlKivPh870W3Xc9szPcM7wkFmTMuHT8rc0nu7dITk= github.com/bool64/dev v0.2.24/go.mod h1:iJbh1y/HkunEPhgebWRNcs8wfGq7sjvJ6W5iabL8ACg= +github.com/bool64/logz v1.1.0 h1:X2cOSXinrr7CX4p+7PLBc9Sw2Vw9ykqPqbuaoV+gX/Y= +github.com/bool64/logz v1.1.0/go.mod h1:PzoWm5F2NKpG3D5MNdAps55DVlLKMPl69Wg2YBZ3DJ4= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/swaggest/usecase v0.1.5/go.mod h1:uubX4ZbjQK1Bnl0xX9hOYpb/IUiSoVKk/yQImawbNMU= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.44.0 h1:R+gLUhldIsfg1HokMuQjdQ5bh9nuXHPIfvkYUu9eR5Q= +github.com/valyala/fasthttp v1.44.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seBFc2uWtgiY= +github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= +github.com/vearutop/dynhist-go v1.0.0/go.mod h1:7Cgyu5Ww8FwdB+Y+zawRz9cQT5oXAxw294L9lQ+JI/k= +github.com/vearutop/dynhist-go v1.2.0 h1:y+kQ3LBSp5Vvpv2vOGWVt14TLBXYp8p2qxzfONvbMwE= +github.com/vearutop/dynhist-go v1.2.0/go.mod h1:yC+DkFgBUuqBW+qb32mrwwGoc5I4bAlwB0dHhxk4rF8= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220906165146-f3363e06e74c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/throttle.go b/throttle.go new file mode 100644 index 0000000..b388a49 --- /dev/null +++ b/throttle.go @@ -0,0 +1,411 @@ +// Package throttle implements a feedback-driven rate and concurrency limiter +// to keep application in bounds of healthy state. +package throttle + +import ( + "context" + "runtime" + "sync" + "sync/atomic" + "time" + + "github.com/vearutop/dynhist-go" + "golang.org/x/time/rate" +) + +// Config controls Throttle behavior. +type Config struct { + // Interval is a duration between two consecutive feedback checks, default 15s. + Interval time.Duration + + // GrowthSkipCycles is a number of feedback cycles to skip before growing rate and concurrency, default 3. + // The more the value, less often Throttle will be adjusted in a healthy state. + GrowthSkipCycles int + + // LimitHeapInUse is maximum heap in use size (bytes) of a healthy system. + LimitHeapInUse uint64 + + // LimitLatency is a percentile latency of a healthy system. + LimitLatency time.Duration + + // LimitLatencyPercentile is the percentile of latency to use for LimitLatency, default 90.0 (90%). + LimitLatencyPercentile float64 + + // LimitFailedPercent is the fraction of failed operations of a healthy system, in percents, example: 2.0 (2%). + LimitFailedPercent float64 + + // LimitReached is a user-defined function to indicate unhealthy state. + LimitReached func() bool + + // StepFracConcurrency is a fraction of current concurrency value to add or remove during performance tweak cycle, + // default 0.1 (10%). + StepFracConcurrency float64 + + // StepFracRate is a fraction of current rate value to add or remove during performance tweak cycle, + // default 0.1 (10%). + StepFracRate float64 + + // MinConcurrency is the lower bound of concurrency limiting, default runtime.NumCPU(). + MinConcurrency int64 + + // InitialConcurrency is the starting concurrency value, must be between MinConcurrency and MaxConcurrency, + // default MaxConcurrency. + InitialConcurrency int64 + + // MaxConcurrency is the maximum number of operation that can be in progress at the same time, default 1000. + MaxConcurrency int64 + + // MinRate is the lower bound of rate limiting. + MinRate rate.Limit + + // InitialRate is the starting rate limit value, must be between MinRate and MaxRate, default MaxRate. + InitialRate rate.Limit + + // MaxRate is the maximum allowed rate of operations. + MaxRate rate.Limit + + // ReceiveUpdate collects new values of rate and concurrency limits when they are updated. + ReceiveUpdate func(grown bool, newRate rate.Limit, newConcurrency int64) +} + +// Throttle is an adaptive rate and concurrency limiter. +// +// Rate of operations is maintained to keep the system in healthy state based on feedback cycles. +// Feedback is collected once in Config.Interval, if any of health checks are failing, +// Throttle would reduce concurrency by a fraction Config.StepFracConcurrency once and +// rate by a fraction Config.StepFracRate once. Target values are bounded below with Config.MinConcurrency +// and Config.MinRate. +type Throttle struct { + *throttle +} + +type throttle struct { + sem chan struct{} // Semaphore channel for concurrency limit. + blocked int64 // Number of blocked slots in semaphore. + + latency *dynhist.Collector // Latency histogram collector for percentile calculation. + hCycles int64 // Number of healthy cycles passed since last perf adjustment. + inProgress int64 // Current number of operations in progress. + failed int64 // Number of failed operations. + total int64 // Total number of performed operations. + lastCycle int64 // Timestamp of last feedback cycle. + + cfg Config + + rlExists bool + mu sync.RWMutex + rl *rate.Limiter +} + +// Option is applied to Config. +type Option func(cfg *Config) + +// NewThrottle creates a configured Throttle instance. +func NewThrottle(options ...Option) *Throttle { + t := &throttle{} + + // T is wrapping t to employ runtime.SetFinalizer to finish feedback goroutine. + T := &Throttle{throttle: t} + + t.cfg.Interval = 15 * time.Second + t.cfg.GrowthSkipCycles = 3 + t.cfg.LimitLatency = 10 * time.Second + t.cfg.LimitLatencyPercentile = 90.0 + t.cfg.MaxConcurrency = 1000 + t.cfg.MinConcurrency = int64(runtime.NumCPU()) + t.cfg.StepFracConcurrency = 0.1 + t.cfg.StepFracRate = 0.1 + + for _, o := range options { + o(&t.cfg) + } + + if t.cfg.MaxConcurrency != 0 { + t.sem = make(chan struct{}, t.cfg.MaxConcurrency) + + if t.cfg.InitialConcurrency != 0 { + t.blocked = t.cfg.MaxConcurrency - t.cfg.InitialConcurrency + for i := 0; i < int(t.blocked); i++ { + t.sem <- struct{}{} + } + } + } + + if t.cfg.MaxRate != 0 { + t.rlExists = true + + if t.cfg.InitialRate == 0 { + t.cfg.InitialRate = t.cfg.MaxRate + } + + t.rl = rate.NewLimiter(t.cfg.InitialRate, 1) + } + + if t.cfg.LimitLatency != 0 { + t.latency = &dynhist.Collector{BucketsLimit: 50, WeightFunc: dynhist.ExpWidth(1.5, 1)} + } + + done := make(chan struct{}) + + go func() { + for { + interval := t.cfg.Interval + + select { + case <-time.After(interval): + t.runFeedbackCycle() + + case <-done: + return + } + } + }() + + runtime.SetFinalizer(T, func(_ *Throttle) { + close(done) + }) + + return T +} + +// Go runs a concurrent job. +func (t *Throttle) Go(f func() error) { + t.Acquire() + go func() { + var ( + s = time.Now() + err error + ) + + defer func() { + t.Release(time.Since(s), err != nil) + if r := recover(); r != nil { + } + }() + err = f() + }() +} + +// Acquire waits until a permit is available and blocks one in the pool. +func (t *Throttle) Acquire() { + if t.rlExists { + t.mu.RLock() + rl := t.rl + t.mu.RUnlock() + + _ = rl.Wait(context.Background()) //nolint:errcheck // Does not fail with background context. + } + + if t.sem != nil { + t.sem <- struct{}{} + } + + atomic.AddInt64(&t.inProgress, 1) +} + +// Release returns permit back to the pool. +func (t *Throttle) Release(elapsed time.Duration, failed bool) { + if t.sem != nil { + <-t.sem + } + + t.latency.Add(elapsed.Seconds()) + atomic.AddInt64(&t.inProgress, -1) + atomic.AddInt64(&t.total, 1) + if failed { + atomic.AddInt64(&t.failed, 1) + } +} + +func (t *throttle) runFeedbackCycle() { + atomic.StoreInt64(&t.lastCycle, time.Now().UnixNano()) + + if t.cfg.LimitHeapInUse != 0 { + m := runtime.MemStats{} + runtime.ReadMemStats(&m) + + if m.HeapInuse >= t.cfg.LimitHeapInUse { + t.slower() + + return + } + } + + if t.cfg.LimitLatency != 0 { + lat := t.latency.Percentile(t.cfg.LimitLatencyPercentile) + if lat > t.cfg.LimitLatency.Seconds() { + t.slower() + + return + } + } + + if t.cfg.LimitFailedPercent != 0 { + failed := atomic.LoadInt64(&t.failed) + total := atomic.LoadInt64(&t.total) + + if 100*float64(failed)/float64(total) > t.cfg.LimitFailedPercent { + t.slower() + + return + } + } + + if t.cfg.LimitReached != nil { + if t.cfg.LimitReached() { + t.slower() + + return + } + } + + t.faster() +} + +// slower tightens limits to reduce throughput and allow the system to cool down into a healthy state. +func (t *throttle) slower() { + atomic.StoreInt64(&t.hCycles, 0) + + newRate := rate.Limit(0) + newConcurrency := int64(0) + + if t.sem != nil { + blocked := atomic.LoadInt64(&t.blocked) + active := t.cfg.MaxConcurrency - blocked + delta := int64(float64(active) * t.cfg.StepFracConcurrency) + + if active-delta <= t.cfg.MinConcurrency { + delta = active - t.cfg.MinConcurrency + } + + newConcurrency = active - delta + + if delta > 0 { + for i := 0; i < int(delta); i++ { + t.sem <- struct{}{} + } + + atomic.AddInt64(&t.blocked, delta) + } + } + + if t.rlExists { + newRate = t.updateRateLimiter(true) + } + + if t.cfg.ReceiveUpdate != nil { + t.cfg.ReceiveUpdate(false, newRate, newConcurrency) + } +} + +// faster loosens the limits to allow for more throughput. +func (t *throttle) faster() { + vv := atomic.AddInt64(&t.hCycles, 1) + if vv <= int64(t.cfg.GrowthSkipCycles) { + return + } + + newRate := rate.Limit(0) + newConcurrency := int64(0) + + if t.sem != nil { + blocked := atomic.LoadInt64(&t.blocked) + if blocked == 0 { + return + } + + active := t.cfg.MaxConcurrency - blocked + delta := int64(float64(active) * t.cfg.StepFracConcurrency) + + if active+delta >= t.cfg.MaxConcurrency { + delta = t.cfg.MaxConcurrency - active + } + + newConcurrency = active + delta + + if delta != 0 { + for i := 0; i < int(delta); i++ { + <-t.sem + } + + atomic.AddInt64(&t.blocked, -delta) + } + } + + if t.rlExists { + newRate = t.updateRateLimiter(false) + } + + if t.cfg.ReceiveUpdate != nil { + t.cfg.ReceiveUpdate(true, newRate, newConcurrency) + } +} + +func (t *throttle) updateRateLimiter(slower bool) rate.Limit { + t.mu.Lock() + defer t.mu.Unlock() + + lim := t.rl.Limit() + delta := float64(lim) * t.cfg.StepFracRate + + if slower { + delta = -delta + } + + newLim := lim + rate.Limit(delta) + if newLim > t.cfg.MaxRate { + newLim = t.cfg.MaxRate + } else if newLim < t.cfg.MinRate { + newLim = t.cfg.MinRate + } + + if newLim != lim { + t.rl = rate.NewLimiter(newLim, 1) + } + + return newLim +} + +// InProgress returns the number of operations that are currently in progress. +func (t *Throttle) InProgress() int64 { + return atomic.LoadInt64(&t.inProgress) +} + +// Latency returns percentile latency in seconds, percent value of 10.0 means 10%. +func (t *Throttle) Latency(percent float64) float64 { + if t.latency == nil { + return 0 + } + + return t.latency.Percentile(percent) +} + +func (t *Throttle) LatencyHistogram() *dynhist.Collector { + return t.latency +} + +func (t *Throttle) Concurrency() int64 { + return t.cfg.MaxConcurrency - atomic.LoadInt64(&t.blocked) +} + +func (t *Throttle) FailedPercent() float64 { + return 100 * float64(atomic.LoadInt64(&t.failed)) / float64(atomic.LoadInt64(&t.total)) +} + +func (t *Throttle) WaitInProgress() { + if t.sem == nil { + return + } + + inProgress := t.InProgress() + + // Fill the semaphore to wait for operations in progress. + for i := 0; i < int(inProgress); i++ { + t.sem <- struct{}{} + } + + // Release temporary filling. + for i := 0; i < int(inProgress); i++ { + <-t.sem + } +} diff --git a/throttle_test.go b/throttle_test.go new file mode 100644 index 0000000..c5e1451 --- /dev/null +++ b/throttle_test.go @@ -0,0 +1,83 @@ +package throttle_test + +import ( + "fmt" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + throttle "github.com/bool64/throttle" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" +) + +func TestSemaphor_Acquire(t *testing.T) { + var concurrency int64 + + var s *throttle.Throttle + s = throttle.NewThrottle(func(cfg *throttle.Config) { + cfg.Interval = 100 * time.Millisecond + cfg.StepFracConcurrency = 0.9 + cfg.LimitLatencyPercentile = 93 + cfg.MaxConcurrency = 100 + cfg.MinConcurrency = 20 + cfg.MaxRate = 1000 + cfg.MinRate = 20 + cfg.LimitLatency = 5 * time.Millisecond + cfg.ReceiveUpdate = func(grown bool, r rate.Limit, c int64) { + atomic.StoreInt64(&concurrency, c) + fmt.Println("UUU", grown, float64(r), c, s.Throughput()) + } + }) + + wg := sync.WaitGroup{} + + for i := 0; i < 10000; i++ { + s.Acquire() + + wg.Add(1) + go func() { + defer func() { wg.Done() }() + time.Sleep(time.Millisecond) + s.Release(time.Millisecond) + }() + } + + for i := 0; i < 1000; i++ { + s.Acquire() + + wg.Add(1) + go func() { + defer func() { wg.Done() }() + time.Sleep(10 * time.Millisecond) + s.Release(10 * time.Millisecond) + }() + } + + require.GreaterOrEqual(t, 10*time.Millisecond.Seconds(), s.Latency(90)) + assert.Equal(t, int64(20), atomic.LoadInt64(&concurrency)) // Min concurrency. + println("now healthy") + + for i := 0; i < 20000; i++ { + if i%1000 == 0 { + println(i, s.Latency(90)) + } + + s.Acquire() + + wg.Add(1) + go func() { + defer func() { wg.Done() }() + time.Sleep(1 * time.Microsecond) + s.Release(1 * time.Microsecond) + }() + } + + wg.Wait() + + runtime.GC() + runtime.GC() +} From c25e76341fe27ff8ffe3b36afcbbb29685a8ff3b Mon Sep 17 00:00:00 2001 From: Viacheslav Poturaev Date: Tue, 14 Apr 2026 14:17:29 +0200 Subject: [PATCH 3/6] Initial implementation --- .github/workflows/bench.yml | 91 +++----- .github/workflows/cloc.yml | 4 +- .github/workflows/golangci-lint.yml | 29 +-- .github/workflows/gorelease.yml | 20 +- .github/workflows/test-unit.yml | 40 ++-- .golangci.yml | 144 +++++++----- Makefile | 8 +- {crawler => cmd/crawler}/README.md | 4 + cmd/crawler/main.go | 337 ++++++++++++++++++++++++++++ crawler/main.go | 209 ----------------- crawler/progress.go | 151 ------------- go.mod | 21 +- go.sum | 95 +++----- throttle.go | 25 ++- throttle_test.go | 11 +- 15 files changed, 550 insertions(+), 639 deletions(-) rename {crawler => cmd/crawler}/README.md (68%) create mode 100644 cmd/crawler/main.go delete mode 100644 crawler/main.go delete mode 100644 crawler/progress.go diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index a85f316..ef52f43 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -12,43 +12,27 @@ on: description: 'New Ref' required: true -# Cancel the workflow in progress in newer build is about to start. -concurrency: - group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} - cancel-in-progress: true - env: GO111MODULE: "on" CACHE_BENCHMARK: "off" # Enables benchmark result reuse between runs, may skew latency results. RUN_BASE_BENCHMARK: "on" # Runs benchmark for PR base in case benchmark result is missing. - GO_VERSION: 1.19.x jobs: bench: + strategy: + matrix: + go-version: [ stable ] runs-on: ubuntu-latest steps: - - name: Install Go stable - if: env.GO_VERSION != 'tip' - uses: actions/setup-go@v3 + - name: Install Go + uses: actions/setup-go@v5 with: - go-version: ${{ env.GO_VERSION }} - - - name: Install Go tip - if: env.GO_VERSION == 'tip' - run: | - curl -sL https://storage.googleapis.com/go-build-snap/go/linux-amd64/$(git ls-remote https://github.com/golang/go.git HEAD | awk '{print $1;}').tar.gz -o gotip.tar.gz - ls -lah gotip.tar.gz - mkdir -p ~/sdk/gotip - tar -C ~/sdk/gotip -xzf gotip.tar.gz - ~/sdk/gotip/bin/go version - echo "PATH=$HOME/go/bin:$HOME/sdk/gotip/bin/:$PATH" >> $GITHUB_ENV - + go-version: ${{ matrix.go-version }} - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v4 with: ref: ${{ (github.event.inputs.new != '') && github.event.inputs.new || github.event.ref }} - - name: Go cache - uses: actions/cache@v2 + uses: actions/cache@v4 with: # In order: # * Module download cache @@ -59,58 +43,49 @@ jobs: key: ${{ runner.os }}-go-cache-${{ hashFiles('**/go.sum') }} restore-keys: | ${{ runner.os }}-go-cache - - name: Restore benchstat - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: ~/go/bin/benchstat - key: ${{ runner.os }}-benchstat-legacy - + key: ${{ runner.os }}-benchstat - name: Restore base benchmark result - id: base-benchmark if: env.CACHE_BENCHMARK == 'on' - uses: actions/cache@v2 + id: benchmark-base + uses: actions/cache@v4 with: path: | bench-master.txt bench-main.txt # Use base sha for PR or new commit hash for master/main push in benchmark result key. key: ${{ runner.os }}-bench-${{ (github.event.pull_request.base.sha != github.event.after) && github.event.pull_request.base.sha || github.event.after }} - - - name: Run benchmark - run: | - export REF_NAME=new - make bench - OUTPUT=$(make bench-stat-diff) - echo "${OUTPUT}" - echo "diff<> $GITHUB_OUTPUT && echo "$OUTPUT" >> $GITHUB_OUTPUT && echo "EOF" >> $GITHUB_OUTPUT - OUTPUT=$(make bench-stat) - echo "${OUTPUT}" - echo "result<> $GITHUB_OUTPUT && echo "$OUTPUT" >> $GITHUB_OUTPUT && echo "EOF" >> $GITHUB_OUTPUT - - - name: Run benchmark for base code - if: env.RUN_BASE_BENCHMARK == 'on' && steps.base-benchmark.outputs.cache-hit != 'true' && (github.event.pull_request.base.sha != '' || github.event.inputs.old != '') + - name: Checkout base code + if: env.RUN_BASE_BENCHMARK == 'on' && steps.benchmark-base.outputs.cache-hit != 'true' && (github.event.pull_request.base.sha != '' || github.event.inputs.old != '') + uses: actions/checkout@v4 + with: + ref: ${{ (github.event.pull_request.base.sha != '' ) && github.event.pull_request.base.sha || github.event.inputs.old }} + path: __base + - name: Run base benchmark + if: env.RUN_BASE_BENCHMARK == 'on' && steps.benchmark-base.outputs.cache-hit != 'true' && (github.event.pull_request.base.sha != '' || github.event.inputs.old != '') run: | - git fetch origin master ${{ github.event.pull_request.base.sha }} - HEAD=$(git rev-parse HEAD) - git reset --hard ${{ github.event.pull_request.base.sha }} export REF_NAME=master - make bench-run bench-stat - git reset --hard $HEAD - - - name: Benchmark stats + cd __base + make | grep bench-run && (BENCH_COUNT=5 make bench-run bench-stat && cp bench-master.txt ../bench-master.txt) || echo "No benchmarks in base" + - name: Benchmark id: bench run: | export REF_NAME=new + BENCH_COUNT=5 make bench OUTPUT=$(make bench-stat-diff) - echo "${OUTPUT}" - echo "diff<> $GITHUB_OUTPUT && echo "$OUTPUT" >> $GITHUB_OUTPUT && echo "EOF" >> $GITHUB_OUTPUT + OUTPUT="${OUTPUT//'%'/'%25'}" + OUTPUT="${OUTPUT//$'\n'/'%0A'}" + OUTPUT="${OUTPUT//$'\r'/'%0D'}" + echo "::set-output name=diff::$OUTPUT" OUTPUT=$(make bench-stat) - echo "${OUTPUT}" - echo "result<> $GITHUB_OUTPUT && echo "$OUTPUT" >> $GITHUB_OUTPUT && echo "EOF" >> $GITHUB_OUTPUT - - - name: Comment benchmark result - continue-on-error: true + OUTPUT="${OUTPUT//'%'/'%25'}" + OUTPUT="${OUTPUT//$'\n'/'%0A'}" + OUTPUT="${OUTPUT//$'\r'/'%0D'}" + echo "::set-output name=result::$OUTPUT" + - name: Comment Benchmark Result uses: marocchino/sticky-pull-request-comment@v2 with: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/cloc.yml b/.github/workflows/cloc.yml index 927e099..e0c3825 100644 --- a/.github/workflows/cloc.yml +++ b/.github/workflows/cloc.yml @@ -13,11 +13,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v4 with: path: pr - name: Checkout base code - uses: actions/checkout@v2 + uses: actions/checkout@v4 with: ref: ${{ github.event.pull_request.base.sha }} path: base diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index fbf8167..34c9ed4 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -19,30 +19,11 @@ jobs: name: golangci-lint runs-on: ubuntu-latest steps: - - uses: actions/setup-go@v3 + - uses: actions/setup-go@v5 with: - go-version: 1.19.x - - uses: actions/checkout@v2 + go-version: stable + - uses: actions/checkout@v4 - name: golangci-lint - uses: golangci/golangci-lint-action@v3.3.1 + uses: golangci/golangci-lint-action@v9.2.0 with: - # Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version. - version: v1.50.1 - - # Optional: working directory, useful for monorepos - # working-directory: somedir - - # Optional: golangci-lint command line arguments. - # args: --issues-exit-code=0 - - # Optional: show only new issues if it's a pull request. The default value is `false`. - # only-new-issues: true - - # Optional: if set to true then the action will use pre-installed Go. - # skip-go-installation: true - - # Optional: if set to true then the action don't cache or restore ~/go/pkg. - # skip-pkg-cache: true - - # Optional: if set to true then the action don't cache or restore ~/.cache/go-build. - # skip-build-cache: true \ No newline at end of file + version: v2.11.3 diff --git a/.github/workflows/gorelease.yml b/.github/workflows/gorelease.yml index 5d9fae0..531f975 100644 --- a/.github/workflows/gorelease.yml +++ b/.github/workflows/gorelease.yml @@ -9,29 +9,19 @@ concurrency: cancel-in-progress: true env: - GO_VERSION: 1.19.x + GO_VERSION: stable jobs: gorelease: runs-on: ubuntu-latest steps: - - name: Install Go stable - if: env.GO_VERSION != 'tip' - uses: actions/setup-go@v3 + - name: Install Go + uses: actions/setup-go@v5 with: go-version: ${{ env.GO_VERSION }} - - name: Install Go tip - if: env.GO_VERSION == 'tip' - run: | - curl -sL https://storage.googleapis.com/go-build-snap/go/linux-amd64/$(git ls-remote https://github.com/golang/go.git HEAD | awk '{print $1;}').tar.gz -o gotip.tar.gz - ls -lah gotip.tar.gz - mkdir -p ~/sdk/gotip - tar -C ~/sdk/gotip -xzf gotip.tar.gz - ~/sdk/gotip/bin/go version - echo "PATH=$HOME/go/bin:$HOME/sdk/gotip/bin/:$PATH" >> $GITHUB_ENV - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Gorelease cache - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: | ~/go/bin/gorelease diff --git a/.github/workflows/test-unit.yml b/.github/workflows/test-unit.yml index 17cca25..978f679 100644 --- a/.github/workflows/test-unit.yml +++ b/.github/workflows/test-unit.yml @@ -15,36 +15,25 @@ concurrency: env: GO111MODULE: "on" RUN_BASE_COVERAGE: "on" # Runs test for PR base in case base test coverage is missing. - COV_GO_VERSION: 1.18.x # Version of Go to collect coverage + COV_GO_VERSION: stable # Version of Go to collect coverage TARGET_DELTA_COV: 90 # Target coverage of changed lines, in percents jobs: test: strategy: matrix: - go-version: [ 1.16.x, 1.17.x, 1.18.x, 1.19.x ] + go-version: [ stable, oldstable ] runs-on: ubuntu-latest steps: - - name: Install Go stable - if: matrix.go-version != 'tip' - uses: actions/setup-go@v3 + - name: Install Go + uses: actions/setup-go@v5 with: go-version: ${{ matrix.go-version }} - - name: Install Go tip - if: matrix.go-version == 'tip' - run: | - curl -sL https://storage.googleapis.com/go-build-snap/go/linux-amd64/$(git ls-remote https://github.com/golang/go.git HEAD | awk '{print $1;}').tar.gz -o gotip.tar.gz - ls -lah gotip.tar.gz - mkdir -p ~/sdk/gotip - tar -C ~/sdk/gotip -xzf gotip.tar.gz - ~/sdk/gotip/bin/go version - echo "PATH=$HOME/go/bin:$HOME/sdk/gotip/bin/:$PATH" >> $GITHUB_ENV - - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Go cache - uses: actions/cache@v2 + uses: actions/cache@v4 with: # In order: # * Module download cache @@ -59,7 +48,7 @@ jobs: - name: Restore base test coverage id: base-coverage if: matrix.go-version == env.COV_GO_VERSION && github.event.pull_request.base.sha != '' - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: | unit-base.txt @@ -88,14 +77,15 @@ jobs: id: annotate if: matrix.go-version == env.COV_GO_VERSION && github.event.pull_request.base.sha != '' run: | - curl -sLO https://github.com/vearutop/gocovdiff/releases/download/v1.3.6/linux_amd64.tar.gz && tar xf linux_amd64.tar.gz + curl -sLO https://github.com/vearutop/gocovdiff/releases/download/v1.4.2/linux_amd64.tar.gz && tar xf linux_amd64.tar.gz && rm linux_amd64.tar.gz gocovdiff_hash=$(git hash-object ./gocovdiff) - [ "$gocovdiff_hash" == "8e507e0d671d4d6dfb3612309b72b163492f28eb" ] || (echo "::error::unexpected hash for gocovdiff, possible tampering: $gocovdiff_hash" && exit 1) - git fetch origin master ${{ github.event.pull_request.base.sha }} - REP=$(./gocovdiff -cov unit.coverprofile -gha-annotations gha-unit.txt -delta-cov-file delta-cov-unit.txt -target-delta-cov ${TARGET_DELTA_COV}) + [ "$gocovdiff_hash" == "c37862c73a677e5a9c069470287823ab5bbf0244" ] || (echo "::error::unexpected hash for gocovdiff, possible tampering: $gocovdiff_hash" && exit 1) + # Fetch PR diff from GitHub API. + curl -s -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" -H "Accept: application/vnd.github.v3.diff" https://api.github.com/repos/${{ github.repository }}/pulls/${{ github.event.pull_request.number }} > pull_request.diff + REP=$(./gocovdiff -diff pull_request.diff -mod github.com/$GITHUB_REPOSITORY -cov unit.coverprofile -gha-annotations gha-unit.txt -delta-cov-file delta-cov-unit.txt -target-delta-cov ${TARGET_DELTA_COV}) echo "${REP}" cat gha-unit.txt - DIFF=$(test -e unit-base.txt && ./gocovdiff -func-cov unit.txt -func-base-cov unit-base.txt || echo "Missing base coverage file") + DIFF=$(test -e unit-base.txt && ./gocovdiff -mod github.com/$GITHUB_REPOSITORY -func-cov unit.txt -func-base-cov unit-base.txt || echo "Missing base coverage file") TOTAL=$(cat delta-cov-unit.txt) echo "rep<> $GITHUB_OUTPUT && echo "$REP" >> $GITHUB_OUTPUT && echo "EOF" >> $GITHUB_OUTPUT echo "diff<> $GITHUB_OUTPUT && echo "$DIFF" >> $GITHUB_OUTPUT && echo "EOF" >> $GITHUB_OUTPUT @@ -130,7 +120,7 @@ jobs: - name: Upload code coverage if: matrix.go-version == env.COV_GO_VERSION - uses: codecov/codecov-action@v1 + uses: codecov/codecov-action@v5 with: - file: ./unit.coverprofile + files: ./unit.coverprofile flags: unittests diff --git a/.golangci.yml b/.golangci.yml index 219b387..ea6e9bf 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,68 +1,94 @@ -# See https://github.com/golangci/golangci-lint/blob/master/.golangci.example.yml +# See https://golangci-lint.run/docs/linters/configuration/ +version: "2" run: tests: true - -linters-settings: - errcheck: - check-type-assertions: true - check-blank: true - gocyclo: - min-complexity: 20 - dupl: - threshold: 100 - misspell: - locale: US - unused: - check-exported: false - unparam: - check-exported: true - linters: - enable-all: true + default: all disable: - - lll - - maligned - - gochecknoglobals - - gomnd - - wrapcheck - - paralleltest + - maintidx + - funlen + - cyclop + - gocyclo + - modernize + - gocognit + - noctx + - nlreturn + - wsl + - embeddedstructfieldcheck + - nilnil + - noinlineerr + - wsl_v5 + - funcorder + - copyloopvar + - depguard + - dupword + - errname + - exhaustruct - forbidigo - - exhaustivestruct - - interfacer # deprecated - forcetypeassert - - scopelint # deprecated - - ifshort # too many false positives - - golint # deprecated - - varnamelen - - tagliatelle - - errname + - gochecknoglobals + - intrange - ireturn - - exhaustruct + - lll + - mnd - nonamedreturns - - nosnakecase - - structcheck - - varcheck - - deadcode + - paralleltest + - recvcheck + - tagalign + - tagliatelle - testableexamples - - dupword - -issues: - exclude-use-default: false - exclude-rules: - - linters: - - gomnd - - goconst - - goerr113 - - noctx - - funlen - - dupl - - structcheck - - unused - - unparam - - nosnakecase - path: "_test.go" - - linters: - - errcheck # Error checking omitted for brevity. - - gosec - path: "example_" - + - testifylint + - varnamelen + - wrapcheck + settings: + dupl: + threshold: 100 + errcheck: + check-type-assertions: true + check-blank: true + gocyclo: + min-complexity: 20 + misspell: + locale: US + unparam: + check-exported: true + cyclop: + max-complexity: 15 + funlen: + lines: 70 + exclusions: + generated: lax + rules: + - linters: + - gosec + - dupl + - funlen + - goconst + - mnd + - noctx + - unparam + - unused + path: _test.go + - linters: + - errcheck + - gosec + path: example_ + - linters: + - revive + text: 'unused-parameter: parameter' + paths: + - third_party$ + - builtin$ + - examples$ +formatters: + enable: + - gci + - gofmt + - gofumpt + - goimports + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ diff --git a/Makefile b/Makefile index ec4a06c..22f4832 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -#GOLANGCI_LINT_VERSION := "v1.50.1" # Optional configuration to pinpoint golangci-lint version. +#GOLANGCI_LINT_VERSION := "v2.11.3" # Optional configuration to pinpoint golangci-lint version. # The head of Makefile determines location of dev-go to include standard targets. GO ?= go @@ -27,10 +27,14 @@ ifeq ($(DEVGO_PATH),) endif endif +BUILD_PKG = ./cmd/crawler +BUILD_LDFLAGS=-s -w + -include $(DEVGO_PATH)/makefiles/main.mk -include $(DEVGO_PATH)/makefiles/lint.mk -include $(DEVGO_PATH)/makefiles/test-unit.mk --include $(DEVGO_PATH)/makefiles/bench.mk +-include $(DEVGO_PATH)/makefiles/build.mk +-include $(DEVGO_PATH)/makefiles/release-assets.mk -include $(DEVGO_PATH)/makefiles/reset-ci.mk # Add your custom targets here. diff --git a/crawler/README.md b/cmd/crawler/README.md similarity index 68% rename from crawler/README.md rename to cmd/crawler/README.md index 6d81b22..7969515 100644 --- a/crawler/README.md +++ b/cmd/crawler/README.md @@ -6,4 +6,8 @@ ``` ./crawler rgrr_replay_24.log.gz | gzip -c > rgrr_replay_24-result.log.gz +``` + +``` +./crawler links.log.zst | gzip -c > rgrr_replay_24-result.log.gz ``` \ No newline at end of file diff --git a/cmd/crawler/main.go b/cmd/crawler/main.go new file mode 100644 index 0000000..f24d88c --- /dev/null +++ b/cmd/crawler/main.go @@ -0,0 +1,337 @@ +// Package main provides crawler tool. +package main + +import ( + "bufio" + "compress/gzip" + "flag" + "fmt" + "io" + "log" + "net" + "net/url" + "os" + "sort" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/bool64/logz" + "github.com/bool64/progress" + throttle "github.com/bool64/throttle" + "github.com/klauspost/compress/zstd" + "github.com/valyala/fasthttp" + "github.com/vearutop/dynhist-go" + "golang.org/x/time/rate" +) + +func main() { + skipLines := flag.Int("skip-lines", 0, "Skip first lines.") + minConcurrency := flag.Int("min-concurrency", 1, "Min concurrent requests.") + startConcurrency := flag.Int("start-concurrency", 20, "Start concurrent requests.") + maxConcurrency := flag.Int("max-concurrency", 200, "Max concurrent requests.") + + minRate := flag.Int("min-rate", 0, "Min rate limit, rps (0 = unlimited).") + startRate := flag.Int("start-rate", 0, "Start rate limit, rps (0 = unlimited).") + maxRate := flag.Int("max-rate", 0, "Max rate limit, rps (0 = unlimited).") + + latencyPercentile := flag.Float64("latency-percentile", 90.0, "Percentile latency of requests.") + throttleLatency := flag.Int("throttle-latency", 500, "Min latency of requests (at percentile) to start throttling, ms.") + throttleErrors := flag.Float64("throttle-errors", 5.0, "Min fraction of failed requests to start throttling, %.") + + adaptInterval := flag.Int("adapt-interval", 5, "Interval to reevaluate concurrency and rate limit, s.") + statusInterval := flag.Int("status-interval", 1, "Interval to print status, s.") + + flag.Parse() + + if len(flag.Args()) == 0 { + println("usage: crawler ") + flag.Usage() + return + } + + fn := flag.Arg(0) + + println("fetching through", fn) + + f, err := os.Open(fn) //nolint:gosec // File is opened by user. + if err != nil { + log.Fatal(err) + } + defer func() { + if err := f.Close(); err != nil { + println("file close:", err.Error()) + } + }() + + var r io.Reader = f + + if strings.HasSuffix(fn, ".gz") { + r, err = gzip.NewReader(r) + if err != nil { + println("gzip reader:", err.Error()) + return + } + } + + if strings.HasSuffix(fn, ".zst") { + r, err = zstd.NewReader(r) + if err != nil { + println("zstd reader:", err.Error()) + return + } + } + + fi, err := f.Stat() + if err != nil { + println("file stats:", err.Error()) + return + } + + total := fi.Size() + + cr := progress.NewCountingReader(r) + + p := progress.Progress{ + Interval: time.Duration(*statusInterval) * time.Second, + } + + scanner := bufio.NewScanner(cr) + + buf := make([]byte, 1e7) // 10MB buffer. + scanner.Buffer(buf, len(buf)) + currentConcurrency := int64(*startConcurrency) + + t := throttle.NewThrottle(func(cfg *throttle.Config) { + cfg.MaxConcurrency = int64(*maxConcurrency) + cfg.InitialConcurrency = int64(*startConcurrency) + cfg.MinConcurrency = int64(*minConcurrency) + + cfg.MinRate = rate.Limit(*minRate) + cfg.InitialRate = rate.Limit(*startRate) + cfg.MaxRate = rate.Limit(*maxRate) + + cfg.LimitLatency = time.Duration(*throttleLatency) * time.Millisecond + cfg.LimitLatencyPercentile = *latencyPercentile + cfg.LimitFailedPercent = *throttleErrors + + cfg.Interval = time.Duration(*adaptInterval) * time.Second + cfg.ReceiveUpdate = func(grown bool, newRate rate.Limit, newConcurrency int64) { + atomic.StoreInt64(¤tConcurrency, newConcurrency) + update := "faster" + if !grown { + update = "slower" + } + + rateLimit := "unlimited" + if newRate > 0 { + rateLimit = fmt.Sprintf("%.1f rps", newRate) + } + + println(fmt.Sprintf("limits updated: %s, rate limit: %s, concurrency limit: %d", update, rateLimit, newConcurrency)) + } + }) + + var ( + requestsFinished int64 + totalLines int64 + ) + + statuses := dynhist.Collector{} + errs := logz.Observer{} + errs.MaxSamples = 2 + errs.MaxCardinality = 10 + + printStat := func() { + println(fmt.Sprintf( + "latency %.0f%%: %.1f ms, concurrency: %d, in progress: %d, failed: %.2f%%", + *latencyPercentile, t.Latency(*latencyPercentile)*1000, atomic.LoadInt64(¤tConcurrency), t.InProgress(), t.FailedPercent(), + )) + + println("statuses:") + println(StatusesString(&statuses)) + + entries := errs.GetEntries() + if len(entries) > 0 { + // Order by count desc. + sort.Slice(entries, func(i, j int) bool { + return entries[i].Count > entries[j].Count + }) + + println("top errors:") + for _, e := range entries { + println(e.Count, e.Message) + } + println() + } + } + + p.IncrementalSpeed = true + p.Print = func(s progress.Status) { + if tl := atomic.LoadInt64(&totalLines); tl > 0 { + s.DonePercent = 100 * float64(s.LinesCompleted) / float64(tl) + s.Remaining = time.Duration((100.0 - s.DonePercent) * float64(s.Elapsed) / (s.DonePercent)).Round(time.Second) + } + + println(fmt.Sprintf(s.Task+": %.1f%% bytes read, %d requests processed, %.1f rps, elapsed %s, remaining %s", + s.DonePercent, s.LinesCompleted, s.SpeedLPS, + s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String())) + + printStat() + } + + p.Start(func(t *progress.Task) { + t.TotalBytes = func() int64 { + return total + } + t.CurrentBytes = func() int64 { + cb := cr.Bytes() + if cb == total { + atomic.StoreInt64(&totalLines, cr.Lines()) + } + + return cb + } + t.CurrentLines = func() int64 { + // return cr.Lines() + return atomic.LoadInt64(&requestsFinished) + } + t.Task = "fetching" + }) + + sp := sync.Pool{ + New: func() any { + b := make([]byte, 0, 100) + + return &b + }, + } + + c := fasthttp.Client{} + c.MaxConnsPerHost = *maxConcurrency + c.Dial = func(addr string) (net.Conn, error) { + return net.Dial("tcp", addr) + } + + lines := 0 + for scanner.Scan() { + if err := scanner.Err(); err != nil { + break + } + + lines++ + + if *skipLines > 0 && lines < *skipLines { + continue + } + + line := string(scanner.Bytes()) + + if len(line) == 0 { + statuses.Add(601) + continue + } + + if strings.HasPrefix(line, "FAILED: ") { + p := strings.LastIndex(line, "URL: http") + if p == -1 { + statuses.Add(603) + continue + } + + line = line[p+5:] + } + + if _, err := url.Parse(line); err != nil { + statuses.Add(602) + errs.ObserveMessage(err.Error(), nil) + continue + } + + t.Acquire() + s := time.Now() + + go func() { + failed := false + defer func() { + atomic.AddInt64(&requestsFinished, 1) + t.Release(time.Since(s), !failed) + }() + + b := sp.Get().(*[]byte) //nolint:errcheck + body := (*b)[:0] + + statusCode, body, err := c.Get(body, line) + sp.Put(&body) + + if err != nil { + statuses.Add(600) + + failed = true + + errs.ObserveMessage(err.Error(), line) + + fmt.Println("FAILED: ", err.Error(), ", URL: ", line) + + return + } + + if statusCode >= 500 { + failed = true + } + + statuses.Add(float64(statusCode)) + }() + } + + // Wait for remaining requests in progress. + + println("waiting in progress to finish") + t.WaitInProgress() + p.Stop() + + printStat() + + if err := scanner.Err(); err != nil { + println(err.Error()) + } +} + +// StatusesString renders buckets value. +func StatusesString(c *dynhist.Collector) string { + c.Lock() + defer c.Unlock() + + if len(c.Buckets) == 0 { + return "" + } + + cLen := printfLen("%d", c.Count) + + var res strings.Builder + + fmt.Fprintf(&res, "[%s] %*s total%%", "sta", cLen, "cnt") + + fmt.Fprintf(&res, " (%d requests)\n", c.Count) + + for _, b := range c.Buckets { + percent := float64(100*b.Count) / float64(c.Count) + + fmt.Fprintf(&res, "[%d] %*d %5.2f%%", int(b.Min), cLen, b.Count, percent) + + if dots := strings.Repeat(".", int(percent)); len(dots) > 0 { + fmt.Fprint(&res, " ", dots) + } + + fmt.Fprintln(&res) + } + + return res.String() +} + +func printfLen(format string, val interface{}) int { + s := fmt.Sprintf(format, val) + + return len(s) +} diff --git a/crawler/main.go b/crawler/main.go deleted file mode 100644 index bd1bd6f..0000000 --- a/crawler/main.go +++ /dev/null @@ -1,209 +0,0 @@ -package main - -import ( - "bufio" - "compress/gzip" - "fmt" - "io" - "log" - "net" - "net/url" - "os" - "sort" - "strings" - "sync" - "time" - - "github.com/bool64/logz" - throttle "github.com/bool64/throttle" - "github.com/valyala/fasthttp" - "github.com/vearutop/dynhist-go" - "golang.org/x/time/rate" -) - -func main() { - if len(os.Args) == 1 { - println("usage: crawler ") - return - } - - println("fetching through", os.Args[1]) - - f, err := os.Open(os.Args[1]) - if err != nil { - log.Fatal(err) - } - defer f.Close() - - cr := &CountingReader{ - Reader: f, - } - - var r io.Reader = cr - - if strings.HasSuffix(os.Args[1], ".gz") { - r, err = gzip.NewReader(cr) - if err != nil { - println("gzip reader:", err.Error()) - return - } - } - - fi, err := f.Stat() - if err != nil { - println("file stats:", err.Error()) - return - } - - total := fi.Size() - - p := Progress{ - Interval: 10 * time.Second, - } - - p.Start(total, cr, "sending") - - scanner := bufio.NewScanner(r) - - buf := make([]byte, 1e7) - scanner.Buffer(buf, len(buf)) - - maxConcurrency := 1000 - latencyPercentile := 90.0 - t := throttle.NewThrottle(func(cfg *throttle.Config) { - cfg.MaxConcurrency = int64(maxConcurrency) - cfg.InitialConcurrency = 100 - cfg.MinConcurrency = 20 - - cfg.LimitLatency = 300 * time.Millisecond - cfg.LimitLatencyPercentile = latencyPercentile - cfg.LimitFailedPercent = 5 - - cfg.Interval = 20 * time.Second - cfg.ReceiveUpdate = func(grown bool, newRate rate.Limit, newConcurrency int64) { - println("concurrency updated:", newConcurrency) - } - }) - - statuses := dynhist.Collector{} - errs := logz.Observer{} - errs.MaxSamples = 2 - errs.MaxCardinality = 10 - - printStat := func() { - println(fmt.Sprintf( - "latency %.0f%%: %.1f ms, in progress: %d, failed: %.2f%%", - latencyPercentile, t.Latency(latencyPercentile)*1000, t.InProgress(), t.FailedPercent(), - )) - - println("statuses:") - println(statuses.String()) - - entries := errs.GetEntries() - if len(entries) > 0 { - // Order by count desc. - sort.Slice(entries, func(i, j int) bool { - return entries[i].Count > entries[j].Count - }) - - println("top errors:") - for _, e := range entries { - println(e.Count, e.Message) - } - println() - } - } - - go func() { - for { - time.Sleep(10 * time.Second) - - printStat() - } - }() - - sp := sync.Pool{ - New: func() any { - return make([]byte, 0, 100) - }, - } - - c := fasthttp.Client{} - c.MaxConnsPerHost = maxConcurrency - c.Dial = func(addr string) (net.Conn, error) { - return net.Dial("tcp", addr) - } - - for scanner.Scan() { - if err := scanner.Err(); err != nil { - log.Fatal(err) - } - - line := string(scanner.Bytes()) - - if len(line) == 0 { - statuses.Add(601) - continue - } - - if strings.HasPrefix(line, "FAILED: ") { - p := strings.LastIndex(line, " http") - if p == -1 { - statuses.Add(603) - continue - } - - line = line[p+1:] - } - - if _, err := url.Parse(line); err != nil { - statuses.Add(602) - errs.ObserveMessage(err.Error(), nil) - continue - } - - t.Acquire() - s := time.Now() - - go func() { - failed := false - defer func() { - p.CountLine() - t.Release(time.Since(s), failed) - }() - - body := sp.Get().([]byte) - - statusCode, body, err := c.Get(body, line) - sp.Put(body[:0]) - - if err != nil { - failed = true - - errs.ObserveMessage(err.Error(), line) - - fmt.Println("FAILED:", err.Error(), line) - - statuses.Add(600) - - return - } - - if statusCode >= 500 { - failed = true - } - - // mu.Lock() - // fmt.Println(statusCode, line) - // mu.Unlock() - - statuses.Add(float64(statusCode)) - }() - } - - // Wait for remaining requests in progress. - t.WaitInProgress() - p.Stop() - - printStat() -} diff --git a/crawler/progress.go b/crawler/progress.go deleted file mode 100644 index cd82869..0000000 --- a/crawler/progress.go +++ /dev/null @@ -1,151 +0,0 @@ -package main - -import ( - "fmt" - "io" - "sync/atomic" - "time" -) - -// ProgressStatus describes current progress. -type ProgressStatus struct { - Task string - DonePercent float64 - LinesCompleted int64 - SpeedMBPS float64 - SpeedLPS float64 - Elapsed time.Duration - Remaining time.Duration -} - -// Progress reports reading performance. -type Progress struct { - Interval time.Duration - Print func(status ProgressStatus) - done chan bool - lines int64 - task string - cr *CountingReader - prnt func(s ProgressStatus) - start time.Time - tot float64 - - prevBytes int64 - prevElapsed int64 - prevLines int64 -} - -// Start spawns background progress reporter. -func (p *Progress) Start(total int64, cr *CountingReader, task string) { - p.done = make(chan bool) - p.lines = 0 - p.task = task - p.cr = cr - - interval := p.Interval - if interval == 0 { - interval = time.Second - } - - p.prnt = p.Print - if p.prnt == nil { - p.prnt = func(s ProgressStatus) { - if s.Task != "" { - s.Task += ": " - } - - println(fmt.Sprintf(s.Task+"%.1f%% bytes read, %d lines processed, %.1f l/s, %.2f MB/s, elapsed %s, remaining %s", - s.DonePercent, s.LinesCompleted, s.SpeedLPS, s.SpeedMBPS, - s.Elapsed.Round(10*time.Millisecond).String(), s.Remaining.String())) - } - } - - p.start = time.Now() - p.tot = float64(total) - done := p.done - t := time.NewTicker(interval) - - go func() { - for { - select { - case <-t.C: - p.printStatus(false) - - case <-done: - t.Stop() - - return - } - } - }() -} - -func (p *Progress) printStatus(last bool) { - s := ProgressStatus{} - s.Task = p.task - s.LinesCompleted = atomic.LoadInt64(&p.lines) - - b := p.cr.Bytes() - s.DonePercent = 100 * float64(b) / p.tot - s.Elapsed = time.Since(p.start) - - prevLines := atomic.LoadInt64(&p.prevLines) - prevElapsed := atomic.LoadInt64(&p.prevElapsed) - prevBytes := atomic.LoadInt64(&p.prevBytes) - - atomic.StoreInt64(&p.prevLines, s.LinesCompleted) - atomic.StoreInt64(&p.prevElapsed, int64(s.Elapsed)) - atomic.StoreInt64(&p.prevBytes, b) - - deltaElapsed := s.Elapsed - time.Duration(prevElapsed) - deltaBytes := b - prevBytes - deltaLines := s.LinesCompleted - prevLines - - s.SpeedMBPS = (float64(deltaBytes) / deltaElapsed.Seconds()) / (1024 * 1024) - s.SpeedLPS = float64(deltaLines) / deltaElapsed.Seconds() - - s.Remaining = time.Duration(float64(100*s.Elapsed)/s.DonePercent) - s.Elapsed - s.Remaining = s.Remaining.Truncate(time.Second) - - if s.Remaining > 100*time.Millisecond || last { - p.prnt(s) - } -} - -// CountLine increments line counter. -func (p *Progress) CountLine() int64 { - return atomic.AddInt64(&p.lines, 1) -} - -// Lines returns number of counted lines. -func (p *Progress) Lines() int64 { - return atomic.LoadInt64(&p.lines) -} - -// Stop stops progress reporting. -func (p *Progress) Stop() { - p.printStatus(true) - - close(p.done) -} - -// CountingReader wraps io.Reader to count bytes. -type CountingReader struct { - Reader io.Reader - - readBytes int64 -} - -// Read reads and counts bytes. -func (cr *CountingReader) Read(p []byte) (n int, err error) { - n, err = cr.Reader.Read(p) - - atomic.AddInt64(&cr.readBytes, int64(n)) - - return n, err -} - -// Bytes returns number of read bytes. -func (cr *CountingReader) Bytes() int64 { - return atomic.LoadInt64(&cr.readBytes) -} diff --git a/go.mod b/go.mod index 644bc42..fc9cd4d 100644 --- a/go.mod +++ b/go.mod @@ -1,21 +1,24 @@ module github.com/bool64/throttle -go 1.17 +go 1.25.0 require ( - github.com/bool64/dev v0.2.24 - github.com/bool64/logz v1.1.0 - github.com/stretchr/testify v1.8.1 - github.com/valyala/fasthttp v1.44.0 - github.com/vearutop/dynhist-go v1.2.0 - golang.org/x/time v0.3.0 + github.com/bool64/dev v0.2.45 + github.com/bool64/logz v1.3.2 + github.com/bool64/progress v0.3.23 + github.com/klauspost/compress v1.18.5 + github.com/stretchr/testify v1.11.1 + github.com/valyala/fasthttp v1.70.0 + github.com/vearutop/dynhist-go v1.2.3 + golang.org/x/time v0.15.0 ) require ( - github.com/andybalholm/brotli v1.0.4 // indirect + github.com/andybalholm/brotli v1.2.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/klauspost/compress v1.15.9 // indirect + github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/vearutop/lograte v1.2.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 9552de7..69de9ff 100644 --- a/go.sum +++ b/go.sum @@ -1,83 +1,38 @@ -github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= -github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= -github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/bool64/ctxd v1.0.0/go.mod h1:+rjDVFNOJeO+xlvMqQfG0p53CzuRB7FhPSo5nWSkpQ0= -github.com/bool64/dev v0.1.25/go.mod h1:cTHiTDNc8EewrQPy3p1obNilpMpdmlUesDkFTF2zRWU= -github.com/bool64/dev v0.1.28/go.mod h1:cTHiTDNc8EewrQPy3p1obNilpMpdmlUesDkFTF2zRWU= -github.com/bool64/dev v0.1.41/go.mod h1:cTHiTDNc8EewrQPy3p1obNilpMpdmlUesDkFTF2zRWU= -github.com/bool64/dev v0.2.22/go.mod h1:iJbh1y/HkunEPhgebWRNcs8wfGq7sjvJ6W5iabL8ACg= -github.com/bool64/dev v0.2.24 h1:xptlKivPh870W3Xc9szPcM7wkFmTMuHT8rc0nu7dITk= -github.com/bool64/dev v0.2.24/go.mod h1:iJbh1y/HkunEPhgebWRNcs8wfGq7sjvJ6W5iabL8ACg= -github.com/bool64/logz v1.1.0 h1:X2cOSXinrr7CX4p+7PLBc9Sw2Vw9ykqPqbuaoV+gX/Y= -github.com/bool64/logz v1.1.0/go.mod h1:PzoWm5F2NKpG3D5MNdAps55DVlLKMPl69Wg2YBZ3DJ4= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/andybalholm/brotli v1.2.1 h1:R+f5xP285VArJDRgowrfb9DqL18yVK0gKAW/F+eTWro= +github.com/andybalholm/brotli v1.2.1/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= +github.com/bool64/dev v0.2.45 h1:3nLKhAS/6Oklk3Mt2lHYSN/Cb4tdAD77KLwzeP+6eYE= +github.com/bool64/dev v0.2.45/go.mod h1:iJbh1y/HkunEPhgebWRNcs8wfGq7sjvJ6W5iabL8ACg= +github.com/bool64/logz v1.3.2 h1:v18HgZ7Q0WbV2jk3V4fwVpThZcmf/eNFN1EMeLLhFpc= +github.com/bool64/logz v1.3.2/go.mod h1:4+kS446aeINYBwX5QsOgz8mBBwW/9IEI1Q/FV3wHYDU= +github.com/bool64/progress v0.3.23 h1:1Cxn2uJj7X6auVsTuc8PbdSAmwA7c3wAf8r6E1XuU1M= +github.com/bool64/progress v0.3.23/go.mod h1:tlm4jzG5g58ELXeeqabBR0/Hy5hHpJNyHAwrCsOJ+bo= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= -github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE= +github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/swaggest/usecase v0.1.5/go.mod h1:uubX4ZbjQK1Bnl0xX9hOYpb/IUiSoVKk/yQImawbNMU= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.44.0 h1:R+gLUhldIsfg1HokMuQjdQ5bh9nuXHPIfvkYUu9eR5Q= -github.com/valyala/fasthttp v1.44.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seBFc2uWtgiY= -github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= -github.com/vearutop/dynhist-go v1.0.0/go.mod h1:7Cgyu5Ww8FwdB+Y+zawRz9cQT5oXAxw294L9lQ+JI/k= -github.com/vearutop/dynhist-go v1.2.0 h1:y+kQ3LBSp5Vvpv2vOGWVt14TLBXYp8p2qxzfONvbMwE= -github.com/vearutop/dynhist-go v1.2.0/go.mod h1:yC+DkFgBUuqBW+qb32mrwwGoc5I4bAlwB0dHhxk4rF8= -go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= -go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/zap v1.19.0/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20220906165146-f3363e06e74c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= -golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +github.com/valyala/fasthttp v1.70.0 h1:LAhMGcWk13QZWm85+eg8ZBNbrq5mnkWFGbHMUJHIdXA= +github.com/valyala/fasthttp v1.70.0/go.mod h1:oDZEHHkJ/Buyklg6uURmYs19442zFSnCIfX3j1FY3pE= +github.com/vearutop/dynhist-go v1.2.3 h1:EIMWszSDm6b7zmqySgx8zW2qNctE3IXUJggGlDFwJBE= +github.com/vearutop/dynhist-go v1.2.3/go.mod h1:liiiYiwAi8ixC3DbkxooEhASTF6ysJSXy+piCrBtxEg= +github.com/vearutop/lograte v1.2.2 h1:uB6VPc+ZYiljWi97aqATYVacv64iG1qI3vpr4owtZ3Y= +github.com/vearutop/lograte v1.2.2/go.mod h1:WowwfH/eQVjXGfx104R/w7RCWfuiPyhr6Azd0+A3he4= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= +golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= +golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/throttle.go b/throttle.go index b388a49..4d25542 100644 --- a/throttle.go +++ b/throttle.go @@ -142,7 +142,7 @@ func NewThrottle(options ...Option) *Throttle { } if t.cfg.LimitLatency != 0 { - t.latency = &dynhist.Collector{BucketsLimit: 50, WeightFunc: dynhist.ExpWidth(1.5, 1)} + t.latency = &dynhist.Collector{BucketsLimit: 300, WeightFunc: dynhist.ExpWidth(1.5, 1)} } done := make(chan struct{}) @@ -169,20 +169,21 @@ func NewThrottle(options ...Option) *Throttle { } // Go runs a concurrent job. -func (t *Throttle) Go(f func() error) { +func (t *Throttle) Go(f func() (passed bool)) { t.Acquire() go func() { var ( - s = time.Now() - err error + s = time.Now() + passed bool ) defer func() { - t.Release(time.Since(s), err != nil) if r := recover(); r != nil { + atomic.AddInt64(&t.failed, 1) } + t.Release(time.Since(s), passed) }() - err = f() + passed = f() }() } @@ -204,7 +205,7 @@ func (t *Throttle) Acquire() { } // Release returns permit back to the pool. -func (t *Throttle) Release(elapsed time.Duration, failed bool) { +func (t *Throttle) Release(elapsed time.Duration, passed bool) { if t.sem != nil { <-t.sem } @@ -212,7 +213,7 @@ func (t *Throttle) Release(elapsed time.Duration, failed bool) { t.latency.Add(elapsed.Seconds()) atomic.AddInt64(&t.inProgress, -1) atomic.AddInt64(&t.total, 1) - if failed { + if !passed { atomic.AddInt64(&t.failed, 1) } } @@ -317,6 +318,10 @@ func (t *throttle) faster() { active := t.cfg.MaxConcurrency - blocked delta := int64(float64(active) * t.cfg.StepFracConcurrency) + if delta < 1 { + delta = 1 + } + if active+delta >= t.cfg.MaxConcurrency { delta = t.cfg.MaxConcurrency - active } @@ -380,18 +385,22 @@ func (t *Throttle) Latency(percent float64) float64 { return t.latency.Percentile(percent) } +// LatencyHistogram returns latency histogram collector. func (t *Throttle) LatencyHistogram() *dynhist.Collector { return t.latency } +// Concurrency returns the number of available slots in the semaphore. func (t *Throttle) Concurrency() int64 { return t.cfg.MaxConcurrency - atomic.LoadInt64(&t.blocked) } +// FailedPercent returns the fraction of failed operations of a healthy system, in percents. func (t *Throttle) FailedPercent() float64 { return 100 * float64(atomic.LoadInt64(&t.failed)) / float64(atomic.LoadInt64(&t.total)) } +// WaitInProgress blocks until all operations in progress are finished. func (t *Throttle) WaitInProgress() { if t.sem == nil { return diff --git a/throttle_test.go b/throttle_test.go index c5e1451..a145388 100644 --- a/throttle_test.go +++ b/throttle_test.go @@ -1,7 +1,6 @@ package throttle_test import ( - "fmt" "runtime" "sync" "sync/atomic" @@ -17,8 +16,7 @@ import ( func TestSemaphor_Acquire(t *testing.T) { var concurrency int64 - var s *throttle.Throttle - s = throttle.NewThrottle(func(cfg *throttle.Config) { + s := throttle.NewThrottle(func(cfg *throttle.Config) { cfg.Interval = 100 * time.Millisecond cfg.StepFracConcurrency = 0.9 cfg.LimitLatencyPercentile = 93 @@ -29,7 +27,6 @@ func TestSemaphor_Acquire(t *testing.T) { cfg.LimitLatency = 5 * time.Millisecond cfg.ReceiveUpdate = func(grown bool, r rate.Limit, c int64) { atomic.StoreInt64(&concurrency, c) - fmt.Println("UUU", grown, float64(r), c, s.Throughput()) } }) @@ -42,7 +39,7 @@ func TestSemaphor_Acquire(t *testing.T) { go func() { defer func() { wg.Done() }() time.Sleep(time.Millisecond) - s.Release(time.Millisecond) + s.Release(time.Millisecond, true) }() } @@ -53,7 +50,7 @@ func TestSemaphor_Acquire(t *testing.T) { go func() { defer func() { wg.Done() }() time.Sleep(10 * time.Millisecond) - s.Release(10 * time.Millisecond) + s.Release(10*time.Millisecond, true) }() } @@ -72,7 +69,7 @@ func TestSemaphor_Acquire(t *testing.T) { go func() { defer func() { wg.Done() }() time.Sleep(1 * time.Microsecond) - s.Release(1 * time.Microsecond) + s.Release(1*time.Microsecond, true) }() } From c9ae764ed5b1068e03ccc0642c3a45f5beba44fb Mon Sep 17 00:00:00 2001 From: Viacheslav Poturaev Date: Tue, 14 Apr 2026 14:18:05 +0200 Subject: [PATCH 4/6] Initial implementation --- .github/workflows/bench.yml | 107 --------------------------- .github/workflows/release-assets.yml | 58 +++++++++++++++ 2 files changed, 58 insertions(+), 107 deletions(-) delete mode 100644 .github/workflows/bench.yml create mode 100644 .github/workflows/release-assets.yml diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml deleted file mode 100644 index ef52f43..0000000 --- a/.github/workflows/bench.yml +++ /dev/null @@ -1,107 +0,0 @@ -# This script is provided by github.com/bool64/dev. -name: bench -on: - pull_request: - workflow_dispatch: - inputs: - old: - description: 'Old Ref' - required: false - default: 'master' - new: - description: 'New Ref' - required: true - -env: - GO111MODULE: "on" - CACHE_BENCHMARK: "off" # Enables benchmark result reuse between runs, may skew latency results. - RUN_BASE_BENCHMARK: "on" # Runs benchmark for PR base in case benchmark result is missing. -jobs: - bench: - strategy: - matrix: - go-version: [ stable ] - runs-on: ubuntu-latest - steps: - - name: Install Go - uses: actions/setup-go@v5 - with: - go-version: ${{ matrix.go-version }} - - name: Checkout code - uses: actions/checkout@v4 - with: - ref: ${{ (github.event.inputs.new != '') && github.event.inputs.new || github.event.ref }} - - name: Go cache - uses: actions/cache@v4 - with: - # In order: - # * Module download cache - # * Build cache (Linux) - path: | - ~/go/pkg/mod - ~/.cache/go-build - key: ${{ runner.os }}-go-cache-${{ hashFiles('**/go.sum') }} - restore-keys: | - ${{ runner.os }}-go-cache - - name: Restore benchstat - uses: actions/cache@v4 - with: - path: ~/go/bin/benchstat - key: ${{ runner.os }}-benchstat - - name: Restore base benchmark result - if: env.CACHE_BENCHMARK == 'on' - id: benchmark-base - uses: actions/cache@v4 - with: - path: | - bench-master.txt - bench-main.txt - # Use base sha for PR or new commit hash for master/main push in benchmark result key. - key: ${{ runner.os }}-bench-${{ (github.event.pull_request.base.sha != github.event.after) && github.event.pull_request.base.sha || github.event.after }} - - name: Checkout base code - if: env.RUN_BASE_BENCHMARK == 'on' && steps.benchmark-base.outputs.cache-hit != 'true' && (github.event.pull_request.base.sha != '' || github.event.inputs.old != '') - uses: actions/checkout@v4 - with: - ref: ${{ (github.event.pull_request.base.sha != '' ) && github.event.pull_request.base.sha || github.event.inputs.old }} - path: __base - - name: Run base benchmark - if: env.RUN_BASE_BENCHMARK == 'on' && steps.benchmark-base.outputs.cache-hit != 'true' && (github.event.pull_request.base.sha != '' || github.event.inputs.old != '') - run: | - export REF_NAME=master - cd __base - make | grep bench-run && (BENCH_COUNT=5 make bench-run bench-stat && cp bench-master.txt ../bench-master.txt) || echo "No benchmarks in base" - - name: Benchmark - id: bench - run: | - export REF_NAME=new - BENCH_COUNT=5 make bench - OUTPUT=$(make bench-stat-diff) - OUTPUT="${OUTPUT//'%'/'%25'}" - OUTPUT="${OUTPUT//$'\n'/'%0A'}" - OUTPUT="${OUTPUT//$'\r'/'%0D'}" - echo "::set-output name=diff::$OUTPUT" - OUTPUT=$(make bench-stat) - OUTPUT="${OUTPUT//'%'/'%25'}" - OUTPUT="${OUTPUT//$'\n'/'%0A'}" - OUTPUT="${OUTPUT//$'\r'/'%0D'}" - echo "::set-output name=result::$OUTPUT" - - name: Comment Benchmark Result - uses: marocchino/sticky-pull-request-comment@v2 - with: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - header: bench - message: | - ### Benchmark Result -
Benchmark diff with base branch - - ``` - ${{ steps.bench.outputs.diff }} - ``` -
- -
Benchmark result - - ``` - ${{ steps.bench.outputs.result }} - ``` -
diff --git a/.github/workflows/release-assets.yml b/.github/workflows/release-assets.yml new file mode 100644 index 0000000..49dbb2e --- /dev/null +++ b/.github/workflows/release-assets.yml @@ -0,0 +1,58 @@ +# This script is provided by github.com/bool64/dev. + +# This script uploads application binaries as GitHub release assets. +name: release-assets +on: + release: + types: + - created + +permissions: + contents: write # Required to upload assets + +env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GO_VERSION: stable + TAG_NAME: ${{ github.event.release.tag_name }} +jobs: + build: + name: Upload Release Assets + runs-on: ubuntu-latest + steps: + - name: Install Go + uses: actions/setup-go@v5 + with: + go-version: ${{ env.GO_VERSION }} + - name: Checkout code + uses: actions/checkout@v4 + - name: Build artifacts + run: | + make release-assets + + - name: Upload linux_amd64.tar.gz + if: hashFiles('linux_amd64.tar.gz') != '' + run: gh release upload "$TAG_NAME" linux_amd64.tar.gz --clobber + + - name: Upload linux_amd64_dbg.tar.gz + if: hashFiles('linux_amd64_dbg.tar.gz') != '' + run: gh release upload "$TAG_NAME" linux_amd64_dbg.tar.gz --clobber + + - name: Upload linux_arm64.tar.gz + if: hashFiles('linux_arm64.tar.gz') != '' + run: gh release upload "$TAG_NAME" linux_arm64.tar.gz --clobber + + - name: Upload linux_arm.tar.gz + if: hashFiles('linux_arm.tar.gz') != '' + run: gh release upload "$TAG_NAME" linux_arm.tar.gz --clobber + + - name: Upload darwin_amd64.tar.gz + if: hashFiles('darwin_amd64.tar.gz') != '' + run: gh release upload "$TAG_NAME" darwin_amd64.tar.gz --clobber + + - name: Upload darwin_arm64.tar.gz + if: hashFiles('darwin_arm64.tar.gz') != '' + run: gh release upload "$TAG_NAME" darwin_arm64.tar.gz --clobber + + - name: Upload windows_amd64.zip + if: hashFiles('windows_amd64.zip') != '' + run: gh release upload "$TAG_NAME" windows_amd64.zip --clobber From 481fd88f12a2786b6bcf8c5467393058340b74c6 Mon Sep 17 00:00:00 2001 From: Viacheslav Poturaev Date: Thu, 7 May 2026 10:53:13 +0200 Subject: [PATCH 5/6] Initial implementation --- throttle_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/throttle_test.go b/throttle_test.go index a145388..3f6b469 100644 --- a/throttle_test.go +++ b/throttle_test.go @@ -25,6 +25,7 @@ func TestSemaphor_Acquire(t *testing.T) { cfg.MaxRate = 1000 cfg.MinRate = 20 cfg.LimitLatency = 5 * time.Millisecond + cfg.Interval = 100 * time.Millisecond cfg.ReceiveUpdate = func(grown bool, r rate.Limit, c int64) { atomic.StoreInt64(&concurrency, c) } @@ -32,7 +33,7 @@ func TestSemaphor_Acquire(t *testing.T) { wg := sync.WaitGroup{} - for i := 0; i < 10000; i++ { + for i := 0; i < 1000; i++ { s.Acquire() wg.Add(1) @@ -43,7 +44,7 @@ func TestSemaphor_Acquire(t *testing.T) { }() } - for i := 0; i < 1000; i++ { + for i := 0; i < 100; i++ { s.Acquire() wg.Add(1) @@ -58,8 +59,8 @@ func TestSemaphor_Acquire(t *testing.T) { assert.Equal(t, int64(20), atomic.LoadInt64(&concurrency)) // Min concurrency. println("now healthy") - for i := 0; i < 20000; i++ { - if i%1000 == 0 { + for i := 0; i < 2000; i++ { + if i%100 == 0 { println(i, s.Latency(90)) } From dacc21cfcfc3e2699dbef26e657744cf403a1740 Mon Sep 17 00:00:00 2001 From: Viacheslav Poturaev Date: Thu, 7 May 2026 11:16:19 +0200 Subject: [PATCH 6/6] Initial implementation --- throttle_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/throttle_test.go b/throttle_test.go index 3f6b469..5a2403d 100644 --- a/throttle_test.go +++ b/throttle_test.go @@ -55,6 +55,7 @@ func TestSemaphor_Acquire(t *testing.T) { }() } + time.Sleep(200 * time.Millisecond) require.GreaterOrEqual(t, 10*time.Millisecond.Seconds(), s.Latency(90)) assert.Equal(t, int64(20), atomic.LoadInt64(&concurrency)) // Min concurrency. println("now healthy")