diff --git a/.github/workflows/pr-deploy-and-test.yml b/.github/workflows/pr-deploy-and-test.yml index a76be32703..dd98995461 100644 --- a/.github/workflows/pr-deploy-and-test.yml +++ b/.github/workflows/pr-deploy-and-test.yml @@ -20,6 +20,10 @@ jobs: uses: ./.github/workflows/deploy-backend.yml with: apigee_environment: internal-dev + lambda_build_flags: >- + ${{ (github.event.action == 'opened' || github.event.action == 'reopened') + && '{"recordprocessor":true,"ack-backend":true}' + || '{}' }} diff_base_sha: ${{ github.event.action == 'synchronize' && github.event.before || github.event.pull_request.base.sha }} diff_head_sha: ${{ github.event.pull_request.head.sha }} run_diff_check: ${{ github.event.action == 'synchronize' }} @@ -35,8 +39,10 @@ jobs: include: - apigee_environment_name: internal-dev required_test_suite: smoke + require_matching_commit_id: true - apigee_environment_name: internal-dev-sandbox required_test_suite: sandbox + require_matching_commit_id: false uses: ./.github/workflows/run-e2e-automation-tests.yml with: apigee_environment: ${{ matrix.apigee_environment_name }} @@ -44,6 +50,8 @@ jobs: sub_environment: pr-${{github.event.pull_request.number}} service_under_test: all suite_to_run: ${{ matrix.required_test_suite }} + expected_commit_id: ${{ github.sha }} + require_matching_commit_id: ${{ matrix.require_matching_commit_id }} secrets: APIGEE_PASSWORD: ${{ secrets.APIGEE_PASSWORD }} APIGEE_BASIC_AUTH_TOKEN: ${{ secrets.APIGEE_BASIC_AUTH_TOKEN }} diff --git a/.github/workflows/quality-checks.yml b/.github/workflows/quality-checks.yml index 5566bb2b32..084ba1fe1a 100644 --- a/.github/workflows/quality-checks.yml +++ b/.github/workflows/quality-checks.yml @@ -9,6 +9,7 @@ on: env: SHARED_PATH: ${{ github.workspace }}/lambdas/shared LAMBDA_PATH: ${{ github.workspace }}/lambdas + POETRY_INSTALLER_ONLY_BINARY: ":all:" jobs: lint-specification: @@ -156,6 +157,7 @@ jobs: id: id_sync env: PYTHONPATH: ${{ env.LAMBDA_PATH }}/id_sync/src:${{ env.LAMBDA_PATH }}/id_sync/tests:${{ env.SHARED_PATH }}/src + POETRY_INSTALLER_ONLY_BINARY: "" continue-on-error: true run: | poetry install @@ -178,6 +180,7 @@ jobs: id: mnspublisher env: PYTHONPATH: ${{ env.LAMBDA_PATH }}/mns_publisher/src:${{ env.LAMBDA_PATH }}/mns_publisher/tests:${{ env.SHARED_PATH }}/src + POETRY_INSTALLER_ONLY_BINARY: "" continue-on-error: true run: | poetry install @@ -234,11 +237,24 @@ jobs: id: shared env: PYTHONPATH: ${{ env.SHARED_PATH }}/src + POETRY_INSTALLER_ONLY_BINARY: "" + GITHUB_WORKSPACE: ${{ github.workspace }} continue-on-error: true run: | poetry install poetry run coverage run --rcfile=.coveragerc --source=src -m unittest discover -s tests -p "test_*.py" -v || echo "shared tests failed" >> ../../failed_tests.txt poetry run coverage xml -o ../../shared-coverage.xml + python - <<'PY' + import os + import re + from pathlib import Path + + coverage_file = Path("../../shared-coverage.xml") + xml = coverage_file.read_text() + xml = re.sub(r".*?", f"{os.environ['GITHUB_WORKSPACE']}", xml, count=1) + xml = xml.replace('filename="src/', 'filename="lambdas/shared/src/') + coverage_file.write_text(xml) + PY - name: Run Test Failure Summary id: check_failure diff --git a/.github/workflows/run-e2e-automation-tests.yml b/.github/workflows/run-e2e-automation-tests.yml index ad367771b9..4f8daf56f5 100644 --- a/.github/workflows/run-e2e-automation-tests.yml +++ b/.github/workflows/run-e2e-automation-tests.yml @@ -18,6 +18,14 @@ on: suite_to_run: required: true type: string + expected_commit_id: + required: false + type: string + default: "" + require_matching_commit_id: + required: false + type: boolean + default: true secrets: APIGEE_PASSWORD: required: true @@ -72,6 +80,16 @@ on: description: Set to true if you want the MNS validation to be performed as part of the tests. please keep in mind it will increase execution time. default: false type: boolean + expected_commit_id: + description: Optional commit SHA expected from the deployed _status endpoint. + required: false + type: string + default: "" + require_matching_commit_id: + description: Whether the _status endpoint commitId must match the expected commit SHA. + required: false + type: boolean + default: true env: APIGEE_AUTH_ENV: ${{ inputs.apigee_environment == 'int' && inputs.apigee_environment || 'internal-dev' }} @@ -81,7 +99,8 @@ env: SERVICE_BASE_PATH: ${{ startsWith(inputs.sub_environment, 'pr-') && format('immunisation-fhir-api/FHIR/R4-{0}', inputs.sub_environment) || 'immunisation-fhir-api/FHIR/R4' }} PROXY_NAME: ${{ startsWith(inputs.sub_environment, 'pr-') && format('immunisation-fhir-api-{0}', inputs.sub_environment) || format('immunisation-fhir-api-{0}', inputs.apigee_environment) }} STATUS_API_KEY: ${{ secrets.STATUS_API_KEY }} - SOURCE_COMMIT_ID: ${{ github.sha }} + SOURCE_COMMIT_ID: ${{ inputs.expected_commit_id || github.sha }} + REQUIRE_MATCHING_COMMIT_ID: ${{ inputs.require_matching_commit_id }} MNS_VALIDATION_REQUIRED: ${{ inputs.mns_validation_required || startsWith(inputs.sub_environment, 'pr-') || inputs.apigee_environment == 'internal-dev' }} jobs: @@ -94,7 +113,6 @@ jobs: - name: Wait for API to be available if: github.event_name != 'workflow_dispatch' run: | - endpoint="" if [[ ${APIGEE_ENVIRONMENT} =~ "prod" ]]; then endpoint="https://api.service.nhs.uk/${SERVICE_BASE_PATH}/_status" else @@ -112,17 +130,21 @@ jobs: if [[ "${response_code}" -eq 200 ]] && [[ "${response_body}" == "OK" ]] && [[ "${status}" == "pass" ]]; then echo "Status test successful" + if [[ "${REQUIRE_MATCHING_COMMIT_ID}" != "true" ]]; then + echo "Skipping commit hash validation for ${APIGEE_ENVIRONMENT}" + break + fi if [[ "${commitId}" == "${SOURCE_COMMIT_ID}" ]]; then echo "Commit hash test successful" break else - echo "Waiting for ${endpoint} to return the correct commit hash..." + echo "Waiting for ${endpoint} to return the correct commit hash... expected ${SOURCE_COMMIT_ID}, got ${commitId}" fi else echo "Waiting for ${endpoint} to return a 200 response with 'OK' body..." fi - ((counter=counter+1)) # Increment counter by 1 + ((counter++)) echo "Attempt ${counter}" sleep 30 done diff --git a/infrastructure/instance/environments/dev/ref/variables.tfvars b/infrastructure/instance/environments/dev/ref/variables.tfvars index e6256cc114..53741cac7a 100644 --- a/infrastructure/instance/environments/dev/ref/variables.tfvars +++ b/infrastructure/instance/environments/dev/ref/variables.tfvars @@ -2,7 +2,7 @@ environment = "dev" immunisation_account_id = "345594581768" dspp_core_account_id = "603871901111" pds_environment = "ref" -mns_environment = "dev" +mns_environment = "int" error_alarm_notifications_enabled = true create_mesh_processor = false has_sub_environment_scope = true diff --git a/infrastructure/instance/id_sync_lambda.tf b/infrastructure/instance/id_sync_lambda.tf index 9e70f37f53..c3cf4cb41d 100644 --- a/infrastructure/instance/id_sync_lambda.tf +++ b/infrastructure/instance/id_sync_lambda.tf @@ -213,6 +213,7 @@ resource "aws_lambda_function" "id_sync_lambda" { variables = { IEDS_TABLE_NAME = aws_dynamodb_table.events-dynamodb-table.name PDS_ENV = var.pds_environment + PDS_BASE_URL = "" SPLUNK_FIREHOSE_NAME = module.splunk.firehose_stream_name } } diff --git a/infrastructure/instance/mns_publisher.tf b/infrastructure/instance/mns_publisher.tf index 9971a3bef6..b990e3f277 100644 --- a/infrastructure/instance/mns_publisher.tf +++ b/infrastructure/instance/mns_publisher.tf @@ -15,6 +15,7 @@ module "mns_publisher" { secrets_manager_policy_path = "${local.policy_path}/secret_manager.json" account_id = data.aws_caller_identity.current.account_id pds_environment = var.pds_environment + pds_base_url = "" mns_environment = var.mns_environment private_subnet_ids = local.private_subnet_ids diff --git a/infrastructure/instance/modules/mns_publisher/mns_publisher_lambda.tf b/infrastructure/instance/modules/mns_publisher/mns_publisher_lambda.tf index 720f1ead02..acb745899d 100644 --- a/infrastructure/instance/modules/mns_publisher/mns_publisher_lambda.tf +++ b/infrastructure/instance/modules/mns_publisher/mns_publisher_lambda.tf @@ -122,6 +122,7 @@ resource "aws_lambda_function" "mns_publisher_lambda" { IMMUNIZATION_ENV = var.resource_scope, IMMUNIZATION_BASE_PATH = var.imms_base_path PDS_ENV = var.pds_environment + PDS_BASE_URL = var.pds_base_url MNS_ENV = var.mns_environment } } diff --git a/infrastructure/instance/modules/mns_publisher/variables.tf b/infrastructure/instance/modules/mns_publisher/variables.tf index aca660ed94..fe3cc0a141 100644 --- a/infrastructure/instance/modules/mns_publisher/variables.tf +++ b/infrastructure/instance/modules/mns_publisher/variables.tf @@ -94,6 +94,12 @@ variable "pds_environment" { type = string } +variable "pds_base_url" { + type = string + default = "" + description = "Optional override for the PDS base URL, used by ref to route to the mock PDS endpoint." +} + variable "account_id" { type = string description = "AWS account ID used for IAM policy templating (e.g., Secrets Manager ARNs)." diff --git a/lambdas/mns_publisher/tests/test_lambda_handler.py b/lambdas/mns_publisher/tests/test_lambda_handler.py index 0ca5d7ff9f..5a520a033c 100644 --- a/lambdas/mns_publisher/tests/test_lambda_handler.py +++ b/lambdas/mns_publisher/tests/test_lambda_handler.py @@ -6,6 +6,7 @@ import responses from moto import mock_aws +import common.api_clients.get_pds_details as get_pds_details_module from lambda_handler import lambda_handler from process_records import extract_trace_ids, process_record, process_records from test_utils import generate_private_key_b64, load_sample_sqs_event @@ -246,6 +247,7 @@ class TestLambdaHandlerIntegration(unittest.TestCase): def setUp(self): """Set up mocked AWS services and test data.""" self.sample_sqs_record = load_sample_sqs_event() + get_pds_details_module._pds_service = None self.secrets_client = boto3.client("secretsmanager", region_name="eu-west-2") self.secrets_client.create_secret( Name="imms/pds/int/jwt-secrets", @@ -254,6 +256,9 @@ def setUp(self): ), ) + def tearDown(self): + get_pds_details_module._pds_service = None + @responses.activate @patch("common.api_clients.authentication.AppRestrictedAuth.get_access_token") @patch("process_records.logger") @@ -378,3 +383,49 @@ def test_successful_notification_creation_with_expired_gp(self, mock_logger, moc self.assertEqual(mns_payload["filtering"]["subjectage"], 21) mock_logger.info.assert_any_call("Successfully processed all 1 messages") + + @responses.activate + @patch.dict("os.environ", {"PDS_BASE_URL": "https://mock-pds.example/Patient"}, clear=False) + @patch("process_records._get_runtime_mns_service") + @patch("process_records.logger") + def test_successful_notification_creation_with_mock_pds_base_url(self, mock_logger, mock_get_mns): + responses.add( + responses.GET, + "https://mock-pds.example/Patient/9481152782", + json={"generalPractitioner": [{"identifier": {"value": "Y12345", "period": {"start": "2024-01-01"}}}]}, + status=200, + ) + + mock_mns_service = Mock() + mock_get_mns.return_value = mock_mns_service + + sqs_event = {"Records": [self.sample_sqs_record]} + result = lambda_handler(sqs_event, Mock()) + + self.assertEqual(result, {"batchItemFailures": []}) + mock_mns_service.publish_notification.assert_called_once() + mns_payload = mock_mns_service.publish_notification.call_args.args[0] + self.assertEqual(mns_payload["filtering"]["generalpractitioner"], "Y12345") + mock_logger.info.assert_any_call("Successfully processed all 1 messages") + + @responses.activate + @patch.dict("os.environ", {"PDS_BASE_URL": "https://mock-pds.example/Patient"}, clear=False) + @patch("process_records._get_runtime_mns_service") + @patch("process_records.logger") + def test_mock_pds_rate_limit_results_in_batch_failure(self, mock_logger, mock_get_mns): + responses.add( + responses.GET, + "https://mock-pds.example/Patient/9481152782", + json={"code": 429, "message": "Mock PDS rate limit has been exceeded"}, + status=429, + ) + + mock_mns_service = Mock() + mock_get_mns.return_value = mock_mns_service + + sqs_event = {"Records": [self.sample_sqs_record]} + result = lambda_handler(sqs_event, Mock()) + + self.assertEqual(len(result["batchItemFailures"]), 1) + mock_mns_service.publish_notification.assert_not_called() + mock_logger.warning.assert_called_with("Batch completed with 1 failures") diff --git a/lambdas/shared/src/common/api_clients/get_pds_details.py b/lambdas/shared/src/common/api_clients/get_pds_details.py index 7f728c81e8..437d6d6875 100644 --- a/lambdas/shared/src/common/api_clients/get_pds_details.py +++ b/lambdas/shared/src/common/api_clients/get_pds_details.py @@ -9,28 +9,31 @@ from common.api_clients.pds_service import PdsService from common.clients import get_secrets_manager_client, logger -PDS_ENV = os.getenv("PDS_ENV", "int") - _pds_service: PdsService | None = None +_pds_service_config: tuple[str, str | None] | None = None def get_pds_service() -> PdsService: - global _pds_service - if _pds_service is None: - authenticator = AppRestrictedAuth( - secret_manager_client=get_secrets_manager_client(), - environment=PDS_ENV, + global _pds_service, _pds_service_config + environment = os.getenv("PDS_ENV", "int") + base_url = os.getenv("PDS_BASE_URL", "").strip() or None + config = (environment, base_url) + + if _pds_service is None or _pds_service_config != config: + authenticator = ( + None + if base_url + else AppRestrictedAuth(secret_manager_client=get_secrets_manager_client(), environment=environment) ) - _pds_service = PdsService(authenticator, PDS_ENV) - + _pds_service = PdsService(authenticator, environment, base_url=base_url) + _pds_service_config = config return _pds_service # Get Patient details from external service PDS using NHS number from MNS notification def pds_get_patient_details(nhs_number: str) -> dict: try: - patient = get_pds_service().get_patient_details(nhs_number) - return patient + return get_pds_service().get_patient_details(nhs_number) except Exception as e: msg = "Error retrieving patient details from PDS" logger.exception(msg) diff --git a/lambdas/shared/src/common/api_clients/pds_service.py b/lambdas/shared/src/common/api_clients/pds_service.py index 90db22f957..42f952b4ec 100644 --- a/lambdas/shared/src/common/api_clients/pds_service.py +++ b/lambdas/shared/src/common/api_clients/pds_service.py @@ -7,31 +7,28 @@ class PdsService: - def __init__(self, authenticator: AppRestrictedAuth, environment): + def __init__( + self, + authenticator: AppRestrictedAuth | None, + environment: str, + base_url: str | None = None, + ): logger.info(f"PdsService init: {environment}") self.authenticator = authenticator - - self.base_url = ( - f"https://{environment}.api.service.nhs.uk/personal-demographics/FHIR/R4/Patient" - if environment != "prod" - else "https://api.service.nhs.uk/personal-demographics/FHIR/R4/Patient" - ) - + host = "api.service.nhs.uk" if environment == "prod" else f"{environment}.api.service.nhs.uk" + self.base_url = base_url.rstrip("/") if base_url else f"https://{host}/personal-demographics/FHIR/R4/Patient" logger.info(f"PDS Service URL: {self.base_url}") def get_patient_details(self, patient_id: str) -> dict | None: - access_token = self.authenticator.get_access_token() - request_headers = { - "Authorization": f"Bearer {access_token}", - "X-Request-ID": str(uuid.uuid4()), - "X-Correlation-ID": str(uuid.uuid4()), - } - response = request_with_retry_backoff("GET", f"{self.base_url}/{patient_id}", headers=request_headers) + headers = {"X-Request-ID": str(uuid.uuid4()), "X-Correlation-ID": str(uuid.uuid4())} + if self.authenticator is not None: + headers["Authorization"] = f"Bearer {self.authenticator.get_access_token()}" + + response = request_with_retry_backoff("GET", f"{self.base_url}/{patient_id}", headers=headers) if response.status_code == 200: return response.json() - elif response.status_code == 404: + if response.status_code == 404: logger.info("Patient not found") return None - else: - raise_error_response(response) + raise_error_response(response) diff --git a/lambdas/shared/tests/test_common/api_clients/test_pds_details.py b/lambdas/shared/tests/test_common/api_clients/test_pds_details.py index e58b430ee8..8021a1e952 100644 --- a/lambdas/shared/tests/test_common/api_clients/test_pds_details.py +++ b/lambdas/shared/tests/test_common/api_clients/test_pds_details.py @@ -9,6 +9,7 @@ class TestGetPdsPatientDetails(unittest.TestCase): def setUp(self): self.test_patient_id = "9912003888" get_pds_service.__globals__["_pds_service"] = None + get_pds_service.__globals__["_pds_service_config"] = None self.logger_patcher = patch("common.api_clients.get_pds_details.logger") self.mock_logger = self.logger_patcher.start() @@ -25,6 +26,7 @@ def setUp(self): def tearDown(self): get_pds_service.__globals__["_pds_service"] = None + get_pds_service.__globals__["_pds_service_config"] = None patch.stopall() def test_pds_get_patient_details_success(self): @@ -88,3 +90,70 @@ def test_reuses_same_pds_service_instance(self): self.mock_auth_class.assert_called_once() self.mock_pds_service_class.assert_called_once() self.assertEqual(self.mock_pds_service_instance.get_patient_details.call_count, 2) + + @patch.dict("os.environ", {"PDS_ENV": "ref", "PDS_BASE_URL": "https://mock-pds.example/Patient"}, clear=False) + def test_uses_base_url_override_without_authenticator(self): + pds_get_patient_details(self.test_patient_id) + + self.mock_auth_class.assert_not_called() + self.mock_pds_service_class.assert_called_once_with( + None, + "ref", + base_url="https://mock-pds.example/Patient", + ) + + self.mock_pds_service_instance.get_patient_details.assert_called_once_with(self.test_patient_id) + + @patch.dict("os.environ", {"PDS_ENV": "ref", "PDS_BASE_URL": " "}, clear=False) + def test_whitespace_only_base_url_uses_authenticator(self): + pds_get_patient_details(self.test_patient_id) + + self.mock_auth_class.assert_called_once() + self.mock_pds_service_class.assert_called_once_with( + self.mock_auth_instance, + "ref", + base_url=None, + ) + + @patch.dict("os.environ", {"PDS_ENV": "ref", "PDS_BASE_URL": "https://mock-pds-v1.example/Patient"}, clear=False) + def test_rebuilds_cached_service_when_base_url_changes(self): + pds_get_patient_details(self.test_patient_id) + + self.mock_pds_service_class.reset_mock() + self.mock_pds_service_instance.get_patient_details.reset_mock() + new_instance = MagicMock() + self.mock_pds_service_class.return_value = new_instance + + with patch.dict("os.environ", {"PDS_BASE_URL": "https://mock-pds-v2.example/Patient"}, clear=False): + pds_get_patient_details("1234567890") + + self.mock_auth_class.assert_not_called() + self.mock_pds_service_class.assert_called_once_with( + None, + "ref", + base_url="https://mock-pds-v2.example/Patient", + ) + new_instance.get_patient_details.assert_called_once_with("1234567890") + + @patch.dict("os.environ", {"PDS_ENV": "int", "PDS_BASE_URL": ""}, clear=False) + def test_rebuilds_cached_service_when_environment_changes(self): + pds_get_patient_details(self.test_patient_id) + + self.mock_pds_service_class.reset_mock() + self.mock_auth_class.reset_mock() + self.mock_pds_service_instance.get_patient_details.reset_mock() + new_auth = MagicMock() + new_service = MagicMock() + self.mock_auth_class.return_value = new_auth + self.mock_pds_service_class.return_value = new_service + + with patch.dict("os.environ", {"PDS_ENV": "ref", "PDS_BASE_URL": ""}, clear=False): + pds_get_patient_details("1234567890") + + self.mock_auth_class.assert_called_once_with(secret_manager_client=unittest.mock.ANY, environment="ref") + self.mock_pds_service_class.assert_called_once_with( + new_auth, + "ref", + base_url=None, + ) + new_service.get_patient_details.assert_called_once_with("1234567890") diff --git a/lambdas/shared/tests/test_common/api_clients/test_pds_service.py b/lambdas/shared/tests/test_common/api_clients/test_pds_service.py index 48f7a8e250..2f58aeb875 100644 --- a/lambdas/shared/tests/test_common/api_clients/test_pds_service.py +++ b/lambdas/shared/tests/test_common/api_clients/test_pds_service.py @@ -77,3 +77,20 @@ def test_env_mapping(self): env = "prod" service = PdsService(None, env) self.assertTrue(env not in service.base_url) + + def test_custom_base_url_override(self): + service = PdsService(None, "ref", base_url="https://mock-pds.example/Patient/") + + self.assertEqual(service.base_url, "https://mock-pds.example/Patient") + + @responses.activate + def test_get_patient_details_without_authenticator(self): + patient_id = "900000009" + pds_url = f"https://mock-pds.example/Patient/{patient_id}" + responses.add(responses.GET, pds_url, json={"id": patient_id}, status=200) + pds_service = PdsService(None, "ref", base_url="https://mock-pds.example/Patient") + + patient = pds_service.get_patient_details(patient_id) + + self.assertEqual(patient, {"id": patient_id}) + self.assertNotIn("Authorization", responses.calls[0].request.headers) diff --git a/tests/perf_tests/Makefile b/tests/perf_tests/Makefile index bf8fd833fd..ee1efecfa5 100644 --- a/tests/perf_tests/Makefile +++ b/tests/perf_tests/Makefile @@ -1,7 +1,46 @@ +LOCUST_FILE ?= src/locustfile.py +LOCUST ?= poetry run locust -f $(LOCUST_FILE) +MOCK_LOCUST_FILE ?= src/locustfile_pds_rate_limit.py +MOCK_LOCUST ?= poetry run locust -f $(MOCK_LOCUST_FILE) + test: - poetry run locust -f src/locustfile.py + $(LOCUST) test-read-only: - poetry run locust -f src/locustfile.py SearchUser + $(LOCUST) SearchUser + +baseline: + PERF_LOAD_PROFILE=baseline $(LOCUST) CreateUser + +spike: + PERF_LOAD_PROFILE=spike $(LOCUST) CreateUser + +ramp: + PERF_LOAD_PROFILE=ramp $(LOCUST) CreateUser + +mockserver: + poetry run python src/pds_rate_limit_stub_server.py + +mockpds: + $(MOCK_LOCUST) MockPdsUser + +mockpdstest-average: + PERF_LOAD_PROFILE=average PERF_MOCK_PDS_AVERAGE_RPS=125 $(MOCK_LOCUST) MockPdsUser + +mockpdstest-boundary: + PERF_LOAD_PROFILE=average PERF_MOCK_PDS_AVERAGE_RPS=130 $(MOCK_LOCUST) MockPdsUser + +mockpdstest-spike: + PERF_LOAD_PROFILE=spike $(MOCK_LOCUST) MockPdsUser + +mockpdstest-ui: + @if [ "$(PERF_LOAD_PROFILE)" != "average" ] && [ "$(PERF_LOAD_PROFILE)" != "spike" ]; then \ + echo "PERF_LOAD_PROFILE must be set to average or spike"; \ + exit 1; \ + fi + @poetry run python src/pds_rate_limit_stub_server.py >/tmp/pds_rate_limit_stub_server.log 2>&1 & \ + server_pid=$$!; \ + trap 'kill $$server_pid >/dev/null 2>&1 || true' EXIT INT TERM; \ + PERF_LOAD_PROFILE=$(PERF_LOAD_PROFILE) $(MOCK_LOCUST) MockPdsUser -.PHONY: test test-read-only +.PHONY: test test-read-only baseline spike ramp mockserver mockpds mockpdstest-average mockpdstest-boundary mockpdstest-spike mockpdstest-ui diff --git a/tests/perf_tests/README.md b/tests/perf_tests/README.md index 5ddede2dd8..465945cb94 100644 --- a/tests/perf_tests/README.md +++ b/tests/perf_tests/README.md @@ -4,6 +4,57 @@ This project contains Locust performance tests for the Immunisation FHIR API. To run them, ensure you have the `APIGEE_ENVIRONMENT` : Currently, only the ref environment is supported. +`PERF_SUPPLIER_SYSTEM` : `EMIS` or `TPP` `PERF_CREATE_RPS_PER_USER` : numeric env vars set, and call `make test`. + +For read-only search load, use `make test-read-only` (runs the `SearchUser` Locust profile). + +For MNS-with-mocked-PDS capacity work, use the `CreateUser` profile so downstream publishing and PDS lookup activity is exercised. + +For direct mock-PDS rate limit testing, use the local test-only mock server in this folder. + +1. Start the local mock server in one terminal: + `make mockserver` +2. Run mock rate tests in another terminal: + `make mockpdstest-average`, `make mockpdstest-boundary`, or `make mockpdstest-spike` + +The rate presets are baked in: + +- `make mockpdstest-average` runs at `125 rps` +- `make mockpdstest-boundary` runs at `130 rps` + +Or run both in one command (starts local mock server and opens Locust UI): +`PERF_LOAD_PROFILE=average make mockpdstest-ui` +or +`PERF_LOAD_PROFILE=spike make mockpdstest-ui` + +`src/locustfile_pds_rate_limit.py` defaults to `http://127.0.0.1:18080`. +Set `MOCK_PDS_BASE_URL` explicitly only if you intentionally want to target a non-local endpoint. + +Local mock profile defaults are tuned for parity with earlier ref checks: + +- Average profile duration default: `180s` +- Spike profile stages default: `10s warmup + 20s spike + 10s recovery` + +Available load profiles: + +- `make baseline`: holds traffic around the average acceptance threshold. Defaults to `125 rps` for `300s`. +- `make spike`: warms up at the average threshold, bursts above the spike threshold, then recovers. Defaults to `125 rps`, then `460 rps`, then back to `125 rps`. +- `make ramp`: increases traffic in fixed steps to identify the knee point and error envelope. Defaults to `50 rps` start, `25 rps` increments, `60s` per step, stopping after `500 rps`. + +Supported environment variables: + +- `PERF_LOAD_PROFILE`: `baseline`, `spike`, or `ramp`. +- `PERF_BASELINE_RPS`, `PERF_BASELINE_DURATION_SECONDS` +- `PERF_SPIKE_WARMUP_RPS`, `PERF_SPIKE_RPS`, `PERF_SPIKE_WARMUP_SECONDS`, `PERF_SPIKE_DURATION_SECONDS`, `PERF_SPIKE_RECOVERY_SECONDS` +- `PERF_RAMP_START_RPS`, `PERF_RAMP_STEP_RPS`, `PERF_RAMP_MAX_RPS`, `PERF_RAMP_STEP_DURATION_SECONDS` + UI mode is used for perf runs in this folder. + +Suggested ref runbook: + +1. Run `make baseline` and confirm downstream create flow is stable at the average threshold. +2. Run `make spike` and check whether 429 responses are isolated to the burst window and whether MNS publish failures remain within expected limits. +3. Run `make ramp` to find the first step where latency, failures, or 429 volume becomes operationally unacceptable. +4. Record success rate, 429 rate, and p95/p99 latency from the generated CSV files for campaign-capacity decisions. diff --git a/tests/perf_tests/src/locustfile.py b/tests/perf_tests/src/locustfile.py index 5a4fd09864..57e9cee7cd 100644 --- a/tests/perf_tests/src/locustfile.py +++ b/tests/perf_tests/src/locustfile.py @@ -1,4 +1,5 @@ import json +import math import os import random import uuid @@ -6,7 +7,7 @@ from urllib.parse import urlencode import pandas as pd -from locust import HttpUser, constant_throughput, task +from locust import HttpUser, LoadTestShape, constant_throughput, task from common.api_clients.authentication import AppRestrictedAuth from common.clients import get_secrets_manager_client @@ -22,6 +23,18 @@ raise ValueError("APIGEE_ENVIRONMENT must be set") PERF_CREATE_TASK_RPS_PER_USER = float(os.getenv("PERF_CREATE_RPS_PER_USER", "1")) +PERF_LOAD_PROFILE = os.getenv("PERF_LOAD_PROFILE", "").strip().lower() +PERF_BASELINE_RPS = int(os.getenv("PERF_BASELINE_RPS", "125")) +PERF_BASELINE_DURATION_SECONDS = int(os.getenv("PERF_BASELINE_DURATION_SECONDS", "300")) +PERF_SPIKE_WARMUP_RPS = int(os.getenv("PERF_SPIKE_WARMUP_RPS", "125")) +PERF_SPIKE_RPS = int(os.getenv("PERF_SPIKE_RPS", "460")) +PERF_SPIKE_WARMUP_SECONDS = int(os.getenv("PERF_SPIKE_WARMUP_SECONDS", "120")) +PERF_SPIKE_DURATION_SECONDS = int(os.getenv("PERF_SPIKE_DURATION_SECONDS", "60")) +PERF_SPIKE_RECOVERY_SECONDS = int(os.getenv("PERF_SPIKE_RECOVERY_SECONDS", "120")) +PERF_RAMP_START_RPS = int(os.getenv("PERF_RAMP_START_RPS", "50")) +PERF_RAMP_STEP_RPS = int(os.getenv("PERF_RAMP_STEP_RPS", "25")) +PERF_RAMP_MAX_RPS = int(os.getenv("PERF_RAMP_MAX_RPS", "500")) +PERF_RAMP_STEP_DURATION_SECONDS = int(os.getenv("PERF_RAMP_STEP_DURATION_SECONDS", "60")) IMMUNIZATION_TARGETS = [ "3IN1", @@ -54,6 +67,40 @@ def _load_valid_patients(): VALID_PATIENT_IDS = _load_valid_patients() +def _users_for_target_rps(target_rps: int) -> int: + per_user_rps = PERF_CREATE_TASK_RPS_PER_USER if PERF_CREATE_TASK_RPS_PER_USER > 0 else 1 + return max(1, math.ceil(target_rps / per_user_rps)) + + +class CampaignCapacityShape(LoadTestShape): + abstract = PERF_LOAD_PROFILE not in {"baseline", "spike", "ramp"} + + def tick(self): + run_time = self.get_run_time() + + if PERF_LOAD_PROFILE == "baseline": + if run_time >= PERF_BASELINE_DURATION_SECONDS: + return None + target_rps = PERF_BASELINE_RPS + elif PERF_LOAD_PROFILE == "spike": + if run_time < PERF_SPIKE_WARMUP_SECONDS: + target_rps = PERF_SPIKE_WARMUP_RPS + elif run_time < PERF_SPIKE_WARMUP_SECONDS + PERF_SPIKE_DURATION_SECONDS: + target_rps = PERF_SPIKE_RPS + elif run_time < PERF_SPIKE_WARMUP_SECONDS + PERF_SPIKE_DURATION_SECONDS + PERF_SPIKE_RECOVERY_SECONDS: + target_rps = PERF_SPIKE_WARMUP_RPS + else: + return None + else: + current_step = int(run_time // PERF_RAMP_STEP_DURATION_SECONDS) + target_rps = PERF_RAMP_START_RPS + (current_step * PERF_RAMP_STEP_RPS) + if target_rps > PERF_RAMP_MAX_RPS: + return None + + user_count = _users_for_target_rps(target_rps) + return user_count, user_count + + class BaseImmunizationUser(HttpUser): abstract = True diff --git a/tests/perf_tests/src/locustfile_pds_rate_limit.py b/tests/perf_tests/src/locustfile_pds_rate_limit.py new file mode 100644 index 0000000000..af3d6857db --- /dev/null +++ b/tests/perf_tests/src/locustfile_pds_rate_limit.py @@ -0,0 +1,143 @@ +"""Locust load generator for validating local mock PDS rate-limit behavior. + +This module drives two profiles against the local stub server: +- average: sustained load around the threshold +- spike: warmup, burst, and recovery phases +""" + +import math +import os +import random +from urllib.parse import urlparse + +from locust import HttpUser, LoadTestShape, constant_throughput, task + +PERF_LOAD_PROFILE = os.getenv("PERF_LOAD_PROFILE", "").strip().lower() +MOCK_PDS_BASE_URL = os.getenv("MOCK_PDS_BASE_URL", "http://127.0.0.1:18080").strip().rstrip("/") + + +def _validate_mock_pds_base_url(base_url: str) -> str: + """Validate that the target URL is an absolute HTTP(S) endpoint.""" + if "<" in base_url or ">" in base_url: + raise ValueError( + "MOCK_PDS_BASE_URL still contains a placeholder. Set it to the real Lambda Function URL, " + "for example https://abc123.lambda-url.eu-west-2.on.aws" + ) + + parsed = urlparse(base_url) + if parsed.scheme not in {"http", "https"} or not parsed.netloc: + raise ValueError( + "MOCK_PDS_BASE_URL must be a valid absolute URL, for example https://abc123.lambda-url.eu-west-2.on.aws" + ) + + return base_url + + +MOCK_PDS_BASE_URL = _validate_mock_pds_base_url(MOCK_PDS_BASE_URL) + +PERF_MOCK_PDS_RPS_PER_USER = float(os.getenv("PERF_MOCK_PDS_RPS_PER_USER", "1")) +MOCK_PDS_VERIFY_TLS = os.getenv("MOCK_PDS_VERIFY_TLS", "false").strip().lower() in {"1", "true", "yes"} + +PERF_MOCK_PDS_AVERAGE_RPS = int(os.getenv("PERF_MOCK_PDS_AVERAGE_RPS", "140")) +PERF_MOCK_PDS_AVERAGE_DURATION_SECONDS = int(os.getenv("PERF_MOCK_PDS_AVERAGE_DURATION_SECONDS", "180")) + +PERF_MOCK_PDS_SPIKE_WARMUP_RPS = int(os.getenv("PERF_MOCK_PDS_SPIKE_WARMUP_RPS", "125")) +PERF_MOCK_PDS_SPIKE_RPS = int(os.getenv("PERF_MOCK_PDS_SPIKE_RPS", "460")) +PERF_MOCK_PDS_SPIKE_WARMUP_SECONDS = int(os.getenv("PERF_MOCK_PDS_SPIKE_WARMUP_SECONDS", "10")) +PERF_MOCK_PDS_SPIKE_DURATION_SECONDS = int(os.getenv("PERF_MOCK_PDS_SPIKE_DURATION_SECONDS", "20")) +PERF_MOCK_PDS_SPIKE_RECOVERY_RPS = int(os.getenv("PERF_MOCK_PDS_SPIKE_RECOVERY_RPS", "125")) +PERF_MOCK_PDS_SPIKE_RECOVERY_SECONDS = int(os.getenv("PERF_MOCK_PDS_SPIKE_RECOVERY_SECONDS", "10")) + +RATE_LIMIT_MESSAGE = "Mock PDS rate limit has been exceeded" + + +def _users_for_target_rps(target_rps: int) -> int: + """Convert an RPS target into the number of Locust users to spawn.""" + per_user_rps = PERF_MOCK_PDS_RPS_PER_USER if PERF_MOCK_PDS_RPS_PER_USER > 0 else 1 + return max(1, math.ceil(target_rps / per_user_rps)) + + +class MockPdsRateLimitShape(LoadTestShape): + """Dynamic load shape used for average and spike rate-limit scenarios.""" + + abstract = PERF_LOAD_PROFILE not in {"average", "spike"} + + def tick(self): + """Return `(user_count, spawn_rate)` for the current run time stage.""" + run_time = self.get_run_time() + + if PERF_LOAD_PROFILE == "average": + if run_time >= PERF_MOCK_PDS_AVERAGE_DURATION_SECONDS: + return None + target_rps = PERF_MOCK_PDS_AVERAGE_RPS + elif PERF_LOAD_PROFILE == "spike": + spike_end = PERF_MOCK_PDS_SPIKE_WARMUP_SECONDS + PERF_MOCK_PDS_SPIKE_DURATION_SECONDS + recovery_end = spike_end + PERF_MOCK_PDS_SPIKE_RECOVERY_SECONDS + + if run_time < PERF_MOCK_PDS_SPIKE_WARMUP_SECONDS: + target_rps = PERF_MOCK_PDS_SPIKE_WARMUP_RPS + elif run_time < spike_end: + target_rps = PERF_MOCK_PDS_SPIKE_RPS + elif run_time < recovery_end: + target_rps = PERF_MOCK_PDS_SPIKE_RECOVERY_RPS + else: + return None + else: + return None + + user_count = _users_for_target_rps(target_rps) + return user_count, user_count + + +class MockPdsUser(HttpUser): + """Locust user that repeatedly calls mock PDS Patient lookup endpoints.""" + + wait_time = constant_throughput(PERF_MOCK_PDS_RPS_PER_USER) + host = MOCK_PDS_BASE_URL + + def on_start(self): + """Apply configured TLS verification behavior to the HTTP client.""" + self.client.verify = MOCK_PDS_VERIFY_TLS + + @staticmethod + def _random_nhs_number() -> str: + """Generate a pseudo NHS number for request variation during load tests.""" + return f"99{random.randint(10_000_000, 99_999_999)}" + + @task + def get_patient(self): + """Execute one lookup request and classify outcomes for test reporting.""" + with self.client.get( + f"/Patient/{self._random_nhs_number()}", + headers={"Accept": "application/fhir+json"}, + name="Mock PDS Patient Lookup", + catch_response=True, + ) as response: + if response.status_code == 200: + response.success() + return + + if response.status_code == 429: + try: + payload = response.json() + except ValueError: + response.failure(f"429 response was not valid JSON: {response.text}") + return + + if payload.get("code") == 429 and payload.get("message") == RATE_LIMIT_MESSAGE: + response.failure(f"HTTP {response.status_code}: {RATE_LIMIT_MESSAGE}") + else: + response.failure(f"Unexpected 429 payload: {response.text}") + return + + if response.status_code == 0: + error_detail = getattr(response, "error", None) + response.failure( + "Connection failed before reaching mock PDS. " + f"Check MOCK_PDS_BASE_URL={self.host}. " + f"TLS verification enabled={MOCK_PDS_VERIFY_TLS}. " + f"Underlying error: {error_detail!r}" + ) + return + + response.failure(f"Unexpected response: {response.status_code} {response.text}") diff --git a/tests/perf_tests/src/pds_rate_limit_stub_server.py b/tests/perf_tests/src/pds_rate_limit_stub_server.py new file mode 100644 index 0000000000..ad86646fcf --- /dev/null +++ b/tests/perf_tests/src/pds_rate_limit_stub_server.py @@ -0,0 +1,143 @@ +"""Local HTTP stub that emulates PDS rate-limit behavior for perf tests. + +The server exposes `GET /Patient/` and applies fixed-window +average and spike limits, returning: +- 200 with empty body when allowed +- 429 JSON error when throttled +""" + +import json +import os +import threading +import time +from http import HTTPStatus +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + +RATE_LIMIT_MESSAGE = "Mock PDS rate limit has been exceeded" + + +class LoadFriendlyThreadingHTTPServer(ThreadingHTTPServer): + """Threaded HTTP server configured for high local request concurrency.""" + + daemon_threads = True + allow_reuse_address = True + request_queue_size = 256 + + +class FixedWindowRateLimiter: + """In-memory fixed-window limiter for average and spike traffic windows.""" + + def __init__( + self, + average_limit: int, + average_window_seconds: int, + spike_limit: int, + spike_window_seconds: int, + ): + self._started_at = time.time() + self._windows = ( + ("average", average_limit * average_window_seconds, average_window_seconds), + ("spike", spike_limit, spike_window_seconds), + ) + self._lock = threading.Lock() + self._counts: dict[str, int] = {} + + def check(self, scope: str) -> tuple[bool, str, int, int, int]: + """Evaluate current request count for each window and return decision metadata.""" + with self._lock: + decision = None + for name, limit, seconds in self._windows: + # Anchor windows to server start so threshold behavior is stable across test runs. + bucket = int((time.time() - self._started_at) // seconds) + key = f"{scope}:{name}:{bucket}" + count = self._counts.get(key, 0) + 1 + self._counts[key] = count + decision = (count <= limit, name, count, limit, seconds) + if not decision[0]: + break + self._cleanup_old_buckets() + return decision + + def _cleanup_old_buckets(self) -> None: + """Prune old buckets to keep memory usage bounded during long runs.""" + now = int(time.time()) + keys_to_delete = [] + for key in self._counts: + parts = key.rsplit(":", 2) + if len(parts) != 3: + continue + _, window_name, bucket_str = parts + seconds = next((w[2] for w in self._windows if w[0] == window_name), 1) + bucket_start = int(bucket_str) * seconds + if now - bucket_start > (seconds * 2): + keys_to_delete.append(key) + + for key in keys_to_delete: + self._counts.pop(key, None) + + +class MockPdsHandler(BaseHTTPRequestHandler): + """Request handler for mock PDS endpoints used in local load tests.""" + + rate_limiter: FixedWindowRateLimiter + protocol_version = "HTTP/1.1" + + def _write_json(self, status: HTTPStatus, payload: dict) -> None: + """Send a JSON response with explicit content length and connection close.""" + body = json.dumps(payload).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.send_header("Connection", "close") + self.end_headers() + self.wfile.write(body) + + def _write_empty_200(self) -> None: + """Send an empty 200 response body to mirror acceptance criteria.""" + self.send_response(HTTPStatus.OK) + self.send_header("Content-Length", "0") + self.send_header("Connection", "close") + self.end_headers() + + def do_GET(self): + """Handle patient lookup requests and apply rate-limit responses.""" + if not self.path.startswith("/Patient/"): + self._write_json(HTTPStatus.BAD_REQUEST, {"code": 400, "message": "Patient id is required"}) + return + + allowed, _, _, _, _ = self.rate_limiter.check("patient-lookup") + if not allowed: + self._write_json(HTTPStatus.TOO_MANY_REQUESTS, {"code": 429, "message": RATE_LIMIT_MESSAGE}) + return + + # Acceptance criteria: 200 with no response body below threshold. + self._write_empty_200() + + def log_message(self, format: str, *args): + """Suppress default HTTP request logs for cleaner perf test output.""" + return + + +def main() -> None: + """Load config from env, initialize limiter, and start the stub server.""" + host = os.getenv("MOCK_PDS_BIND_HOST", "127.0.0.1") + port = int(os.getenv("MOCK_PDS_BIND_PORT", "18080")) + average_limit = int(os.getenv("MOCK_PDS_AVERAGE_LIMIT", "125")) + average_window_seconds = int(os.getenv("MOCK_PDS_AVERAGE_WINDOW_SECONDS", "60")) + spike_limit = int(os.getenv("MOCK_PDS_SPIKE_LIMIT", "450")) + spike_window_seconds = int(os.getenv("MOCK_PDS_SPIKE_WINDOW_SECONDS", "1")) + + MockPdsHandler.rate_limiter = FixedWindowRateLimiter( + average_limit=average_limit, + average_window_seconds=average_window_seconds, + spike_limit=spike_limit, + spike_window_seconds=spike_window_seconds, + ) + + server = LoadFriendlyThreadingHTTPServer((host, port), MockPdsHandler) + print(f"Mock PDS test server listening on http://{host}:{port}") + server.serve_forever() + + +if __name__ == "__main__": + main()