diff --git a/backend/pytest.ini b/backend/pytest.ini new file mode 100644 index 0000000..8375b90 --- /dev/null +++ b/backend/pytest.ini @@ -0,0 +1,14 @@ +[pytest] +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* +asyncio_mode = auto +addopts = + -v + --tb=short + --strict-markers + --disable-warnings + +markers = + asyncio: mark test as async diff --git a/backend/tests/README.md b/backend/tests/README.md new file mode 100644 index 0000000..efdd20f --- /dev/null +++ b/backend/tests/README.md @@ -0,0 +1,117 @@ +# Backend Unit Tests + +Comprehensive unit tests for the Insurance Management Platform backend API. + +## πŸ€– Generated by TestKraft + +These tests were automatically generated by **TestKraft** - an AI-powered test generation platform that creates comprehensive, production-ready unit tests for backend applications. + +## Test Coverage + +### Core Module Tests +- **test_config.py** - Configuration settings, environment variables, validation +- **test_security.py** - JWT token creation, password hashing, verification + +### Database Tests +- **test_session.py** - Database session management, async engine configuration + +### API Tests +- **test_main.py** - FastAPI application, endpoints, CORS, middleware + +### Model Tests +- **test_user.py** - User model, fields, relationships, validation +- **test_policy.py** - Policy model, PolicyType enum, constraints +- **test_claim.py** - Claim model, status handling, relationships +- **test_payment.py** - Payment model, transaction handling, status +- **test_models_init.py** - Model package exports and integration + +## Test Strategy + +All tests follow comprehensive testing principles: + +βœ… **Boundary Value Analysis** - Testing edge cases, min/max values, empty/null inputs +βœ… **Equivalence Class Partitioning** - Representative test cases for each behavior class +βœ… **Actual Behavior Testing** - Tests verify what the code actually does +βœ… **Mocked Dependencies** - Database, external services properly mocked +βœ… **Error Conditions** - Exception handling and validation tested + +## Running the Tests + +### Run all tests +```bash +cd backend +pytest +``` + +### Run specific test file +```bash +pytest tests/app/core/test_security.py +``` + +### Run with coverage +```bash +pytest --cov=app --cov-report=html +``` + +### Run tests matching a pattern +```bash +pytest -k "test_user" +``` + +### Run verbose output +```bash +pytest -v +``` + +## Test Structure + +``` +tests/ +β”œβ”€β”€ conftest.py # Shared fixtures and configuration +β”œβ”€β”€ app/ +β”‚ β”œβ”€β”€ core/ +β”‚ β”‚ β”œβ”€β”€ test_config.py +β”‚ β”‚ └── test_security.py +β”‚ β”œβ”€β”€ db/ +β”‚ β”‚ └── test_session.py +β”‚ β”œβ”€β”€ models/ +β”‚ β”‚ β”œβ”€β”€ test_user.py +β”‚ β”‚ β”œβ”€β”€ test_policy.py +β”‚ β”‚ β”œβ”€β”€ test_claim.py +β”‚ β”‚ β”œβ”€β”€ test_payment.py +β”‚ β”‚ └── test_models_init.py +β”‚ └── test_main.py +└── README.md +``` + +## Dependencies + +Required packages (already in requirements.txt): +- pytest >= 8.1.1 +- pytest-asyncio >= 0.23.5 +- httpx >= 0.27.0 + +## Fixtures + +**conftest.py** provides shared fixtures: +- `mock_settings` - Mock configuration settings +- `async_db_engine` - In-memory async database engine +- `async_db_session` - Async database session for testing + +## Test Statistics + +- **Total Test Files:** 10 +- **Framework:** pytest with async support +- **Backend Files Tested:** 9 Python modules +- **Test Categories:** Config, Security, Database, API, Models + +## Notes + +- Tests use in-memory SQLite database for speed +- All async operations properly handled with pytest-asyncio +- Mocked external dependencies for isolation +- Comprehensive boundary value and edge case testing + +--- + +πŸ€– **Generated by TestKraft** - AI-Powered Backend Test Generation diff --git a/backend/tests/__init__.py b/backend/tests/__init__.py new file mode 100644 index 0000000..7b330fc --- /dev/null +++ b/backend/tests/__init__.py @@ -0,0 +1 @@ +# Tests package initialization diff --git a/backend/tests/app/__init__.py b/backend/tests/app/__init__.py new file mode 100644 index 0000000..16cd941 --- /dev/null +++ b/backend/tests/app/__init__.py @@ -0,0 +1 @@ +# App tests package diff --git a/backend/tests/app/core/__init__.py b/backend/tests/app/core/__init__.py new file mode 100644 index 0000000..d62ca33 --- /dev/null +++ b/backend/tests/app/core/__init__.py @@ -0,0 +1 @@ +# Core tests package diff --git a/backend/tests/app/core/test_config.py b/backend/tests/app/core/test_config.py new file mode 100644 index 0000000..abae196 --- /dev/null +++ b/backend/tests/app/core/test_config.py @@ -0,0 +1,189 @@ +""" +Unit tests for app.core.config module. + +Tests the Settings configuration class with various scenarios including: +- Default values +- Environment variable loading +- Required fields validation +- Boundary value testing +""" +import pytest +from unittest.mock import patch, MagicMock +from pydantic import ValidationError + + +def test_settings_default_values(): + """Test Settings class with default values where applicable.""" + from app.core.config import Settings + + # Mock environment variables for required fields + with patch.dict('os.environ', { + 'SECRET_KEY': 'test_secret_key_123456789', + 'DATABASE_URL': 'postgresql://user:pass@localhost/db' + }): + settings = Settings() + + # Test default values + assert settings.PROJECT_NAME == "Insurance Management Platform" + assert settings.API_V1_STR == "/api/v1" + assert settings.ACCESS_TOKEN_EXPIRE_MINUTES == 60 * 24 * 7 # 7 days + assert settings.CORS_ORIGINS == "http://localhost:3000" + + # Test required fields are loaded + assert settings.SECRET_KEY == 'test_secret_key_123456789' + assert settings.DATABASE_URL == 'postgresql://user:pass@localhost/db' + + +def test_settings_custom_values(): + """Test Settings class with custom environment values.""" + from app.core.config import Settings + + custom_env = { + 'PROJECT_NAME': 'Custom Insurance Platform', + 'API_V1_STR': '/api/v2', + 'SECRET_KEY': 'custom_secret_key', + 'ACCESS_TOKEN_EXPIRE_MINUTES': '120', + 'CORS_ORIGINS': 'http://example.com,http://another.com', + 'DATABASE_URL': 'postgresql://custom:pass@db.example.com/insurance' + } + + with patch.dict('os.environ', custom_env, clear=True): + settings = Settings() + + assert settings.PROJECT_NAME == 'Custom Insurance Platform' + assert settings.API_V1_STR == '/api/v2' + assert settings.SECRET_KEY == 'custom_secret_key' + assert settings.ACCESS_TOKEN_EXPIRE_MINUTES == 120 + assert settings.CORS_ORIGINS == 'http://example.com,http://another.com' + assert settings.DATABASE_URL == 'postgresql://custom:pass@db.example.com/insurance' + + +def test_settings_missing_required_secret_key(): + """Test Settings validation fails when SECRET_KEY is missing.""" + from app.core.config import Settings + + with patch.dict('os.environ', { + 'DATABASE_URL': 'postgresql://user:pass@localhost/db' + }, clear=True): + with pytest.raises(ValidationError) as exc_info: + Settings() + + # Verify SECRET_KEY is mentioned in the error + assert 'SECRET_KEY' in str(exc_info.value) + + +def test_settings_missing_required_database_url(): + """Test Settings validation fails when DATABASE_URL is missing.""" + from app.core.config import Settings + + with patch.dict('os.environ', { + 'SECRET_KEY': 'test_secret_key' + }, clear=True): + with pytest.raises(ValidationError) as exc_info: + Settings() + + # Verify DATABASE_URL is mentioned in the error + assert 'DATABASE_URL' in str(exc_info.value) + + +def test_settings_boundary_access_token_expire_minutes_zero(): + """Test ACCESS_TOKEN_EXPIRE_MINUTES with boundary value of zero.""" + from app.core.config import Settings + + with patch.dict('os.environ', { + 'SECRET_KEY': 'test_secret', + 'DATABASE_URL': 'postgresql://user:pass@localhost/db', + 'ACCESS_TOKEN_EXPIRE_MINUTES': '0' + }): + settings = Settings() + assert settings.ACCESS_TOKEN_EXPIRE_MINUTES == 0 + + +def test_settings_boundary_access_token_expire_minutes_large(): + """Test ACCESS_TOKEN_EXPIRE_MINUTES with large value.""" + from app.core.config import Settings + + with patch.dict('os.environ', { + 'SECRET_KEY': 'test_secret', + 'DATABASE_URL': 'postgresql://user:pass@localhost/db', + 'ACCESS_TOKEN_EXPIRE_MINUTES': '525600' # 1 year in minutes + }): + settings = Settings() + assert settings.ACCESS_TOKEN_EXPIRE_MINUTES == 525600 + + +def test_settings_empty_cors_origins(): + """Test CORS_ORIGINS with empty string.""" + from app.core.config import Settings + + with patch.dict('os.environ', { + 'SECRET_KEY': 'test_secret', + 'DATABASE_URL': 'postgresql://user:pass@localhost/db', + 'CORS_ORIGINS': '' + }): + settings = Settings() + assert settings.CORS_ORIGINS == '' + + +def test_settings_multiple_cors_origins(): + """Test CORS_ORIGINS with multiple comma-separated values.""" + from app.core.config import Settings + + cors_value = 'http://localhost:3000,http://localhost:3001,https://example.com' + with patch.dict('os.environ', { + 'SECRET_KEY': 'test_secret', + 'DATABASE_URL': 'postgresql://user:pass@localhost/db', + 'CORS_ORIGINS': cors_value + }): + settings = Settings() + assert settings.CORS_ORIGINS == cors_value + + +def test_settings_singleton_instance(): + """Test that the settings singleton instance is accessible.""" + from app.core.config import settings + + # Verify the singleton exists and has expected attributes + assert hasattr(settings, 'PROJECT_NAME') + assert hasattr(settings, 'SECRET_KEY') + assert hasattr(settings, 'DATABASE_URL') + assert hasattr(settings, 'ACCESS_TOKEN_EXPIRE_MINUTES') + + +def test_settings_extra_fields_ignored(): + """Test that extra fields in environment are ignored due to extra='ignore'.""" + from app.core.config import Settings + + with patch.dict('os.environ', { + 'SECRET_KEY': 'test_secret', + 'DATABASE_URL': 'postgresql://user:pass@localhost/db', + 'UNKNOWN_FIELD': 'should_be_ignored', + 'ANOTHER_EXTRA': 'also_ignored' + }): + settings = Settings() + + # Should not raise an error, extra fields are ignored + assert not hasattr(settings, 'UNKNOWN_FIELD') + assert not hasattr(settings, 'ANOTHER_EXTRA') + + +def test_settings_api_v1_str_variations(): + """Test API_V1_STR with different valid path formats.""" + from app.core.config import Settings + + test_cases = [ + '/api/v1', + '/api/v2', + '/v1', + '', + '/custom/api/path' + ] + + for api_str in test_cases: + with patch.dict('os.environ', { + 'SECRET_KEY': 'test_secret', + 'DATABASE_URL': 'postgresql://user:pass@localhost/db', + 'API_V1_STR': api_str + }): + settings = Settings() + assert settings.API_V1_STR == api_str diff --git a/backend/tests/app/core/test_security.py b/backend/tests/app/core/test_security.py new file mode 100644 index 0000000..e89aac0 --- /dev/null +++ b/backend/tests/app/core/test_security.py @@ -0,0 +1,305 @@ +""" +Unit tests for app.core.security module. + +Tests JWT token creation, password hashing, and verification with: +- Token creation with various subjects +- Token expiration handling +- Password hashing and verification +- Boundary value testing +- Error conditions +""" +import pytest +from datetime import datetime, timedelta +from unittest.mock import patch, MagicMock +from jose import jwt, JWTError + + +def test_create_access_token_with_string_subject(mock_settings): + """Test create_access_token with a string subject.""" + from app.core.security import create_access_token, ALGORITHM + + subject = "user@example.com" + token = create_access_token(subject=subject) + + # Decode token to verify contents + decoded = jwt.decode(token, mock_settings.SECRET_KEY, algorithms=[ALGORITHM]) + + assert decoded["sub"] == subject + assert "exp" in decoded + + +def test_create_access_token_with_integer_subject(mock_settings): + """Test create_access_token with an integer subject (user ID).""" + from app.core.security import create_access_token, ALGORITHM + + subject = 12345 + token = create_access_token(subject=subject) + + # Decode token to verify contents + decoded = jwt.decode(token, mock_settings.SECRET_KEY, algorithms=[ALGORITHM]) + + # Subject should be converted to string + assert decoded["sub"] == "12345" + assert "exp" in decoded + + +def test_create_access_token_with_custom_expiration(mock_settings): + """Test create_access_token with custom expiration delta.""" + from app.core.security import create_access_token, ALGORITHM + + subject = "user@example.com" + custom_delta = timedelta(minutes=30) + + # Mock datetime to control time + with patch('app.core.security.datetime') as mock_datetime: + fixed_now = datetime(2024, 1, 1, 12, 0, 0) + mock_datetime.utcnow.return_value = fixed_now + + token = create_access_token(subject=subject, expires_delta=custom_delta) + + decoded = jwt.decode(token, mock_settings.SECRET_KEY, algorithms=[ALGORITHM]) + + # Calculate expected expiration + expected_exp = fixed_now + custom_delta + actual_exp = datetime.fromtimestamp(decoded["exp"]) + + assert decoded["sub"] == subject + assert abs((actual_exp - expected_exp).total_seconds()) < 1 # Within 1 second + + +def test_create_access_token_default_expiration(mock_settings): + """Test create_access_token uses default expiration from settings.""" + from app.core.security import create_access_token, ALGORITHM + + subject = "user@example.com" + + with patch('app.core.security.datetime') as mock_datetime: + fixed_now = datetime(2024, 1, 1, 12, 0, 0) + mock_datetime.utcnow.return_value = fixed_now + + token = create_access_token(subject=subject) + + decoded = jwt.decode(token, mock_settings.SECRET_KEY, algorithms=[ALGORITHM]) + + # Expected expiration based on settings + expected_exp = fixed_now + timedelta(minutes=mock_settings.ACCESS_TOKEN_EXPIRE_MINUTES) + actual_exp = datetime.fromtimestamp(decoded["exp"]) + + # Verify expiration matches settings default + assert abs((actual_exp - expected_exp).total_seconds()) < 1 + + +def test_create_access_token_zero_expiration(mock_settings): + """Test create_access_token with zero expiration delta (boundary value).""" + from app.core.security import create_access_token, ALGORITHM + + subject = "user@example.com" + zero_delta = timedelta(minutes=0) + + with patch('app.core.security.datetime') as mock_datetime: + fixed_now = datetime(2024, 1, 1, 12, 0, 0) + mock_datetime.utcnow.return_value = fixed_now + + token = create_access_token(subject=subject, expires_delta=zero_delta) + + decoded = jwt.decode(token, mock_settings.SECRET_KEY, algorithms=[ALGORITHM]) + + # Token should expire immediately + expected_exp = fixed_now + actual_exp = datetime.fromtimestamp(decoded["exp"]) + + assert abs((actual_exp - expected_exp).total_seconds()) < 1 + + +def test_create_access_token_negative_expiration(mock_settings): + """Test create_access_token with negative expiration delta.""" + from app.core.security import create_access_token, ALGORITHM + + subject = "user@example.com" + negative_delta = timedelta(minutes=-30) + + with patch('app.core.security.datetime') as mock_datetime: + fixed_now = datetime(2024, 1, 1, 12, 0, 0) + mock_datetime.utcnow.return_value = fixed_now + + token = create_access_token(subject=subject, expires_delta=negative_delta) + + decoded = jwt.decode(token, mock_settings.SECRET_KEY, algorithms=[ALGORITHM]) + + # Token should be expired (exp in the past) + expected_exp = fixed_now + negative_delta + actual_exp = datetime.fromtimestamp(decoded["exp"]) + + assert actual_exp < fixed_now + + +def test_create_access_token_empty_string_subject(mock_settings): + """Test create_access_token with empty string subject.""" + from app.core.security import create_access_token, ALGORITHM + + subject = "" + token = create_access_token(subject=subject) + + decoded = jwt.decode(token, mock_settings.SECRET_KEY, algorithms=[ALGORITHM]) + assert decoded["sub"] == "" + + +def test_create_access_token_none_subject(mock_settings): + """Test create_access_token with None subject.""" + from app.core.security import create_access_token, ALGORITHM + + subject = None + token = create_access_token(subject=subject) + + decoded = jwt.decode(token, mock_settings.SECRET_KEY, algorithms=[ALGORITHM]) + assert decoded["sub"] == "None" + + +def test_verify_password_correct_password(): + """Test verify_password returns True for correct password.""" + from app.core.security import verify_password, get_password_hash + + plain_password = "SecurePassword123!" + hashed_password = get_password_hash(plain_password) + + result = verify_password(plain_password, hashed_password) + assert result is True + + +def test_verify_password_incorrect_password(): + """Test verify_password returns False for incorrect password.""" + from app.core.security import verify_password, get_password_hash + + plain_password = "SecurePassword123!" + wrong_password = "WrongPassword456!" + hashed_password = get_password_hash(plain_password) + + result = verify_password(wrong_password, hashed_password) + assert result is False + + +def test_verify_password_empty_plain_password(): + """Test verify_password with empty plain password.""" + from app.core.security import verify_password, get_password_hash + + plain_password = "" + hashed_password = get_password_hash("SomePassword") + + result = verify_password(plain_password, hashed_password) + assert result is False + + +def test_verify_password_empty_hashed_password(): + """Test verify_password with empty hashed password.""" + from app.core.security import verify_password + + plain_password = "SomePassword" + hashed_password = "" + + result = verify_password(plain_password, hashed_password) + assert result is False + + +def test_get_password_hash_generates_different_hashes(): + """Test get_password_hash generates different hashes for same password (salt).""" + from app.core.security import get_password_hash + + password = "TestPassword123" + + hash1 = get_password_hash(password) + hash2 = get_password_hash(password) + + # Hashes should be different due to random salt + assert hash1 != hash2 + + # But both should be valid hashes + assert len(hash1) > 0 + assert len(hash2) > 0 + assert hash1.startswith("$2b$") + assert hash2.startswith("$2b$") + + +def test_get_password_hash_empty_password(): + """Test get_password_hash with empty password.""" + from app.core.security import get_password_hash + + password = "" + hashed = get_password_hash(password) + + # Should still generate a hash + assert len(hashed) > 0 + assert hashed.startswith("$2b$") + + +def test_get_password_hash_special_characters(): + """Test get_password_hash with special characters.""" + from app.core.security import get_password_hash, verify_password + + password = "P@ssw0rd!#$%^&*()_+-=[]{}|;':\",./<>?" + hashed = get_password_hash(password) + + # Verify the hash works correctly + assert verify_password(password, hashed) is True + + +def test_get_password_hash_unicode_characters(): + """Test get_password_hash with unicode characters.""" + from app.core.security import get_password_hash, verify_password + + password = "密码TestπŸ”’Password" + hashed = get_password_hash(password) + + # Verify the hash works with unicode + assert verify_password(password, hashed) is True + + +def test_get_password_hash_very_long_password(): + """Test get_password_hash with very long password (boundary value).""" + from app.core.security import get_password_hash, verify_password + + password = "a" * 1000 # 1000 character password + hashed = get_password_hash(password) + + # Verify the hash works for long passwords + assert verify_password(password, hashed) is True + + +def test_password_hash_format(): + """Test that password hashes follow bcrypt format.""" + from app.core.security import get_password_hash + + password = "TestPassword" + hashed = get_password_hash(password) + + # Bcrypt hashes start with $2b$ and have specific length + assert hashed.startswith("$2b$") + assert len(hashed) == 60 # Standard bcrypt hash length + + +def test_verify_password_case_sensitive(): + """Test that password verification is case-sensitive.""" + from app.core.security import get_password_hash, verify_password + + password = "TestPassword" + hashed = get_password_hash(password) + + # Different case should not match + assert verify_password("testpassword", hashed) is False + assert verify_password("TESTPASSWORD", hashed) is False + assert verify_password(password, hashed) is True + + +def test_pwd_context_configuration(): + """Test that pwd_context is configured correctly.""" + from app.core.security import pwd_context + + # Verify bcrypt is the scheme + assert "bcrypt" in pwd_context.schemes() + assert pwd_context.deprecated == "auto" + + +def test_algorithm_constant(): + """Test that ALGORITHM constant is set correctly.""" + from app.core.security import ALGORITHM + + assert ALGORITHM == "HS256" diff --git a/backend/tests/app/db/__init__.py b/backend/tests/app/db/__init__.py new file mode 100644 index 0000000..7ace235 --- /dev/null +++ b/backend/tests/app/db/__init__.py @@ -0,0 +1 @@ +# DB tests package diff --git a/backend/tests/app/db/test_session.py b/backend/tests/app/db/test_session.py new file mode 100644 index 0000000..fe6c30b --- /dev/null +++ b/backend/tests/app/db/test_session.py @@ -0,0 +1,197 @@ +""" +Unit tests for app.db.session module. + +Tests database session management including: +- Engine creation +- Session maker configuration +- get_db generator function +- Base model declaration +""" +import pytest +from unittest.mock import patch, MagicMock, AsyncMock +from sqlalchemy.ext.asyncio import AsyncSession, AsyncEngine +from sqlalchemy.orm import DeclarativeMeta + + +def test_engine_creation(mock_settings): + """Test that async engine is created with correct configuration.""" + from app.db.session import engine + + assert engine is not None + assert isinstance(engine, AsyncEngine) + + +def test_async_session_local_configuration(mock_settings): + """Test AsyncSessionLocal is configured correctly.""" + from app.db.session import AsyncSessionLocal + + assert AsyncSessionLocal is not None + # Verify session factory configuration + assert AsyncSessionLocal.kw.get('expire_on_commit') is False + assert AsyncSessionLocal.kw.get('autoflush') is False + + +def test_base_declarative_base(): + """Test that Base is a valid declarative base.""" + from app.db.session import Base + + assert Base is not None + # Verify Base is a DeclarativeMeta + assert isinstance(Base, DeclarativeMeta) + + +@pytest.mark.asyncio +async def test_get_db_yields_session(): + """Test that get_db yields an AsyncSession.""" + from app.db.session import get_db + + # Mock AsyncSessionLocal context manager + with patch('app.db.session.AsyncSessionLocal') as mock_session_maker: + mock_session = AsyncMock(spec=AsyncSession) + mock_session_maker.return_value.__aenter__.return_value = mock_session + + # Call get_db generator + async for session in get_db(): + assert session is mock_session + # Verify it yields an AsyncSession + break + + +@pytest.mark.asyncio +async def test_get_db_closes_session_after_use(): + """Test that get_db properly closes session after use.""" + from app.db.session import get_db + + mock_session = AsyncMock(spec=AsyncSession) + + with patch('app.db.session.AsyncSessionLocal') as mock_session_maker: + mock_session_maker.return_value.__aenter__.return_value = mock_session + mock_session_maker.return_value.__aexit__ = AsyncMock() + + # Use get_db in async context + async for session in get_db(): + pass + + # Verify session context was entered and exited + mock_session_maker.return_value.__aenter__.assert_called_once() + mock_session_maker.return_value.__aexit__.assert_called_once() + + +@pytest.mark.asyncio +async def test_get_db_exception_handling(): + """Test that get_db properly handles exceptions and closes session.""" + from app.db.session import get_db + + mock_session = AsyncMock(spec=AsyncSession) + + with patch('app.db.session.AsyncSessionLocal') as mock_session_maker: + mock_session_maker.return_value.__aenter__.return_value = mock_session + mock_exit = AsyncMock() + mock_session_maker.return_value.__aexit__ = mock_exit + + try: + async for session in get_db(): + # Simulate an exception during session use + raise ValueError("Test exception") + except ValueError: + pass + + # Verify session cleanup was called even after exception + mock_exit.assert_called_once() + + +def test_engine_echo_disabled(mock_settings): + """Test that database engine echo is disabled.""" + from app.db.session import engine + + # Echo should be False for production + assert engine.echo is False + + +def test_base_metadata_exists(): + """Test that Base has metadata attribute.""" + from app.db.session import Base + + assert hasattr(Base, 'metadata') + assert Base.metadata is not None + + +def test_base_can_be_used_for_table_definition(): + """Test that Base can be used to define a simple table.""" + from app.db.session import Base + from sqlalchemy import Column, Integer, String + + # Create a test model class + class TestModel(Base): + __tablename__ = "test_table" + id = Column(Integer, primary_key=True) + name = Column(String(50)) + + # Verify model was created successfully + assert TestModel.__tablename__ == "test_table" + assert hasattr(TestModel, 'id') + assert hasattr(TestModel, 'name') + + +def test_session_maker_class_is_async_session(): + """Test that AsyncSessionLocal creates AsyncSession instances.""" + from app.db.session import AsyncSessionLocal + + # Verify the class_ parameter is AsyncSession + assert AsyncSessionLocal.kw.get('class_') == AsyncSession + + +@pytest.mark.asyncio +async def test_get_db_single_use(): + """Test that get_db is a generator that yields exactly once.""" + from app.db.session import get_db + + mock_session = AsyncMock(spec=AsyncSession) + + with patch('app.db.session.AsyncSessionLocal') as mock_session_maker: + mock_session_maker.return_value.__aenter__.return_value = mock_session + + yield_count = 0 + async for session in get_db(): + yield_count += 1 + + # Should yield exactly once + assert yield_count == 1 + + +@pytest.mark.asyncio +async def test_get_db_returns_new_session_each_call(): + """Test that each call to get_db creates a new session.""" + from app.db.session import get_db + + with patch('app.db.session.AsyncSessionLocal') as mock_session_maker: + mock_session1 = AsyncMock(spec=AsyncSession) + mock_session2 = AsyncMock(spec=AsyncSession) + + # Configure mock to return different sessions on subsequent calls + mock_session_maker.return_value.__aenter__.side_effect = [mock_session1, mock_session2] + + # First call + async for session1 in get_db(): + pass + + # Second call + async for session2 in get_db(): + pass + + # AsyncSessionLocal should be called twice + assert mock_session_maker.call_count == 2 + + +def test_database_url_used_from_settings(mock_settings): + """Test that engine is created with DATABASE_URL from settings.""" + with patch('app.db.session.create_async_engine') as mock_create_engine: + # Re-import to trigger engine creation with mock + import importlib + import app.db.session + importlib.reload(app.db.session) + + # Verify create_async_engine was called with settings.DATABASE_URL + mock_create_engine.assert_called_once() + call_args = mock_create_engine.call_args + assert mock_settings.DATABASE_URL in str(call_args) diff --git a/backend/tests/app/models/__init__.py b/backend/tests/app/models/__init__.py new file mode 100644 index 0000000..a595122 --- /dev/null +++ b/backend/tests/app/models/__init__.py @@ -0,0 +1 @@ +# Models tests package diff --git a/backend/tests/app/models/test_claim.py b/backend/tests/app/models/test_claim.py new file mode 100644 index 0000000..e1603af --- /dev/null +++ b/backend/tests/app/models/test_claim.py @@ -0,0 +1,648 @@ +""" +Unit tests for app.models.claim module. + +Tests the Claim model including: +- Model structure and fields +- Default values +- Field constraints +- Relationships +- Boundary value testing +""" +import pytest +from datetime import datetime +from sqlalchemy import inspect + + +def test_claim_model_exists(): + """Test that Claim model can be imported.""" + from app.models.claim import Claim + + assert Claim is not None + + +def test_claim_model_tablename(): + """Test that Claim model has correct table name.""" + from app.models.claim import Claim + + assert Claim.__tablename__ == "claims" + + +def test_claim_model_fields(): + """Test that Claim model has all expected fields.""" + from app.models.claim import Claim + + mapper = inspect(Claim) + column_names = [column.key for column in mapper.columns] + + expected_fields = [ + 'id', 'user_id', 'policy_id', 'amount_requested', + 'description', 'status', 'document_url', + 'created_at', 'updated_at' + ] + + for field in expected_fields: + assert field in column_names, f"Field {field} not found in Claim model" + + +def test_claim_id_field_properties(): + """Test Claim.id field properties.""" + from app.models.claim import Claim + + mapper = inspect(Claim) + id_column = mapper.columns['id'] + + assert id_column.primary_key is True + assert id_column.index is True + + +def test_claim_user_id_field_properties(): + """Test Claim.user_id field properties.""" + from app.models.claim import Claim + + mapper = inspect(Claim) + user_id_column = mapper.columns['user_id'] + + assert user_id_column.nullable is False + assert len(user_id_column.foreign_keys) > 0 + + +def test_claim_policy_id_field_properties(): + """Test Claim.policy_id field properties.""" + from app.models.claim import Claim + + mapper = inspect(Claim) + policy_id_column = mapper.columns['policy_id'] + + assert policy_id_column.nullable is False + assert len(policy_id_column.foreign_keys) > 0 + + +def test_claim_amount_requested_field_properties(): + """Test Claim.amount_requested field properties.""" + from app.models.claim import Claim + + mapper = inspect(Claim) + amount_column = mapper.columns['amount_requested'] + + assert amount_column.nullable is False + assert amount_column.type.python_type == float + + +def test_claim_description_field_properties(): + """Test Claim.description field properties.""" + from app.models.claim import Claim + + mapper = inspect(Claim) + description_column = mapper.columns['description'] + + assert description_column.nullable is False + # Text type doesn't have a length limit + + +def test_claim_status_field_properties(): + """Test Claim.status field properties and default value.""" + from app.models.claim import Claim + + mapper = inspect(Claim) + status_column = mapper.columns['status'] + + assert status_column.default.arg == "pending" + assert status_column.type.length == 50 + + +def test_claim_document_url_field_properties(): + """Test Claim.document_url field properties.""" + from app.models.claim import Claim + + mapper = inspect(Claim) + doc_url_column = mapper.columns['document_url'] + + assert doc_url_column.nullable is True + assert doc_url_column.type.length == 500 + + +def test_claim_relationships(): + """Test that Claim model has expected relationships.""" + from app.models.claim import Claim + + mapper = inspect(Claim) + relationships = [rel.key for rel in mapper.relationships] + + assert 'user' in relationships + assert 'policy' in relationships + + +def test_claim_user_relationship_properties(): + """Test Claim.user relationship configuration.""" + from app.models.claim import Claim + + mapper = inspect(Claim) + user_rel = mapper.relationships['user'] + + assert user_rel.back_populates == 'claims' + + +def test_claim_policy_relationship_properties(): + """Test Claim.policy relationship configuration.""" + from app.models.claim import Claim + + mapper = inspect(Claim) + policy_rel = mapper.relationships['policy'] + + assert policy_rel.back_populates == 'claims' + + +@pytest.mark.asyncio +async def test_create_claim_instance(async_db_session): + """Test creating a Claim instance.""" + from app.models.claim import Claim + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + # Create user and policy + user = User(email="claimant@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + # Create claim + claim = Claim( + user_id=user.id, + policy_id=policy.id, + amount_requested=5000.0, + description="Medical expenses for surgery" + ) + + async_db_session.add(claim) + await async_db_session.commit() + await async_db_session.refresh(claim) + + assert claim.id is not None + assert claim.user_id == user.id + assert claim.policy_id == policy.id + assert claim.amount_requested == 5000.0 + assert claim.description == "Medical expenses for surgery" + assert claim.status == "pending" + assert claim.document_url is None + + +@pytest.mark.asyncio +async def test_claim_default_status(async_db_session): + """Test Claim gets default status when not specified.""" + from app.models.claim import Claim + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + claim = Claim( + user_id=user.id, + policy_id=policy.id, + amount_requested=1000.0, + description="Test claim" + ) + + async_db_session.add(claim) + await async_db_session.commit() + await async_db_session.refresh(claim) + + assert claim.status == "pending" + + +@pytest.mark.asyncio +async def test_claim_status_pending(async_db_session): + """Test Claim with pending status.""" + from app.models.claim import Claim + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + claim = Claim( + user_id=user.id, + policy_id=policy.id, + amount_requested=2000.0, + description="Test", + status="pending" + ) + + async_db_session.add(claim) + await async_db_session.commit() + await async_db_session.refresh(claim) + + assert claim.status == "pending" + + +@pytest.mark.asyncio +async def test_claim_status_approved(async_db_session): + """Test Claim with approved status.""" + from app.models.claim import Claim + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + claim = Claim( + user_id=user.id, + policy_id=policy.id, + amount_requested=3000.0, + description="Approved claim", + status="approved" + ) + + async_db_session.add(claim) + await async_db_session.commit() + await async_db_session.refresh(claim) + + assert claim.status == "approved" + + +@pytest.mark.asyncio +async def test_claim_status_rejected(async_db_session): + """Test Claim with rejected status.""" + from app.models.claim import Claim + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + claim = Claim( + user_id=user.id, + policy_id=policy.id, + amount_requested=100000.0, + description="Rejected claim - exceeds coverage", + status="rejected" + ) + + async_db_session.add(claim) + await async_db_session.commit() + await async_db_session.refresh(claim) + + assert claim.status == "rejected" + + +@pytest.mark.asyncio +async def test_claim_with_document_url(async_db_session): + """Test Claim with document URL.""" + from app.models.claim import Claim + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + document_url = "https://example.com/documents/claim_12345.pdf" + + claim = Claim( + user_id=user.id, + policy_id=policy.id, + amount_requested=2500.0, + description="Claim with document", + document_url=document_url + ) + + async_db_session.add(claim) + await async_db_session.commit() + await async_db_session.refresh(claim) + + assert claim.document_url == document_url + + +@pytest.mark.asyncio +async def test_claim_without_document_url(async_db_session): + """Test Claim without document URL (None).""" + from app.models.claim import Claim + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + claim = Claim( + user_id=user.id, + policy_id=policy.id, + amount_requested=1500.0, + description="Claim without document" + ) + + async_db_session.add(claim) + await async_db_session.commit() + await async_db_session.refresh(claim) + + assert claim.document_url is None + + +@pytest.mark.asyncio +async def test_claim_amount_boundary_zero(async_db_session): + """Test Claim with zero amount requested (boundary value).""" + from app.models.claim import Claim + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + claim = Claim( + user_id=user.id, + policy_id=policy.id, + amount_requested=0.0, + description="Zero amount claim" + ) + + async_db_session.add(claim) + await async_db_session.commit() + await async_db_session.refresh(claim) + + assert claim.amount_requested == 0.0 + + +@pytest.mark.asyncio +async def test_claim_amount_boundary_large(async_db_session): + """Test Claim with very large amount requested (boundary value).""" + from app.models.claim import Claim + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="life", + premium=5000.0, + coverage_amount=1000000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + large_amount = 999999.99 + + claim = Claim( + user_id=user.id, + policy_id=policy.id, + amount_requested=large_amount, + description="Large claim amount" + ) + + async_db_session.add(claim) + await async_db_session.commit() + await async_db_session.refresh(claim) + + assert claim.amount_requested == large_amount + + +@pytest.mark.asyncio +async def test_claim_description_short(async_db_session): + """Test Claim with short description (boundary value).""" + from app.models.claim import Claim + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + claim = Claim( + user_id=user.id, + policy_id=policy.id, + amount_requested=1000.0, + description="X" + ) + + async_db_session.add(claim) + await async_db_session.commit() + await async_db_session.refresh(claim) + + assert claim.description == "X" + + +@pytest.mark.asyncio +async def test_claim_description_long(async_db_session): + """Test Claim with very long description.""" + from app.models.claim import Claim + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + long_description = "A" * 5000 # Very long description + + claim = Claim( + user_id=user.id, + policy_id=policy.id, + amount_requested=1000.0, + description=long_description + ) + + async_db_session.add(claim) + await async_db_session.commit() + await async_db_session.refresh(claim) + + assert claim.description == long_description + assert len(claim.description) == 5000 + + +@pytest.mark.asyncio +async def test_claim_created_at_auto_set(async_db_session): + """Test Claim.created_at is automatically set.""" + from app.models.claim import Claim + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + claim = Claim( + user_id=user.id, + policy_id=policy.id, + amount_requested=1000.0, + description="Test" + ) + + async_db_session.add(claim) + await async_db_session.commit() + await async_db_session.refresh(claim) + + assert claim.created_at is not None + assert isinstance(claim.created_at, datetime) + + +def test_claim_model_inherits_from_base(): + """Test that Claim model inherits from Base.""" + from app.models.claim import Claim + from app.db.session import Base + + assert issubclass(Claim, Base.__class__) diff --git a/backend/tests/app/models/test_models_init.py b/backend/tests/app/models/test_models_init.py new file mode 100644 index 0000000..f0c2021 --- /dev/null +++ b/backend/tests/app/models/test_models_init.py @@ -0,0 +1,113 @@ +""" +Unit tests for app.models.__init__ module. + +Tests that all models are properly exported from the models package. +""" +import pytest + + +def test_models_init_imports_base(): + """Test that Base is imported in models.__init__.""" + from app.models import Base + + assert Base is not None + + +def test_models_init_imports_user(): + """Test that User model is imported in models.__init__.""" + from app.models import User + + assert User is not None + assert User.__name__ == "User" + + +def test_models_init_imports_policy(): + """Test that Policy model is imported in models.__init__.""" + from app.models import Policy + + assert Policy is not None + assert Policy.__name__ == "Policy" + + +def test_models_init_imports_claim(): + """Test that Claim model is imported in models.__init__.""" + from app.models import Claim + + assert Claim is not None + assert Claim.__name__ == "Claim" + + +def test_models_init_imports_payment(): + """Test that Payment model is imported in models.__init__.""" + from app.models import Payment + + assert Payment is not None + assert Payment.__name__ == "Payment" + + +def test_all_models_accessible_from_package(): + """Test that all models can be imported from the models package.""" + from app.models import Base, User, Policy, Claim, Payment + + models = [Base, User, Policy, Claim, Payment] + + for model in models: + assert model is not None + + +def test_user_model_from_package(): + """Test User model has correct table name when imported from package.""" + from app.models import User + + assert User.__tablename__ == "users" + + +def test_policy_model_from_package(): + """Test Policy model has correct table name when imported from package.""" + from app.models import Policy + + assert Policy.__tablename__ == "policies" + + +def test_claim_model_from_package(): + """Test Claim model has correct table name when imported from package.""" + from app.models import Claim + + assert Claim.__tablename__ == "claims" + + +def test_payment_model_from_package(): + """Test Payment model has correct table name when imported from package.""" + from app.models import Payment + + assert Payment.__tablename__ == "payments" + + +def test_models_share_same_base(): + """Test that all models use the same Base class.""" + from app.models import Base, User, Policy, Claim, Payment + + # All models should inherit from the same Base + assert issubclass(User, Base.__class__) + assert issubclass(Policy, Base.__class__) + assert issubclass(Claim, Base.__class__) + assert issubclass(Payment, Base.__class__) + + +def test_models_have_metadata(): + """Test that models have metadata available through Base.""" + from app.models import Base + + assert hasattr(Base, 'metadata') + assert Base.metadata is not None + + +def test_import_all_models_no_errors(): + """Test that importing all models does not raise any errors.""" + try: + from app.models import Base, User, Policy, Claim, Payment + success = True + except ImportError: + success = False + + assert success is True diff --git a/backend/tests/app/models/test_payment.py b/backend/tests/app/models/test_payment.py new file mode 100644 index 0000000..b702441 --- /dev/null +++ b/backend/tests/app/models/test_payment.py @@ -0,0 +1,670 @@ +""" +Unit tests for app.models.payment module. + +Tests the Payment model including: +- Model structure and fields +- Default values +- Field constraints +- Relationships +- Boundary value testing +""" +import pytest +from datetime import datetime +from sqlalchemy import inspect + + +def test_payment_model_exists(): + """Test that Payment model can be imported.""" + from app.models.payment import Payment + + assert Payment is not None + + +def test_payment_model_tablename(): + """Test that Payment model has correct table name.""" + from app.models.payment import Payment + + assert Payment.__tablename__ == "payments" + + +def test_payment_model_fields(): + """Test that Payment model has all expected fields.""" + from app.models.payment import Payment + + mapper = inspect(Payment) + column_names = [column.key for column in mapper.columns] + + expected_fields = [ + 'id', 'policy_id', 'amount', 'status', + 'transaction_id', 'created_at', 'updated_at' + ] + + for field in expected_fields: + assert field in column_names, f"Field {field} not found in Payment model" + + +def test_payment_id_field_properties(): + """Test Payment.id field properties.""" + from app.models.payment import Payment + + mapper = inspect(Payment) + id_column = mapper.columns['id'] + + assert id_column.primary_key is True + assert id_column.index is True + + +def test_payment_policy_id_field_properties(): + """Test Payment.policy_id field properties.""" + from app.models.payment import Payment + + mapper = inspect(Payment) + policy_id_column = mapper.columns['policy_id'] + + assert policy_id_column.nullable is False + assert len(policy_id_column.foreign_keys) > 0 + + +def test_payment_amount_field_properties(): + """Test Payment.amount field properties.""" + from app.models.payment import Payment + + mapper = inspect(Payment) + amount_column = mapper.columns['amount'] + + assert amount_column.nullable is False + assert amount_column.type.python_type == float + + +def test_payment_status_field_properties(): + """Test Payment.status field properties and default value.""" + from app.models.payment import Payment + + mapper = inspect(Payment) + status_column = mapper.columns['status'] + + assert status_column.default.arg == "pending" + assert status_column.type.length == 50 + + +def test_payment_transaction_id_field_properties(): + """Test Payment.transaction_id field properties.""" + from app.models.payment import Payment + + mapper = inspect(Payment) + transaction_id_column = mapper.columns['transaction_id'] + + assert transaction_id_column.unique is True + assert transaction_id_column.index is True + assert transaction_id_column.nullable is True + assert transaction_id_column.type.length == 100 + + +def test_payment_relationships(): + """Test that Payment model has expected relationships.""" + from app.models.payment import Payment + + mapper = inspect(Payment) + relationships = [rel.key for rel in mapper.relationships] + + assert 'policy' in relationships + + +def test_payment_policy_relationship_properties(): + """Test Payment.policy relationship configuration.""" + from app.models.payment import Payment + + mapper = inspect(Payment) + policy_rel = mapper.relationships['policy'] + + assert policy_rel.back_populates == 'payments' + + +@pytest.mark.asyncio +async def test_create_payment_instance(async_db_session): + """Test creating a Payment instance.""" + from app.models.payment import Payment + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + # Create user and policy + user = User(email="payer@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + # Create payment + payment = Payment( + policy_id=policy.id, + amount=1000.0, + transaction_id="TXN123456" + ) + + async_db_session.add(payment) + await async_db_session.commit() + await async_db_session.refresh(payment) + + assert payment.id is not None + assert payment.policy_id == policy.id + assert payment.amount == 1000.0 + assert payment.status == "pending" + assert payment.transaction_id == "TXN123456" + + +@pytest.mark.asyncio +async def test_payment_default_status(async_db_session): + """Test Payment gets default status when not specified.""" + from app.models.payment import Payment + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=500.0, + coverage_amount=25000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + payment = Payment( + policy_id=policy.id, + amount=500.0 + ) + + async_db_session.add(payment) + await async_db_session.commit() + await async_db_session.refresh(payment) + + assert payment.status == "pending" + + +@pytest.mark.asyncio +async def test_payment_status_pending(async_db_session): + """Test Payment with pending status.""" + from app.models.payment import Payment + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="vehicle", + premium=800.0, + coverage_amount=30000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + payment = Payment( + policy_id=policy.id, + amount=800.0, + status="pending" + ) + + async_db_session.add(payment) + await async_db_session.commit() + await async_db_session.refresh(payment) + + assert payment.status == "pending" + + +@pytest.mark.asyncio +async def test_payment_status_paid(async_db_session): + """Test Payment with paid status.""" + from app.models.payment import Payment + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="life", + premium=2000.0, + coverage_amount=500000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + payment = Payment( + policy_id=policy.id, + amount=2000.0, + status="paid", + transaction_id="TXN_PAID_123" + ) + + async_db_session.add(payment) + await async_db_session.commit() + await async_db_session.refresh(payment) + + assert payment.status == "paid" + + +@pytest.mark.asyncio +async def test_payment_status_failed(async_db_session): + """Test Payment with failed status.""" + from app.models.payment import Payment + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1200.0, + coverage_amount=60000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + payment = Payment( + policy_id=policy.id, + amount=1200.0, + status="failed", + transaction_id="TXN_FAILED_456" + ) + + async_db_session.add(payment) + await async_db_session.commit() + await async_db_session.refresh(payment) + + assert payment.status == "failed" + + +@pytest.mark.asyncio +async def test_payment_with_transaction_id(async_db_session): + """Test Payment with transaction ID.""" + from app.models.payment import Payment + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + transaction_id = "PAY_2024_ABC123XYZ" + + payment = Payment( + policy_id=policy.id, + amount=1000.0, + transaction_id=transaction_id + ) + + async_db_session.add(payment) + await async_db_session.commit() + await async_db_session.refresh(payment) + + assert payment.transaction_id == transaction_id + + +@pytest.mark.asyncio +async def test_payment_without_transaction_id(async_db_session): + """Test Payment without transaction ID (None).""" + from app.models.payment import Payment + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + payment = Payment( + policy_id=policy.id, + amount=1000.0 + ) + + async_db_session.add(payment) + await async_db_session.commit() + await async_db_session.refresh(payment) + + assert payment.transaction_id is None + + +@pytest.mark.asyncio +async def test_payment_amount_boundary_zero(async_db_session): + """Test Payment with zero amount (boundary value).""" + from app.models.payment import Payment + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=0.0, + coverage_amount=10000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + payment = Payment( + policy_id=policy.id, + amount=0.0 + ) + + async_db_session.add(payment) + await async_db_session.commit() + await async_db_session.refresh(payment) + + assert payment.amount == 0.0 + + +@pytest.mark.asyncio +async def test_payment_amount_boundary_large(async_db_session): + """Test Payment with very large amount (boundary value).""" + from app.models.payment import Payment + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="life", + premium=999999.99, + coverage_amount=10000000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + large_amount = 999999.99 + + payment = Payment( + policy_id=policy.id, + amount=large_amount + ) + + async_db_session.add(payment) + await async_db_session.commit() + await async_db_session.refresh(payment) + + assert payment.amount == large_amount + + +@pytest.mark.asyncio +async def test_payment_amount_decimal(async_db_session): + """Test Payment with decimal amount.""" + from app.models.payment import Payment + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1234.56, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + payment = Payment( + policy_id=policy.id, + amount=1234.56 + ) + + async_db_session.add(payment) + await async_db_session.commit() + await async_db_session.refresh(payment) + + assert payment.amount == 1234.56 + + +@pytest.mark.asyncio +async def test_payment_transaction_id_boundary_short(async_db_session): + """Test Payment with short transaction ID (boundary value).""" + from app.models.payment import Payment + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + payment = Payment( + policy_id=policy.id, + amount=1000.0, + transaction_id="T1" + ) + + async_db_session.add(payment) + await async_db_session.commit() + await async_db_session.refresh(payment) + + assert payment.transaction_id == "T1" + + +@pytest.mark.asyncio +async def test_payment_transaction_id_boundary_long(async_db_session): + """Test Payment with long transaction ID (up to 100 chars).""" + from app.models.payment import Payment + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + long_txn_id = "T" * 100 # Maximum length + + payment = Payment( + policy_id=policy.id, + amount=1000.0, + transaction_id=long_txn_id + ) + + async_db_session.add(payment) + await async_db_session.commit() + await async_db_session.refresh(payment) + + assert payment.transaction_id == long_txn_id + assert len(payment.transaction_id) == 100 + + +@pytest.mark.asyncio +async def test_payment_created_at_auto_set(async_db_session): + """Test Payment.created_at is automatically set.""" + from app.models.payment import Payment + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + payment = Payment( + policy_id=policy.id, + amount=1000.0 + ) + + async_db_session.add(payment) + await async_db_session.commit() + await async_db_session.refresh(payment) + + assert payment.created_at is not None + assert isinstance(payment.created_at, datetime) + + +@pytest.mark.asyncio +async def test_payment_status_boundary_empty(async_db_session): + """Test Payment with empty status string.""" + from app.models.payment import Payment + from app.models.user import User + from app.models.policy import Policy + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + payment = Payment( + policy_id=policy.id, + amount=1000.0, + status="" + ) + + async_db_session.add(payment) + await async_db_session.commit() + await async_db_session.refresh(payment) + + assert payment.status == "" + + +def test_payment_model_inherits_from_base(): + """Test that Payment model inherits from Base.""" + from app.models.payment import Payment + from app.db.session import Base + + assert issubclass(Payment, Base.__class__) diff --git a/backend/tests/app/models/test_policy.py b/backend/tests/app/models/test_policy.py new file mode 100644 index 0000000..02732b1 --- /dev/null +++ b/backend/tests/app/models/test_policy.py @@ -0,0 +1,513 @@ +""" +Unit tests for app.models.policy module. + +Tests the Policy model including: +- Model structure and fields +- PolicyType enum +- Default values +- Field constraints +- Relationships +- Boundary value testing +""" +import pytest +from datetime import datetime +from sqlalchemy import inspect +import enum + + +def test_policy_type_enum_exists(): + """Test that PolicyType enum can be imported.""" + from app.models.policy import PolicyType + + assert PolicyType is not None + assert issubclass(PolicyType, str) + assert issubclass(PolicyType, enum.Enum) + + +def test_policy_type_enum_values(): + """Test PolicyType enum has correct values.""" + from app.models.policy import PolicyType + + assert PolicyType.health.value == "health" + assert PolicyType.vehicle.value == "vehicle" + assert PolicyType.life.value == "life" + + +def test_policy_type_enum_members(): + """Test PolicyType enum has correct members.""" + from app.models.policy import PolicyType + + members = [member.name for member in PolicyType] + + assert "health" in members + assert "vehicle" in members + assert "life" in members + assert len(members) == 3 + + +def test_policy_model_exists(): + """Test that Policy model can be imported.""" + from app.models.policy import Policy + + assert Policy is not None + + +def test_policy_model_tablename(): + """Test that Policy model has correct table name.""" + from app.models.policy import Policy + + assert Policy.__tablename__ == "policies" + + +def test_policy_model_fields(): + """Test that Policy model has all expected fields.""" + from app.models.policy import Policy + + mapper = inspect(Policy) + column_names = [column.key for column in mapper.columns] + + expected_fields = [ + 'id', 'user_id', 'policy_type', 'premium', 'coverage_amount', + 'start_date', 'end_date', 'is_active', 'created_at', 'updated_at' + ] + + for field in expected_fields: + assert field in column_names, f"Field {field} not found in Policy model" + + +def test_policy_id_field_properties(): + """Test Policy.id field properties.""" + from app.models.policy import Policy + + mapper = inspect(Policy) + id_column = mapper.columns['id'] + + assert id_column.primary_key is True + assert id_column.index is True + + +def test_policy_user_id_field_properties(): + """Test Policy.user_id field properties.""" + from app.models.policy import Policy + + mapper = inspect(Policy) + user_id_column = mapper.columns['user_id'] + + assert user_id_column.nullable is False + # Check foreign key constraint + assert len(user_id_column.foreign_keys) > 0 + + +def test_policy_type_field_properties(): + """Test Policy.policy_type field properties.""" + from app.models.policy import Policy + + mapper = inspect(Policy) + policy_type_column = mapper.columns['policy_type'] + + assert policy_type_column.nullable is False + assert policy_type_column.type.length == 50 + + +def test_policy_premium_field_properties(): + """Test Policy.premium field properties.""" + from app.models.policy import Policy + + mapper = inspect(Policy) + premium_column = mapper.columns['premium'] + + assert premium_column.nullable is False + assert premium_column.type.python_type == float + + +def test_policy_coverage_amount_field_properties(): + """Test Policy.coverage_amount field properties.""" + from app.models.policy import Policy + + mapper = inspect(Policy) + coverage_column = mapper.columns['coverage_amount'] + + assert coverage_column.nullable is False + assert coverage_column.type.python_type == float + + +def test_policy_start_date_field_properties(): + """Test Policy.start_date field properties.""" + from app.models.policy import Policy + + mapper = inspect(Policy) + start_date_column = mapper.columns['start_date'] + + assert start_date_column.nullable is False + + +def test_policy_end_date_field_properties(): + """Test Policy.end_date field properties.""" + from app.models.policy import Policy + + mapper = inspect(Policy) + end_date_column = mapper.columns['end_date'] + + assert end_date_column.nullable is False + + +def test_policy_is_active_field_properties(): + """Test Policy.is_active field properties and default value.""" + from app.models.policy import Policy + + mapper = inspect(Policy) + is_active_column = mapper.columns['is_active'] + + assert is_active_column.default.arg is True + + +def test_policy_relationships(): + """Test that Policy model has expected relationships.""" + from app.models.policy import Policy + + mapper = inspect(Policy) + relationships = [rel.key for rel in mapper.relationships] + + assert 'user' in relationships + assert 'claims' in relationships + assert 'payments' in relationships + + +def test_policy_user_relationship_properties(): + """Test Policy.user relationship configuration.""" + from app.models.policy import Policy + + mapper = inspect(Policy) + user_rel = mapper.relationships['user'] + + assert user_rel.back_populates == 'policies' + + +def test_policy_claims_relationship_properties(): + """Test Policy.claims relationship configuration.""" + from app.models.policy import Policy + + mapper = inspect(Policy) + claims_rel = mapper.relationships['claims'] + + assert claims_rel.back_populates == 'policy' + assert 'delete-orphan' in str(claims_rel.cascade) + + +def test_policy_payments_relationship_properties(): + """Test Policy.payments relationship configuration.""" + from app.models.policy import Policy + + mapper = inspect(Policy) + payments_rel = mapper.relationships['payments'] + + assert payments_rel.back_populates == 'policy' + assert 'delete-orphan' in str(payments_rel.cascade) + + +@pytest.mark.asyncio +async def test_create_policy_instance(async_db_session): + """Test creating a Policy instance.""" + from app.models.policy import Policy + from app.models.user import User + from datetime import datetime, timedelta + + # Create a user first + user = User( + email="policyowner@example.com", + hashed_password="hashed_password" + ) + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + # Create policy + start = datetime.utcnow() + end = start + timedelta(days=365) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=1000.0, + coverage_amount=50000.0, + start_date=start, + end_date=end + ) + + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + assert policy.id is not None + assert policy.user_id == user.id + assert policy.policy_type == "health" + assert policy.premium == 1000.0 + assert policy.coverage_amount == 50000.0 + assert policy.is_active is True + + +@pytest.mark.asyncio +async def test_policy_default_is_active(async_db_session): + """Test Policy gets default is_active when not specified.""" + from app.models.policy import Policy + from app.models.user import User + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + start = datetime.utcnow() + end = start + timedelta(days=365) + + policy = Policy( + user_id=user.id, + policy_type="vehicle", + premium=500.0, + coverage_amount=25000.0, + start_date=start, + end_date=end + ) + + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + assert policy.is_active is True + + +@pytest.mark.asyncio +async def test_policy_type_health(async_db_session): + """Test Policy with health type.""" + from app.models.policy import Policy, PolicyType + from app.models.user import User + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type=PolicyType.health.value, + premium=1200.0, + coverage_amount=100000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + assert policy.policy_type == "health" + + +@pytest.mark.asyncio +async def test_policy_type_vehicle(async_db_session): + """Test Policy with vehicle type.""" + from app.models.policy import Policy, PolicyType + from app.models.user import User + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type=PolicyType.vehicle.value, + premium=800.0, + coverage_amount=30000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + assert policy.policy_type == "vehicle" + + +@pytest.mark.asyncio +async def test_policy_type_life(async_db_session): + """Test Policy with life type.""" + from app.models.policy import Policy, PolicyType + from app.models.user import User + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type=PolicyType.life.value, + premium=2000.0, + coverage_amount=500000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + assert policy.policy_type == "life" + + +@pytest.mark.asyncio +async def test_policy_premium_boundary_zero(async_db_session): + """Test Policy with zero premium (boundary value).""" + from app.models.policy import Policy + from app.models.user import User + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=0.0, + coverage_amount=10000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + assert policy.premium == 0.0 + + +@pytest.mark.asyncio +async def test_policy_premium_boundary_large(async_db_session): + """Test Policy with very large premium (boundary value).""" + from app.models.policy import Policy + from app.models.user import User + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + large_premium = 999999999.99 + + policy = Policy( + user_id=user.id, + policy_type="life", + premium=large_premium, + coverage_amount=1000000000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + assert policy.premium == large_premium + + +@pytest.mark.asyncio +async def test_policy_coverage_boundary_zero(async_db_session): + """Test Policy with zero coverage amount (boundary value).""" + from app.models.policy import Policy + from app.models.user import User + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=100.0, + coverage_amount=0.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + assert policy.coverage_amount == 0.0 + + +@pytest.mark.asyncio +async def test_policy_is_active_false(async_db_session): + """Test Policy with is_active set to False.""" + from app.models.policy import Policy + from app.models.user import User + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=500.0, + coverage_amount=20000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365), + is_active=False + ) + + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + assert policy.is_active is False + + +@pytest.mark.asyncio +async def test_policy_created_at_auto_set(async_db_session): + """Test Policy.created_at is automatically set.""" + from app.models.policy import Policy + from app.models.user import User + from datetime import datetime, timedelta + + user = User(email="user@example.com", hashed_password="pass") + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + policy = Policy( + user_id=user.id, + policy_type="health", + premium=500.0, + coverage_amount=20000.0, + start_date=datetime.utcnow(), + end_date=datetime.utcnow() + timedelta(days=365) + ) + + async_db_session.add(policy) + await async_db_session.commit() + await async_db_session.refresh(policy) + + assert policy.created_at is not None + assert isinstance(policy.created_at, datetime) + + +def test_policy_model_inherits_from_base(): + """Test that Policy model inherits from Base.""" + from app.models.policy import Policy + from app.db.session import Base + + assert issubclass(Policy, Base.__class__) diff --git a/backend/tests/app/models/test_user.py b/backend/tests/app/models/test_user.py new file mode 100644 index 0000000..da65385 --- /dev/null +++ b/backend/tests/app/models/test_user.py @@ -0,0 +1,352 @@ +""" +Unit tests for app.models.user module. + +Tests the User model including: +- Model structure and fields +- Default values +- Field constraints +- Relationships +- Boundary value testing +""" +import pytest +from datetime import datetime +from sqlalchemy import inspect + + +def test_user_model_exists(): + """Test that User model can be imported.""" + from app.models.user import User + + assert User is not None + + +def test_user_model_tablename(): + """Test that User model has correct table name.""" + from app.models.user import User + + assert User.__tablename__ == "users" + + +def test_user_model_fields(): + """Test that User model has all expected fields.""" + from app.models.user import User + + # Get model columns + mapper = inspect(User) + column_names = [column.key for column in mapper.columns] + + expected_fields = [ + 'id', 'email', 'hashed_password', 'role', + 'is_active', 'created_at', 'updated_at' + ] + + for field in expected_fields: + assert field in column_names, f"Field {field} not found in User model" + + +def test_user_id_field_properties(): + """Test User.id field properties.""" + from app.models.user import User + + mapper = inspect(User) + id_column = mapper.columns['id'] + + assert id_column.primary_key is True + assert id_column.index is True + assert id_column.type.python_type == int + + +def test_user_email_field_properties(): + """Test User.email field properties.""" + from app.models.user import User + + mapper = inspect(User) + email_column = mapper.columns['email'] + + assert email_column.unique is True + assert email_column.index is True + assert email_column.nullable is False + assert email_column.type.length == 255 + + +def test_user_hashed_password_field_properties(): + """Test User.hashed_password field properties.""" + from app.models.user import User + + mapper = inspect(User) + password_column = mapper.columns['hashed_password'] + + assert password_column.nullable is False + assert password_column.type.length == 255 + + +def test_user_role_field_properties(): + """Test User.role field properties and default value.""" + from app.models.user import User + + mapper = inspect(User) + role_column = mapper.columns['role'] + + assert role_column.default.arg == "customer" + assert role_column.type.length == 50 + + +def test_user_is_active_field_properties(): + """Test User.is_active field properties and default value.""" + from app.models.user import User + + mapper = inspect(User) + is_active_column = mapper.columns['is_active'] + + assert is_active_column.default.arg is True + + +def test_user_created_at_field_properties(): + """Test User.created_at field has server default.""" + from app.models.user import User + + mapper = inspect(User) + created_at_column = mapper.columns['created_at'] + + assert created_at_column.server_default is not None + + +def test_user_updated_at_field_properties(): + """Test User.updated_at field has onupdate.""" + from app.models.user import User + + mapper = inspect(User) + updated_at_column = mapper.columns['updated_at'] + + assert updated_at_column.onupdate is not None + + +def test_user_relationships(): + """Test that User model has expected relationships.""" + from app.models.user import User + + mapper = inspect(User) + relationships = [rel.key for rel in mapper.relationships] + + assert 'policies' in relationships + assert 'claims' in relationships + + +def test_user_policies_relationship_properties(): + """Test User.policies relationship configuration.""" + from app.models.user import User + + mapper = inspect(User) + policies_rel = mapper.relationships['policies'] + + assert policies_rel.back_populates == 'user' + # Check cascade includes delete-orphan + assert 'delete-orphan' in str(policies_rel.cascade) + + +def test_user_claims_relationship_properties(): + """Test User.claims relationship configuration.""" + from app.models.user import User + + mapper = inspect(User) + claims_rel = mapper.relationships['claims'] + + assert claims_rel.back_populates == 'user' + assert 'delete-orphan' in str(claims_rel.cascade) + + +@pytest.mark.asyncio +async def test_create_user_instance(async_db_session): + """Test creating a User instance.""" + from app.models.user import User + + user = User( + email="test@example.com", + hashed_password="hashed_password_123", + role="customer" + ) + + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + assert user.id is not None + assert user.email == "test@example.com" + assert user.hashed_password == "hashed_password_123" + assert user.role == "customer" + assert user.is_active is True + + +@pytest.mark.asyncio +async def test_user_default_role(async_db_session): + """Test User gets default role when not specified.""" + from app.models.user import User + + user = User( + email="test@example.com", + hashed_password="hashed_password_123" + ) + + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + assert user.role == "customer" + + +@pytest.mark.asyncio +async def test_user_default_is_active(async_db_session): + """Test User gets default is_active when not specified.""" + from app.models.user import User + + user = User( + email="test@example.com", + hashed_password="hashed_password_123" + ) + + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + assert user.is_active is True + + +@pytest.mark.asyncio +async def test_user_created_at_auto_set(async_db_session): + """Test User.created_at is automatically set.""" + from app.models.user import User + + user = User( + email="test@example.com", + hashed_password="hashed_password_123" + ) + + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + assert user.created_at is not None + assert isinstance(user.created_at, datetime) + + +@pytest.mark.asyncio +async def test_user_role_variations(async_db_session): + """Test User can be created with different roles.""" + from app.models.user import User + + roles = ['admin', 'agent', 'customer'] + + for role in roles: + user = User( + email=f"{role}@example.com", + hashed_password="hashed_password", + role=role + ) + + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + assert user.role == role + + +@pytest.mark.asyncio +async def test_user_is_active_false(async_db_session): + """Test User can be created with is_active=False.""" + from app.models.user import User + + user = User( + email="inactive@example.com", + hashed_password="hashed_password", + is_active=False + ) + + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + assert user.is_active is False + + +@pytest.mark.asyncio +async def test_user_email_boundary_empty(async_db_session): + """Test User with very short email (boundary value).""" + from app.models.user import User + + user = User( + email="a@b.c", # Minimal valid email + hashed_password="hashed_password" + ) + + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + assert user.email == "a@b.c" + + +@pytest.mark.asyncio +async def test_user_email_boundary_long(async_db_session): + """Test User with long email (boundary value).""" + from app.models.user import User + + # Create an email close to 255 character limit + long_email = "a" * 240 + "@example.com" # ~252 characters + + user = User( + email=long_email, + hashed_password="hashed_password" + ) + + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + assert user.email == long_email + + +@pytest.mark.asyncio +async def test_user_role_boundary_empty(async_db_session): + """Test User with empty role string.""" + from app.models.user import User + + user = User( + email="test@example.com", + hashed_password="hashed_password", + role="" + ) + + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + assert user.role == "" + + +@pytest.mark.asyncio +async def test_user_role_boundary_long(async_db_session): + """Test User with long role string (up to 50 chars).""" + from app.models.user import User + + long_role = "a" * 50 # Maximum length + + user = User( + email="test@example.com", + hashed_password="hashed_password", + role=long_role + ) + + async_db_session.add(user) + await async_db_session.commit() + await async_db_session.refresh(user) + + assert user.role == long_role + assert len(user.role) == 50 + + +def test_user_model_inherits_from_base(): + """Test that User model inherits from Base.""" + from app.models.user import User + from app.db.session import Base + + assert issubclass(User, Base.__class__) diff --git a/backend/tests/app/test_main.py b/backend/tests/app/test_main.py new file mode 100644 index 0000000..eb9d0b4 --- /dev/null +++ b/backend/tests/app/test_main.py @@ -0,0 +1,258 @@ +""" +Unit tests for app.main module. + +Tests FastAPI application initialization and endpoints including: +- App configuration +- CORS middleware setup +- Root endpoint +- Health check endpoint +- Boundary value testing +""" +import pytest +from fastapi.testclient import TestClient +from unittest.mock import patch, MagicMock + + +@pytest.fixture +def client(): + """Create a test client for the FastAPI app.""" + from app.main import app + return TestClient(app) + + +def test_app_initialization(): + """Test that FastAPI app is initialized with correct configuration.""" + from app.main import app + + assert app is not None + assert app.title == "Insurance Management API" + assert app.description == "API for the Insurance Management System" + assert app.version == "1.0.0" + + +def test_root_endpoint_returns_welcome_message(client): + """Test root endpoint returns correct welcome message.""" + response = client.get("/") + + assert response.status_code == 200 + assert response.json() == {"message": "Welcome to the Insurance Management API"} + + +def test_health_check_endpoint_returns_ok(client): + """Test health check endpoint returns ok status.""" + response = client.get("/health") + + assert response.status_code == 200 + assert response.json() == {"status": "ok"} + + +def test_root_endpoint_response_format(client): + """Test root endpoint response has correct format.""" + response = client.get("/") + + assert response.status_code == 200 + data = response.json() + assert isinstance(data, dict) + assert "message" in data + assert isinstance(data["message"], str) + + +def test_health_check_response_format(client): + """Test health check endpoint response has correct format.""" + response = client.get("/health") + + assert response.status_code == 200 + data = response.json() + assert isinstance(data, dict) + assert "status" in data + assert data["status"] == "ok" + + +def test_root_endpoint_with_trailing_slash(client): + """Test root endpoint works without trailing slash.""" + response = client.get("/") + assert response.status_code == 200 + + +def test_cors_middleware_configured(): + """Test that CORS middleware is configured in the app.""" + from app.main import app + + # Check that middleware is registered + middleware_found = False + for middleware in app.user_middleware: + if "CORSMiddleware" in str(middleware): + middleware_found = True + break + + assert middleware_found, "CORS middleware not found in app" + + +def test_cors_origins_from_environment(): + """Test CORS origins are loaded from environment variable.""" + with patch.dict('os.environ', {'CORS_ORIGINS': 'http://example.com,http://test.com'}): + # Re-import to apply environment changes + import importlib + import app.main + importlib.reload(app.main) + + from app.main import app + + # Verify app still works + assert app is not None + + +def test_cors_origins_default_localhost(): + """Test CORS origins default to localhost when not set.""" + with patch.dict('os.environ', {}, clear=False): + with patch('os.getenv') as mock_getenv: + mock_getenv.return_value = "http://localhost:3000" + + import importlib + import app.main + importlib.reload(app.main) + + # Verify default is used + mock_getenv.assert_called() + + +def test_root_endpoint_content_type(client): + """Test root endpoint returns JSON content type.""" + response = client.get("/") + + assert response.status_code == 200 + assert "application/json" in response.headers["content-type"] + + +def test_health_check_content_type(client): + """Test health check endpoint returns JSON content type.""" + response = client.get("/health") + + assert response.status_code == 200 + assert "application/json" in response.headers["content-type"] + + +def test_nonexistent_endpoint_returns_404(client): + """Test that accessing non-existent endpoint returns 404.""" + response = client.get("/nonexistent") + + assert response.status_code == 404 + + +def test_root_endpoint_methods(client): + """Test root endpoint only supports GET method.""" + # GET should work + response = client.get("/") + assert response.status_code == 200 + + # POST should not be allowed + response = client.post("/") + assert response.status_code == 405 # Method Not Allowed + + # PUT should not be allowed + response = client.put("/") + assert response.status_code == 405 + + # DELETE should not be allowed + response = client.delete("/") + assert response.status_code == 405 + + +def test_health_check_endpoint_methods(client): + """Test health check endpoint only supports GET method.""" + # GET should work + response = client.get("/health") + assert response.status_code == 200 + + # POST should not be allowed + response = client.post("/health") + assert response.status_code == 405 + + +def test_root_endpoint_with_query_parameters(client): + """Test root endpoint ignores query parameters.""" + response = client.get("/?foo=bar&baz=qux") + + assert response.status_code == 200 + assert response.json() == {"message": "Welcome to the Insurance Management API"} + + +def test_health_check_with_query_parameters(client): + """Test health check ignores query parameters.""" + response = client.get("/health?test=value") + + assert response.status_code == 200 + assert response.json() == {"status": "ok"} + + +def test_app_openapi_schema_exists(client): + """Test that OpenAPI schema endpoint exists.""" + response = client.get("/openapi.json") + + assert response.status_code == 200 + schema = response.json() + assert "openapi" in schema + assert "info" in schema + assert schema["info"]["title"] == "Insurance Management API" + + +def test_app_docs_endpoint_exists(client): + """Test that auto-generated docs endpoint exists.""" + response = client.get("/docs") + + assert response.status_code == 200 + + +def test_app_redoc_endpoint_exists(client): + """Test that ReDoc documentation endpoint exists.""" + response = client.get("/redoc") + + assert response.status_code == 200 + + +def test_multiple_simultaneous_requests(client): + """Test app handles multiple requests correctly.""" + responses = [] + + # Make multiple requests + for _ in range(5): + response = client.get("/") + responses.append(response) + + # All should succeed + for response in responses: + assert response.status_code == 200 + assert response.json() == {"message": "Welcome to the Insurance Management API"} + + +def test_health_check_consistency(client): + """Test health check returns consistent results.""" + # Call health check multiple times + for _ in range(10): + response = client.get("/health") + assert response.status_code == 200 + assert response.json() == {"status": "ok"} + + +def test_cors_empty_origins_string(): + """Test CORS configuration with empty origins string.""" + with patch.dict('os.environ', {'CORS_ORIGINS': ''}): + import importlib + import app.main + importlib.reload(app.main) + + from app.main import app + client = TestClient(app) + + # App should still work + response = client.get("/") + assert response.status_code == 200 + + +def test_app_response_headers(client): + """Test that responses include expected headers.""" + response = client.get("/") + + assert response.status_code == 200 + assert "content-length" in response.headers + assert "content-type" in response.headers diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py new file mode 100644 index 0000000..55dde90 --- /dev/null +++ b/backend/tests/conftest.py @@ -0,0 +1,55 @@ +""" +Pytest configuration and fixtures for backend tests. +""" +import pytest +from unittest.mock import MagicMock, AsyncMock, patch +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker +from sqlalchemy.pool import StaticPool +from app.db.session import Base + + +@pytest.fixture +def mock_settings(): + """Mock settings fixture.""" + with patch("app.core.config.settings") as mock: + mock.PROJECT_NAME = "Test Insurance Platform" + mock.API_V1_STR = "/api/v1" + mock.SECRET_KEY = "test_secret_key_for_testing_only" + mock.ACCESS_TOKEN_EXPIRE_MINUTES = 60 + mock.CORS_ORIGINS = "http://localhost:3000" + mock.DATABASE_URL = "postgresql+asyncpg://test:test@localhost/test_db" + yield mock + + +@pytest.fixture +async def async_db_engine(): + """Create an async in-memory SQLite engine for testing.""" + engine = create_async_engine( + "sqlite+aiosqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + yield engine + + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + + await engine.dispose() + + +@pytest.fixture +async def async_db_session(async_db_engine): + """Create an async database session for testing.""" + async_session = async_sessionmaker( + async_db_engine, + class_=AsyncSession, + expire_on_commit=False, + autoflush=False, + ) + + async with async_session() as session: + yield session