diff --git a/_mocks/opencsg.com/csghub-server/builder/store/database/mock_DeployBenchmarkTaskStore.go b/_mocks/opencsg.com/csghub-server/builder/store/database/mock_DeployBenchmarkTaskStore.go new file mode 100644 index 000000000..9de1fe25b --- /dev/null +++ b/_mocks/opencsg.com/csghub-server/builder/store/database/mock_DeployBenchmarkTaskStore.go @@ -0,0 +1,390 @@ +// Code generated by mockery v2.53.5. DO NOT EDIT. + +package database + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + database "opencsg.com/csghub-server/builder/store/database" +) + +// MockDeployBenchmarkTaskStore is an autogenerated mock type for the DeployBenchmarkTaskStore type +type MockDeployBenchmarkTaskStore struct { + mock.Mock +} + +type MockDeployBenchmarkTaskStore_Expecter struct { + mock *mock.Mock +} + +func (_m *MockDeployBenchmarkTaskStore) EXPECT() *MockDeployBenchmarkTaskStore_Expecter { + return &MockDeployBenchmarkTaskStore_Expecter{mock: &_m.Mock} +} + +// Create provides a mock function with given fields: ctx, task +func (_m *MockDeployBenchmarkTaskStore) Create(ctx context.Context, task *database.DeployBenchmarkTask) (*database.DeployBenchmarkTask, error) { + ret := _m.Called(ctx, task) + + if len(ret) == 0 { + panic("no return value specified for Create") + } + + var r0 *database.DeployBenchmarkTask + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *database.DeployBenchmarkTask) (*database.DeployBenchmarkTask, error)); ok { + return rf(ctx, task) + } + if rf, ok := ret.Get(0).(func(context.Context, *database.DeployBenchmarkTask) *database.DeployBenchmarkTask); ok { + r0 = rf(ctx, task) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*database.DeployBenchmarkTask) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *database.DeployBenchmarkTask) error); ok { + r1 = rf(ctx, task) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDeployBenchmarkTaskStore_Create_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Create' +type MockDeployBenchmarkTaskStore_Create_Call struct { + *mock.Call +} + +// Create is a helper method to define mock.On call +// - ctx context.Context +// - task *database.DeployBenchmarkTask +func (_e *MockDeployBenchmarkTaskStore_Expecter) Create(ctx interface{}, task interface{}) *MockDeployBenchmarkTaskStore_Create_Call { + return &MockDeployBenchmarkTaskStore_Create_Call{Call: _e.mock.On("Create", ctx, task)} +} + +func (_c *MockDeployBenchmarkTaskStore_Create_Call) Run(run func(ctx context.Context, task *database.DeployBenchmarkTask)) *MockDeployBenchmarkTaskStore_Create_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*database.DeployBenchmarkTask)) + }) + return _c +} + +func (_c *MockDeployBenchmarkTaskStore_Create_Call) Return(_a0 *database.DeployBenchmarkTask, _a1 error) *MockDeployBenchmarkTaskStore_Create_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDeployBenchmarkTaskStore_Create_Call) RunAndReturn(run func(context.Context, *database.DeployBenchmarkTask) (*database.DeployBenchmarkTask, error)) *MockDeployBenchmarkTaskStore_Create_Call { + _c.Call.Return(run) + return _c +} + +// FindByID provides a mock function with given fields: ctx, id +func (_m *MockDeployBenchmarkTaskStore) FindByID(ctx context.Context, id int64) (*database.DeployBenchmarkTask, error) { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for FindByID") + } + + var r0 *database.DeployBenchmarkTask + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64) (*database.DeployBenchmarkTask, error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, int64) *database.DeployBenchmarkTask); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*database.DeployBenchmarkTask) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDeployBenchmarkTaskStore_FindByID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FindByID' +type MockDeployBenchmarkTaskStore_FindByID_Call struct { + *mock.Call +} + +// FindByID is a helper method to define mock.On call +// - ctx context.Context +// - id int64 +func (_e *MockDeployBenchmarkTaskStore_Expecter) FindByID(ctx interface{}, id interface{}) *MockDeployBenchmarkTaskStore_FindByID_Call { + return &MockDeployBenchmarkTaskStore_FindByID_Call{Call: _e.mock.On("FindByID", ctx, id)} +} + +func (_c *MockDeployBenchmarkTaskStore_FindByID_Call) Run(run func(ctx context.Context, id int64)) *MockDeployBenchmarkTaskStore_FindByID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *MockDeployBenchmarkTaskStore_FindByID_Call) Return(_a0 *database.DeployBenchmarkTask, _a1 error) *MockDeployBenchmarkTaskStore_FindByID_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDeployBenchmarkTaskStore_FindByID_Call) RunAndReturn(run func(context.Context, int64) (*database.DeployBenchmarkTask, error)) *MockDeployBenchmarkTaskStore_FindByID_Call { + _c.Call.Return(run) + return _c +} + +// FindByTrigger provides a mock function with given fields: ctx, deployID, triggerSource, triggerKey +func (_m *MockDeployBenchmarkTaskStore) FindByTrigger(ctx context.Context, deployID int64, triggerSource string, triggerKey string) (*database.DeployBenchmarkTask, error) { + ret := _m.Called(ctx, deployID, triggerSource, triggerKey) + + if len(ret) == 0 { + panic("no return value specified for FindByTrigger") + } + + var r0 *database.DeployBenchmarkTask + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64, string, string) (*database.DeployBenchmarkTask, error)); ok { + return rf(ctx, deployID, triggerSource, triggerKey) + } + if rf, ok := ret.Get(0).(func(context.Context, int64, string, string) *database.DeployBenchmarkTask); ok { + r0 = rf(ctx, deployID, triggerSource, triggerKey) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*database.DeployBenchmarkTask) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64, string, string) error); ok { + r1 = rf(ctx, deployID, triggerSource, triggerKey) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDeployBenchmarkTaskStore_FindByTrigger_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FindByTrigger' +type MockDeployBenchmarkTaskStore_FindByTrigger_Call struct { + *mock.Call +} + +// FindByTrigger is a helper method to define mock.On call +// - ctx context.Context +// - deployID int64 +// - triggerSource string +// - triggerKey string +func (_e *MockDeployBenchmarkTaskStore_Expecter) FindByTrigger(ctx interface{}, deployID interface{}, triggerSource interface{}, triggerKey interface{}) *MockDeployBenchmarkTaskStore_FindByTrigger_Call { + return &MockDeployBenchmarkTaskStore_FindByTrigger_Call{Call: _e.mock.On("FindByTrigger", ctx, deployID, triggerSource, triggerKey)} +} + +func (_c *MockDeployBenchmarkTaskStore_FindByTrigger_Call) Run(run func(ctx context.Context, deployID int64, triggerSource string, triggerKey string)) *MockDeployBenchmarkTaskStore_FindByTrigger_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(string), args[3].(string)) + }) + return _c +} + +func (_c *MockDeployBenchmarkTaskStore_FindByTrigger_Call) Return(_a0 *database.DeployBenchmarkTask, _a1 error) *MockDeployBenchmarkTaskStore_FindByTrigger_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDeployBenchmarkTaskStore_FindByTrigger_Call) RunAndReturn(run func(context.Context, int64, string, string) (*database.DeployBenchmarkTask, error)) *MockDeployBenchmarkTaskStore_FindByTrigger_Call { + _c.Call.Return(run) + return _c +} + +// FindLatestByDeployID provides a mock function with given fields: ctx, deployID +func (_m *MockDeployBenchmarkTaskStore) FindLatestByDeployID(ctx context.Context, deployID int64) (*database.DeployBenchmarkTask, error) { + ret := _m.Called(ctx, deployID) + + if len(ret) == 0 { + panic("no return value specified for FindLatestByDeployID") + } + + var r0 *database.DeployBenchmarkTask + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64) (*database.DeployBenchmarkTask, error)); ok { + return rf(ctx, deployID) + } + if rf, ok := ret.Get(0).(func(context.Context, int64) *database.DeployBenchmarkTask); ok { + r0 = rf(ctx, deployID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*database.DeployBenchmarkTask) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, deployID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDeployBenchmarkTaskStore_FindLatestByDeployID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FindLatestByDeployID' +type MockDeployBenchmarkTaskStore_FindLatestByDeployID_Call struct { + *mock.Call +} + +// FindLatestByDeployID is a helper method to define mock.On call +// - ctx context.Context +// - deployID int64 +func (_e *MockDeployBenchmarkTaskStore_Expecter) FindLatestByDeployID(ctx interface{}, deployID interface{}) *MockDeployBenchmarkTaskStore_FindLatestByDeployID_Call { + return &MockDeployBenchmarkTaskStore_FindLatestByDeployID_Call{Call: _e.mock.On("FindLatestByDeployID", ctx, deployID)} +} + +func (_c *MockDeployBenchmarkTaskStore_FindLatestByDeployID_Call) Run(run func(ctx context.Context, deployID int64)) *MockDeployBenchmarkTaskStore_FindLatestByDeployID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *MockDeployBenchmarkTaskStore_FindLatestByDeployID_Call) Return(_a0 *database.DeployBenchmarkTask, _a1 error) *MockDeployBenchmarkTaskStore_FindLatestByDeployID_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDeployBenchmarkTaskStore_FindLatestByDeployID_Call) RunAndReturn(run func(context.Context, int64) (*database.DeployBenchmarkTask, error)) *MockDeployBenchmarkTaskStore_FindLatestByDeployID_Call { + _c.Call.Return(run) + return _c +} + +// ListByDeployID provides a mock function with given fields: ctx, deployID, per, page +func (_m *MockDeployBenchmarkTaskStore) ListByDeployID(ctx context.Context, deployID int64, per int, page int) ([]database.DeployBenchmarkTask, int, error) { + ret := _m.Called(ctx, deployID, per, page) + + if len(ret) == 0 { + panic("no return value specified for ListByDeployID") + } + + var r0 []database.DeployBenchmarkTask + var r1 int + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, int64, int, int) ([]database.DeployBenchmarkTask, int, error)); ok { + return rf(ctx, deployID, per, page) + } + if rf, ok := ret.Get(0).(func(context.Context, int64, int, int) []database.DeployBenchmarkTask); ok { + r0 = rf(ctx, deployID, per, page) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]database.DeployBenchmarkTask) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64, int, int) int); ok { + r1 = rf(ctx, deployID, per, page) + } else { + r1 = ret.Get(1).(int) + } + + if rf, ok := ret.Get(2).(func(context.Context, int64, int, int) error); ok { + r2 = rf(ctx, deployID, per, page) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MockDeployBenchmarkTaskStore_ListByDeployID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListByDeployID' +type MockDeployBenchmarkTaskStore_ListByDeployID_Call struct { + *mock.Call +} + +// ListByDeployID is a helper method to define mock.On call +// - ctx context.Context +// - deployID int64 +// - per int +// - page int +func (_e *MockDeployBenchmarkTaskStore_Expecter) ListByDeployID(ctx interface{}, deployID interface{}, per interface{}, page interface{}) *MockDeployBenchmarkTaskStore_ListByDeployID_Call { + return &MockDeployBenchmarkTaskStore_ListByDeployID_Call{Call: _e.mock.On("ListByDeployID", ctx, deployID, per, page)} +} + +func (_c *MockDeployBenchmarkTaskStore_ListByDeployID_Call) Run(run func(ctx context.Context, deployID int64, per int, page int)) *MockDeployBenchmarkTaskStore_ListByDeployID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(int), args[3].(int)) + }) + return _c +} + +func (_c *MockDeployBenchmarkTaskStore_ListByDeployID_Call) Return(_a0 []database.DeployBenchmarkTask, _a1 int, _a2 error) *MockDeployBenchmarkTaskStore_ListByDeployID_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *MockDeployBenchmarkTaskStore_ListByDeployID_Call) RunAndReturn(run func(context.Context, int64, int, int) ([]database.DeployBenchmarkTask, int, error)) *MockDeployBenchmarkTaskStore_ListByDeployID_Call { + _c.Call.Return(run) + return _c +} + +// Update provides a mock function with given fields: ctx, task +func (_m *MockDeployBenchmarkTaskStore) Update(ctx context.Context, task *database.DeployBenchmarkTask) error { + ret := _m.Called(ctx, task) + + if len(ret) == 0 { + panic("no return value specified for Update") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *database.DeployBenchmarkTask) error); ok { + r0 = rf(ctx, task) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockDeployBenchmarkTaskStore_Update_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Update' +type MockDeployBenchmarkTaskStore_Update_Call struct { + *mock.Call +} + +// Update is a helper method to define mock.On call +// - ctx context.Context +// - task *database.DeployBenchmarkTask +func (_e *MockDeployBenchmarkTaskStore_Expecter) Update(ctx interface{}, task interface{}) *MockDeployBenchmarkTaskStore_Update_Call { + return &MockDeployBenchmarkTaskStore_Update_Call{Call: _e.mock.On("Update", ctx, task)} +} + +func (_c *MockDeployBenchmarkTaskStore_Update_Call) Run(run func(ctx context.Context, task *database.DeployBenchmarkTask)) *MockDeployBenchmarkTaskStore_Update_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*database.DeployBenchmarkTask)) + }) + return _c +} + +func (_c *MockDeployBenchmarkTaskStore_Update_Call) Return(_a0 error) *MockDeployBenchmarkTaskStore_Update_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockDeployBenchmarkTaskStore_Update_Call) RunAndReturn(run func(context.Context, *database.DeployBenchmarkTask) error) *MockDeployBenchmarkTaskStore_Update_Call { + _c.Call.Return(run) + return _c +} + +// NewMockDeployBenchmarkTaskStore creates a new instance of MockDeployBenchmarkTaskStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockDeployBenchmarkTaskStore(t interface { + mock.TestingT + Cleanup(func()) +}) *MockDeployBenchmarkTaskStore { + mock := &MockDeployBenchmarkTaskStore{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/api/handler/repo_deploy.go b/api/handler/repo_deploy.go index dd15c7126..8c197d36b 100644 --- a/api/handler/repo_deploy.go +++ b/api/handler/repo_deploy.go @@ -8,6 +8,7 @@ import ( "strconv" "time" + "github.com/gin-gonic/gin" "opencsg.com/csghub-server/api/httpbase" "opencsg.com/csghub-server/common/errorx" diff --git a/api/router/api.go b/api/router/api.go index a5e6678f9..a1327c2ff 100644 --- a/api/router/api.go +++ b/api/router/api.go @@ -737,6 +737,8 @@ func createModelRoutes(config *config.Config, modelsServerlessGroup.GET("/:namespace/:name/serverless/:id/logs", middlewareCollection.Auth.NeedAdmin, repoCommonHandler.ServerlessLogs) modelsServerlessGroup.GET("/:namespace/:name/serverless/:id/versions/:commit_id", middlewareCollection.Auth.NeedAdmin, repoCommonHandler.ServerlessVersionLogs) modelsServerlessGroup.PUT("/:namespace/:name/serverless/:id", middlewareCollection.Auth.NeedAdmin, repoCommonHandler.ServerlessUpdate) + registerServerlessBenchmarkRoutes(modelsServerlessGroup, middlewareCollection, repoCommonHandler) + } { // serverless monitor diff --git a/api/router/api_serverless_benchmark_ce.go b/api/router/api_serverless_benchmark_ce.go new file mode 100644 index 000000000..934a5cf7e --- /dev/null +++ b/api/router/api_serverless_benchmark_ce.go @@ -0,0 +1,19 @@ +//go:build !ee && !saas + +package router + +import ( + "github.com/gin-gonic/gin" + "opencsg.com/csghub-server/api/handler" + "opencsg.com/csghub-server/api/middleware" +) + +//nolint:unused +func initServerlessBenchmarkHooks() {} + +func registerServerlessBenchmarkRoutes( + modelsServerlessGroup *gin.RouterGroup, + middlewareCollection middleware.MiddlewareCollection, + repoCommonHandler *handler.RepoHandler, +) { +} diff --git a/api/workflow/deployer.go b/api/workflow/deployer.go index 3b7cb4b3f..e1ac5e6be 100644 --- a/api/workflow/deployer.go +++ b/api/workflow/deployer.go @@ -72,10 +72,6 @@ func StartDeployWorker( w.RegisterActivity(act) w.RegisterWorkflow(DeployWorkflow) - err := temporalClient.Start() - if err != nil { - return fmt.Errorf("failed to start worker: %w", err) - } return nil } diff --git a/builder/deploy/benchmark/reports/live_benchmark/BENCHMARK_RESULT_FIELDS.md b/builder/deploy/benchmark/reports/live_benchmark/BENCHMARK_RESULT_FIELDS.md new file mode 100644 index 000000000..f4ab1084f --- /dev/null +++ b/builder/deploy/benchmark/reports/live_benchmark/BENCHMARK_RESULT_FIELDS.md @@ -0,0 +1,42 @@ +# Benchmark Result Fields + +## Top-level Structure + +- `summary`: Aggregated metrics. +- `raw_result`: Runtime metadata and error samples. + +## summary Fields + +- `total_requests`: Total requests during the benchmark phase (excluding warmup). +- `success_requests`: Number of successful requests (HTTP success and parsing success). +- `failed_requests`: Number of failed requests. +- `success_rate`: `success_requests / total_requests`. +- `avg_latency_ms`: Average total request latency in milliseconds. +- `p95_latency_ms`: P95 total latency in milliseconds. +- `p99_latency_ms`: P99 total latency in milliseconds. +- `ttft_ms`: Average time-to-first-token in milliseconds, meaningful only for streaming requests. +- `ttft_available`: Whether TTFT was successfully sampled. +- `prompt_tokens`: Total input token count. +- `completion_tokens`: Total output token count. +- `total_tokens`: Total token count. +- `tpm`: Token throughput per minute (`total_tokens * 60 / duration_seconds`). +- `rps`: Requests per second (`total_requests / duration_seconds`). + +## raw_result Fields + +- `url`: Final request URL. +- `duration_seconds`: Actual duration of the benchmark phase in seconds. +- `warmup_requests`: Number of warmup requests. +- `concurrency`: Number of concurrent workers. +- `enable_stream`: Whether streaming mode was enabled. +- `sample_count`: Number of sampled requests (usually equals `total_requests`). +- `ttft_available`: Whether TTFT samples are available. +- `ttft_sample_count`: Number of TTFT samples. +- `errors`: Error summary for failed samples (up to the first 20 entries). + +## Streaming (SSE) Notes + +- Streaming mode parses `data: ...` event lines. +- The arrival time of the first valid `data` event is recorded as the per-request TTFT. +- If the event includes a `usage` field, it is used for token accounting; otherwise, the request tokens are counted as 0. +- A `data: [DONE]` line is treated as the stream end marker. diff --git a/builder/deploy/benchmark/reports/live_benchmark/BENCHMARK_USAGE.md b/builder/deploy/benchmark/reports/live_benchmark/BENCHMARK_USAGE.md new file mode 100644 index 000000000..aac4522e3 --- /dev/null +++ b/builder/deploy/benchmark/reports/live_benchmark/BENCHMARK_USAGE.md @@ -0,0 +1,36 @@ +# Deploy Benchmark Usage + +## 1. Input File + +- Input files are located in `builder/deploy/benchmark/reports/live_benchmark/inputs/`. +- Recommended for this round: + - `sample.json`: SSE streaming scenario, validates `ttft_ms`. + +## 2. Running with Python + +```bash +NO_PROXY=localhost,127.0.0.1 no_proxy=localhost,127.0.0.1 \ +python3 builder/deploy/benchmark/scripts/deploy_benchmark.py \ + < builder/deploy/benchmark/reports/live_benchmark/inputs/sample.json \ + > builder/deploy/benchmark/reports/live_benchmark/python_results_round2/local_8094_stream_python_result.json \ + 2> builder/deploy/benchmark/reports/live_benchmark/python_results_round2/local_8094_stream_python_stderr.txt +``` + +## 3. Running with Go (live benchmark UT) + +```bash +RUN_LIVE_BENCHMARK=1 \ +LIVE_BENCHMARK_OUTPUT_SUBDIR=go_results_round2 \ +go test ./builder/deploy/benchmark -tags live_benchmark -run TestRunner_RunLiveBenchmarkCases -v +``` + +- `RUN_LIVE_BENCHMARK=1`: Explicitly enables real URL benchmark testing. +- `LIVE_BENCHMARK_OUTPUT_SUBDIR`: Specifies the output subdirectory name for preserving results across multiple test rounds. + +## 4. Output Directory + +- Python results: `builder/deploy/benchmark/reports/live_benchmark/python_results_round2/` +- Go results: `builder/deploy/benchmark/reports/live_benchmark/go_results_round2/` +- Comparison reports: + - `benchmark_comparison_round2.md` + - `benchmark_comparison_round2.json` diff --git a/builder/deploy/benchmark/reports/live_benchmark/inputs/sample.json b/builder/deploy/benchmark/reports/live_benchmark/inputs/sample.json new file mode 100644 index 000000000..1a57da1f7 --- /dev/null +++ b/builder/deploy/benchmark/reports/live_benchmark/inputs/sample.json @@ -0,0 +1,29 @@ +{ + "endpoint": "https://localhost:8094", + "request_template": { + "api_path": "/v1/chat/completions", + "method": "POST", + "headers": { + "Content-Type": "application/json", + "Authorization": "Bearer your_token" + }, + "request_body": { + "model": "Qwen/Qwen3-0.6B", + "messages": [ + { + "role": "user", + "content": "Hello, please introduce yourself in one paragraph." + } + ], + "stream": true, + "temperature": 0 + } + }, + "config": { + "warmup_requests": 1, + "duration_seconds": 20, + "concurrency": 10, + "timeout_seconds": 60, + "enable_stream": true + } +} diff --git a/builder/deploy/benchmark/scripts/deploy_benchmark.py b/builder/deploy/benchmark/scripts/deploy_benchmark.py new file mode 100644 index 000000000..976665ce7 --- /dev/null +++ b/builder/deploy/benchmark/scripts/deploy_benchmark.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python3 +""" +Deploy Benchmark Script - Fallback for TPM/RPM testing + +This script provides a manual fallback for benchmark testing when +the automated Go-based system is unavailable. + +Usage: + python deploy_benchmark.py < input.json > output.json + +Input JSON format: +{ + "endpoint": "https://...", + "request_template": { + "api_path": "/v1/chat/completions", + "method": "POST", + "headers": {"Content-Type": "application/json"}, + "request_body": {"model": "...", "messages": [...]} + }, + "config": { + "warmup_requests": 2, + "duration_seconds": 60, + "concurrency": 4, + "timeout_seconds": 30, + "enable_stream": false + } +} + +Output JSON format: +{ + "summary": { + "total_requests": 120, + "success_requests": 120, + "failed_requests": 0, + "success_rate": 1.0, + "avg_latency_ms": 820.5, + "p95_latency_ms": 1350.0, + "p99_latency_ms": 1800.0, + "ttft_ms": 0.0, + "ttft_available": false, + "prompt_tokens": 18000, + "completion_tokens": 42000, + "total_tokens": 60000, + "tpm": 60000.0, + "rps": 2.0 + }, + "raw_result": {...} +} +""" + +import json +import math +import sys +import time +import urllib.error +import urllib.parse +import urllib.request +from concurrent.futures import ThreadPoolExecutor, as_completed + + +def build_url(endpoint: str, api_path: str) -> str: + endpoint = (endpoint or "").rstrip("/") + api_path = api_path or "" + if not api_path.startswith("/"): + api_path = "/" + api_path + return endpoint + api_path + + +def extract_usage(payload: dict) -> dict: + usage = payload.get("usage") or {} + return { + "prompt_tokens": int(usage.get("prompt_tokens", 0) or 0), + "completion_tokens": int(usage.get("completion_tokens", 0) or 0), + "total_tokens": int(usage.get("total_tokens", 0) or 0), + } + + +def percentile(values, ratio: float) -> float: + if not values: + return 0.0 + ordered = sorted(values) + index = int(math.ceil(len(ordered) * ratio)) - 1 + index = max(0, min(index, len(ordered) - 1)) + return float(ordered[index]) + + +def request_once(url: str, method: str, headers: dict, body: dict, timeout_seconds: int, enable_stream: bool) -> dict: + body_bytes = json.dumps(body).encode("utf-8") + req = urllib.request.Request(url=url, data=body_bytes, headers=headers, method=method) + start = time.perf_counter() + try: + with urllib.request.urlopen(req, timeout=timeout_seconds) as resp: + status_code = int(getattr(resp, "status", 200)) + if status_code < 200 or status_code >= 300: + response_bytes = resp.read() + latency_ms = (time.perf_counter() - start) * 1000.0 + return { + "ok": False, + "status_code": status_code, + "latency_ms": latency_ms, + "ttft_ms": 0.0, + "ttft_available": False, + "error": response_bytes.decode("utf-8", errors="ignore"), + "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}, + } + + if enable_stream: + return parse_streaming_response(resp, start, status_code) + + response_bytes = resp.read() + latency_ms = (time.perf_counter() - start) * 1000.0 + payload = json.loads(response_bytes.decode("utf-8") or "{}") + usage = extract_usage(payload) + # For non-streaming requests, TTFT is not applicable + return { + "ok": True, + "status_code": status_code, + "latency_ms": latency_ms, + "ttft_ms": 0.0, + "ttft_available": False, + "usage": usage, + } + except urllib.error.HTTPError as exc: + latency_ms = (time.perf_counter() - start) * 1000.0 + return { + "ok": False, + "status_code": exc.code, + "latency_ms": latency_ms, + "ttft_ms": 0.0, + "ttft_available": False, + "error": str(exc), + "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}, + } + except Exception as exc: + latency_ms = (time.perf_counter() - start) * 1000.0 + return { + "ok": False, + "status_code": 0, + "latency_ms": latency_ms, + "ttft_ms": 0.0, + "ttft_available": False, + "error": str(exc), + "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}, + } + + +def parse_sse_data(line: str) -> tuple[str, bool]: + prefix = "data:" + if not line.startswith(prefix): + return "", False + return line[len(prefix):].strip(), True + + +def has_usage(usage: dict) -> bool: + return any((usage.get("prompt_tokens", 0), usage.get("completion_tokens", 0), usage.get("total_tokens", 0))) + + +def parse_streaming_response(resp, start_time: float, status_code: int) -> dict: + ttft_ms = 0.0 + ttft_available = False + usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} + fallback_chunks = [] + + while True: + line = resp.readline() + if not line: + break + decoded = line.decode("utf-8", errors="ignore").strip() + if not decoded: + continue + fallback_chunks.append(decoded) + + event_data, is_sse = parse_sse_data(decoded) + if is_sse: + if not event_data or event_data == "[DONE]": + continue + if not ttft_available: + ttft_available = True + ttft_ms = (time.perf_counter() - start_time) * 1000.0 + try: + payload = json.loads(event_data) + candidate = extract_usage(payload) + if has_usage(candidate): + usage = candidate + except json.JSONDecodeError: + continue + continue + + try: + payload = json.loads(decoded) + candidate = extract_usage(payload) + if has_usage(candidate): + usage = candidate + except json.JSONDecodeError: + continue + + if not ttft_available and fallback_chunks: + try: + payload = json.loads("".join(fallback_chunks)) + candidate = extract_usage(payload) + if has_usage(candidate): + usage = candidate + except json.JSONDecodeError: + pass + + latency_ms = (time.perf_counter() - start_time) * 1000.0 + return { + "ok": True, + "status_code": status_code, + "latency_ms": latency_ms, + "ttft_ms": ttft_ms, + "ttft_available": ttft_available, + "usage": usage, + } + + +def run_benchmark(input_data: dict) -> dict: + endpoint = input_data["endpoint"] + request_template = input_data["request_template"] + config = input_data["config"] + url = build_url(endpoint, request_template["api_path"]) + method = request_template.get("method", "POST") + headers = request_template.get("headers", {"Content-Type": "application/json"}) + body = request_template.get("request_body", {}) + timeout_seconds = int(config.get("timeout_seconds", 30)) + warmup_requests = int(config.get("warmup_requests", 2)) + duration_seconds = int(config.get("duration_seconds", 60)) + concurrency = int(config.get("concurrency", 2)) + enable_stream = bool(config.get("enable_stream", False)) + + # Warmup phase - sequential requests + for _ in range(max(0, warmup_requests)): + result = request_once(url, method, headers, body, timeout_seconds, enable_stream) + if not result["ok"]: + first_error = result.get("error", "warmup failed") + raise RuntimeError(f"warmup failed: {first_error}") + + # Benchmark phase - fixed number of worker threads, each sending requests in loop + started_at = time.time() + deadline = started_at + max(duration_seconds, 1) + results = [] + + def worker_loop(): + worker_results = [] + while time.time() < deadline: + result = request_once(url, method, headers, body, timeout_seconds, enable_stream) + worker_results.append(result) + return worker_results + + with ThreadPoolExecutor(max_workers=concurrency) as executor: + futures = [executor.submit(worker_loop) for _ in range(concurrency)] + for future in as_completed(futures): + try: + worker_results = future.result() + results.extend(worker_results) + except Exception as e: + results.append({ + "ok": False, + "status_code": 0, + "latency_ms": 0, + "ttft_ms": 0, + "ttft_available": False, + "error": str(e), + "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}, + }) + + finished_at = time.time() + duration = max(finished_at - started_at, 1e-6) + latencies = [item["latency_ms"] for item in results] + ttfts = [item["ttft_ms"] for item in results if item.get("ttft_available", False)] + ttft_available_count = len(ttfts) + success_requests = sum(1 for item in results if item["ok"]) + failed_requests = len(results) - success_requests + prompt_tokens = sum(item["usage"]["prompt_tokens"] for item in results) + completion_tokens = sum(item["usage"]["completion_tokens"] for item in results) + total_tokens = sum(item["usage"]["total_tokens"] for item in results) + + # TTFT is only meaningful for streaming requests + ttft_ms = 0.0 + ttft_available = enable_stream and ttft_available_count > 0 + if ttft_available and ttfts: + ttft_ms = sum(ttfts) / len(ttfts) + + summary = { + "total_requests": len(results), + "success_requests": success_requests, + "failed_requests": failed_requests, + "success_rate": float(success_requests) / float(len(results)) if results else 0.0, + "avg_latency_ms": float(sum(latencies) / len(latencies)) if latencies else 0.0, + "p95_latency_ms": percentile(latencies, 0.95), + "p99_latency_ms": percentile(latencies, 0.99), + "ttft_ms": ttft_ms, + "ttft_available": ttft_available, + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": total_tokens, + "tpm": float(total_tokens) * 60.0 / duration if duration > 0 else 0.0, + "rps": float(len(results)) / duration if duration > 0 else 0.0, + } + + return { + "summary": summary, + "raw_result": { + "url": url, + "duration_seconds": duration, + "warmup_requests": warmup_requests, + "concurrency": concurrency, + "enable_stream": enable_stream, + "sample_count": len(results), + "ttft_available": ttft_available, + "ttft_sample_count": ttft_available_count, + "errors": [item.get("error", "") for item in results if not item["ok"]][:20], + }, + } + + +def main() -> int: + try: + input_data = json.load(sys.stdin) + result = run_benchmark(input_data) + sys.stdout.write(json.dumps(result)) + return 0 + except Exception as exc: + sys.stderr.write(str(exc)) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/builder/deploy/benchmark/scripts/test_deploy_benchmark.py b/builder/deploy/benchmark/scripts/test_deploy_benchmark.py new file mode 100644 index 000000000..9167d8c2d --- /dev/null +++ b/builder/deploy/benchmark/scripts/test_deploy_benchmark.py @@ -0,0 +1,93 @@ +import os +import sys +import unittest +from unittest import mock + +sys.path.insert(0, os.path.dirname(__file__)) + +import deploy_benchmark + + +class DeployBenchmarkScriptTest(unittest.TestCase): + def test_build_url(self): + self.assertEqual( + "http://localhost:8000/v1/chat/completions", + deploy_benchmark.build_url("http://localhost:8000/", "/v1/chat/completions"), + ) + + def test_extract_usage(self): + usage = deploy_benchmark.extract_usage( + {"usage": {"prompt_tokens": 10, "completion_tokens": 20, "total_tokens": 30}} + ) + self.assertEqual( + {"prompt_tokens": 10, "completion_tokens": 20, "total_tokens": 30}, + usage, + ) + + def test_percentile(self): + self.assertEqual(95.0, deploy_benchmark.percentile([10, 95, 50], 0.95)) + + def test_request_once_success(self): + """Test request_once returns correct structure for successful request""" + # This is a basic structure test, actual HTTP testing would need a mock server + result = { + "ok": True, + "status_code": 200, + "latency_ms": 100.0, + "ttft_ms": 0.0, + "ttft_available": False, + "usage": {"prompt_tokens": 10, "completion_tokens": 20, "total_tokens": 30}, + } + self.assertIn("ttft_available", result) + self.assertFalse(result["ttft_available"]) + + def test_parse_sse_data(self): + self.assertEqual(("{}", True), deploy_benchmark.parse_sse_data("data: {}")) + self.assertEqual(("", False), deploy_benchmark.parse_sse_data("event: message")) + + @mock.patch("deploy_benchmark.urllib.request.urlopen") + def test_request_once_stream_sse_success(self, mock_urlopen): + class FakeResponse: + status = 200 + + def __init__(self): + self.lines = [ + b'data: {"choices":[{"delta":{"content":"hi"}}]}\n', + b'data: {"usage":{"prompt_tokens":6,"completion_tokens":1,"total_tokens":7}}\n', + b"data: [DONE]\n", + ] + self.index = 0 + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return False + + def readline(self): + if self.index >= len(self.lines): + return b"" + line = self.lines[self.index] + self.index += 1 + return line + + def read(self): + return b"" + + mock_urlopen.return_value = FakeResponse() + result = deploy_benchmark.request_once( + url="http://localhost:8094/v1/chat/completions", + method="POST", + headers={"Content-Type": "application/json"}, + body={"stream": True}, + timeout_seconds=5, + enable_stream=True, + ) + self.assertTrue(result["ok"]) + self.assertTrue(result["ttft_available"]) + self.assertGreater(result["ttft_ms"], 0.0) + self.assertEqual(7, result["usage"]["total_tokens"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/builder/deploy/common/status.go b/builder/deploy/common/status.go index 52f18b06d..3b05598e9 100644 --- a/builder/deploy/common/status.go +++ b/builder/deploy/common/status.go @@ -46,4 +46,5 @@ const ( Deleted = 27 // end user trigger delete action for deploy ResourceUnhealthy = 28 // cluster resource unhealthy + BenchmarkRunning = 29 // deploy is running benchmark test ) diff --git a/builder/deploy/deployer.go b/builder/deploy/deployer.go index d25858c45..644d4839f 100644 --- a/builder/deploy/deployer.go +++ b/builder/deploy/deployer.go @@ -27,6 +27,14 @@ type DeployWorkflowFunc func(buildTask, runTask *database.DeployTask) var DeployWorkflow DeployWorkflowFunc +type DeployRunningCallbackFunc func(ctx context.Context, deploy *database.Deploy, sourceDeployTaskID int64) error + +var DeployRunningCallback DeployRunningCallbackFunc + +type LaunchDeployBenchmarkFunc func(ctx context.Context, req types.DeployBenchmarkLaunchReq) (*types.DeployBenchmarkTriggerResp, error) + +var LaunchDeployBenchmark LaunchDeployBenchmarkFunc + type Deployer interface { Deploy(ctx context.Context, dr types.DeployRequest) (deployID int64, err error) Status(ctx context.Context, dr types.DeployRequest, needDetails bool) (srvName string, status int, instances []types.Instance, err error) diff --git a/builder/store/database/deploy_benchmark_task.go b/builder/store/database/deploy_benchmark_task.go new file mode 100644 index 000000000..7d222ebcb --- /dev/null +++ b/builder/store/database/deploy_benchmark_task.go @@ -0,0 +1,226 @@ +package database + +import ( + "context" + "fmt" + "strings" + "time" + + "opencsg.com/csghub-server/common/errorx" + "opencsg.com/csghub-server/common/types" +) + +type DeployBenchmarkTask struct { + ID int64 `bun:",pk,autoincrement" json:"id"` + DeployID int64 `bun:",notnull" json:"deploy_id"` + SourceDeployTaskID int64 `bun:",nullzero" json:"source_deploy_task_id"` + WorkflowID string `bun:",nullzero" json:"workflow_id"` + TriggerSource string `bun:",notnull" json:"trigger_source"` + TriggerKey string `bun:",notnull" json:"trigger_key"` + BenchmarkType string `bun:",notnull" json:"benchmark_type"` + Status string `bun:",notnull" json:"status"` + RuntimeFramework string `bun:",notnull" json:"runtime_framework"` + Task string `bun:",notnull" json:"task"` + Endpoint string `bun:",notnull" json:"endpoint"` + SvcName string `bun:",notnull" json:"svc_name"` + ClusterID string `bun:",notnull" json:"cluster_id"` + OwnerNamespace string `bun:",notnull" json:"owner_namespace"` + UserUUID string `bun:",notnull" json:"user_uuid"` + Hardware map[string]any `bun:"type:jsonb,notnull" json:"hardware"` + RequestTemplate types.DeployBenchmarkTemplate `bun:"type:jsonb,notnull" json:"request_template"` + BenchmarkConfig types.DeployBenchmarkConfig `bun:"type:jsonb,notnull" json:"benchmark_config"` + ResultSummary types.DeployBenchmarkSummary `bun:"type:jsonb,notnull" json:"result_summary"` + RawResult map[string]any `bun:"type:jsonb,notnull" json:"raw_result"` + ErrorMessage string `bun:",type:text,nullzero" json:"error_message"` + StartedAt *time.Time `bun:",nullzero" json:"started_at,omitempty"` + FinishedAt *time.Time `bun:",nullzero" json:"finished_at,omitempty"` + times +} + +type DeployBenchmarkTaskStore interface { + Create(ctx context.Context, task *DeployBenchmarkTask) (*DeployBenchmarkTask, error) + Update(ctx context.Context, task *DeployBenchmarkTask) error + FindByID(ctx context.Context, id int64) (*DeployBenchmarkTask, error) + FindLatestByDeployID(ctx context.Context, deployID int64) (*DeployBenchmarkTask, error) + FindByTrigger(ctx context.Context, deployID int64, triggerSource, triggerKey string) (*DeployBenchmarkTask, error) + ListByDeployID(ctx context.Context, deployID int64, per, page int) ([]DeployBenchmarkTask, int, error) +} + +type deployBenchmarkTaskStoreImpl struct { + db *DB +} + +func NewDeployBenchmarkTaskStore() DeployBenchmarkTaskStore { + return &deployBenchmarkTaskStoreImpl{db: defaultDB} +} + +func NewDeployBenchmarkTaskStoreWithDB(db *DB) DeployBenchmarkTaskStore { + return &deployBenchmarkTaskStoreImpl{db: db} +} + +func (s *deployBenchmarkTaskStoreImpl) Create(ctx context.Context, task *DeployBenchmarkTask) (*DeployBenchmarkTask, error) { + res, err := s.db.Core.NewInsert().Model(task).Exec(ctx) + if err = assertAffectedOneRow(res, err); err != nil { + return nil, errorx.HandleDBError(err, map[string]any{ + "deploy_id": task.DeployID, + }) + } + return task, nil +} + +func (s *deployBenchmarkTaskStoreImpl) Update(ctx context.Context, task *DeployBenchmarkTask) error { + res, err := s.db.Core.NewUpdate().Model(task).Where("id = ?", task.ID).Exec(ctx) + if err = assertAffectedOneRow(res, err); err != nil { + return errorx.HandleDBError(err, map[string]any{ + "benchmark_task_id": task.ID, + }) + } + return nil +} + +func (s *deployBenchmarkTaskStoreImpl) FindByID(ctx context.Context, id int64) (*DeployBenchmarkTask, error) { + task := &DeployBenchmarkTask{} + err := s.db.Core.NewSelect().Model(task).Where("id = ?", id).Scan(ctx) + if err != nil { + return nil, errorx.HandleDBError(err, map[string]any{ + "benchmark_task_id": id, + }) + } + return task, nil +} + +func (s *deployBenchmarkTaskStoreImpl) FindLatestByDeployID(ctx context.Context, deployID int64) (*DeployBenchmarkTask, error) { + task := &DeployBenchmarkTask{} + err := s.db.Core.NewSelect(). + Model(task). + Where("deploy_id = ?", deployID). + Order("created_at DESC"). + Limit(1). + Scan(ctx) + if err != nil { + return nil, errorx.HandleDBError(err, map[string]any{ + "deploy_id": deployID, + }) + } + return task, nil +} + +func (s *deployBenchmarkTaskStoreImpl) FindByTrigger(ctx context.Context, deployID int64, triggerSource, triggerKey string) (*DeployBenchmarkTask, error) { + task := &DeployBenchmarkTask{} + err := s.db.Core.NewSelect(). + Model(task). + Where("deploy_id = ?", deployID). + Where("trigger_source = ?", triggerSource). + Where("trigger_key = ?", triggerKey). + Limit(1). + Scan(ctx) + if err != nil { + return nil, errorx.HandleDBError(err, map[string]any{ + "deploy_id": deployID, + "trigger_source": triggerSource, + "trigger_key": triggerKey, + }) + } + return task, nil +} + +func (s *deployBenchmarkTaskStoreImpl) ListByDeployID(ctx context.Context, deployID int64, per, page int) ([]DeployBenchmarkTask, int, error) { + var tasks []DeployBenchmarkTask + query := s.db.Core.NewSelect(). + Model(&tasks). + Where("deploy_id = ?", deployID) + + total, err := query.Count(ctx) + if err != nil { + return nil, 0, errorx.HandleDBError(err, map[string]any{"deploy_id": deployID}) + } + + if per > 0 && page > 0 { + query = query.Limit(per).Offset((page - 1) * per) + } + + err = query.Order("created_at DESC").Scan(ctx) + if err != nil { + return nil, 0, errorx.HandleDBError(err, map[string]any{"deploy_id": deployID}) + } + return tasks, total, nil +} + +func DeployBenchmarkTaskToResp(task *DeployBenchmarkTask) (*types.DeployBenchmarkResp, error) { + if task == nil { + return nil, fmt.Errorf("deploy benchmark task is nil") + } + + return &types.DeployBenchmarkResp{ + ID: task.ID, + DeployID: task.DeployID, + SourceDeployTaskID: task.SourceDeployTaskID, + WorkflowID: task.WorkflowID, + TriggerSource: task.TriggerSource, + Status: task.Status, + BenchmarkType: task.BenchmarkType, + RuntimeFramework: task.RuntimeFramework, + Task: task.Task, + Endpoint: task.Endpoint, + Summary: task.ResultSummary, + BenchmarkConfig: task.BenchmarkConfig, + RequestTemplate: task.RequestTemplate, + ErrorMessage: task.ErrorMessage, + StartedAt: task.StartedAt, + FinishedAt: task.FinishedAt, + CreatedAt: task.CreatedAt, + UpdatedAt: task.UpdatedAt, + }, nil +} + +// ShouldSkipBenchmark checks if benchmark should be skipped for the given deploy +// Returns (shouldSkip, reason) where reason explains why it was skipped +func ShouldSkipBenchmark(deployReq types.DeployRequest) (bool, string) { + if strings.TrimSpace(deployReq.Endpoint) == "" { + return true, "endpoint is empty" + } + + if !types.IsDeployBenchmarkTaskSupported(types.PipelineTask(deployReq.Task)) { + return true, fmt.Sprintf("unsupported task type: %s", deployReq.Task) + } + + return false, "" +} + +// CreateSkippedBenchmarkTask creates a benchmark task with skipped status +func CreateSkippedBenchmarkTask(ctx context.Context, store DeployBenchmarkTaskStore, req types.DeployBenchmarkLaunchReq, reason string) (*DeployBenchmarkTask, error) { + now := time.Now() + benchmarkType, ok := types.ResolveDeployBenchmarkType(types.PipelineTask(req.Deploy.Task)) + if !ok { + benchmarkType = "" + } + task := &DeployBenchmarkTask{ + DeployID: req.Deploy.DeployID, + SourceDeployTaskID: req.SourceDeployTaskID, + TriggerSource: req.TriggerSource, + TriggerKey: req.TriggerKey, + BenchmarkType: benchmarkType, + Status: types.DeployBenchmarkStatusSkipped, + RuntimeFramework: req.Deploy.RuntimeFramework, + Task: req.Deploy.Task, + Endpoint: req.Deploy.Endpoint, + SvcName: req.Deploy.SvcName, + ClusterID: req.Deploy.ClusterID, + OwnerNamespace: req.Deploy.OwnerNamespace, + UserUUID: req.Deploy.UserUUID, + Hardware: map[string]any{}, + RequestTemplate: types.DeployBenchmarkTemplate{}, + BenchmarkConfig: types.DeployBenchmarkConfig{}, + ResultSummary: types.DeployBenchmarkSummary{}, + RawResult: map[string]any{}, + ErrorMessage: reason, + StartedAt: &now, + FinishedAt: &now, + } + + created, err := store.Create(ctx, task) + if err != nil { + return nil, fmt.Errorf("create skipped benchmark task: %w", err) + } + return created, nil +} diff --git a/builder/store/database/deploy_benchmark_task_test.go b/builder/store/database/deploy_benchmark_task_test.go new file mode 100644 index 000000000..b3f5de7fe --- /dev/null +++ b/builder/store/database/deploy_benchmark_task_test.go @@ -0,0 +1,44 @@ +package database + +import ( + "testing" + + "github.com/stretchr/testify/require" + "opencsg.com/csghub-server/common/types" +) + +func TestDeployBenchmarkTaskToResp(t *testing.T) { + resp, err := DeployBenchmarkTaskToResp(&DeployBenchmarkTask{ + ID: 1, + DeployID: 2, + TriggerSource: types.DeployBenchmarkTriggerSourceManual, + BenchmarkType: types.DeployBenchmarkTypeOpenAIChatCompletions, + Status: types.DeployBenchmarkStatusSuccess, + RuntimeFramework: "vllm", + Task: "text-generation", + Endpoint: "http://example.com", + ResultSummary: types.DeployBenchmarkSummary{TPM: 12345, SuccessRate: 1}, + BenchmarkConfig: types.DeployBenchmarkConfig{Concurrency: 2, DurationSeconds: 60}, + RequestTemplate: types.DeployBenchmarkTemplate{APIPath: "/v1/chat/completions", Method: "POST"}, + }) + require.NoError(t, err) + require.Equal(t, int64(1), resp.ID) + require.Equal(t, 12345.0, resp.Summary.TPM) + require.Equal(t, 2, resp.BenchmarkConfig.Concurrency) +} + +func TestShouldSkipBenchmark(t *testing.T) { + shouldSkip, reason := ShouldSkipBenchmark(types.DeployRequest{ + Endpoint: "http://example.com", + Task: string(types.Text2Image), + }) + require.False(t, shouldSkip) + require.Empty(t, reason) + + shouldSkip, reason = ShouldSkipBenchmark(types.DeployRequest{ + Endpoint: "http://example.com", + Task: string(types.Image2Video), + }) + require.True(t, shouldSkip) + require.Contains(t, reason, "unsupported task type") +} diff --git a/builder/store/database/migrations/20260415090000_create_table_deploy_benchmark_tasks.go b/builder/store/database/migrations/20260415090000_create_table_deploy_benchmark_tasks.go new file mode 100644 index 000000000..5527df4ee --- /dev/null +++ b/builder/store/database/migrations/20260415090000_create_table_deploy_benchmark_tasks.go @@ -0,0 +1,70 @@ +package migrations + +import ( + "context" + "time" + + "github.com/uptrace/bun" +) + +type DeployBenchmarkTask struct { + bun.BaseModel `bun:"table:deploy_benchmark_tasks"` + ID int64 `bun:",pk,autoincrement" json:"id"` + DeployID int64 `bun:",notnull" json:"deploy_id"` + SourceDeployTaskID int64 `bun:",nullzero" json:"source_deploy_task_id"` + WorkflowID string `bun:",nullzero" json:"workflow_id"` + TriggerSource string `bun:",notnull" json:"trigger_source"` + TriggerKey string `bun:",notnull" json:"trigger_key"` + BenchmarkType string `bun:",notnull" json:"benchmark_type"` + Status string `bun:",notnull" json:"status"` + RuntimeFramework string `bun:",notnull" json:"runtime_framework"` + Task string `bun:",notnull" json:"task"` + Endpoint string `bun:",notnull" json:"endpoint"` + SvcName string `bun:",notnull" json:"svc_name"` + ClusterID string `bun:",notnull" json:"cluster_id"` + OwnerNamespace string `bun:",notnull" json:"owner_namespace"` + UserUUID string `bun:",notnull" json:"user_uuid"` + Hardware string `bun:"type:jsonb,notnull,default:'{}'" json:"hardware"` + RequestTemplate string `bun:"type:jsonb,notnull,default:'{}'" json:"request_template"` + BenchmarkConfig string `bun:"type:jsonb,notnull,default:'{}'" json:"benchmark_config"` + ResultSummary string `bun:"type:jsonb,notnull,default:'{}'" json:"result_summary"` + RawResult string `bun:"type:jsonb,notnull,default:'{}'" json:"raw_result"` + ErrorMessage string `bun:",type:text,nullzero" json:"error_message"` + StartedAt *time.Time `bun:",nullzero" json:"started_at,omitempty"` + FinishedAt *time.Time `bun:",nullzero" json:"finished_at,omitempty"` + times +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + if err := createTables(ctx, db, &DeployBenchmarkTask{}); err != nil { + return err + } + + indexes := []struct { + name string + columns []string + unique bool + }{ + {name: "idx_deploy_benchmark_tasks_deploy_id_created_at", columns: []string{"deploy_id", "created_at"}}, + {name: "idx_deploy_benchmark_tasks_status_created_at", columns: []string{"status", "created_at"}}, + {name: "idx_deploy_benchmark_tasks_workflow_id", columns: []string{"workflow_id"}}, + {name: "idx_deploy_benchmark_tasks_user_uuid_created_at", columns: []string{"user_uuid", "created_at"}}, + {name: "uq_deploy_benchmark_tasks_trigger", columns: []string{"deploy_id", "trigger_source", "trigger_key"}, unique: true}, + } + + for _, idx := range indexes { + query := db.NewCreateIndex().Model(&DeployBenchmarkTask{}).Index(idx.name).Column(idx.columns...).IfNotExists() + if idx.unique { + query = query.Unique() + } + if _, err := query.Exec(ctx); err != nil { + return err + } + } + + return nil + }, func(ctx context.Context, db *bun.DB) error { + return dropTables(ctx, db, &DeployBenchmarkTask{}) + }) +} diff --git a/cmd/csghub-server/cmd/temporal-worker/launch.go b/cmd/csghub-server/cmd/temporal-worker/launch.go index c4fba25ee..ed2df52c8 100644 --- a/cmd/csghub-server/cmd/temporal-worker/launch.go +++ b/cmd/csghub-server/cmd/temporal-worker/launch.go @@ -30,7 +30,7 @@ import ( userworkflow "opencsg.com/csghub-server/user/workflow" ) -var initVersionWorker = func(cfg *config.Config) error { +var initVersionWorker = func(cfg *config.Config, temporalClient temporal.Client) error { return nil } @@ -143,7 +143,7 @@ var cmdLaunch = &cobra.Command{ return fmt.Errorf("failed to start deploy worker, error: %w", err) } - if err := initVersionWorker(cfg); err != nil { + if err := initVersionWorker(cfg, temporalClient); err != nil { return fmt.Errorf("failed to start user worker, error: %w", err) } diff --git a/common/types/deploy_benchmark.go b/common/types/deploy_benchmark.go new file mode 100644 index 000000000..5b903c423 --- /dev/null +++ b/common/types/deploy_benchmark.go @@ -0,0 +1,183 @@ +package types + +import "time" + +const ( + DeployBenchmarkStatusPending = "pending" + DeployBenchmarkStatusRunning = "running" + DeployBenchmarkStatusSuccess = "success" + DeployBenchmarkStatusFailed = "failed" + DeployBenchmarkStatusSkipped = "skipped" + DeployBenchmarkStatusCancelled = "cancelled" +) + +const ( + DeployBenchmarkTriggerSourceRunningWebhook = "runner_webhook" + DeployBenchmarkTriggerSourceManual = "manual" + DeployBenchmarkTypeOpenAIChatCompletions = "openai_chat_completions" + DeployBenchmarkTypeOpenAIEmbeddings = "openai_embeddings" + DeployBenchmarkTypeOpenAIImageGeneration = "openai_image_generation" + DeployBenchmarkTypeOpenAIVideoGeneration = "openai_video_generation" +) + +const DeployBenchmarkMaxConcurrency = 100000 + +type DeployBenchmarkTemplate struct { + APIPath string `json:"api_path"` + Method string `json:"method"` + Headers map[string]string `json:"headers"` + RequestBody map[string]any `json:"request_body"` + RequestBodyVariants []map[string]any `json:"request_body_variants,omitempty"` + Stream bool `json:"stream"` + ExpectedTask string `json:"expected_task"` +} + +type DeployBenchmarkConfig struct { + WarmupRequests int `json:"warmup_requests"` + DurationSeconds int `json:"duration_seconds"` + Concurrency int `json:"concurrency"` + MaxConcurrency int `json:"max_concurrency"` + TimeoutSeconds int `json:"timeout_seconds"` + SuccessRateMin float64 `json:"success_rate_min"` + P95LatencyMsMax float64 `json:"p95_latency_ms_max"` // max allowed p95 latency in milliseconds, 0 means no limit + TPMTarget float64 `json:"tpm_target"` + EnableStream bool `json:"enable_stream"` + SampleMessage string `json:"sample_message"` +} + +type DeployBenchmarkSummary struct { + TotalRequests int64 `json:"total_requests"` + SuccessRequests int64 `json:"success_requests"` + FailedRequests int64 `json:"failed_requests"` + SuccessRate float64 `json:"success_rate"` + AvgLatencyMs float64 `json:"avg_latency_ms"` + P95LatencyMs float64 `json:"p95_latency_ms"` + P99LatencyMs float64 `json:"p99_latency_ms"` + TTFTMs float64 `json:"ttft_ms"` // Time To First Token, only meaningful for streaming requests + TTFTAvailable bool `json:"ttft_available"` // true if TTFT was measured from streaming response + PromptTokens int64 `json:"prompt_tokens"` + CompletionTokens int64 `json:"completion_tokens"` + TotalTokens int64 `json:"total_tokens"` + TPM float64 `json:"tpm"` // Tokens Per Minute + RPS float64 `json:"rps"` // Requests Per Second +} + +type DeployBenchmarkTriggerReq struct { + Concurrency int `json:"concurrency"` + MaxConcurrency int `json:"max_concurrency"` + DurationSeconds int `json:"duration_seconds"` + TimeoutSeconds int `json:"timeout_seconds"` + SuccessRateMin float64 `json:"success_rate_min"` + P95LatencyMsMax float64 `json:"p95_latency_ms_max"` + TPMTarget float64 `json:"tpm_target"` + SampleMessage string `json:"sample_message"` + EnableStream *bool `json:"enable_stream"` +} + +type DeployBenchmarkTriggerResp struct { + BenchmarkTaskID int64 `json:"benchmark_task_id"` + WorkflowID string `json:"workflow_id"` +} + +type DeployBenchmarkReq struct { + DeployActReq + BenchmarkID int64 `json:"benchmark_id"` + PageOpts +} + +type DeployBenchmarkResp struct { + ID int64 `json:"id"` + DeployID int64 `json:"deploy_id"` + SourceDeployTaskID int64 `json:"source_deploy_task_id"` + WorkflowID string `json:"workflow_id"` + TriggerSource string `json:"trigger_source"` + Status string `json:"status"` + BenchmarkType string `json:"benchmark_type"` + RuntimeFramework string `json:"runtime_framework"` + Task string `json:"task"` + Endpoint string `json:"endpoint"` + Summary DeployBenchmarkSummary `json:"summary"` + BenchmarkConfig DeployBenchmarkConfig `json:"benchmark_config"` + RequestTemplate DeployBenchmarkTemplate `json:"request_template"` + ErrorMessage string `json:"error_message"` + StartedAt *time.Time `json:"started_at,omitempty"` + FinishedAt *time.Time `json:"finished_at,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +type DeployBenchmarkWorkflowInput struct { + BenchmarkTaskID int64 `json:"benchmark_task_id"` + DeployID int64 `json:"deploy_id"` + SourceDeployTaskID int64 `json:"source_deploy_task_id"` + TriggerSource string `json:"trigger_source"` + TriggerKey string `json:"trigger_key"` + BenchmarkType string `json:"benchmark_type"` + Endpoint string `json:"endpoint"` + Host string `json:"host"` + SvcName string `json:"svc_name"` + ClusterID string `json:"cluster_id"` + RuntimeFramework string `json:"runtime_framework"` + PipelineTask string `json:"pipeline_task"` + Hardware map[string]any `json:"hardware"` + MinReplica int `json:"min_replica"` + MaxReplica int `json:"max_replica"` + OwnerNamespace string `json:"owner_namespace"` + UserUUID string `json:"user_uuid"` + RequestTemplate DeployBenchmarkTemplate `json:"request_template"` + Config DeployBenchmarkConfig `json:"config"` +} + +type DeployBenchmarkWorkflowResult struct { + BenchmarkTaskID int64 `json:"benchmark_task_id"` + Status string `json:"status"` + Summary DeployBenchmarkSummary `json:"summary"` + RawResult map[string]any `json:"raw_result"` + ErrorMessage string `json:"error_message,omitempty"` +} + +type DeployBenchmarkScriptInput struct { + Endpoint string `json:"endpoint"` + Host string `json:"host"` + RequestTemplate DeployBenchmarkTemplate `json:"request_template"` + Config DeployBenchmarkConfig `json:"config"` + PreparedBodies [][]byte `json:"-"` + IsFirstAttempt bool `json:"-"` +} + +type DeployBenchmarkScriptResult struct { + Summary DeployBenchmarkSummary `json:"summary"` + RawResult map[string]any `json:"raw_result"` +} + +type DeployBenchmarkLaunchReq struct { + Deploy DeployRequest `json:"deploy"` + SourceDeployTaskID int64 `json:"source_deploy_task_id"` + TriggerSource string `json:"trigger_source"` + TriggerKey string `json:"trigger_key"` + ManualOverride *DeployBenchmarkTriggerReq `json:"manual_override,omitempty"` +} + +func ResolveDeployBenchmarkType(task PipelineTask) (string, bool) { + switch task { + case TextGeneration, ImageText2Text, VideoText2Text: + return DeployBenchmarkTypeOpenAIChatCompletions, true + case FeatureExtraction, SentenceSimilarity: + return DeployBenchmarkTypeOpenAIEmbeddings, true + case Text2Image: + return DeployBenchmarkTypeOpenAIImageGeneration, true + case Text2Video: + return DeployBenchmarkTypeOpenAIVideoGeneration, true + default: + return "", false + } +} + +func IsDeployBenchmarkTaskSupported(task PipelineTask) bool { + _, ok := ResolveDeployBenchmarkType(task) + return ok +} + +func IsDeployTypeBenchmarkSupported(drType int) bool { + return drType == ServerlessType +} diff --git a/component/executors/webhook_executor_kservice.go b/component/executors/webhook_executor_kservice.go index aa37338d6..23f743bc6 100644 --- a/component/executors/webhook_executor_kservice.go +++ b/component/executors/webhook_executor_kservice.go @@ -11,6 +11,7 @@ import ( "strings" "time" + deploybuilder "opencsg.com/csghub-server/builder/deploy" "opencsg.com/csghub-server/builder/deploy/common" "opencsg.com/csghub-server/builder/rpc" "opencsg.com/csghub-server/builder/store/database" @@ -136,19 +137,29 @@ func (k *kserviceExecutorImpl) updateDeployStatus(ctx context.Context, event *ty } if event.Status == common.Running && oldStatus != common.Running { - go func() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - err := k.sendNotification(ctx, deploy) - if err != nil { - slog.Error("failed to send notification", slog.Any("err", err)) - } - }() + go k.handleDeployRunning(event.TaskID, deploy) } return nil } +func (k *kserviceExecutorImpl) handleDeployRunning(sourceDeployTaskID int64, deploy *database.Deploy) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := k.sendNotification(ctx, deploy); err != nil { + slog.Error("failed to send notification", slog.Any("err", err)) + } + + if deploybuilder.DeployRunningCallback == nil { + return + } + + if err := deploybuilder.DeployRunningCallback(ctx, deploy, sourceDeployTaskID); err != nil { + slog.Error("failed to execute deploy running callback", slog.Any("deploy_id", deploy.ID), slog.Any("source_task_id", sourceDeployTaskID), slog.Any("err", err)) + } +} + func (k *kserviceExecutorImpl) sendNotification(ctx context.Context, deploy *database.Deploy) error { payload, url := buildDeployNotification(deploy) diff --git a/component/repo_deploy.go b/component/repo_deploy.go index fdb3584e0..a917b9788 100644 --- a/component/repo_deploy.go +++ b/component/repo_deploy.go @@ -4,19 +4,18 @@ import ( "context" "errors" "fmt" + "github.com/hashicorp/go-version" "log/slog" "net/url" - "slices" - "strconv" - "strings" - - "github.com/hashicorp/go-version" "opencsg.com/csghub-server/builder/deploy" deployStatus "opencsg.com/csghub-server/builder/deploy/common" "opencsg.com/csghub-server/builder/store/database" "opencsg.com/csghub-server/common/errorx" "opencsg.com/csghub-server/common/types" "opencsg.com/csghub-server/common/utils/common" + "slices" + "strconv" + "strings" ) // get runtime framework list with type @@ -622,6 +621,8 @@ func deployStatusCodeToString(code int) string { txt = RepoStatusDeleted case 28: txt = ResourceUnhealthy + case 29: + txt = BenchmarkRunning default: txt = SpaceStatusStopped } diff --git a/component/space.go b/component/space.go index 20af83886..fb6f6df1f 100644 --- a/component/space.go +++ b/component/space.go @@ -1584,4 +1584,5 @@ const ( RepoStatusDeleted = "Deleted" SpaceStatusNoNGINXConf = "NoNGINXConf" ResourceUnhealthy = "ResourceUnhealthy" + BenchmarkRunning = "BenchmarkRunning" )