Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 1 addition & 39 deletions internal/riverinternaltest/riverinternaltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package riverinternaltest

import (
"context"
"errors"
"fmt"
"log"
"net/url"
Expand Down Expand Up @@ -219,44 +218,7 @@ func TestTx(ctx context.Context, tb testing.TB) pgx.Tx {
return dbPool
}

tx, err := getPool().Begin(ctx)
require.NoError(tb, err)

tb.Cleanup(func() {
err := tx.Rollback(ctx)

if err == nil {
return
}

// Try to look for an error on rollback because it does occasionally
// reveal a real problem in the way a test is written. However, allow
// tests to roll back their transaction early if they like, so ignore
// `ErrTxClosed`.
if errors.Is(err, pgx.ErrTxClosed) {
return
}

// In case of a cancelled context during a database operation, which
// happens in many tests, pgx seems to not only roll back the
// transaction, but closes the connection, and returns this error on
// rollback. Allow this error since it's hard to prevent it in our flows
// that use contexts heavily.
if err.Error() == "conn closed" {
return
}

// Similar to the above, but a newly appeared error that wraps the
// above. As far as I can tell, no error variables are available to use
// with `errors.Is`.
if err.Error() == "failed to deallocate cached statement(s): conn closed" {
return
}

require.NoError(tb, err)
})

return tx
return riversharedtest.TestTxPool(ctx, tb, getPool())
}

// TruncateRiverTables truncates River tables in the target database. This is
Expand Down
1 change: 1 addition & 0 deletions rivershared/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
5 changes: 5 additions & 0 deletions rivershared/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0=
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand All @@ -8,10 +10,13 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/riverqueue/river v0.18.0 h1:sGHeTOL9MR8+pMIVHRm59fzet8Ron/xjF3Yq/PSGb78=
github.com/riverqueue/river v0.18.0/go.mod h1:oapX5xb/L2YnkE801QubDZ0COHxVxEGVY37icPzghhU=
github.com/riverqueue/river v0.19.0/go.mod h1:YJ7LA2uBdqFHQJzKyYc+X6S04KJeiwsS1yU5a1rynlk=
github.com/riverqueue/river/riverdriver v0.18.0 h1:a2haR5I0MQLHjLCSVFpUEeJALCLemRl5zCztucysm1E=
github.com/riverqueue/river/riverdriver v0.18.0/go.mod h1:Mj45PbHabEnBv/nSah0J1/tg6hrX/SNeXtcYcSqMzxQ=
github.com/riverqueue/river/riverdriver v0.19.0/go.mod h1:Soxi08hHkEvopExAp6ADG2437r4coSiB4QpuIL5E28k=
github.com/riverqueue/river/rivertype v0.18.0 h1:YsXR5NbLAzniurGO0+zcISWMKq7Y71xkIe2oi86OAsE=
github.com/riverqueue/river/rivertype v0.18.0/go.mod h1:DETcejveWlq6bAb8tHkbgJqmXWVLiFhTiEm8j7co1bE=
github.com/riverqueue/river/rivertype v0.19.0/go.mod h1:DETcejveWlq6bAb8tHkbgJqmXWVLiFhTiEm8j7co1bE=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
Expand Down
109 changes: 109 additions & 0 deletions rivershared/riversharedtest/riversharedtest.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package riversharedtest

import (
"cmp"
"context"
"errors"
"fmt"
"log/slog"
"os"
"sync"
"testing"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

Expand All @@ -27,6 +32,47 @@ func BaseServiceArchetype(tb testing.TB) *baseservice.Archetype {
}
}

// A pool and mutex to protect it, lazily initialized by TestTx. Once open, this
// pool is never explicitly closed, instead closing implicitly as the package
// tests finish.
var (
dbPool *pgxpool.Pool //nolint:gochecknoglobals
dbPoolMu sync.RWMutex //nolint:gochecknoglobals
)

// DBPool gets a lazily initialized database pool for `TEST_DATABASE_URL` or
// `river_test` if the former isn't specified.
func DBPool(ctx context.Context, tb testing.TB) *pgxpool.Pool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should take an arg for DB URL rather than assuming a particular env? It makes it slightly more verbose to use but also much more flexible if other projects want to use their own schemas and DBs instead of sharing those used by the main River test suite.

It could even require.NotEmpty on the URL so the caller doesn't need to use a mustGetEnv type util when calling it.

I guess the downside is that this arg would need to cascade down through TestTx and TestTxPool, whose APIs are more likely to be called in a lot of places.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good idea. I went back and forth on these quite a lot because it's hard to find a good balance between good customizability and encouraging good default convention everywhere.

Just since this is specifically in riversharedtest (strongly implying use in testing), why don't we see if we can get everything on the convention of TEST_DATABASE_URL/river_test. In case this is turns out to be harde than expected, we can swap that out early for a more injectable env version.

tb.Helper()

tryPool := func() *pgxpool.Pool {
dbPoolMu.RLock()
defer dbPoolMu.RUnlock()
return dbPool
}

if dbPool := tryPool(); dbPool != nil {
return dbPool
}

dbPoolMu.Lock()
defer dbPoolMu.Unlock()

// Multiple goroutines may have passed the initial `nil` check on start
// up, so check once more to make sure pool hasn't been set yet.
if dbPool != nil {
return dbPool
}

dbPool, err := pgxpool.New(ctx, cmp.Or(
os.Getenv("TEST_DATABASE_URL"),
"postgres://localhost:5432/river_test",
))
require.NoError(tb, err)

return dbPool
}

// Logger returns a logger suitable for use in tests.
//
// Defaults to informational verbosity. If env is set with `RIVER_DEBUG=true`,
Expand All @@ -48,6 +94,69 @@ func LoggerWarn(tb testing.TB) *slog.Logger {
return slogtest.NewLogger(tb, &slog.HandlerOptions{Level: slog.LevelWarn})
}

// TestTx starts a test transaction that's rolled back automatically as the test
// case is cleaning itself up.
//
// This variant uses the default database pool from DBPool that points to
// `TEST_DATABASE_URL` or `river_test` if the former wasn't specified.
func TestTx(ctx context.Context, tb testing.TB) pgx.Tx {
tb.Helper()
return TestTxPool(ctx, tb, DBPool(ctx, tb))
}

// TestTxPool starts a test transaction that's rolled back automatically as the
// test case is cleaning itself up.
//
// This variant starts the test transaction on the specified database pool.
func TestTxPool(ctx context.Context, tb testing.TB, dbPool *pgxpool.Pool) pgx.Tx {
tb.Helper()

tx, err := dbPool.Begin(ctx)
require.NoError(tb, err)

tb.Cleanup(func() {
// Tests may inerit context from `t.Context()` which is cancelled after
// tests run and before calling clean up. We need a non-cancelled
// context to issue rollback here, so use a bit of a bludgeon to do so
// with `context.WithoutCancel()`.
ctx := context.WithoutCancel(ctx)
Comment on lines +118 to +122
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This of course means this rollback could hang ~indefinitely since it no longer inherits cancellation from tb.Context(), but that's probably fine within a test suite. Worst case you could re-apply a moderate timeout here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I suppose it's technically possible, but I think it's another one of those things that doesn't really happen in practice. I've had something similar to this in place at work for many months now over which time we've done millions of test runs and I've never seen it happen even one time.


err := tx.Rollback(ctx)

if err == nil {
return
}

// Try to look for an error on rollback because it does occasionally
// reveal a real problem in the way a test is written. However, allow
// tests to roll back their transaction early if they like, so ignore
// `ErrTxClosed`.
if errors.Is(err, pgx.ErrTxClosed) {
return
}

// In case of a cancelled context during a database operation, which
// happens in many tests, pgx seems to not only roll back the
// transaction, but closes the connection, and returns this error on
// rollback. Allow this error since it's hard to prevent it in our flows
// that use contexts heavily.
if err.Error() == "conn closed" {
return
}

// Similar to the above, but a newly appeared error that wraps the
// above. As far as I can tell, no error variables are available to use
// with `errors.Is`.
if err.Error() == "failed to deallocate cached statement(s): conn closed" {
return
}

require.NoError(tb, err)
})

return tx
}

// TimeStub implements baseservice.TimeGeneratorWithStub to allow time to be
// stubbed in tests.
//
Expand Down
55 changes: 55 additions & 0 deletions rivershared/riversharedtest/riversharedtest_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,67 @@
package riversharedtest

import (
"context"
"testing"
"time"

"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/stretchr/testify/require"
)

func TestDBPool(t *testing.T) {
t.Parallel()

ctx := context.Background()

pool := DBPool(ctx, t)
_, err := pool.Exec(ctx, "SELECT 1")
require.NoError(t, err)
}

func TestTestTx(t *testing.T) {
t.Parallel()

ctx := context.Background()

type PoolOrTx interface {
Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error)
}

checkTestTable := func(ctx context.Context, poolOrTx PoolOrTx) error {
_, err := poolOrTx.Exec(ctx, "SELECT * FROM river_shared_test_tx_table")
return err
}

// Test cleanups are invoked in the order of last added, first called. When
// TestTx is called below it adds a cleanup, so we want to make sure that
// this cleanup, which checks that the database remains pristine, is invoked
// after the TestTx cleanup, so we add it first.
t.Cleanup(func() {
// Tests may inherit context from `t.Context()` which is cancelled after
// tests run and before calling clean up. We need a non-cancelled
// context to issue rollback here, so use a bit of a bludgeon to do so
// with `context.WithoutCancel()`.
ctx := context.WithoutCancel(ctx)

err := checkTestTable(ctx, DBPool(ctx, t))
require.Error(t, err)

var pgErr *pgconn.PgError
require.ErrorAs(t, err, &pgErr)
require.Equal(t, pgerrcode.UndefinedTable, pgErr.Code)
})

tx := TestTx(ctx, t)

_, err := tx.Exec(ctx, "CREATE TABLE river_shared_test_tx_table (id bigint)")
require.NoError(t, err)

err = checkTestTable(ctx, tx)
require.NoError(t, err)
}

func TestWaitOrTimeout(t *testing.T) {
t.Parallel()

Expand Down
Loading