From f2129711c82740e18bf15a269c441535f72384ee Mon Sep 17 00:00:00 2001 From: Anuj Dhawan Date: Wed, 18 Mar 2026 15:26:13 -0700 Subject: [PATCH] Add BigQuery job labels and job timeout support Parse account_id and job_timeout from DSN query parameters and apply them to BigQuery queries: - account_id is set as a job label for server-side cost attribution - job_timeout sets JobTimeout on the query for bounded execution - Fix QueryContext to use caller's context instead of context.Background() DSN format: bigquery://project/dataset?account_id=X&job_timeout=5m --- driver/driver.go | 37 +++++++++++++++++++++++++++++++++++++ driver/statement.go | 33 +++++++++++++++++++++++++++++++-- 2 files changed, 68 insertions(+), 2 deletions(-) diff --git a/driver/driver.go b/driver/driver.go index c92ffa1..490d809 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -6,11 +6,23 @@ import ( "fmt" "net/url" "strings" + "time" "cloud.google.com/go/bigquery" + "github.com/sirupsen/logrus" "google.golang.org/api/option" ) +const ( + // accountIDParam is the DSN query parameter + // name for the BigQuery job label account ID. + accountIDParam = "account_id" + + // defaultAccountID is used when the account_id + // parameter is not set in the DSN. + defaultAccountID = "UNSPECIFIED" +) + type BigQueryDriver struct { } @@ -21,6 +33,13 @@ type bigQueryConfig struct { scopes []string endpoint string disableAuth bool + accountID string + // jobTimeout is the server-side timeout for BQ + // jobs. It applies only to job execution time, + // not queue/pending time. Set via the + // "job_timeout" DSN query parameter (e.g., + // ?job_timeout=5m). BQ floors minimum to 1s. + jobTimeout time.Duration } func (b BigQueryDriver) Open(uri string) (driver.Conn, error) { @@ -84,12 +103,30 @@ func configFromUri(uri string) (*bigQueryConfig, error) { datasetName = fields[len(fields)-1] } + accountID := u.Query().Get(accountIDParam) + if accountID == "" { + accountID = defaultAccountID + } + config := &bigQueryConfig{ projectID: u.Hostname(), dataSet: datasetName, scopes: getScopes(u.Query()), endpoint: u.Query().Get("endpoint"), disableAuth: u.Query().Get("disable_auth") == "true", + accountID: accountID, + } + + if v := u.Query().Get("job_timeout"); v != "" { + d, err := time.ParseDuration(v) + if err != nil { + logrus.Warnf( + "bq driver: invalid job_timeout %q: %v", + v, err, + ) + } else { + config.jobTimeout = d + } } if len(fields) == 2 { diff --git a/driver/statement.go b/driver/statement.go index fe4515b..1346be6 100644 --- a/driver/statement.go +++ b/driver/statement.go @@ -42,11 +42,22 @@ func (statement *bigQueryStatement) ExecContext(ctx context.Context, args []driv return nil, err } - rowIterator, err := query.Read(ctx) + // Split into Run + Read (instead of query.Read) + // to get a job handle for server-side cancellation + // when the caller's context is done. + job, err := query.Run(ctx) if err != nil { return nil, err } + rowIterator, err := job.Read(ctx) + if err != nil { + if ctx.Err() != nil { + go job.Cancel(context.Background()) + } + return nil, err + } + return &bigQueryResult{rowIterator}, nil } @@ -88,8 +99,19 @@ func (statement *bigQueryStatement) QueryContext(ctx context.Context, args []dri return nil, err } - rowIterator, err := query.Read(context.Background()) + // Split into Run + Read (instead of query.Read) + // to get a job handle for server-side cancellation + // when the caller's context is done. + job, err := query.Run(ctx) + if err != nil { + return nil, err + } + + rowIterator, err := job.Read(ctx) if err != nil { + if ctx.Err() != nil { + go job.Cancel(context.Background()) + } return nil, err } @@ -151,6 +173,13 @@ func (statement bigQueryStatement) buildQuery(args []driver.Value) (*bigquery.Qu return nil, err } query.DefaultDatasetID = statement.connection.config.dataSet + query.Labels = map[string]string{ + accountIDParam: statement.connection.config.accountID, + } + if statement.connection.config.jobTimeout > 0 { + query.JobTimeout = statement.connection.config.jobTimeout + } + query.Parameters, err = statement.buildParameters(args) if err != nil { return nil, err