Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ mise run postgres:up --extra-args "--detach --wait"
mise run postgres:setup # Install EQL and schema
```

> **macOS Note:** If you hit file descriptor limits during development (e.g. "Too many open files"), you may need to increase the limit:
> ```bash
> ulimit -n 10240
> ```
> To make this persistent, add it to your shell profile (e.g. `~/.zshrc`).

### Core Development Workflow
```bash
# Build and run Proxy as a process (development)
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ mise run proxy:down
Running Proxy in a container cross-compiles a binary for Linux and the current architecture (`amd64`, `arm64`), then copies the binary into the container.
We cross-compile binary outside the container because it's generally faster, due to packages already being cached, and slower network and disk IO in Docker.

> [!IMPORTANT]
> **Proxy must always be built from source for testing.** The `proxy:up` task builds a binary from source (`build:binary`), packages it into a Docker image tagged `cipherstash/proxy:latest` (`build:docker`), then starts it via `docker compose up`. The `tests/docker-compose.yml` file uses `pull_policy: never` on the proxy services to ensure Docker never pulls the released image from Docker Hub. If you see an error like `pull access denied` or `image not found`, run `mise run build` first to build the local image.

### Building

Build a binary and Docker image:
Expand Down Expand Up @@ -460,6 +463,8 @@ This project uses `docker compose` to manage containers and networking.

The configuration for those containers is in `tests/docker-compose.yml`.

The proxy services in `tests/docker-compose.yml` use `pull_policy: never` to ensure Docker never pulls the released `cipherstash/proxy:latest` image from Docker Hub. The image must be built locally from source via `mise run proxy:up` (or `mise run build`). This guarantees integration tests always run against the current source code.

The integration tests use the `proxy:up` and `proxy:down` commands documented above to run containers in different configurations.

#### Configuration: configuring PostgreSQL containers in integration tests
Expand Down
3 changes: 3 additions & 0 deletions mise.toml
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,9 @@ cp -v {{config_root}}/target/{{ target }}/release/cipherstash-proxy {{config_roo

[tasks."build:docker"]
depends = ["eql:download"]
# Tags the image as cipherstash/proxy:latest locally.
# tests/docker-compose.yml uses pull_policy: never to ensure this local image
# is always used instead of the released image on Docker Hub.
description = "Build a Docker image for cipherstash-proxy"
run = """
{% set default_platform = "linux/" ~ arch() | replace(from="x86_64", to="amd64") %}
Expand Down
5 changes: 3 additions & 2 deletions packages/cipherstash-proxy-integration/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ pub fn connection_config(port: u16) -> tokio_postgres::Config {
.port(port)
.user(&username)
.password(&password)
.dbname(&name);
.dbname(&name)
.connect_timeout(std::time::Duration::from_secs(10));

db_config
}
Expand Down Expand Up @@ -212,7 +213,7 @@ where
}

/// Get database port from environment or use default.
fn get_database_port() -> u16 {
pub fn get_database_port() -> u16 {
std::env::var("CS_DATABASE__PORT")
.ok()
.and_then(|s| s.parse().ok())
Expand Down
198 changes: 198 additions & 0 deletions packages/cipherstash-proxy-integration/src/connection_resilience.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/// Tests that validate proxy connection isolation under load.
///
/// These tests verify that:
/// - Slow queries on one connection don't block other connections
/// - The proxy accepts new connections after client disconnect
/// - Concurrent connections under load remain responsive
/// - Blocked backend connections don't affect other proxy connections
#[cfg(test)]
mod tests {
use crate::common::{connect_with_tls, get_database_port, PROXY};
use std::time::Instant;
use tokio::task::JoinSet;
use tokio::time::{timeout, Duration};
use tokio_postgres::SimpleQueryMessage;

/// Advisory lock ID used in isolation tests. Arbitrary value — just needs to be
/// unique across concurrently running test suites against the same database.
const ADVISORY_LOCK_ID: i64 = 99_001;

/// A slow query on one connection does not block other connections through the proxy.
#[tokio::test]
async fn slow_query_does_not_block_other_connections() {
let result = timeout(Duration::from_secs(30), async {
let client_a = connect_with_tls(PROXY).await;
let client_b = connect_with_tls(PROXY).await;

// Connection A: run a slow query
let a_handle = tokio::spawn(async move {
client_a.simple_query("SELECT pg_sleep(5)").await.unwrap();
});

// Brief pause to ensure A's query is in flight
tokio::time::sleep(Duration::from_millis(200)).await;

// Connection B: run a fast query, should complete promptly
let start = Instant::now();
let rows = client_b.simple_query("SELECT 1").await.unwrap();
let elapsed = start.elapsed();

assert!(!rows.is_empty(), "Expected result from SELECT 1");
assert!(
elapsed < Duration::from_secs(2),
"Fast query took {elapsed:?}, expected < 2s — proxy may be blocking"
);

a_handle.await.unwrap();
})
.await;

result.expect("Test timed out after 30s");
}

/// Proxy accepts new connections after a client disconnects.
#[tokio::test]
async fn proxy_accepts_new_connections_after_client_disconnect() {
let result = timeout(Duration::from_secs(10), async {
// First connection: query, then drop
{
let client = connect_with_tls(PROXY).await;
let rows = client.simple_query("SELECT 1").await.unwrap();
assert!(!rows.is_empty());
}
// Client dropped here

// Brief pause
tokio::time::sleep(Duration::from_millis(100)).await;

// Second connection: should work fine
let client = connect_with_tls(PROXY).await;
let rows = client.simple_query("SELECT 1").await.unwrap();
assert!(!rows.is_empty());
})
.await;

result.expect("Test timed out after 10s");
}

/// Concurrent slow and fast connections: fast queries complete promptly under slow load.
#[tokio::test]
async fn concurrent_connections_under_slow_load() {
let result = timeout(Duration::from_secs(30), async {
let mut join_set = JoinSet::new();

// 5 slow connections
for _ in 0..5 {
join_set.spawn(async {
let client = connect_with_tls(PROXY).await;
client.simple_query("SELECT pg_sleep(3)").await.unwrap();
});
}

// Brief pause to let slow queries start
tokio::time::sleep(Duration::from_millis(300)).await;

// 5 fast connections, each should complete promptly
for _ in 0..5 {
join_set.spawn(async {
let start = Instant::now();
let client = connect_with_tls(PROXY).await;
let rows = client.simple_query("SELECT 1").await.unwrap();
let elapsed = start.elapsed();

assert!(!rows.is_empty());
assert!(
elapsed < Duration::from_secs(5),
"Fast query took {elapsed:?} under slow load, expected < 5s"
);
});
}

while let Some(result) = join_set.join_next().await {
result.unwrap();
}
})
.await;

result.expect("Test timed out after 30s");
}

/// An advisory-lock-blocked connection through the proxy does not block other proxy connections.
///
/// Uses pg_locks polling to deterministically wait for client_b to be blocked on the
/// advisory lock, rather than relying on a fixed sleep.
#[tokio::test]
async fn advisory_lock_blocked_connection_does_not_block_proxy() {
let lock_query = format!("SELECT pg_advisory_lock({ADVISORY_LOCK_ID})");
let unlock_query = format!("SELECT pg_advisory_unlock({ADVISORY_LOCK_ID})");

let result = timeout(Duration::from_secs(30), async {
// Connection A: hold an advisory lock (connect directly to PG to avoid proxy interference)
let pg_port = get_database_port();
let client_a = connect_with_tls(pg_port).await;
client_a
.simple_query(&lock_query)
.await
.unwrap();

let b_lock_query = lock_query.clone();
let b_unlock_query = unlock_query.clone();

// Connection B: through proxy, attempt to acquire the same lock (will block)
let b_handle = tokio::spawn(async move {
let client_b = connect_with_tls(PROXY).await;
// This will block until A releases the lock
client_b
.simple_query(&b_lock_query)
.await
.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is failing and I think it's because there should a sleep in between acquiring and releasing so that the has_waiting check on line 160 has a chance of observing client_b holding the lock.

// Release after acquiring
client_b
.simple_query(&b_unlock_query)
.await
.unwrap();
});

// Poll pg_locks until client_b is observed waiting for the advisory lock
let poll_query = format!(
"SELECT 1 FROM pg_locks WHERE locktype = 'advisory' AND NOT granted AND classid = 0 AND objid = {ADVISORY_LOCK_ID}"
);
let deadline = Instant::now() + Duration::from_secs(10);
loop {
let result = client_a.simple_query(&poll_query).await.unwrap();
let has_waiting = result.iter().any(|m| matches!(m, SimpleQueryMessage::Row(_)));
if has_waiting {
break;
}
assert!(
Instant::now() < deadline,
"Timed out waiting for client_b to be blocked on advisory lock"
);
tokio::time::sleep(Duration::from_millis(50)).await;
}

// Connection C: through proxy, should complete immediately despite B being blocked
let start = Instant::now();
let client_c = connect_with_tls(PROXY).await;
let rows = client_c.simple_query("SELECT 1").await.unwrap();
let elapsed = start.elapsed();

assert!(!rows.is_empty());
assert!(
elapsed < Duration::from_secs(2),
"Connection C took {elapsed:?}, expected < 2s — blocked connection may be affecting proxy"
);

// Release the lock so B can complete
client_a
.simple_query(&unlock_query)
.await
.unwrap();

b_handle.await.unwrap();
})
.await;

result.expect("Test timed out after 30s");
}
}
1 change: 1 addition & 0 deletions packages/cipherstash-proxy-integration/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod common;
mod connection_resilience;
mod decrypt;
mod diagnostics;
mod disable_mapping;
Expand Down
2 changes: 1 addition & 1 deletion packages/cipherstash-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ eql-mapper = { path = "../eql-mapper" }
exitcode = "1.1.2"
hex = "0.4.3"
md-5 = "0.10.6"
metrics = "0.24.1"
metrics = "0.24.3"
metrics-exporter-prometheus = "0.17"
moka = { version = "0.12", features = ["future"] }
oid-registry = "0.8"
Expand Down
36 changes: 35 additions & 1 deletion packages/cipherstash-proxy/src/config/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,14 @@ impl DatabaseConfig {
self.password.to_owned().risky_unwrap()
}

const DEFAULT_CONNECTION_TIMEOUT_MS: u64 = 120_000;

pub fn connection_timeout(&self) -> Option<Duration> {
self.connection_timeout.map(Duration::from_millis)
match self.connection_timeout {
Some(0) => None,
Some(ms) => Some(Duration::from_millis(ms)),
None => Some(Duration::from_millis(Self::DEFAULT_CONNECTION_TIMEOUT_MS)),
}
}

pub fn server_name(&self) -> Result<ServerName<'_>, Error> {
Expand Down Expand Up @@ -116,3 +122,31 @@ impl Display for DatabaseConfig {
)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn connection_timeout_defaults_to_120_seconds() {
let config = DatabaseConfig::for_testing();
assert_eq!(config.connection_timeout(), Some(Duration::from_secs(120)));
}

#[test]
fn connection_timeout_zero_disables_timeout() {
let mut config = DatabaseConfig::for_testing();
config.connection_timeout = Some(0);
assert_eq!(config.connection_timeout(), None);
}

#[test]
fn connection_timeout_custom_value_in_millis() {
let mut config = DatabaseConfig::for_testing();
config.connection_timeout = Some(5000);
assert_eq!(
config.connection_timeout(),
Some(Duration::from_millis(5000))
);
}
}
10 changes: 9 additions & 1 deletion packages/cipherstash-proxy/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub enum Error {
#[error("Connection closed by client")]
ConnectionClosed,

#[error("Connection timed out after {} ms", duration.as_secs())]
#[error("Connection timed out after {} ms", duration.as_millis())]
ConnectionTimeout { duration: Duration },

#[error("Error creating connection")]
Expand Down Expand Up @@ -522,4 +522,12 @@ mod tests {

assert_eq!(format!("Statement encountered an internal error. This may be a bug in the statement mapping module of CipherStash Proxy. Please visit {ERROR_DOC_BASE_URL}#mapping-internal-error for more information."), message);
}

#[test]
fn connection_timeout_message_shows_millis() {
let error = Error::ConnectionTimeout {
duration: Duration::from_millis(5000),
};
assert_eq!(error.to_string(), "Connection timed out after 5000 ms");
}
}
Loading
Loading