diff --git a/.github/workflows/pr-deploy-and-test.yml b/.github/workflows/pr-deploy-and-test.yml
index a76be32703..dd98995461 100644
--- a/.github/workflows/pr-deploy-and-test.yml
+++ b/.github/workflows/pr-deploy-and-test.yml
@@ -20,6 +20,10 @@ jobs:
uses: ./.github/workflows/deploy-backend.yml
with:
apigee_environment: internal-dev
+ lambda_build_flags: >-
+ ${{ (github.event.action == 'opened' || github.event.action == 'reopened')
+ && '{"recordprocessor":true,"ack-backend":true}'
+ || '{}' }}
diff_base_sha: ${{ github.event.action == 'synchronize' && github.event.before || github.event.pull_request.base.sha }}
diff_head_sha: ${{ github.event.pull_request.head.sha }}
run_diff_check: ${{ github.event.action == 'synchronize' }}
@@ -35,8 +39,10 @@ jobs:
include:
- apigee_environment_name: internal-dev
required_test_suite: smoke
+ require_matching_commit_id: true
- apigee_environment_name: internal-dev-sandbox
required_test_suite: sandbox
+ require_matching_commit_id: false
uses: ./.github/workflows/run-e2e-automation-tests.yml
with:
apigee_environment: ${{ matrix.apigee_environment_name }}
@@ -44,6 +50,8 @@ jobs:
sub_environment: pr-${{github.event.pull_request.number}}
service_under_test: all
suite_to_run: ${{ matrix.required_test_suite }}
+ expected_commit_id: ${{ github.sha }}
+ require_matching_commit_id: ${{ matrix.require_matching_commit_id }}
secrets:
APIGEE_PASSWORD: ${{ secrets.APIGEE_PASSWORD }}
APIGEE_BASIC_AUTH_TOKEN: ${{ secrets.APIGEE_BASIC_AUTH_TOKEN }}
diff --git a/.github/workflows/quality-checks.yml b/.github/workflows/quality-checks.yml
index 5566bb2b32..084ba1fe1a 100644
--- a/.github/workflows/quality-checks.yml
+++ b/.github/workflows/quality-checks.yml
@@ -9,6 +9,7 @@ on:
env:
SHARED_PATH: ${{ github.workspace }}/lambdas/shared
LAMBDA_PATH: ${{ github.workspace }}/lambdas
+ POETRY_INSTALLER_ONLY_BINARY: ":all:"
jobs:
lint-specification:
@@ -156,6 +157,7 @@ jobs:
id: id_sync
env:
PYTHONPATH: ${{ env.LAMBDA_PATH }}/id_sync/src:${{ env.LAMBDA_PATH }}/id_sync/tests:${{ env.SHARED_PATH }}/src
+ POETRY_INSTALLER_ONLY_BINARY: ""
continue-on-error: true
run: |
poetry install
@@ -178,6 +180,7 @@ jobs:
id: mnspublisher
env:
PYTHONPATH: ${{ env.LAMBDA_PATH }}/mns_publisher/src:${{ env.LAMBDA_PATH }}/mns_publisher/tests:${{ env.SHARED_PATH }}/src
+ POETRY_INSTALLER_ONLY_BINARY: ""
continue-on-error: true
run: |
poetry install
@@ -234,11 +237,24 @@ jobs:
id: shared
env:
PYTHONPATH: ${{ env.SHARED_PATH }}/src
+ POETRY_INSTALLER_ONLY_BINARY: ""
+ GITHUB_WORKSPACE: ${{ github.workspace }}
continue-on-error: true
run: |
poetry install
poetry run coverage run --rcfile=.coveragerc --source=src -m unittest discover -s tests -p "test_*.py" -v || echo "shared tests failed" >> ../../failed_tests.txt
poetry run coverage xml -o ../../shared-coverage.xml
+ python - <<'PY'
+ import os
+ import re
+ from pathlib import Path
+
+ coverage_file = Path("../../shared-coverage.xml")
+ xml = coverage_file.read_text()
+ xml = re.sub(r".*?", f"{os.environ['GITHUB_WORKSPACE']}", xml, count=1)
+ xml = xml.replace('filename="src/', 'filename="lambdas/shared/src/')
+ coverage_file.write_text(xml)
+ PY
- name: Run Test Failure Summary
id: check_failure
diff --git a/.github/workflows/run-e2e-automation-tests.yml b/.github/workflows/run-e2e-automation-tests.yml
index ad367771b9..4f8daf56f5 100644
--- a/.github/workflows/run-e2e-automation-tests.yml
+++ b/.github/workflows/run-e2e-automation-tests.yml
@@ -18,6 +18,14 @@ on:
suite_to_run:
required: true
type: string
+ expected_commit_id:
+ required: false
+ type: string
+ default: ""
+ require_matching_commit_id:
+ required: false
+ type: boolean
+ default: true
secrets:
APIGEE_PASSWORD:
required: true
@@ -72,6 +80,16 @@ on:
description: Set to true if you want the MNS validation to be performed as part of the tests. please keep in mind it will increase execution time.
default: false
type: boolean
+ expected_commit_id:
+ description: Optional commit SHA expected from the deployed _status endpoint.
+ required: false
+ type: string
+ default: ""
+ require_matching_commit_id:
+ description: Whether the _status endpoint commitId must match the expected commit SHA.
+ required: false
+ type: boolean
+ default: true
env:
APIGEE_AUTH_ENV: ${{ inputs.apigee_environment == 'int' && inputs.apigee_environment || 'internal-dev' }}
@@ -81,7 +99,8 @@ env:
SERVICE_BASE_PATH: ${{ startsWith(inputs.sub_environment, 'pr-') && format('immunisation-fhir-api/FHIR/R4-{0}', inputs.sub_environment) || 'immunisation-fhir-api/FHIR/R4' }}
PROXY_NAME: ${{ startsWith(inputs.sub_environment, 'pr-') && format('immunisation-fhir-api-{0}', inputs.sub_environment) || format('immunisation-fhir-api-{0}', inputs.apigee_environment) }}
STATUS_API_KEY: ${{ secrets.STATUS_API_KEY }}
- SOURCE_COMMIT_ID: ${{ github.sha }}
+ SOURCE_COMMIT_ID: ${{ inputs.expected_commit_id || github.sha }}
+ REQUIRE_MATCHING_COMMIT_ID: ${{ inputs.require_matching_commit_id }}
MNS_VALIDATION_REQUIRED: ${{ inputs.mns_validation_required || startsWith(inputs.sub_environment, 'pr-') || inputs.apigee_environment == 'internal-dev' }}
jobs:
@@ -94,7 +113,6 @@ jobs:
- name: Wait for API to be available
if: github.event_name != 'workflow_dispatch'
run: |
- endpoint=""
if [[ ${APIGEE_ENVIRONMENT} =~ "prod" ]]; then
endpoint="https://api.service.nhs.uk/${SERVICE_BASE_PATH}/_status"
else
@@ -112,17 +130,21 @@ jobs:
if [[ "${response_code}" -eq 200 ]] && [[ "${response_body}" == "OK" ]] && [[ "${status}" == "pass" ]]; then
echo "Status test successful"
+ if [[ "${REQUIRE_MATCHING_COMMIT_ID}" != "true" ]]; then
+ echo "Skipping commit hash validation for ${APIGEE_ENVIRONMENT}"
+ break
+ fi
if [[ "${commitId}" == "${SOURCE_COMMIT_ID}" ]]; then
echo "Commit hash test successful"
break
else
- echo "Waiting for ${endpoint} to return the correct commit hash..."
+ echo "Waiting for ${endpoint} to return the correct commit hash... expected ${SOURCE_COMMIT_ID}, got ${commitId}"
fi
else
echo "Waiting for ${endpoint} to return a 200 response with 'OK' body..."
fi
- ((counter=counter+1)) # Increment counter by 1
+ ((counter++))
echo "Attempt ${counter}"
sleep 30
done
diff --git a/infrastructure/instance/environments/dev/ref/variables.tfvars b/infrastructure/instance/environments/dev/ref/variables.tfvars
index e6256cc114..53741cac7a 100644
--- a/infrastructure/instance/environments/dev/ref/variables.tfvars
+++ b/infrastructure/instance/environments/dev/ref/variables.tfvars
@@ -2,7 +2,7 @@ environment = "dev"
immunisation_account_id = "345594581768"
dspp_core_account_id = "603871901111"
pds_environment = "ref"
-mns_environment = "dev"
+mns_environment = "int"
error_alarm_notifications_enabled = true
create_mesh_processor = false
has_sub_environment_scope = true
diff --git a/infrastructure/instance/id_sync_lambda.tf b/infrastructure/instance/id_sync_lambda.tf
index 9e70f37f53..c3cf4cb41d 100644
--- a/infrastructure/instance/id_sync_lambda.tf
+++ b/infrastructure/instance/id_sync_lambda.tf
@@ -213,6 +213,7 @@ resource "aws_lambda_function" "id_sync_lambda" {
variables = {
IEDS_TABLE_NAME = aws_dynamodb_table.events-dynamodb-table.name
PDS_ENV = var.pds_environment
+ PDS_BASE_URL = ""
SPLUNK_FIREHOSE_NAME = module.splunk.firehose_stream_name
}
}
diff --git a/infrastructure/instance/mns_publisher.tf b/infrastructure/instance/mns_publisher.tf
index 9971a3bef6..b990e3f277 100644
--- a/infrastructure/instance/mns_publisher.tf
+++ b/infrastructure/instance/mns_publisher.tf
@@ -15,6 +15,7 @@ module "mns_publisher" {
secrets_manager_policy_path = "${local.policy_path}/secret_manager.json"
account_id = data.aws_caller_identity.current.account_id
pds_environment = var.pds_environment
+ pds_base_url = ""
mns_environment = var.mns_environment
private_subnet_ids = local.private_subnet_ids
diff --git a/infrastructure/instance/modules/mns_publisher/mns_publisher_lambda.tf b/infrastructure/instance/modules/mns_publisher/mns_publisher_lambda.tf
index 720f1ead02..acb745899d 100644
--- a/infrastructure/instance/modules/mns_publisher/mns_publisher_lambda.tf
+++ b/infrastructure/instance/modules/mns_publisher/mns_publisher_lambda.tf
@@ -122,6 +122,7 @@ resource "aws_lambda_function" "mns_publisher_lambda" {
IMMUNIZATION_ENV = var.resource_scope,
IMMUNIZATION_BASE_PATH = var.imms_base_path
PDS_ENV = var.pds_environment
+ PDS_BASE_URL = var.pds_base_url
MNS_ENV = var.mns_environment
}
}
diff --git a/infrastructure/instance/modules/mns_publisher/variables.tf b/infrastructure/instance/modules/mns_publisher/variables.tf
index aca660ed94..fe3cc0a141 100644
--- a/infrastructure/instance/modules/mns_publisher/variables.tf
+++ b/infrastructure/instance/modules/mns_publisher/variables.tf
@@ -94,6 +94,12 @@ variable "pds_environment" {
type = string
}
+variable "pds_base_url" {
+ type = string
+ default = ""
+ description = "Optional override for the PDS base URL, used by ref to route to the mock PDS endpoint."
+}
+
variable "account_id" {
type = string
description = "AWS account ID used for IAM policy templating (e.g., Secrets Manager ARNs)."
diff --git a/lambdas/mns_publisher/tests/test_lambda_handler.py b/lambdas/mns_publisher/tests/test_lambda_handler.py
index 0ca5d7ff9f..5a520a033c 100644
--- a/lambdas/mns_publisher/tests/test_lambda_handler.py
+++ b/lambdas/mns_publisher/tests/test_lambda_handler.py
@@ -6,6 +6,7 @@
import responses
from moto import mock_aws
+import common.api_clients.get_pds_details as get_pds_details_module
from lambda_handler import lambda_handler
from process_records import extract_trace_ids, process_record, process_records
from test_utils import generate_private_key_b64, load_sample_sqs_event
@@ -246,6 +247,7 @@ class TestLambdaHandlerIntegration(unittest.TestCase):
def setUp(self):
"""Set up mocked AWS services and test data."""
self.sample_sqs_record = load_sample_sqs_event()
+ get_pds_details_module._pds_service = None
self.secrets_client = boto3.client("secretsmanager", region_name="eu-west-2")
self.secrets_client.create_secret(
Name="imms/pds/int/jwt-secrets",
@@ -254,6 +256,9 @@ def setUp(self):
),
)
+ def tearDown(self):
+ get_pds_details_module._pds_service = None
+
@responses.activate
@patch("common.api_clients.authentication.AppRestrictedAuth.get_access_token")
@patch("process_records.logger")
@@ -378,3 +383,49 @@ def test_successful_notification_creation_with_expired_gp(self, mock_logger, moc
self.assertEqual(mns_payload["filtering"]["subjectage"], 21)
mock_logger.info.assert_any_call("Successfully processed all 1 messages")
+
+ @responses.activate
+ @patch.dict("os.environ", {"PDS_BASE_URL": "https://mock-pds.example/Patient"}, clear=False)
+ @patch("process_records._get_runtime_mns_service")
+ @patch("process_records.logger")
+ def test_successful_notification_creation_with_mock_pds_base_url(self, mock_logger, mock_get_mns):
+ responses.add(
+ responses.GET,
+ "https://mock-pds.example/Patient/9481152782",
+ json={"generalPractitioner": [{"identifier": {"value": "Y12345", "period": {"start": "2024-01-01"}}}]},
+ status=200,
+ )
+
+ mock_mns_service = Mock()
+ mock_get_mns.return_value = mock_mns_service
+
+ sqs_event = {"Records": [self.sample_sqs_record]}
+ result = lambda_handler(sqs_event, Mock())
+
+ self.assertEqual(result, {"batchItemFailures": []})
+ mock_mns_service.publish_notification.assert_called_once()
+ mns_payload = mock_mns_service.publish_notification.call_args.args[0]
+ self.assertEqual(mns_payload["filtering"]["generalpractitioner"], "Y12345")
+ mock_logger.info.assert_any_call("Successfully processed all 1 messages")
+
+ @responses.activate
+ @patch.dict("os.environ", {"PDS_BASE_URL": "https://mock-pds.example/Patient"}, clear=False)
+ @patch("process_records._get_runtime_mns_service")
+ @patch("process_records.logger")
+ def test_mock_pds_rate_limit_results_in_batch_failure(self, mock_logger, mock_get_mns):
+ responses.add(
+ responses.GET,
+ "https://mock-pds.example/Patient/9481152782",
+ json={"code": 429, "message": "Mock PDS rate limit has been exceeded"},
+ status=429,
+ )
+
+ mock_mns_service = Mock()
+ mock_get_mns.return_value = mock_mns_service
+
+ sqs_event = {"Records": [self.sample_sqs_record]}
+ result = lambda_handler(sqs_event, Mock())
+
+ self.assertEqual(len(result["batchItemFailures"]), 1)
+ mock_mns_service.publish_notification.assert_not_called()
+ mock_logger.warning.assert_called_with("Batch completed with 1 failures")
diff --git a/lambdas/shared/src/common/api_clients/get_pds_details.py b/lambdas/shared/src/common/api_clients/get_pds_details.py
index 7f728c81e8..437d6d6875 100644
--- a/lambdas/shared/src/common/api_clients/get_pds_details.py
+++ b/lambdas/shared/src/common/api_clients/get_pds_details.py
@@ -9,28 +9,31 @@
from common.api_clients.pds_service import PdsService
from common.clients import get_secrets_manager_client, logger
-PDS_ENV = os.getenv("PDS_ENV", "int")
-
_pds_service: PdsService | None = None
+_pds_service_config: tuple[str, str | None] | None = None
def get_pds_service() -> PdsService:
- global _pds_service
- if _pds_service is None:
- authenticator = AppRestrictedAuth(
- secret_manager_client=get_secrets_manager_client(),
- environment=PDS_ENV,
+ global _pds_service, _pds_service_config
+ environment = os.getenv("PDS_ENV", "int")
+ base_url = os.getenv("PDS_BASE_URL", "").strip() or None
+ config = (environment, base_url)
+
+ if _pds_service is None or _pds_service_config != config:
+ authenticator = (
+ None
+ if base_url
+ else AppRestrictedAuth(secret_manager_client=get_secrets_manager_client(), environment=environment)
)
- _pds_service = PdsService(authenticator, PDS_ENV)
-
+ _pds_service = PdsService(authenticator, environment, base_url=base_url)
+ _pds_service_config = config
return _pds_service
# Get Patient details from external service PDS using NHS number from MNS notification
def pds_get_patient_details(nhs_number: str) -> dict:
try:
- patient = get_pds_service().get_patient_details(nhs_number)
- return patient
+ return get_pds_service().get_patient_details(nhs_number)
except Exception as e:
msg = "Error retrieving patient details from PDS"
logger.exception(msg)
diff --git a/lambdas/shared/src/common/api_clients/pds_service.py b/lambdas/shared/src/common/api_clients/pds_service.py
index 90db22f957..42f952b4ec 100644
--- a/lambdas/shared/src/common/api_clients/pds_service.py
+++ b/lambdas/shared/src/common/api_clients/pds_service.py
@@ -7,31 +7,28 @@
class PdsService:
- def __init__(self, authenticator: AppRestrictedAuth, environment):
+ def __init__(
+ self,
+ authenticator: AppRestrictedAuth | None,
+ environment: str,
+ base_url: str | None = None,
+ ):
logger.info(f"PdsService init: {environment}")
self.authenticator = authenticator
-
- self.base_url = (
- f"https://{environment}.api.service.nhs.uk/personal-demographics/FHIR/R4/Patient"
- if environment != "prod"
- else "https://api.service.nhs.uk/personal-demographics/FHIR/R4/Patient"
- )
-
+ host = "api.service.nhs.uk" if environment == "prod" else f"{environment}.api.service.nhs.uk"
+ self.base_url = base_url.rstrip("/") if base_url else f"https://{host}/personal-demographics/FHIR/R4/Patient"
logger.info(f"PDS Service URL: {self.base_url}")
def get_patient_details(self, patient_id: str) -> dict | None:
- access_token = self.authenticator.get_access_token()
- request_headers = {
- "Authorization": f"Bearer {access_token}",
- "X-Request-ID": str(uuid.uuid4()),
- "X-Correlation-ID": str(uuid.uuid4()),
- }
- response = request_with_retry_backoff("GET", f"{self.base_url}/{patient_id}", headers=request_headers)
+ headers = {"X-Request-ID": str(uuid.uuid4()), "X-Correlation-ID": str(uuid.uuid4())}
+ if self.authenticator is not None:
+ headers["Authorization"] = f"Bearer {self.authenticator.get_access_token()}"
+
+ response = request_with_retry_backoff("GET", f"{self.base_url}/{patient_id}", headers=headers)
if response.status_code == 200:
return response.json()
- elif response.status_code == 404:
+ if response.status_code == 404:
logger.info("Patient not found")
return None
- else:
- raise_error_response(response)
+ raise_error_response(response)
diff --git a/lambdas/shared/tests/test_common/api_clients/test_pds_details.py b/lambdas/shared/tests/test_common/api_clients/test_pds_details.py
index e58b430ee8..8021a1e952 100644
--- a/lambdas/shared/tests/test_common/api_clients/test_pds_details.py
+++ b/lambdas/shared/tests/test_common/api_clients/test_pds_details.py
@@ -9,6 +9,7 @@ class TestGetPdsPatientDetails(unittest.TestCase):
def setUp(self):
self.test_patient_id = "9912003888"
get_pds_service.__globals__["_pds_service"] = None
+ get_pds_service.__globals__["_pds_service_config"] = None
self.logger_patcher = patch("common.api_clients.get_pds_details.logger")
self.mock_logger = self.logger_patcher.start()
@@ -25,6 +26,7 @@ def setUp(self):
def tearDown(self):
get_pds_service.__globals__["_pds_service"] = None
+ get_pds_service.__globals__["_pds_service_config"] = None
patch.stopall()
def test_pds_get_patient_details_success(self):
@@ -88,3 +90,70 @@ def test_reuses_same_pds_service_instance(self):
self.mock_auth_class.assert_called_once()
self.mock_pds_service_class.assert_called_once()
self.assertEqual(self.mock_pds_service_instance.get_patient_details.call_count, 2)
+
+ @patch.dict("os.environ", {"PDS_ENV": "ref", "PDS_BASE_URL": "https://mock-pds.example/Patient"}, clear=False)
+ def test_uses_base_url_override_without_authenticator(self):
+ pds_get_patient_details(self.test_patient_id)
+
+ self.mock_auth_class.assert_not_called()
+ self.mock_pds_service_class.assert_called_once_with(
+ None,
+ "ref",
+ base_url="https://mock-pds.example/Patient",
+ )
+
+ self.mock_pds_service_instance.get_patient_details.assert_called_once_with(self.test_patient_id)
+
+ @patch.dict("os.environ", {"PDS_ENV": "ref", "PDS_BASE_URL": " "}, clear=False)
+ def test_whitespace_only_base_url_uses_authenticator(self):
+ pds_get_patient_details(self.test_patient_id)
+
+ self.mock_auth_class.assert_called_once()
+ self.mock_pds_service_class.assert_called_once_with(
+ self.mock_auth_instance,
+ "ref",
+ base_url=None,
+ )
+
+ @patch.dict("os.environ", {"PDS_ENV": "ref", "PDS_BASE_URL": "https://mock-pds-v1.example/Patient"}, clear=False)
+ def test_rebuilds_cached_service_when_base_url_changes(self):
+ pds_get_patient_details(self.test_patient_id)
+
+ self.mock_pds_service_class.reset_mock()
+ self.mock_pds_service_instance.get_patient_details.reset_mock()
+ new_instance = MagicMock()
+ self.mock_pds_service_class.return_value = new_instance
+
+ with patch.dict("os.environ", {"PDS_BASE_URL": "https://mock-pds-v2.example/Patient"}, clear=False):
+ pds_get_patient_details("1234567890")
+
+ self.mock_auth_class.assert_not_called()
+ self.mock_pds_service_class.assert_called_once_with(
+ None,
+ "ref",
+ base_url="https://mock-pds-v2.example/Patient",
+ )
+ new_instance.get_patient_details.assert_called_once_with("1234567890")
+
+ @patch.dict("os.environ", {"PDS_ENV": "int", "PDS_BASE_URL": ""}, clear=False)
+ def test_rebuilds_cached_service_when_environment_changes(self):
+ pds_get_patient_details(self.test_patient_id)
+
+ self.mock_pds_service_class.reset_mock()
+ self.mock_auth_class.reset_mock()
+ self.mock_pds_service_instance.get_patient_details.reset_mock()
+ new_auth = MagicMock()
+ new_service = MagicMock()
+ self.mock_auth_class.return_value = new_auth
+ self.mock_pds_service_class.return_value = new_service
+
+ with patch.dict("os.environ", {"PDS_ENV": "ref", "PDS_BASE_URL": ""}, clear=False):
+ pds_get_patient_details("1234567890")
+
+ self.mock_auth_class.assert_called_once_with(secret_manager_client=unittest.mock.ANY, environment="ref")
+ self.mock_pds_service_class.assert_called_once_with(
+ new_auth,
+ "ref",
+ base_url=None,
+ )
+ new_service.get_patient_details.assert_called_once_with("1234567890")
diff --git a/lambdas/shared/tests/test_common/api_clients/test_pds_service.py b/lambdas/shared/tests/test_common/api_clients/test_pds_service.py
index 48f7a8e250..2f58aeb875 100644
--- a/lambdas/shared/tests/test_common/api_clients/test_pds_service.py
+++ b/lambdas/shared/tests/test_common/api_clients/test_pds_service.py
@@ -77,3 +77,20 @@ def test_env_mapping(self):
env = "prod"
service = PdsService(None, env)
self.assertTrue(env not in service.base_url)
+
+ def test_custom_base_url_override(self):
+ service = PdsService(None, "ref", base_url="https://mock-pds.example/Patient/")
+
+ self.assertEqual(service.base_url, "https://mock-pds.example/Patient")
+
+ @responses.activate
+ def test_get_patient_details_without_authenticator(self):
+ patient_id = "900000009"
+ pds_url = f"https://mock-pds.example/Patient/{patient_id}"
+ responses.add(responses.GET, pds_url, json={"id": patient_id}, status=200)
+ pds_service = PdsService(None, "ref", base_url="https://mock-pds.example/Patient")
+
+ patient = pds_service.get_patient_details(patient_id)
+
+ self.assertEqual(patient, {"id": patient_id})
+ self.assertNotIn("Authorization", responses.calls[0].request.headers)
diff --git a/tests/perf_tests/Makefile b/tests/perf_tests/Makefile
index bf8fd833fd..ee1efecfa5 100644
--- a/tests/perf_tests/Makefile
+++ b/tests/perf_tests/Makefile
@@ -1,7 +1,46 @@
+LOCUST_FILE ?= src/locustfile.py
+LOCUST ?= poetry run locust -f $(LOCUST_FILE)
+MOCK_LOCUST_FILE ?= src/locustfile_pds_rate_limit.py
+MOCK_LOCUST ?= poetry run locust -f $(MOCK_LOCUST_FILE)
+
test:
- poetry run locust -f src/locustfile.py
+ $(LOCUST)
test-read-only:
- poetry run locust -f src/locustfile.py SearchUser
+ $(LOCUST) SearchUser
+
+baseline:
+ PERF_LOAD_PROFILE=baseline $(LOCUST) CreateUser
+
+spike:
+ PERF_LOAD_PROFILE=spike $(LOCUST) CreateUser
+
+ramp:
+ PERF_LOAD_PROFILE=ramp $(LOCUST) CreateUser
+
+mockserver:
+ poetry run python src/pds_rate_limit_stub_server.py
+
+mockpds:
+ $(MOCK_LOCUST) MockPdsUser
+
+mockpdstest-average:
+ PERF_LOAD_PROFILE=average PERF_MOCK_PDS_AVERAGE_RPS=125 $(MOCK_LOCUST) MockPdsUser
+
+mockpdstest-boundary:
+ PERF_LOAD_PROFILE=average PERF_MOCK_PDS_AVERAGE_RPS=130 $(MOCK_LOCUST) MockPdsUser
+
+mockpdstest-spike:
+ PERF_LOAD_PROFILE=spike $(MOCK_LOCUST) MockPdsUser
+
+mockpdstest-ui:
+ @if [ "$(PERF_LOAD_PROFILE)" != "average" ] && [ "$(PERF_LOAD_PROFILE)" != "spike" ]; then \
+ echo "PERF_LOAD_PROFILE must be set to average or spike"; \
+ exit 1; \
+ fi
+ @poetry run python src/pds_rate_limit_stub_server.py >/tmp/pds_rate_limit_stub_server.log 2>&1 & \
+ server_pid=$$!; \
+ trap 'kill $$server_pid >/dev/null 2>&1 || true' EXIT INT TERM; \
+ PERF_LOAD_PROFILE=$(PERF_LOAD_PROFILE) $(MOCK_LOCUST) MockPdsUser
-.PHONY: test test-read-only
+.PHONY: test test-read-only baseline spike ramp mockserver mockpds mockpdstest-average mockpdstest-boundary mockpdstest-spike mockpdstest-ui
diff --git a/tests/perf_tests/README.md b/tests/perf_tests/README.md
index 5ddede2dd8..465945cb94 100644
--- a/tests/perf_tests/README.md
+++ b/tests/perf_tests/README.md
@@ -4,6 +4,57 @@ This project contains Locust performance tests for the Immunisation FHIR API.
To run them, ensure you have the
`APIGEE_ENVIRONMENT` : Currently, only the ref environment is supported.
+`PERF_SUPPLIER_SYSTEM` : `EMIS` or `TPP`
`PERF_CREATE_RPS_PER_USER` : numeric
env vars set, and call `make test`.
+
+For read-only search load, use `make test-read-only` (runs the `SearchUser` Locust profile).
+
+For MNS-with-mocked-PDS capacity work, use the `CreateUser` profile so downstream publishing and PDS lookup activity is exercised.
+
+For direct mock-PDS rate limit testing, use the local test-only mock server in this folder.
+
+1. Start the local mock server in one terminal:
+ `make mockserver`
+2. Run mock rate tests in another terminal:
+ `make mockpdstest-average`, `make mockpdstest-boundary`, or `make mockpdstest-spike`
+
+The rate presets are baked in:
+
+- `make mockpdstest-average` runs at `125 rps`
+- `make mockpdstest-boundary` runs at `130 rps`
+
+Or run both in one command (starts local mock server and opens Locust UI):
+`PERF_LOAD_PROFILE=average make mockpdstest-ui`
+or
+`PERF_LOAD_PROFILE=spike make mockpdstest-ui`
+
+`src/locustfile_pds_rate_limit.py` defaults to `http://127.0.0.1:18080`.
+Set `MOCK_PDS_BASE_URL` explicitly only if you intentionally want to target a non-local endpoint.
+
+Local mock profile defaults are tuned for parity with earlier ref checks:
+
+- Average profile duration default: `180s`
+- Spike profile stages default: `10s warmup + 20s spike + 10s recovery`
+
+Available load profiles:
+
+- `make baseline`: holds traffic around the average acceptance threshold. Defaults to `125 rps` for `300s`.
+- `make spike`: warms up at the average threshold, bursts above the spike threshold, then recovers. Defaults to `125 rps`, then `460 rps`, then back to `125 rps`.
+- `make ramp`: increases traffic in fixed steps to identify the knee point and error envelope. Defaults to `50 rps` start, `25 rps` increments, `60s` per step, stopping after `500 rps`.
+
+Supported environment variables:
+
+- `PERF_LOAD_PROFILE`: `baseline`, `spike`, or `ramp`.
+- `PERF_BASELINE_RPS`, `PERF_BASELINE_DURATION_SECONDS`
+- `PERF_SPIKE_WARMUP_RPS`, `PERF_SPIKE_RPS`, `PERF_SPIKE_WARMUP_SECONDS`, `PERF_SPIKE_DURATION_SECONDS`, `PERF_SPIKE_RECOVERY_SECONDS`
+- `PERF_RAMP_START_RPS`, `PERF_RAMP_STEP_RPS`, `PERF_RAMP_MAX_RPS`, `PERF_RAMP_STEP_DURATION_SECONDS`
+ UI mode is used for perf runs in this folder.
+
+Suggested ref runbook:
+
+1. Run `make baseline` and confirm downstream create flow is stable at the average threshold.
+2. Run `make spike` and check whether 429 responses are isolated to the burst window and whether MNS publish failures remain within expected limits.
+3. Run `make ramp` to find the first step where latency, failures, or 429 volume becomes operationally unacceptable.
+4. Record success rate, 429 rate, and p95/p99 latency from the generated CSV files for campaign-capacity decisions.
diff --git a/tests/perf_tests/src/locustfile.py b/tests/perf_tests/src/locustfile.py
index 5a4fd09864..57e9cee7cd 100644
--- a/tests/perf_tests/src/locustfile.py
+++ b/tests/perf_tests/src/locustfile.py
@@ -1,4 +1,5 @@
import json
+import math
import os
import random
import uuid
@@ -6,7 +7,7 @@
from urllib.parse import urlencode
import pandas as pd
-from locust import HttpUser, constant_throughput, task
+from locust import HttpUser, LoadTestShape, constant_throughput, task
from common.api_clients.authentication import AppRestrictedAuth
from common.clients import get_secrets_manager_client
@@ -22,6 +23,18 @@
raise ValueError("APIGEE_ENVIRONMENT must be set")
PERF_CREATE_TASK_RPS_PER_USER = float(os.getenv("PERF_CREATE_RPS_PER_USER", "1"))
+PERF_LOAD_PROFILE = os.getenv("PERF_LOAD_PROFILE", "").strip().lower()
+PERF_BASELINE_RPS = int(os.getenv("PERF_BASELINE_RPS", "125"))
+PERF_BASELINE_DURATION_SECONDS = int(os.getenv("PERF_BASELINE_DURATION_SECONDS", "300"))
+PERF_SPIKE_WARMUP_RPS = int(os.getenv("PERF_SPIKE_WARMUP_RPS", "125"))
+PERF_SPIKE_RPS = int(os.getenv("PERF_SPIKE_RPS", "460"))
+PERF_SPIKE_WARMUP_SECONDS = int(os.getenv("PERF_SPIKE_WARMUP_SECONDS", "120"))
+PERF_SPIKE_DURATION_SECONDS = int(os.getenv("PERF_SPIKE_DURATION_SECONDS", "60"))
+PERF_SPIKE_RECOVERY_SECONDS = int(os.getenv("PERF_SPIKE_RECOVERY_SECONDS", "120"))
+PERF_RAMP_START_RPS = int(os.getenv("PERF_RAMP_START_RPS", "50"))
+PERF_RAMP_STEP_RPS = int(os.getenv("PERF_RAMP_STEP_RPS", "25"))
+PERF_RAMP_MAX_RPS = int(os.getenv("PERF_RAMP_MAX_RPS", "500"))
+PERF_RAMP_STEP_DURATION_SECONDS = int(os.getenv("PERF_RAMP_STEP_DURATION_SECONDS", "60"))
IMMUNIZATION_TARGETS = [
"3IN1",
@@ -54,6 +67,40 @@ def _load_valid_patients():
VALID_PATIENT_IDS = _load_valid_patients()
+def _users_for_target_rps(target_rps: int) -> int:
+ per_user_rps = PERF_CREATE_TASK_RPS_PER_USER if PERF_CREATE_TASK_RPS_PER_USER > 0 else 1
+ return max(1, math.ceil(target_rps / per_user_rps))
+
+
+class CampaignCapacityShape(LoadTestShape):
+ abstract = PERF_LOAD_PROFILE not in {"baseline", "spike", "ramp"}
+
+ def tick(self):
+ run_time = self.get_run_time()
+
+ if PERF_LOAD_PROFILE == "baseline":
+ if run_time >= PERF_BASELINE_DURATION_SECONDS:
+ return None
+ target_rps = PERF_BASELINE_RPS
+ elif PERF_LOAD_PROFILE == "spike":
+ if run_time < PERF_SPIKE_WARMUP_SECONDS:
+ target_rps = PERF_SPIKE_WARMUP_RPS
+ elif run_time < PERF_SPIKE_WARMUP_SECONDS + PERF_SPIKE_DURATION_SECONDS:
+ target_rps = PERF_SPIKE_RPS
+ elif run_time < PERF_SPIKE_WARMUP_SECONDS + PERF_SPIKE_DURATION_SECONDS + PERF_SPIKE_RECOVERY_SECONDS:
+ target_rps = PERF_SPIKE_WARMUP_RPS
+ else:
+ return None
+ else:
+ current_step = int(run_time // PERF_RAMP_STEP_DURATION_SECONDS)
+ target_rps = PERF_RAMP_START_RPS + (current_step * PERF_RAMP_STEP_RPS)
+ if target_rps > PERF_RAMP_MAX_RPS:
+ return None
+
+ user_count = _users_for_target_rps(target_rps)
+ return user_count, user_count
+
+
class BaseImmunizationUser(HttpUser):
abstract = True
diff --git a/tests/perf_tests/src/locustfile_pds_rate_limit.py b/tests/perf_tests/src/locustfile_pds_rate_limit.py
new file mode 100644
index 0000000000..af3d6857db
--- /dev/null
+++ b/tests/perf_tests/src/locustfile_pds_rate_limit.py
@@ -0,0 +1,143 @@
+"""Locust load generator for validating local mock PDS rate-limit behavior.
+
+This module drives two profiles against the local stub server:
+- average: sustained load around the threshold
+- spike: warmup, burst, and recovery phases
+"""
+
+import math
+import os
+import random
+from urllib.parse import urlparse
+
+from locust import HttpUser, LoadTestShape, constant_throughput, task
+
+PERF_LOAD_PROFILE = os.getenv("PERF_LOAD_PROFILE", "").strip().lower()
+MOCK_PDS_BASE_URL = os.getenv("MOCK_PDS_BASE_URL", "http://127.0.0.1:18080").strip().rstrip("/")
+
+
+def _validate_mock_pds_base_url(base_url: str) -> str:
+ """Validate that the target URL is an absolute HTTP(S) endpoint."""
+ if "<" in base_url or ">" in base_url:
+ raise ValueError(
+ "MOCK_PDS_BASE_URL still contains a placeholder. Set it to the real Lambda Function URL, "
+ "for example https://abc123.lambda-url.eu-west-2.on.aws"
+ )
+
+ parsed = urlparse(base_url)
+ if parsed.scheme not in {"http", "https"} or not parsed.netloc:
+ raise ValueError(
+ "MOCK_PDS_BASE_URL must be a valid absolute URL, for example https://abc123.lambda-url.eu-west-2.on.aws"
+ )
+
+ return base_url
+
+
+MOCK_PDS_BASE_URL = _validate_mock_pds_base_url(MOCK_PDS_BASE_URL)
+
+PERF_MOCK_PDS_RPS_PER_USER = float(os.getenv("PERF_MOCK_PDS_RPS_PER_USER", "1"))
+MOCK_PDS_VERIFY_TLS = os.getenv("MOCK_PDS_VERIFY_TLS", "false").strip().lower() in {"1", "true", "yes"}
+
+PERF_MOCK_PDS_AVERAGE_RPS = int(os.getenv("PERF_MOCK_PDS_AVERAGE_RPS", "140"))
+PERF_MOCK_PDS_AVERAGE_DURATION_SECONDS = int(os.getenv("PERF_MOCK_PDS_AVERAGE_DURATION_SECONDS", "180"))
+
+PERF_MOCK_PDS_SPIKE_WARMUP_RPS = int(os.getenv("PERF_MOCK_PDS_SPIKE_WARMUP_RPS", "125"))
+PERF_MOCK_PDS_SPIKE_RPS = int(os.getenv("PERF_MOCK_PDS_SPIKE_RPS", "460"))
+PERF_MOCK_PDS_SPIKE_WARMUP_SECONDS = int(os.getenv("PERF_MOCK_PDS_SPIKE_WARMUP_SECONDS", "10"))
+PERF_MOCK_PDS_SPIKE_DURATION_SECONDS = int(os.getenv("PERF_MOCK_PDS_SPIKE_DURATION_SECONDS", "20"))
+PERF_MOCK_PDS_SPIKE_RECOVERY_RPS = int(os.getenv("PERF_MOCK_PDS_SPIKE_RECOVERY_RPS", "125"))
+PERF_MOCK_PDS_SPIKE_RECOVERY_SECONDS = int(os.getenv("PERF_MOCK_PDS_SPIKE_RECOVERY_SECONDS", "10"))
+
+RATE_LIMIT_MESSAGE = "Mock PDS rate limit has been exceeded"
+
+
+def _users_for_target_rps(target_rps: int) -> int:
+ """Convert an RPS target into the number of Locust users to spawn."""
+ per_user_rps = PERF_MOCK_PDS_RPS_PER_USER if PERF_MOCK_PDS_RPS_PER_USER > 0 else 1
+ return max(1, math.ceil(target_rps / per_user_rps))
+
+
+class MockPdsRateLimitShape(LoadTestShape):
+ """Dynamic load shape used for average and spike rate-limit scenarios."""
+
+ abstract = PERF_LOAD_PROFILE not in {"average", "spike"}
+
+ def tick(self):
+ """Return `(user_count, spawn_rate)` for the current run time stage."""
+ run_time = self.get_run_time()
+
+ if PERF_LOAD_PROFILE == "average":
+ if run_time >= PERF_MOCK_PDS_AVERAGE_DURATION_SECONDS:
+ return None
+ target_rps = PERF_MOCK_PDS_AVERAGE_RPS
+ elif PERF_LOAD_PROFILE == "spike":
+ spike_end = PERF_MOCK_PDS_SPIKE_WARMUP_SECONDS + PERF_MOCK_PDS_SPIKE_DURATION_SECONDS
+ recovery_end = spike_end + PERF_MOCK_PDS_SPIKE_RECOVERY_SECONDS
+
+ if run_time < PERF_MOCK_PDS_SPIKE_WARMUP_SECONDS:
+ target_rps = PERF_MOCK_PDS_SPIKE_WARMUP_RPS
+ elif run_time < spike_end:
+ target_rps = PERF_MOCK_PDS_SPIKE_RPS
+ elif run_time < recovery_end:
+ target_rps = PERF_MOCK_PDS_SPIKE_RECOVERY_RPS
+ else:
+ return None
+ else:
+ return None
+
+ user_count = _users_for_target_rps(target_rps)
+ return user_count, user_count
+
+
+class MockPdsUser(HttpUser):
+ """Locust user that repeatedly calls mock PDS Patient lookup endpoints."""
+
+ wait_time = constant_throughput(PERF_MOCK_PDS_RPS_PER_USER)
+ host = MOCK_PDS_BASE_URL
+
+ def on_start(self):
+ """Apply configured TLS verification behavior to the HTTP client."""
+ self.client.verify = MOCK_PDS_VERIFY_TLS
+
+ @staticmethod
+ def _random_nhs_number() -> str:
+ """Generate a pseudo NHS number for request variation during load tests."""
+ return f"99{random.randint(10_000_000, 99_999_999)}"
+
+ @task
+ def get_patient(self):
+ """Execute one lookup request and classify outcomes for test reporting."""
+ with self.client.get(
+ f"/Patient/{self._random_nhs_number()}",
+ headers={"Accept": "application/fhir+json"},
+ name="Mock PDS Patient Lookup",
+ catch_response=True,
+ ) as response:
+ if response.status_code == 200:
+ response.success()
+ return
+
+ if response.status_code == 429:
+ try:
+ payload = response.json()
+ except ValueError:
+ response.failure(f"429 response was not valid JSON: {response.text}")
+ return
+
+ if payload.get("code") == 429 and payload.get("message") == RATE_LIMIT_MESSAGE:
+ response.failure(f"HTTP {response.status_code}: {RATE_LIMIT_MESSAGE}")
+ else:
+ response.failure(f"Unexpected 429 payload: {response.text}")
+ return
+
+ if response.status_code == 0:
+ error_detail = getattr(response, "error", None)
+ response.failure(
+ "Connection failed before reaching mock PDS. "
+ f"Check MOCK_PDS_BASE_URL={self.host}. "
+ f"TLS verification enabled={MOCK_PDS_VERIFY_TLS}. "
+ f"Underlying error: {error_detail!r}"
+ )
+ return
+
+ response.failure(f"Unexpected response: {response.status_code} {response.text}")
diff --git a/tests/perf_tests/src/pds_rate_limit_stub_server.py b/tests/perf_tests/src/pds_rate_limit_stub_server.py
new file mode 100644
index 0000000000..ad86646fcf
--- /dev/null
+++ b/tests/perf_tests/src/pds_rate_limit_stub_server.py
@@ -0,0 +1,143 @@
+"""Local HTTP stub that emulates PDS rate-limit behavior for perf tests.
+
+The server exposes `GET /Patient/` and applies fixed-window
+average and spike limits, returning:
+- 200 with empty body when allowed
+- 429 JSON error when throttled
+"""
+
+import json
+import os
+import threading
+import time
+from http import HTTPStatus
+from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
+
+RATE_LIMIT_MESSAGE = "Mock PDS rate limit has been exceeded"
+
+
+class LoadFriendlyThreadingHTTPServer(ThreadingHTTPServer):
+ """Threaded HTTP server configured for high local request concurrency."""
+
+ daemon_threads = True
+ allow_reuse_address = True
+ request_queue_size = 256
+
+
+class FixedWindowRateLimiter:
+ """In-memory fixed-window limiter for average and spike traffic windows."""
+
+ def __init__(
+ self,
+ average_limit: int,
+ average_window_seconds: int,
+ spike_limit: int,
+ spike_window_seconds: int,
+ ):
+ self._started_at = time.time()
+ self._windows = (
+ ("average", average_limit * average_window_seconds, average_window_seconds),
+ ("spike", spike_limit, spike_window_seconds),
+ )
+ self._lock = threading.Lock()
+ self._counts: dict[str, int] = {}
+
+ def check(self, scope: str) -> tuple[bool, str, int, int, int]:
+ """Evaluate current request count for each window and return decision metadata."""
+ with self._lock:
+ decision = None
+ for name, limit, seconds in self._windows:
+ # Anchor windows to server start so threshold behavior is stable across test runs.
+ bucket = int((time.time() - self._started_at) // seconds)
+ key = f"{scope}:{name}:{bucket}"
+ count = self._counts.get(key, 0) + 1
+ self._counts[key] = count
+ decision = (count <= limit, name, count, limit, seconds)
+ if not decision[0]:
+ break
+ self._cleanup_old_buckets()
+ return decision
+
+ def _cleanup_old_buckets(self) -> None:
+ """Prune old buckets to keep memory usage bounded during long runs."""
+ now = int(time.time())
+ keys_to_delete = []
+ for key in self._counts:
+ parts = key.rsplit(":", 2)
+ if len(parts) != 3:
+ continue
+ _, window_name, bucket_str = parts
+ seconds = next((w[2] for w in self._windows if w[0] == window_name), 1)
+ bucket_start = int(bucket_str) * seconds
+ if now - bucket_start > (seconds * 2):
+ keys_to_delete.append(key)
+
+ for key in keys_to_delete:
+ self._counts.pop(key, None)
+
+
+class MockPdsHandler(BaseHTTPRequestHandler):
+ """Request handler for mock PDS endpoints used in local load tests."""
+
+ rate_limiter: FixedWindowRateLimiter
+ protocol_version = "HTTP/1.1"
+
+ def _write_json(self, status: HTTPStatus, payload: dict) -> None:
+ """Send a JSON response with explicit content length and connection close."""
+ body = json.dumps(payload).encode("utf-8")
+ self.send_response(status)
+ self.send_header("Content-Type", "application/json")
+ self.send_header("Content-Length", str(len(body)))
+ self.send_header("Connection", "close")
+ self.end_headers()
+ self.wfile.write(body)
+
+ def _write_empty_200(self) -> None:
+ """Send an empty 200 response body to mirror acceptance criteria."""
+ self.send_response(HTTPStatus.OK)
+ self.send_header("Content-Length", "0")
+ self.send_header("Connection", "close")
+ self.end_headers()
+
+ def do_GET(self):
+ """Handle patient lookup requests and apply rate-limit responses."""
+ if not self.path.startswith("/Patient/"):
+ self._write_json(HTTPStatus.BAD_REQUEST, {"code": 400, "message": "Patient id is required"})
+ return
+
+ allowed, _, _, _, _ = self.rate_limiter.check("patient-lookup")
+ if not allowed:
+ self._write_json(HTTPStatus.TOO_MANY_REQUESTS, {"code": 429, "message": RATE_LIMIT_MESSAGE})
+ return
+
+ # Acceptance criteria: 200 with no response body below threshold.
+ self._write_empty_200()
+
+ def log_message(self, format: str, *args):
+ """Suppress default HTTP request logs for cleaner perf test output."""
+ return
+
+
+def main() -> None:
+ """Load config from env, initialize limiter, and start the stub server."""
+ host = os.getenv("MOCK_PDS_BIND_HOST", "127.0.0.1")
+ port = int(os.getenv("MOCK_PDS_BIND_PORT", "18080"))
+ average_limit = int(os.getenv("MOCK_PDS_AVERAGE_LIMIT", "125"))
+ average_window_seconds = int(os.getenv("MOCK_PDS_AVERAGE_WINDOW_SECONDS", "60"))
+ spike_limit = int(os.getenv("MOCK_PDS_SPIKE_LIMIT", "450"))
+ spike_window_seconds = int(os.getenv("MOCK_PDS_SPIKE_WINDOW_SECONDS", "1"))
+
+ MockPdsHandler.rate_limiter = FixedWindowRateLimiter(
+ average_limit=average_limit,
+ average_window_seconds=average_window_seconds,
+ spike_limit=spike_limit,
+ spike_window_seconds=spike_window_seconds,
+ )
+
+ server = LoadFriendlyThreadingHTTPServer((host, port), MockPdsHandler)
+ print(f"Mock PDS test server listening on http://{host}:{port}")
+ server.serve_forever()
+
+
+if __name__ == "__main__":
+ main()