diff --git a/.github/workflows/docker_image_publish.yml b/.github/workflows/docker_image_publish.yml index ef8732c3..665a4fb2 100644 --- a/.github/workflows/docker_image_publish.yml +++ b/.github/workflows/docker_image_publish.yml @@ -15,11 +15,11 @@ jobs: uses: Azure/docker-login@v2 with: # Container registry username - username: ${{ secrets.MAIN_ACR_USERNAME }} + username: ${{ secrets.ACR_USERNAME }} # Container registry password - password: ${{ secrets.MAIN_ACR_PASSWORD }} + password: ${{ secrets.ACR_PASSWORD }} # Container registry server url - login-server: ${{ secrets.MAIN_ACR_LOGIN_SERVER }} + login-server: ${{ secrets.ACR_LOGIN_SERVER }} - name: Normalize branch name for tag run: | REF="${GITHUB_REF_NAME}" diff --git a/application/single_app/semantic_kernel_loader.py b/application/single_app/semantic_kernel_loader.py index 78f54203..51823af9 100644 --- a/application/single_app/semantic_kernel_loader.py +++ b/application/single_app/semantic_kernel_loader.py @@ -40,6 +40,11 @@ from semantic_kernel_plugins.plugin_loader import discover_plugins from semantic_kernel_plugins.openapi_plugin_factory import OpenApiPluginFactory import app_settings_cache +try: + from azure.identity import DefaultAzureCredential, get_bearer_token_provider +except ImportError: + DefaultAzureCredential = None + get_bearer_token_provider = None @@ -295,26 +300,32 @@ def merge_fields(primary, fallback): debug_print(f"[SK Loader] Using user APIM with global fallback") merged = merge_fields(u_apim, g_apim if global_apim_enabled and any_filled(*g_apim) else (None, None, None, None)) endpoint, key, deployment, api_version = merged + endpoint_is_user_supplied = True # 2. User APIM enabled but no user APIM values, and global APIM enabled and present: use global APIM elif user_apim_enabled and global_apim_enabled and any_filled(*g_apim): debug_print(f"[SK Loader] Using global APIM (user APIM enabled but not present)") endpoint, key, deployment, api_version = g_apim + endpoint_is_user_supplied = False # 3. User GPT config is FULLY filled: use user GPT (all fields filled) elif all_filled(*u_gpt) and can_use_agent_endpoints: debug_print(f"[SK Loader] Using agent GPT config (all fields filled)") endpoint, key, deployment, api_version = u_gpt + endpoint_is_user_supplied = True # 4. User GPT config is PARTIALLY filled, global APIM is NOT enabled: merge user GPT with global GPT elif any_filled(*u_gpt) and not global_apim_enabled and can_use_agent_endpoints: debug_print(f"[SK Loader] Using agent GPT config (partially filled, merging with global GPT, global APIM not enabled)") endpoint, key, deployment, api_version = merge_fields(u_gpt, g_gpt) + endpoint_is_user_supplied = True # 5. Global APIM enabled and present: use global APIM elif global_apim_enabled and any_filled(*g_apim): debug_print(f"[SK Loader] Using global APIM (fallback)") endpoint, key, deployment, api_version = g_apim + endpoint_is_user_supplied = False # 6. Fallback to global GPT config else: debug_print(f"[SK Loader] Using global GPT config (fallback)") endpoint, key, deployment, api_version = g_gpt + endpoint_is_user_supplied = False result = { "endpoint": endpoint, @@ -337,6 +348,9 @@ def merge_fields(primary, fallback): "max_completion_tokens": agent.get("max_completion_tokens", -1), # -1 meant use model default determined by the service, 35-trubo is 4096, 4o is 16384, 4.1 is at least 32768 "agent_type": agent_type or "local", "other_settings": other_settings, + # Security: track whether the endpoint was user/agent-supplied vs system-controlled. + # Managed identity must NOT be used with user-supplied endpoints to prevent token theft. + "endpoint_is_user_supplied": endpoint_is_user_supplied, } print(f"[SK Loader] Final resolved config for {agent.get('name')}: endpoint={bool(endpoint)}, key={bool(key)}, deployment={deployment}") @@ -721,6 +735,20 @@ def normalize(s): print(f"[SK Loader] Error loading agent-specific plugins: {e}") log_event(f"[SK Loader] Error loading agent-specific plugins: {e}", level=logging.ERROR, exceptionTraceback=True) +def _build_mi_token_provider(endpoint): + """Build a bearer token provider for managed identity auth. + + Selects the correct Azure Cognitive Services scope based on whether the + endpoint is in the US Government cloud (.azure.us) or the commercial cloud. + """ + scope = ( + "https://cognitiveservices.azure.us/.default" + if ".azure.us" in (endpoint or "") + else "https://cognitiveservices.azure.com/.default" + ) + return get_bearer_token_provider(DefaultAzureCredential(), scope) + + def load_single_agent_for_kernel(kernel, agent_cfg, settings, context_obj, redis_client=None, mode_label="global"): """ DRY helper to load a single agent (default agent) for the kernel. @@ -758,7 +786,15 @@ def load_single_agent_for_kernel(kernel, agent_cfg, settings, context_obj, redis log_event(f"[SK Loader] Agent config resolved for {agent_cfg.get('name')} - endpoint: {bool(agent_config.get('endpoint'))}, key: {bool(agent_config.get('key'))}, deployment: {agent_config.get('deployment')}, max_completion_tokens: {agent_config.get('max_completion_tokens')}", level=logging.INFO) - if AzureChatCompletion and agent_config["endpoint"] and agent_config["key"] and agent_config["deployment"]: + auth_type = settings.get('azure_openai_gpt_authentication_type', '') + use_managed_identity = ( + auth_type == 'managed_identity' + and not apim_enabled + and not agent_config.get("key") + and bool(DefaultAzureCredential) + and not agent_config.get("endpoint_is_user_supplied", False) + ) + if AzureChatCompletion and agent_config["endpoint"] and (agent_config["key"] or use_managed_identity) and agent_config["deployment"]: print(f"[SK Loader] Azure config valid for {agent_config['name']}, creating chat service...") if apim_enabled: log_event( @@ -779,6 +815,24 @@ def load_single_agent_for_kernel(kernel, agent_cfg, settings, context_obj, redis api_version=agent_config["api_version"], # default_headers={"Ocp-Apim-Subscription-Key": agent_config["key"]} ) + elif use_managed_identity: + log_event( + f"[SK Loader] Initializing Managed Identity AzureChatCompletion for agent: {agent_config['name']} ({mode_label})", + { + "aoai_endpoint": agent_config["endpoint"], + "aoai_deployment": agent_config["deployment"], + "agent_name": agent_config["name"] + }, + level=logging.INFO + ) + _token_provider = _build_mi_token_provider(agent_config.get("endpoint")) + chat_service = AzureChatCompletion( + service_id=service_id, + deployment_name=agent_config["deployment"], + endpoint=agent_config["endpoint"], + ad_token_provider=_token_provider, + api_version=agent_config["api_version"], + ) else: log_event( f"[SK Loader] Initializing GPT Direct AzureChatCompletion for agent: {agent_config['name']} ({mode_label})", @@ -1521,7 +1575,16 @@ def load_semantic_kernel(kernel: Kernel, settings): agent_config = resolve_agent_config(agent_cfg, settings) chat_service = None service_id = f"aoai-chat-{agent_config['name'].replace(' ', '').lower()}" - if AzureChatCompletion and agent_config["endpoint"] and agent_config["key"] and agent_config["deployment"]: + _ma_auth_type = settings.get('azure_openai_gpt_authentication_type', '') + _ma_apim_enabled = settings.get("enable_gpt_apim", False) + _ma_use_mi = ( + _ma_auth_type == 'managed_identity' + and not _ma_apim_enabled + and not agent_config.get("key") + and bool(DefaultAzureCredential) + and not agent_config.get("endpoint_is_user_supplied", False) + ) + if AzureChatCompletion and agent_config["endpoint"] and (agent_config["key"] or _ma_use_mi) and agent_config["deployment"]: try: try: chat_service = kernel.get_service(service_id=service_id) @@ -1548,6 +1611,15 @@ def load_semantic_kernel(kernel: Kernel, settings): api_version=agent_config["api_version"], # default_headers={"Ocp-Apim-Subscription-Key": agent_config["key"]} ) + elif _ma_use_mi: + _token_provider = _build_mi_token_provider(agent_config.get("endpoint")) + chat_service = AzureChatCompletion( + service_id=service_id, + deployment_name=agent_config["deployment"], + endpoint=agent_config["endpoint"], + ad_token_provider=_token_provider, + api_version=agent_config["api_version"], + ) else: chat_service = AzureChatCompletion( service_id=service_id, @@ -1631,7 +1703,16 @@ def load_semantic_kernel(kernel: Kernel, settings): orchestrator_config = resolve_agent_config(orchestrator_cfg, settings) service_id = f"aoai-chat-{orchestrator_config['name']}" chat_service = None - if AzureChatCompletion and orchestrator_config["endpoint"] and orchestrator_config["key"] and orchestrator_config["deployment"]: + _orch_auth_type = settings.get('azure_openai_gpt_authentication_type', '') + _orch_apim_enabled = settings.get("enable_gpt_apim", False) + _orch_use_mi = ( + _orch_auth_type == 'managed_identity' + and not _orch_apim_enabled + and not orchestrator_config.get("key") + and bool(DefaultAzureCredential) + and not orchestrator_config.get("endpoint_is_user_supplied", False) + ) + if AzureChatCompletion and orchestrator_config["endpoint"] and (orchestrator_config["key"] or _orch_use_mi) and orchestrator_config["deployment"]: try: chat_service = kernel.get_service(service_id=service_id) except Exception: @@ -1657,6 +1738,15 @@ def load_semantic_kernel(kernel: Kernel, settings): api_version=orchestrator_config["api_version"], # default_headers={"Ocp-Apim-Subscription-Key": orchestrator_config["key"]} ) + elif _orch_use_mi: + _token_provider = _build_mi_token_provider(orchestrator_config.get("endpoint")) + chat_service = AzureChatCompletion( + service_id=service_id, + deployment_name=orchestrator_config["deployment"], + endpoint=orchestrator_config["endpoint"], + ad_token_provider=_token_provider, + api_version=orchestrator_config["api_version"], + ) else: chat_service = AzureChatCompletion( service_id=service_id, diff --git a/docs/explanation/fixes/v0.239.002/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md b/docs/explanation/fixes/v0.239.002/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md new file mode 100644 index 00000000..d14736e4 --- /dev/null +++ b/docs/explanation/fixes/v0.239.002/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md @@ -0,0 +1,212 @@ +# Agent Managed Identity SK Loader Fix + +**Fixed/Implemented in version:** **0.239.002** +**GitHub Issue:** [#769 — Agents fail silently when using Managed Identity authentication](https://github.com/microsoft/simplechat/issues/769) + +## Issue Description + +When using **Azure Managed Identity (MI)** for Azure OpenAI authentication, agents configured through +the **Model & Connection** page (Step 2 of the agent wizard) failed silently — the agent never loaded, +fell back to plain GPT-4.1 with no tools or instructions, and fabricated responses instead of calling +real APIs (e.g., ServiceNow). + +## Root Cause Analysis + +### How Agent Config Is Resolved + +`resolve_agent_config()` in `semantic_kernel_loader.py` (~line 107) figures out which endpoint/key/ +deployment to use for an agent by running through a **decision tree** (~line 291): + +``` +# 1. User APIM enabled and any user APIM values set → use user APIM +# 2. User APIM enabled but empty, global APIM enabled → use global APIM +# 3. Agent GPT config is FULLY filled → use agent GPT config +# 4. Agent GPT config is PARTIALLY filled, global APIM off → merge agent GPT with global GPT +# 5. Global APIM enabled → use global APIM +# 6. Fallback → use global GPT config entirely +``` + +### The Failure + +When an agent is configured with only the deployment name set (endpoint and key left blank), the +decision tree hits **case 4** — it merges the agent's partial config with global settings: + +``` +Agent-level: endpoint='', key='', deployment='gpt-4.1', api_version='' +``` + +After merge with global settings: +- `endpoint` = global endpoint ✓ +- `deployment` = `'gpt-4.1'` ✓ +- `key` = global key = **`None`** ✗ (MI auth — no API key is stored in settings) + +The gate condition at ~line 768 then fails: + +```python +if AzureChatCompletion and agent_config["endpoint"] and agent_config["key"] and agent_config["deployment"]: +``` + +`agent_config["key"]` is `None` → **condition is False** → falls into the `else` block: + +``` +[SK Loader] Azure config INVALID for servicenow_support_agent: + - AzureChatCompletion available: True + - endpoint: True + - key: False ← THIS IS THE FAILURE + - deployment: True +``` + +Returns `None, None` → no agent loaded → chat uses plain GPT-4.1 with no tools/instructions +→ GPT fabricates responses instead of calling the actual API. + +## Security Vulnerability (Identified by GitHub Copilot PR Review) + +Once the silent-failure fix was submitted as a pull request, a GitHub Copilot automated review +identified a **credential-theft vulnerability** in the initial implementation. + +### The Risk + +The initial `use_managed_identity` expression had only four guards: + +```python +use_managed_identity = ( + auth_type == 'managed_identity' + and not apim_enabled + and not agent_config.get("key") + and bool(DefaultAzureCredential) + # ← MISSING: no check on whether the endpoint is user/agent-supplied +) +``` + +With `allow_group_custom_agent_endpoints = True` (a legitimate admin configuration), a group +workspace admin could configure an agent with a **custom Azure OpenAI endpoint** pointing to an +attacker-controlled server. Because `use_managed_identity` had no endpoint check, the app would +obtain a real MI bearer token (scoped to Azure Cognitive Services) and send it in the +`Authorization: Bearer ...` header to that attacker-controlled endpoint — **leaking the app's +managed identity credentials** to a third party. + +### How `resolve_agent_config()` Flags Endpoint Ownership + +`resolve_agent_config()` already tagged every branch of its decision tree with an +`endpoint_is_user_supplied` flag indicating whether the resolved endpoint is under system control +or was provided by a user/agent config: + +| Case | Condition | `endpoint_is_user_supplied` | +|------|-----------|-----------------------------| +| 1 | User APIM values set and allowed | `True` — agent-supplied | +| 2 | User APIM on but empty; fall to global APIM | `False` — system-controlled | +| 3 | Agent GPT config fully filled and allowed | `True` — agent-supplied | +| 4 | Agent GPT config partially filled, no global APIM | `True` — agent-supplied | +| 5 | Global APIM enabled | `False` — system-controlled | +| 6 | Global GPT fallback (most common MI scenario) | `False` — system-controlled | + +MI tokens should only ever be sent to Cases 2, 5, and 6 (system-controlled). The missing guard +meant MI tokens could also reach Cases 1, 3, and 4. + +### The Security Fix + +A fifth guard was added to `use_managed_identity`: + +```python +and not agent_config.get("endpoint_is_user_supplied", False) +``` + +This single condition closes the token-theft path: even if all other guards pass, if the +resolved endpoint is agent/user-supplied, `use_managed_identity` evaluates to `False`, the gate +condition fails (no key + no MI), and the agent fails to load rather than leaking the MI token. + +**Intended behaviour for affected agents:** An agent that uses a custom endpoint must supply its +own API key. Relying on the app's managed identity to authenticate against a third-party or +operator-controlled endpoint is by design disallowed. + +## Files Modified + +| File | Lines Changed | +|------|--------------| +| `application/single_app/semantic_kernel_loader.py` | ~43-47, ~767-768, ~810-829, ~1530-1532, ~1548-1558, ~1636-1638, ~1655-1665 | +| `application/single_app/config.py` | VERSION bump | + +## Fix + +### 1. Added Azure Identity imports (~line 43) + +```python +try: + from azure.identity import DefaultAzureCredential, get_bearer_token_provider +except ImportError: + DefaultAzureCredential = None + get_bearer_token_provider = None +``` + +### 2. Added MI detection before each gate (~line 767) + +At each of the 3 `AzureChatCompletion` creation sites (single agent, multi-agent specialist, +multi-agent orchestrator): + +```python +auth_type = settings.get('azure_openai_gpt_authentication_type', '') +use_managed_identity = ( + auth_type == 'managed_identity' # guard 1 + and not apim_enabled # guard 2 + and not agent_config.get("key") # guard 3 + and bool(DefaultAzureCredential) # guard 4 + and not agent_config.get("endpoint_is_user_supplied", False) # guard 5 — security +) +``` + +`use_managed_identity` is `True` only when ALL five guards hold: +1. Global auth type is `managed_identity` +2. APIM is not enabled (APIM uses subscription keys, not MI) +3. No API key is present (if a key exists, use it directly) +4. `azure-identity` imported successfully (`DefaultAzureCredential` is not `None`) +5. **Endpoint is system-controlled** — `endpoint_is_user_supplied` is `False` (Cases 2, 5, 6 only) + +Guard 5 was added in response to the Copilot security finding — see the **Security Vulnerability** +section above for the full explanation. + +### 3. Updated gate condition to accept MI (~line 768) + +Before: +```python +if AzureChatCompletion and agent_config["endpoint"] and agent_config["key"] and agent_config["deployment"]: +``` + +After: +```python +if AzureChatCompletion and agent_config["endpoint"] and (agent_config["key"] or use_managed_identity) and agent_config["deployment"]: +``` + +### 4. Added MI branch for AzureChatCompletion creation (~line 789) + +Between the existing APIM branch and direct-key branch, a new `elif use_managed_identity:` block: + +```python +elif use_managed_identity: + # Detect gov vs commercial cloud from endpoint URL + _scope = "https://cognitiveservices.azure.us/.default" if ".azure.us" in (agent_config.get("endpoint") or "") else "https://cognitiveservices.azure.com/.default" + _token_provider = get_bearer_token_provider(DefaultAzureCredential(), _scope) + chat_service = AzureChatCompletion( + service_id=service_id, + deployment_name=agent_config["deployment"], + endpoint=agent_config["endpoint"], + ad_token_provider=_token_provider, # ← MI token, not api_key + api_version=agent_config["api_version"], + ) +``` + +The scope is auto-detected: endpoints containing `.azure.us` use the Azure Government scope; +all others use the commercial Azure scope. + +## Auth Flow After Fix + +``` +User sends message + → SK Loader resolves agent config (case 6: global GPT fallback, allow_user_custom_agent_endpoints=False) + → endpoint = global endpoint, key = None (MI), deployment = 'gpt-4.1', endpoint_is_user_supplied = False + → use_managed_identity = True (auth_type='managed_identity', key=None, APIM=off, DefaultAzureCredential ok, endpoint_is_user_supplied=False) + → Gate passes: (agent_config["key"] or use_managed_identity) = True + → AzureChatCompletion created with ad_token_provider (DefaultAzureCredential) + → Agent loads with full instructions + ServiceNow tools (OpenAPI plugin) + → Agent calls queryAssets → OpenAPI plugin injects Bearer token → ServiceNow returns real data + → Real results displayed (no fabrication) +``` \ No newline at end of file diff --git a/functional_tests/test_agent_managed_identity_endpoint_flag.py b/functional_tests/test_agent_managed_identity_endpoint_flag.py new file mode 100644 index 00000000..893297f1 --- /dev/null +++ b/functional_tests/test_agent_managed_identity_endpoint_flag.py @@ -0,0 +1,393 @@ +#!/usr/bin/env python3 +# test_agent_managed_identity_endpoint_flag.py +""" +Functional test for managed identity endpoint_is_user_supplied flag and +use_managed_identity guard logic in semantic_kernel_loader.py. + +Version: 0.239.002 +Implemented in: 0.238.025 + +This test ensures that: +1. resolve_agent_config() sets endpoint_is_user_supplied=False for Cases 5 and 6 + (system-controlled endpoints) and True for Cases 1, 3, and 4 (user/agent-supplied). +2. use_managed_identity evaluates to True only when all five guards hold: + auth_type == 'managed_identity', no APIM, no key, DefaultAzureCredential + available, and endpoint_is_user_supplied == False. +3. The AzureChatCompletion gate condition admits MI auth when appropriate and + blocks it when endpoint_is_user_supplied=True (which forces use_managed_identity=False). + +These tests mirror the decision tree in resolve_agent_config() and the +use_managed_identity expression in load_single_agent_for_kernel() from +application/single_app/semantic_kernel_loader.py. +""" + +import sys + + +# --------------------------------------------------------------------------- +# Inline mirror of the resolve_agent_config() decision tree. +# Only the endpoint_is_user_supplied assignment is exercised here. +# Logic must stay in sync with semantic_kernel_loader.py. +# --------------------------------------------------------------------------- + +def _resolve_endpoint_is_user_supplied(agent, settings): + """ + Mirror of the 'PATCHED DECISION TREE' in resolve_agent_config(). + Returns (endpoint_is_user_supplied, case_number). + """ + def any_filled(*fields): + return any(bool(f) for f in fields) + + def all_filled(*fields): + return all(bool(f) for f in fields) + + user_apim_enabled = agent.get("enable_agent_gpt_apim") in [True, 1, "true", "True"] + global_apim_enabled = settings.get("enable_gpt_apim", False) + allow_user_custom = settings.get("allow_user_custom_agent_endpoints", False) + allow_group_custom = settings.get("allow_group_custom_agent_endpoints", False) + is_group_agent = agent.get("is_group", False) + is_global_agent = agent.get("is_global", False) + + if is_group_agent: + can_use_agent_endpoints = allow_group_custom + elif is_global_agent: + can_use_agent_endpoints = False + else: + can_use_agent_endpoints = allow_user_custom + + user_apim_allowed = user_apim_enabled and can_use_agent_endpoints + + u_apim = ( + agent.get("azure_apim_gpt_endpoint"), + agent.get("azure_apim_gpt_subscription_key"), + agent.get("azure_apim_gpt_deployment"), + agent.get("azure_apim_gpt_api_version"), + ) + g_apim = ( + settings.get("azure_apim_gpt_endpoint"), + settings.get("azure_apim_gpt_subscription_key"), + settings.get("azure_apim_gpt_deployment"), + settings.get("azure_apim_gpt_api_version"), + ) + u_gpt = ( + agent.get("azure_openai_gpt_endpoint"), + agent.get("azure_openai_gpt_key"), + agent.get("azure_openai_gpt_deployment"), + agent.get("azure_openai_gpt_api_version"), + ) + g_gpt = ( + settings.get("azure_openai_gpt_endpoint"), + settings.get("azure_openai_gpt_key"), + settings.get("azure_openai_gpt_deployment"), + settings.get("azure_openai_gpt_api_version"), + ) + + # Case 1 – user APIM values present and allowed + if user_apim_allowed and any_filled(*u_apim): + return True, 1 + # Case 2 – user APIM enabled but no user values; fall to global APIM + elif user_apim_enabled and global_apim_enabled and any_filled(*g_apim): + return False, 2 + # Case 3 – agent GPT config fully filled and allowed + elif all_filled(*u_gpt) and can_use_agent_endpoints: + return True, 3 + # Case 4 – agent GPT config partially filled, no global APIM + elif any_filled(*u_gpt) and not global_apim_enabled and can_use_agent_endpoints: + return True, 4 + # Case 5 – global APIM enabled and present + elif global_apim_enabled and any_filled(*g_apim): + return False, 5 + # Case 6 – global GPT fallback + else: + return False, 6 + + +# --------------------------------------------------------------------------- +# Mirror of the use_managed_identity expression in load_single_agent_for_kernel +# --------------------------------------------------------------------------- + +def _compute_use_managed_identity(auth_type, apim_enabled, agent_key, + credential_available, endpoint_is_user_supplied): + """Mirror of the inline use_managed_identity expression.""" + DefaultAzureCredential = object() if credential_available else None + return ( + auth_type == "managed_identity" + and not apim_enabled + and not agent_key + and bool(DefaultAzureCredential) + and not endpoint_is_user_supplied + ) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +GLOBAL_ENDPOINT = "https://global.openai.azure.com" +GLOBAL_KEY = "global-key-abc" +GLOBAL_DEPLOYMENT = "gpt-4.1" +GLOBAL_API_VER = "2024-08-01" + +AGENT_ENDPOINT = "https://agent.openai.azure.com" +AGENT_KEY = "agent-key-xyz" +AGENT_DEPLOYMENT = "gpt-4o" +AGENT_API_VER = "2024-05-13" + +APIM_ENDPOINT = "https://apim.azure-api.net" +APIM_KEY = "apim-sub-key" +APIM_DEPL = "gpt-4.1" +APIM_VER = "2024-08-01" + + +def _settings(**overrides): + s = { + "azure_openai_gpt_endpoint": GLOBAL_ENDPOINT, + "azure_openai_gpt_key": GLOBAL_KEY, + "azure_openai_gpt_deployment": GLOBAL_DEPLOYMENT, + "azure_openai_gpt_api_version": GLOBAL_API_VER, + "enable_gpt_apim": False, + "per_user_semantic_kernel": True, + "allow_user_custom_agent_endpoints": False, + "allow_group_custom_agent_endpoints": False, + } + s.update(overrides) + return s + + +def _agent(**overrides): + a = { + "name": "test-agent", + "agent_type": "local", + "enable_agent_gpt_apim": False, + "is_global": False, + "is_group": False, + } + a.update(overrides) + return a + + +# --------------------------------------------------------------------------- +# Test 1 – endpoint_is_user_supplied across all 6 decision-tree cases +# --------------------------------------------------------------------------- + +def test_endpoint_is_user_supplied_all_cases(): + """endpoint_is_user_supplied must be False for Cases 2, 5, 6 and True for 1, 3, 4.""" + print("Testing endpoint_is_user_supplied flag for all 6 cases...") + errors = [] + + # Case 1 – user APIM with values present, can_use=True → True + val, case = _resolve_endpoint_is_user_supplied( + _agent( + enable_agent_gpt_apim=True, + azure_apim_gpt_endpoint=APIM_ENDPOINT, + azure_apim_gpt_subscription_key=APIM_KEY, + azure_apim_gpt_deployment=APIM_DEPL, + azure_apim_gpt_api_version=APIM_VER, + ), + _settings(allow_user_custom_agent_endpoints=True), + ) + _check(errors, "Case 1 (user APIM)", expected=True, got=val, case=case) + + # Case 2 – user APIM on but no user values, global APIM present → False + val, case = _resolve_endpoint_is_user_supplied( + _agent(enable_agent_gpt_apim=True), + _settings( + allow_user_custom_agent_endpoints=True, + enable_gpt_apim=True, + azure_apim_gpt_endpoint=APIM_ENDPOINT, + azure_apim_gpt_subscription_key=APIM_KEY, + azure_apim_gpt_deployment=APIM_DEPL, + azure_apim_gpt_api_version=APIM_VER, + ), + ) + _check(errors, "Case 2 (global APIM fallback)", expected=False, got=val, case=case) + + # Case 3 – agent GPT fully filled, can_use=True → True + val, case = _resolve_endpoint_is_user_supplied( + _agent( + azure_openai_gpt_endpoint=AGENT_ENDPOINT, + azure_openai_gpt_key=AGENT_KEY, + azure_openai_gpt_deployment=AGENT_DEPLOYMENT, + azure_openai_gpt_api_version=AGENT_API_VER, + ), + _settings(allow_user_custom_agent_endpoints=True), + ) + _check(errors, "Case 3 (full agent GPT)", expected=True, got=val, case=case) + + # Case 4 – agent GPT partially filled, no global APIM, can_use=True → True + val, case = _resolve_endpoint_is_user_supplied( + _agent(azure_openai_gpt_deployment=AGENT_DEPLOYMENT), # only deployment + _settings(allow_user_custom_agent_endpoints=True, enable_gpt_apim=False), + ) + _check(errors, "Case 4 (partial agent GPT, merged)", expected=True, got=val, case=case) + + # Case 5 – global APIM enabled and present, no agent override → False + val, case = _resolve_endpoint_is_user_supplied( + _agent(), + _settings( + enable_gpt_apim=True, + azure_apim_gpt_endpoint=APIM_ENDPOINT, + azure_apim_gpt_subscription_key=APIM_KEY, + azure_apim_gpt_deployment=APIM_DEPL, + azure_apim_gpt_api_version=APIM_VER, + ), + ) + _check(errors, "Case 5 (global APIM)", expected=False, got=val, case=case) + + # Case 6 – pure global GPT fallback (most common MI scenario) → False + val, case = _resolve_endpoint_is_user_supplied(_agent(), _settings()) + _check(errors, "Case 6 (global GPT fallback)", expected=False, got=val, case=case) + + # --- Group-agent scenarios matching: Allow Group Custom Agent Endpoints=ON --- + # Group agent with NO custom fields + allow_group_custom=True + # → no u_gpt/u_apim fields filled → falls to Case 6 → endpoint_is_user_supplied=False + # → MI is permitted (this is the user's deployment scenario) + val, case = _resolve_endpoint_is_user_supplied( + _agent(is_group=True), + _settings(allow_group_custom_agent_endpoints=True), + ) + _check(errors, "Group agent, no custom fields, allow_group_custom=True (MI permitted)", + expected=False, got=val, case=case) + + # Group agent WITH a custom endpoint + allow_group_custom=True + # → hits Case 3 (fully filled u_gpt) → endpoint_is_user_supplied=True + # → MI is BLOCKED (group admin could point at attacker endpoint) + val, case = _resolve_endpoint_is_user_supplied( + _agent( + is_group=True, + azure_openai_gpt_endpoint=AGENT_ENDPOINT, + azure_openai_gpt_key=AGENT_KEY, + azure_openai_gpt_deployment=AGENT_DEPLOYMENT, + azure_openai_gpt_api_version=AGENT_API_VER, + ), + _settings(allow_group_custom_agent_endpoints=True), + ) + _check(errors, "Group agent, custom endpoint set, allow_group_custom=True (MI blocked)", + expected=True, got=val, case=case) + + return _summarise(errors, "endpoint_is_user_supplied") + + +def _check(errors, label, expected, got, case=None): + status = "PASS" if got == expected else "FAIL" + suffix = f" (case #{case})" if case else "" + print(f" [{status}] {label}{suffix}: endpoint_is_user_supplied={got}") + if got != expected: + errors.append(f"{label}: expected {expected}, got {got}") + + +# --------------------------------------------------------------------------- +# Test 2 – use_managed_identity guard logic +# --------------------------------------------------------------------------- + +def test_use_managed_identity_logic(): + """use_managed_identity must be True only when every guard passes.""" + print("\nTesting use_managed_identity boolean logic...") + + cases = [ + # (description, auth_type, apim, key, cred_avail, user_supplied, expected) + ("all guards pass → True", + "managed_identity", False, None, True, False, True), + ("wrong auth_type → False", + "api_key", False, None, True, False, False), + ("APIM enabled → False", + "managed_identity", True, None, True, False, False), + ("key present → False", + "managed_identity", False, "abc123", True, False, False), + ("no DefaultAzureCredential → False", + "managed_identity", False, None, False, False, False), + ("endpoint_is_user_supplied=True → False", + "managed_identity", False, None, True, True, False), + ] + + errors = [] + for desc, auth, apim, key, cred, user_sup, expected in cases: + result = _compute_use_managed_identity(auth, apim, key, cred, user_sup) + status = "PASS" if result == expected else "FAIL" + print(f" [{status}] {desc}: {result}") + if result != expected: + errors.append(f"{desc}: expected {expected}, got {result}") + + return _summarise(errors, "use_managed_identity") + + +# --------------------------------------------------------------------------- +# Test 3 – AzureChatCompletion gate condition +# --------------------------------------------------------------------------- + +def test_gate_condition(): + """Gate must admit MI auth and block it when endpoint_is_user_supplied=True + (which sets use_mi=False) and no key is present.""" + print("\nTesting AzureChatCompletion gate condition...") + + def gate(endpoint, key, deployment, use_mi): + """Mirrors: if AzureChatCompletion and endpoint and (key or use_mi) and deployment""" + return bool(endpoint) and bool(key or use_mi) and bool(deployment) + + cases = [ + # (desc, endpoint, key, deployment, use_mi, expected) + ("MI auth, no key, user_supplied=False → admitted", + GLOBAL_ENDPOINT, None, GLOBAL_DEPLOYMENT, True, True), + ("key auth, no MI → admitted", + GLOBAL_ENDPOINT, GLOBAL_KEY, GLOBAL_DEPLOYMENT, False, True), + ("user_supplied=True → use_mi=False, no key → blocked", + AGENT_ENDPOINT, None, AGENT_DEPLOYMENT, False, False), + ("no endpoint → blocked", + None, None, GLOBAL_DEPLOYMENT, True, False), + ("no deployment → blocked", + GLOBAL_ENDPOINT, None, None, True, False), + ] + + errors = [] + for desc, ep, key, depl, use_mi, expected in cases: + result = gate(ep, key, depl, use_mi) + status = "PASS" if result == expected else "FAIL" + print(f" [{status}] {desc}: {result}") + if result != expected: + errors.append(f"{desc}: expected {expected}, got {result}") + + return _summarise(errors, "gate condition") + + +# --------------------------------------------------------------------------- +# Helper +# --------------------------------------------------------------------------- + +def _summarise(errors, label): + if errors: + for e in errors: + print(f" FAIL: {e}") + return False + print(f"All {label} cases passed!") + return True + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + tests = [ + test_endpoint_is_user_supplied_all_cases, + test_use_managed_identity_logic, + test_gate_condition, + ] + results = [] + for t in tests: + print(f"\n{'='*60}") + print(f"Running {t.__name__}...") + print("="*60) + try: + results.append(t()) + except Exception as exc: + import traceback + print(f"ERROR: {exc}") + traceback.print_exc() + results.append(False) + + passed = sum(1 for r in results if r) + total = len(results) + print(f"\n{'='*60}") + print(f"Results: {passed}/{total} tests passed") + print("="*60) + sys.exit(0 if all(results) else 1)