From f78c497456b67a1b81bcdbd7e92dacada1b4c1a3 Mon Sep 17 00:00:00 2001 From: "Chen, Vivien" Date: Wed, 4 Mar 2026 22:42:53 -0500 Subject: [PATCH 1/8] fix: agent managed identity support in semantic_kernel_loader (#769) --- application/single_app/config.py | 2 +- .../single_app/semantic_kernel_loader.py | 58 ++++++- .../AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md | 141 ++++++++++++++++++ 3 files changed, 197 insertions(+), 4 deletions(-) create mode 100644 docs/explanation/fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md diff --git a/application/single_app/config.py b/application/single_app/config.py index a6a3bc99..7094a3c5 100644 --- a/application/single_app/config.py +++ b/application/single_app/config.py @@ -88,7 +88,7 @@ EXECUTOR_TYPE = 'thread' EXECUTOR_MAX_WORKERS = 30 SESSION_TYPE = 'filesystem' -VERSION = "0.238.024" +VERSION = "0.238.025" SECRET_KEY = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production') diff --git a/application/single_app/semantic_kernel_loader.py b/application/single_app/semantic_kernel_loader.py index 78f54203..16798ed9 100644 --- a/application/single_app/semantic_kernel_loader.py +++ b/application/single_app/semantic_kernel_loader.py @@ -40,6 +40,11 @@ from semantic_kernel_plugins.plugin_loader import discover_plugins from semantic_kernel_plugins.openapi_plugin_factory import OpenApiPluginFactory import app_settings_cache +try: + from azure.identity import DefaultAzureCredential, get_bearer_token_provider +except ImportError: + DefaultAzureCredential = None + get_bearer_token_provider = None @@ -758,7 +763,9 @@ def load_single_agent_for_kernel(kernel, agent_cfg, settings, context_obj, redis log_event(f"[SK Loader] Agent config resolved for {agent_cfg.get('name')} - endpoint: {bool(agent_config.get('endpoint'))}, key: {bool(agent_config.get('key'))}, deployment: {agent_config.get('deployment')}, max_completion_tokens: {agent_config.get('max_completion_tokens')}", level=logging.INFO) - if AzureChatCompletion and agent_config["endpoint"] and agent_config["key"] and agent_config["deployment"]: + auth_type = settings.get('azure_openai_gpt_authentication_type', '') + use_managed_identity = (auth_type == 'managed_identity') and not apim_enabled and not agent_config.get("key") + if AzureChatCompletion and agent_config["endpoint"] and (agent_config["key"] or use_managed_identity) and agent_config["deployment"]: print(f"[SK Loader] Azure config valid for {agent_config['name']}, creating chat service...") if apim_enabled: log_event( @@ -779,6 +786,25 @@ def load_single_agent_for_kernel(kernel, agent_cfg, settings, context_obj, redis api_version=agent_config["api_version"], # default_headers={"Ocp-Apim-Subscription-Key": agent_config["key"]} ) + elif use_managed_identity: + log_event( + f"[SK Loader] Initializing Managed Identity AzureChatCompletion for agent: {agent_config['name']} ({mode_label})", + { + "aoai_endpoint": agent_config["endpoint"], + "aoai_deployment": agent_config["deployment"], + "agent_name": agent_config["name"] + }, + level=logging.INFO + ) + _scope = "https://cognitiveservices.azure.us/.default" if ".azure.us" in (agent_config.get("endpoint") or "") else "https://cognitiveservices.azure.com/.default" + _token_provider = get_bearer_token_provider(DefaultAzureCredential(), _scope) + chat_service = AzureChatCompletion( + service_id=service_id, + deployment_name=agent_config["deployment"], + endpoint=agent_config["endpoint"], + ad_token_provider=_token_provider, + api_version=agent_config["api_version"], + ) else: log_event( f"[SK Loader] Initializing GPT Direct AzureChatCompletion for agent: {agent_config['name']} ({mode_label})", @@ -1521,7 +1547,10 @@ def load_semantic_kernel(kernel: Kernel, settings): agent_config = resolve_agent_config(agent_cfg, settings) chat_service = None service_id = f"aoai-chat-{agent_config['name'].replace(' ', '').lower()}" - if AzureChatCompletion and agent_config["endpoint"] and agent_config["key"] and agent_config["deployment"]: + _ma_auth_type = settings.get('azure_openai_gpt_authentication_type', '') + _ma_apim_enabled = settings.get("enable_gpt_apim", False) + _ma_use_mi = (_ma_auth_type == 'managed_identity') and not _ma_apim_enabled and not agent_config.get("key") + if AzureChatCompletion and agent_config["endpoint"] and (agent_config["key"] or _ma_use_mi) and agent_config["deployment"]: try: try: chat_service = kernel.get_service(service_id=service_id) @@ -1548,6 +1577,16 @@ def load_semantic_kernel(kernel: Kernel, settings): api_version=agent_config["api_version"], # default_headers={"Ocp-Apim-Subscription-Key": agent_config["key"]} ) + elif _ma_use_mi: + _scope = "https://cognitiveservices.azure.us/.default" if ".azure.us" in (agent_config.get("endpoint") or "") else "https://cognitiveservices.azure.com/.default" + _token_provider = get_bearer_token_provider(DefaultAzureCredential(), _scope) + chat_service = AzureChatCompletion( + service_id=service_id, + deployment_name=agent_config["deployment"], + endpoint=agent_config["endpoint"], + ad_token_provider=_token_provider, + api_version=agent_config["api_version"], + ) else: chat_service = AzureChatCompletion( service_id=service_id, @@ -1631,7 +1670,10 @@ def load_semantic_kernel(kernel: Kernel, settings): orchestrator_config = resolve_agent_config(orchestrator_cfg, settings) service_id = f"aoai-chat-{orchestrator_config['name']}" chat_service = None - if AzureChatCompletion and orchestrator_config["endpoint"] and orchestrator_config["key"] and orchestrator_config["deployment"]: + _orch_auth_type = settings.get('azure_openai_gpt_authentication_type', '') + _orch_apim_enabled = settings.get("enable_gpt_apim", False) + _orch_use_mi = (_orch_auth_type == 'managed_identity') and not _orch_apim_enabled and not orchestrator_config.get("key") + if AzureChatCompletion and orchestrator_config["endpoint"] and (orchestrator_config["key"] or _orch_use_mi) and orchestrator_config["deployment"]: try: chat_service = kernel.get_service(service_id=service_id) except Exception: @@ -1657,6 +1699,16 @@ def load_semantic_kernel(kernel: Kernel, settings): api_version=orchestrator_config["api_version"], # default_headers={"Ocp-Apim-Subscription-Key": orchestrator_config["key"]} ) + elif _orch_use_mi: + _scope = "https://cognitiveservices.azure.us/.default" if ".azure.us" in (orchestrator_config.get("endpoint") or "") else "https://cognitiveservices.azure.com/.default" + _token_provider = get_bearer_token_provider(DefaultAzureCredential(), _scope) + chat_service = AzureChatCompletion( + service_id=service_id, + deployment_name=orchestrator_config["deployment"], + endpoint=orchestrator_config["endpoint"], + ad_token_provider=_token_provider, + api_version=orchestrator_config["api_version"], + ) else: chat_service = AzureChatCompletion( service_id=service_id, diff --git a/docs/explanation/fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md b/docs/explanation/fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md new file mode 100644 index 00000000..fb716d59 --- /dev/null +++ b/docs/explanation/fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md @@ -0,0 +1,141 @@ +# Agent Managed Identity SK Loader Fix + +**Fixed/Implemented in version:** **0.238.025** (matches `config.py` `app.config['VERSION']`) +**GitHub Issue:** [#769 — Agents fail silently when using Managed Identity authentication](https://github.com/microsoft/simplechat/issues/769) + +## Issue Description + +When using **Azure Managed Identity (MI)** for Azure OpenAI authentication, agents configured through +the **Model & Connection** page (Step 2 of the agent wizard) failed silently — the agent never loaded, +fell back to plain GPT-4.1 with no tools or instructions, and fabricated responses instead of calling +real APIs (e.g., ServiceNow). + +## Root Cause Analysis + +### How Agent Config Is Resolved + +`resolve_agent_config()` in `semantic_kernel_loader.py` (~line 107) figures out which endpoint/key/ +deployment to use for an agent by running through a **decision tree** (~line 291): + +``` +# 1. User APIM enabled and any user APIM values set → use user APIM +# 2. User APIM enabled but empty, global APIM enabled → use global APIM +# 3. Agent GPT config is FULLY filled → use agent GPT config +# 4. Agent GPT config is PARTIALLY filled, global APIM off → merge agent GPT with global GPT +# 5. Global APIM enabled → use global APIM +# 6. Fallback → use global GPT config entirely +``` + +### The Failure + +When an agent is configured with only the deployment name set (endpoint and key left blank), the +decision tree hits **case 4** — it merges the agent's partial config with global settings: + +``` +Agent-level: endpoint='', key='', deployment='gpt-4.1', api_version='' +``` + +After merge with global settings: +- `endpoint` = global endpoint ✓ +- `deployment` = `'gpt-4.1'` ✓ +- `key` = global key = **`None`** ✗ (MI auth — no API key is stored in settings) + +The gate condition at ~line 768 then fails: + +```python +if AzureChatCompletion and agent_config["endpoint"] and agent_config["key"] and agent_config["deployment"]: +``` + +`agent_config["key"]` is `None` → **condition is False** → falls into the `else` block: + +``` +[SK Loader] Azure config INVALID for servicenow_support_agent: + - AzureChatCompletion available: True + - endpoint: True + - key: False ← THIS IS THE FAILURE + - deployment: True +``` + +Returns `None, None` → no agent loaded → chat uses plain GPT-4.1 with no tools/instructions +→ GPT fabricates responses instead of calling the actual API. + +## Files Modified + +| File | Lines Changed | +|------|--------------| +| `application/single_app/semantic_kernel_loader.py` | ~43-47, ~767-768, ~810-829, ~1530-1532, ~1548-1558, ~1636-1638, ~1655-1665 | +| `application/single_app/config.py` | VERSION bump | + +## Fix + +### 1. Added Azure Identity imports (~line 43) + +```python +try: + from azure.identity import DefaultAzureCredential, get_bearer_token_provider +except ImportError: + DefaultAzureCredential = None + get_bearer_token_provider = None +``` + +### 2. Added MI detection before each gate (~line 767) + +At each of the 3 `AzureChatCompletion` creation sites (single agent, multi-agent specialist, +multi-agent orchestrator): + +```python +auth_type = settings.get('azure_openai_gpt_authentication_type', '') +use_managed_identity = (auth_type == 'managed_identity') and not apim_enabled and not agent_config.get("key") +``` + +`use_managed_identity` is `True` when ALL of: +- Global auth type is `managed_identity` +- APIM is not enabled (APIM uses subscription keys, not MI) +- No API key is present (if a key exists, use it directly) + +### 3. Updated gate condition to accept MI (~line 768) + +Before: +```python +if AzureChatCompletion and agent_config["endpoint"] and agent_config["key"] and agent_config["deployment"]: +``` + +After: +```python +if AzureChatCompletion and agent_config["endpoint"] and (agent_config["key"] or use_managed_identity) and agent_config["deployment"]: +``` + +### 4. Added MI branch for AzureChatCompletion creation (~line 789) + +Between the existing APIM branch and direct-key branch, a new `elif use_managed_identity:` block: + +```python +elif use_managed_identity: + # Detect gov vs commercial cloud from endpoint URL + _scope = "https://cognitiveservices.azure.us/.default" if ".azure.us" in (agent_config.get("endpoint") or "") else "https://cognitiveservices.azure.com/.default" + _token_provider = get_bearer_token_provider(DefaultAzureCredential(), _scope) + chat_service = AzureChatCompletion( + service_id=service_id, + deployment_name=agent_config["deployment"], + endpoint=agent_config["endpoint"], + ad_token_provider=_token_provider, # ← MI token, not api_key + api_version=agent_config["api_version"], + ) +``` + +The scope is auto-detected: endpoints containing `.azure.us` use the Azure Government scope; +all others use the commercial Azure scope. + +## Auth Flow After Fix + +``` +User sends message + → SK Loader resolves agent config (case 4: merge agent partial + global GPT) + → endpoint = global endpoint, key = None (MI), deployment = 'gpt-4.1' + → use_managed_identity = True (auth_type='managed_identity', key=None, APIM=off) + → Gate passes: (agent_config["key"] or use_managed_identity) = True + → AzureChatCompletion created with ad_token_provider (DefaultAzureCredential) + → Agent loads with full instructions + ServiceNow tools (OpenAPI plugin) + → Agent calls queryAssets → OpenAPI plugin injects Bearer token → ServiceNow returns real data + → Real results displayed (no fabrication) +``` \ No newline at end of file From b3220b8e282b2e7e91e3ed6fdb2667e962d7f723 Mon Sep 17 00:00:00 2001 From: "Chen, Vivien" Date: Thu, 5 Mar 2026 00:28:46 -0500 Subject: [PATCH 2/8] Fixed different references of secrets --- .github/workflows/docker_image_publish.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/docker_image_publish.yml b/.github/workflows/docker_image_publish.yml index ef8732c3..665a4fb2 100644 --- a/.github/workflows/docker_image_publish.yml +++ b/.github/workflows/docker_image_publish.yml @@ -15,11 +15,11 @@ jobs: uses: Azure/docker-login@v2 with: # Container registry username - username: ${{ secrets.MAIN_ACR_USERNAME }} + username: ${{ secrets.ACR_USERNAME }} # Container registry password - password: ${{ secrets.MAIN_ACR_PASSWORD }} + password: ${{ secrets.ACR_PASSWORD }} # Container registry server url - login-server: ${{ secrets.MAIN_ACR_LOGIN_SERVER }} + login-server: ${{ secrets.ACR_LOGIN_SERVER }} - name: Normalize branch name for tag run: | REF="${GITHUB_REF_NAME}" From 75f1d564fb58c75641eb9481880c27e5e2f4ee57 Mon Sep 17 00:00:00 2001 From: "Chen, Vivien" Date: Thu, 5 Mar 2026 00:35:34 -0500 Subject: [PATCH 3/8] fix: guard use_managed_identity against None DefaultAzureCredential When azure.identity fails to import, DefaultAzureCredential and get_bearer_token_provider are set to None. Previously, use_managed_identity could still evaluate to True based solely on auth_type and absence of a key, causing the gate condition to pass and eventually calling get_bearer_token_provider(None(), scope) raising: TypeError: 'NoneType' object is not callable Fix: add 'and bool(DefaultAzureCredential)' guard to all three managed identity flag computations (lines 767, 1552, 1675) so managed identity is only attempted when the credential class is actually available. Identified by GitHub Copilot code review on PR #770. --- .../single_app/semantic_kernel_loader.py | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/application/single_app/semantic_kernel_loader.py b/application/single_app/semantic_kernel_loader.py index 16798ed9..a4673d84 100644 --- a/application/single_app/semantic_kernel_loader.py +++ b/application/single_app/semantic_kernel_loader.py @@ -764,7 +764,12 @@ def load_single_agent_for_kernel(kernel, agent_cfg, settings, context_obj, redis log_event(f"[SK Loader] Agent config resolved for {agent_cfg.get('name')} - endpoint: {bool(agent_config.get('endpoint'))}, key: {bool(agent_config.get('key'))}, deployment: {agent_config.get('deployment')}, max_completion_tokens: {agent_config.get('max_completion_tokens')}", level=logging.INFO) auth_type = settings.get('azure_openai_gpt_authentication_type', '') - use_managed_identity = (auth_type == 'managed_identity') and not apim_enabled and not agent_config.get("key") + use_managed_identity = ( + auth_type == 'managed_identity' + and not apim_enabled + and not agent_config.get("key") + and bool(DefaultAzureCredential) + ) if AzureChatCompletion and agent_config["endpoint"] and (agent_config["key"] or use_managed_identity) and agent_config["deployment"]: print(f"[SK Loader] Azure config valid for {agent_config['name']}, creating chat service...") if apim_enabled: @@ -1549,7 +1554,12 @@ def load_semantic_kernel(kernel: Kernel, settings): service_id = f"aoai-chat-{agent_config['name'].replace(' ', '').lower()}" _ma_auth_type = settings.get('azure_openai_gpt_authentication_type', '') _ma_apim_enabled = settings.get("enable_gpt_apim", False) - _ma_use_mi = (_ma_auth_type == 'managed_identity') and not _ma_apim_enabled and not agent_config.get("key") + _ma_use_mi = ( + _ma_auth_type == 'managed_identity' + and not _ma_apim_enabled + and not agent_config.get("key") + and bool(DefaultAzureCredential) + ) if AzureChatCompletion and agent_config["endpoint"] and (agent_config["key"] or _ma_use_mi) and agent_config["deployment"]: try: try: @@ -1672,7 +1682,12 @@ def load_semantic_kernel(kernel: Kernel, settings): chat_service = None _orch_auth_type = settings.get('azure_openai_gpt_authentication_type', '') _orch_apim_enabled = settings.get("enable_gpt_apim", False) - _orch_use_mi = (_orch_auth_type == 'managed_identity') and not _orch_apim_enabled and not orchestrator_config.get("key") + _orch_use_mi = ( + _orch_auth_type == 'managed_identity' + and not _orch_apim_enabled + and not orchestrator_config.get("key") + and bool(DefaultAzureCredential) + ) if AzureChatCompletion and orchestrator_config["endpoint"] and (orchestrator_config["key"] or _orch_use_mi) and orchestrator_config["deployment"]: try: chat_service = kernel.get_service(service_id=service_id) From 8d4f65e0e8ce4d2d631617737869e29540d4ee57 Mon Sep 17 00:00:00 2001 From: "Chen, Vivien" Date: Thu, 5 Mar 2026 00:47:59 -0500 Subject: [PATCH 4/8] fix: block managed identity when endpoint is user-supplied (token theft) Security fix identified by GitHub Copilot code review on PR #770. When allow_user_custom_agent_endpoints / allow_group_custom_agent_endpoints is enabled, agent_config['endpoint'] can be an attacker-controlled URL. With azure_openai_gpt_authentication_type=managed_identity, the code was calling get_bearer_token_provider(DefaultAzureCredential(), scope) and passing the token to AzureChatCompletion with that attacker-supplied endpoint leaking the managed identity bearer token to an external host. Fix: - Track endpoint_is_user_supplied in resolve_agent_config for each of the 6 endpoint resolution branches (cases 1/3/4 = user-supplied True, cases 2/5/6 = system-controlled False) - Add 'and not agent_config.get(endpoint_is_user_supplied, False)' guard to all three managed identity flag computations (use_managed_identity, _ma_use_mi, _orch_use_mi) so managed identity tokens are only ever sent to system-controlled Azure endpoints. --- application/single_app/semantic_kernel_loader.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/application/single_app/semantic_kernel_loader.py b/application/single_app/semantic_kernel_loader.py index a4673d84..23fa12cb 100644 --- a/application/single_app/semantic_kernel_loader.py +++ b/application/single_app/semantic_kernel_loader.py @@ -300,26 +300,32 @@ def merge_fields(primary, fallback): debug_print(f"[SK Loader] Using user APIM with global fallback") merged = merge_fields(u_apim, g_apim if global_apim_enabled and any_filled(*g_apim) else (None, None, None, None)) endpoint, key, deployment, api_version = merged + endpoint_is_user_supplied = True # 2. User APIM enabled but no user APIM values, and global APIM enabled and present: use global APIM elif user_apim_enabled and global_apim_enabled and any_filled(*g_apim): debug_print(f"[SK Loader] Using global APIM (user APIM enabled but not present)") endpoint, key, deployment, api_version = g_apim + endpoint_is_user_supplied = False # 3. User GPT config is FULLY filled: use user GPT (all fields filled) elif all_filled(*u_gpt) and can_use_agent_endpoints: debug_print(f"[SK Loader] Using agent GPT config (all fields filled)") endpoint, key, deployment, api_version = u_gpt + endpoint_is_user_supplied = True # 4. User GPT config is PARTIALLY filled, global APIM is NOT enabled: merge user GPT with global GPT elif any_filled(*u_gpt) and not global_apim_enabled and can_use_agent_endpoints: debug_print(f"[SK Loader] Using agent GPT config (partially filled, merging with global GPT, global APIM not enabled)") endpoint, key, deployment, api_version = merge_fields(u_gpt, g_gpt) + endpoint_is_user_supplied = True # 5. Global APIM enabled and present: use global APIM elif global_apim_enabled and any_filled(*g_apim): debug_print(f"[SK Loader] Using global APIM (fallback)") endpoint, key, deployment, api_version = g_apim + endpoint_is_user_supplied = False # 6. Fallback to global GPT config else: debug_print(f"[SK Loader] Using global GPT config (fallback)") endpoint, key, deployment, api_version = g_gpt + endpoint_is_user_supplied = False result = { "endpoint": endpoint, @@ -342,6 +348,9 @@ def merge_fields(primary, fallback): "max_completion_tokens": agent.get("max_completion_tokens", -1), # -1 meant use model default determined by the service, 35-trubo is 4096, 4o is 16384, 4.1 is at least 32768 "agent_type": agent_type or "local", "other_settings": other_settings, + # Security: track whether the endpoint was user/agent-supplied vs system-controlled. + # Managed identity must NOT be used with user-supplied endpoints to prevent token theft. + "endpoint_is_user_supplied": endpoint_is_user_supplied, } print(f"[SK Loader] Final resolved config for {agent.get('name')}: endpoint={bool(endpoint)}, key={bool(key)}, deployment={deployment}") @@ -769,6 +778,7 @@ def load_single_agent_for_kernel(kernel, agent_cfg, settings, context_obj, redis and not apim_enabled and not agent_config.get("key") and bool(DefaultAzureCredential) + and not agent_config.get("endpoint_is_user_supplied", False) ) if AzureChatCompletion and agent_config["endpoint"] and (agent_config["key"] or use_managed_identity) and agent_config["deployment"]: print(f"[SK Loader] Azure config valid for {agent_config['name']}, creating chat service...") @@ -1559,6 +1569,7 @@ def load_semantic_kernel(kernel: Kernel, settings): and not _ma_apim_enabled and not agent_config.get("key") and bool(DefaultAzureCredential) + and not agent_config.get("endpoint_is_user_supplied", False) ) if AzureChatCompletion and agent_config["endpoint"] and (agent_config["key"] or _ma_use_mi) and agent_config["deployment"]: try: @@ -1687,6 +1698,7 @@ def load_semantic_kernel(kernel: Kernel, settings): and not _orch_apim_enabled and not orchestrator_config.get("key") and bool(DefaultAzureCredential) + and not orchestrator_config.get("endpoint_is_user_supplied", False) ) if AzureChatCompletion and orchestrator_config["endpoint"] and (orchestrator_config["key"] or _orch_use_mi) and orchestrator_config["deployment"]: try: From 3ed285ae0ca031a43d7ca2401b95302c67842c53 Mon Sep 17 00:00:00 2001 From: "Chen, Vivien" Date: Thu, 5 Mar 2026 09:51:55 -0500 Subject: [PATCH 5/8] refactor: extract _build_mi_token_provider helper to eliminate scope detection duplication The managed identity scope detection and token-provider creation logic was duplicated verbatim at three call sites (lines ~814, ~1602, ~1730). Extract into a single _build_mi_token_provider(endpoint) helper that selects the correct Azure Cognitive Services scope based on whether the endpoint is in the US Government cloud (.azure.us) or the commercial cloud (.azure.com), then replace all three sites with a call to the helper. This makes future sovereign cloud changes (e.g. China .azure.cn) a single-line update instead of a three-place change. --- .../single_app/semantic_kernel_loader.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/application/single_app/semantic_kernel_loader.py b/application/single_app/semantic_kernel_loader.py index 23fa12cb..51823af9 100644 --- a/application/single_app/semantic_kernel_loader.py +++ b/application/single_app/semantic_kernel_loader.py @@ -735,6 +735,20 @@ def normalize(s): print(f"[SK Loader] Error loading agent-specific plugins: {e}") log_event(f"[SK Loader] Error loading agent-specific plugins: {e}", level=logging.ERROR, exceptionTraceback=True) +def _build_mi_token_provider(endpoint): + """Build a bearer token provider for managed identity auth. + + Selects the correct Azure Cognitive Services scope based on whether the + endpoint is in the US Government cloud (.azure.us) or the commercial cloud. + """ + scope = ( + "https://cognitiveservices.azure.us/.default" + if ".azure.us" in (endpoint or "") + else "https://cognitiveservices.azure.com/.default" + ) + return get_bearer_token_provider(DefaultAzureCredential(), scope) + + def load_single_agent_for_kernel(kernel, agent_cfg, settings, context_obj, redis_client=None, mode_label="global"): """ DRY helper to load a single agent (default agent) for the kernel. @@ -811,8 +825,7 @@ def load_single_agent_for_kernel(kernel, agent_cfg, settings, context_obj, redis }, level=logging.INFO ) - _scope = "https://cognitiveservices.azure.us/.default" if ".azure.us" in (agent_config.get("endpoint") or "") else "https://cognitiveservices.azure.com/.default" - _token_provider = get_bearer_token_provider(DefaultAzureCredential(), _scope) + _token_provider = _build_mi_token_provider(agent_config.get("endpoint")) chat_service = AzureChatCompletion( service_id=service_id, deployment_name=agent_config["deployment"], @@ -1599,8 +1612,7 @@ def load_semantic_kernel(kernel: Kernel, settings): # default_headers={"Ocp-Apim-Subscription-Key": agent_config["key"]} ) elif _ma_use_mi: - _scope = "https://cognitiveservices.azure.us/.default" if ".azure.us" in (agent_config.get("endpoint") or "") else "https://cognitiveservices.azure.com/.default" - _token_provider = get_bearer_token_provider(DefaultAzureCredential(), _scope) + _token_provider = _build_mi_token_provider(agent_config.get("endpoint")) chat_service = AzureChatCompletion( service_id=service_id, deployment_name=agent_config["deployment"], @@ -1727,8 +1739,7 @@ def load_semantic_kernel(kernel: Kernel, settings): # default_headers={"Ocp-Apim-Subscription-Key": orchestrator_config["key"]} ) elif _orch_use_mi: - _scope = "https://cognitiveservices.azure.us/.default" if ".azure.us" in (orchestrator_config.get("endpoint") or "") else "https://cognitiveservices.azure.com/.default" - _token_provider = get_bearer_token_provider(DefaultAzureCredential(), _scope) + _token_provider = _build_mi_token_provider(orchestrator_config.get("endpoint")) chat_service = AzureChatCompletion( service_id=service_id, deployment_name=orchestrator_config["deployment"], From fe2f1830b7a5b40cd3f22f91382a5b2d896bca88 Mon Sep 17 00:00:00 2001 From: "Chen, Vivien" Date: Thu, 5 Mar 2026 10:03:32 -0500 Subject: [PATCH 6/8] docs: remove stale version parenthetical from MI fix doc The parenthetical '(matches config.py app.config[VERSION])' was accurate when the doc was first written at v0.238.025 but config.py has since moved to v0.239.003, making the claim misleading. Remove the parenthetical; the version number itself (0.238.025) remains correct. --- .../fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/explanation/fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md b/docs/explanation/fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md index fb716d59..c5cca5b5 100644 --- a/docs/explanation/fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md +++ b/docs/explanation/fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md @@ -1,6 +1,6 @@ # Agent Managed Identity SK Loader Fix -**Fixed/Implemented in version:** **0.238.025** (matches `config.py` `app.config['VERSION']`) +**Fixed/Implemented in version:** **0.238.025** **GitHub Issue:** [#769 — Agents fail silently when using Managed Identity authentication](https://github.com/microsoft/simplechat/issues/769) ## Issue Description From c8635082520b824895bed6ec0c76795b22593ecd Mon Sep 17 00:00:00 2001 From: "Chen, Vivien" Date: Thu, 5 Mar 2026 10:12:59 -0500 Subject: [PATCH 7/8] docs: fix two inaccuracies in AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md - Expand code example to show all five use_managed_identity guards (bool(DefaultAzureCredential) and not endpoint_is_user_supplied) - Add corresponding bullet points explaining the two new guards - Correct auth-flow example from case 4 to case 6 (global GPT fallback, endpoint_is_user_supplied=False) which is the actual path that permits MI --- .../AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/docs/explanation/fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md b/docs/explanation/fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md index c5cca5b5..2f10490f 100644 --- a/docs/explanation/fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md +++ b/docs/explanation/fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md @@ -85,13 +85,21 @@ multi-agent orchestrator): ```python auth_type = settings.get('azure_openai_gpt_authentication_type', '') -use_managed_identity = (auth_type == 'managed_identity') and not apim_enabled and not agent_config.get("key") +use_managed_identity = ( + auth_type == 'managed_identity' + and not apim_enabled + and not agent_config.get("key") + and bool(DefaultAzureCredential) + and not agent_config.get("endpoint_is_user_supplied", False) +) ``` `use_managed_identity` is `True` when ALL of: - Global auth type is `managed_identity` - APIM is not enabled (APIM uses subscription keys, not MI) - No API key is present (if a key exists, use it directly) +- `azure-identity` imported successfully (`DefaultAzureCredential` is not `None`) +- Endpoint is system-controlled, not user/agent-supplied (prevents MI token theft) ### 3. Updated gate condition to accept MI (~line 768) @@ -130,9 +138,9 @@ all others use the commercial Azure scope. ``` User sends message - → SK Loader resolves agent config (case 4: merge agent partial + global GPT) - → endpoint = global endpoint, key = None (MI), deployment = 'gpt-4.1' - → use_managed_identity = True (auth_type='managed_identity', key=None, APIM=off) + → SK Loader resolves agent config (case 6: global GPT fallback, allow_user_custom_agent_endpoints=False) + → endpoint = global endpoint, key = None (MI), deployment = 'gpt-4.1', endpoint_is_user_supplied = False + → use_managed_identity = True (auth_type='managed_identity', key=None, APIM=off, DefaultAzureCredential ok, endpoint_is_user_supplied=False) → Gate passes: (agent_config["key"] or use_managed_identity) = True → AzureChatCompletion created with ad_token_provider (DefaultAzureCredential) → Agent loads with full instructions + ServiceNow tools (OpenAPI plugin) From f98d2f7a53d4097182bb985730f306c3ade54c72 Mon Sep 17 00:00:00 2001 From: "Chen, Vivien" Date: Thu, 5 Mar 2026 12:55:06 -0500 Subject: [PATCH 8/8] test: add MI endpoint flag functional test; update fix doc with security finding Functional test (functional_tests/test_agent_managed_identity_endpoint_flag.py): - Covers all three logic paths per Copilot review finding CodingGuidelineID 1000005 - endpoint_is_user_supplied=False for Cases 2/5/6 and True for Cases 1/3/4 (6 assertions) - Group agent scenarios: no custom fields -> MI permitted; custom endpoint -> MI blocked (2 assertions) - use_managed_identity evaluates correctly across all five guard conditions (6 assertions) - AzureChatCompletion gate admits MI and blocks when user_supplied=True (5 assertions) Fix documentation (docs/explanation/fixes/v0.239.002/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md): - Moved from v0.238.025/ to v0.239.002/ to match current config.py VERSION - Corrected Fixed/Implemented version header to 0.239.002 - Added 'Security Vulnerability' section documenting the Copilot-identified credential-theft risk: 4-guard use_managed_identity could send MI bearer tokens to attacker-controlled endpoints when allow_group_custom_agent_endpoints=True - Added Case 1-6 table showing which cases are system-controlled vs agent-supplied - Numbered all five guards in Fix section 2 with cross-reference to security section --- .../AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md | 87 +++- ...st_agent_managed_identity_endpoint_flag.py | 393 ++++++++++++++++++ 2 files changed, 468 insertions(+), 12 deletions(-) rename docs/explanation/fixes/{v0.238.025 => v0.239.002}/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md (59%) create mode 100644 functional_tests/test_agent_managed_identity_endpoint_flag.py diff --git a/docs/explanation/fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md b/docs/explanation/fixes/v0.239.002/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md similarity index 59% rename from docs/explanation/fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md rename to docs/explanation/fixes/v0.239.002/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md index 2f10490f..d14736e4 100644 --- a/docs/explanation/fixes/v0.238.025/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md +++ b/docs/explanation/fixes/v0.239.002/AGENT_MANAGED_IDENTITY_SK_LOADER_FIX.md @@ -1,6 +1,6 @@ # Agent Managed Identity SK Loader Fix -**Fixed/Implemented in version:** **0.238.025** +**Fixed/Implemented in version:** **0.239.002** **GitHub Issue:** [#769 — Agents fail silently when using Managed Identity authentication](https://github.com/microsoft/simplechat/issues/769) ## Issue Description @@ -59,6 +59,66 @@ if AzureChatCompletion and agent_config["endpoint"] and agent_config["key"] and Returns `None, None` → no agent loaded → chat uses plain GPT-4.1 with no tools/instructions → GPT fabricates responses instead of calling the actual API. +## Security Vulnerability (Identified by GitHub Copilot PR Review) + +Once the silent-failure fix was submitted as a pull request, a GitHub Copilot automated review +identified a **credential-theft vulnerability** in the initial implementation. + +### The Risk + +The initial `use_managed_identity` expression had only four guards: + +```python +use_managed_identity = ( + auth_type == 'managed_identity' + and not apim_enabled + and not agent_config.get("key") + and bool(DefaultAzureCredential) + # ← MISSING: no check on whether the endpoint is user/agent-supplied +) +``` + +With `allow_group_custom_agent_endpoints = True` (a legitimate admin configuration), a group +workspace admin could configure an agent with a **custom Azure OpenAI endpoint** pointing to an +attacker-controlled server. Because `use_managed_identity` had no endpoint check, the app would +obtain a real MI bearer token (scoped to Azure Cognitive Services) and send it in the +`Authorization: Bearer ...` header to that attacker-controlled endpoint — **leaking the app's +managed identity credentials** to a third party. + +### How `resolve_agent_config()` Flags Endpoint Ownership + +`resolve_agent_config()` already tagged every branch of its decision tree with an +`endpoint_is_user_supplied` flag indicating whether the resolved endpoint is under system control +or was provided by a user/agent config: + +| Case | Condition | `endpoint_is_user_supplied` | +|------|-----------|-----------------------------| +| 1 | User APIM values set and allowed | `True` — agent-supplied | +| 2 | User APIM on but empty; fall to global APIM | `False` — system-controlled | +| 3 | Agent GPT config fully filled and allowed | `True` — agent-supplied | +| 4 | Agent GPT config partially filled, no global APIM | `True` — agent-supplied | +| 5 | Global APIM enabled | `False` — system-controlled | +| 6 | Global GPT fallback (most common MI scenario) | `False` — system-controlled | + +MI tokens should only ever be sent to Cases 2, 5, and 6 (system-controlled). The missing guard +meant MI tokens could also reach Cases 1, 3, and 4. + +### The Security Fix + +A fifth guard was added to `use_managed_identity`: + +```python +and not agent_config.get("endpoint_is_user_supplied", False) +``` + +This single condition closes the token-theft path: even if all other guards pass, if the +resolved endpoint is agent/user-supplied, `use_managed_identity` evaluates to `False`, the gate +condition fails (no key + no MI), and the agent fails to load rather than leaking the MI token. + +**Intended behaviour for affected agents:** An agent that uses a custom endpoint must supply its +own API key. Relying on the app's managed identity to authenticate against a third-party or +operator-controlled endpoint is by design disallowed. + ## Files Modified | File | Lines Changed | @@ -86,20 +146,23 @@ multi-agent orchestrator): ```python auth_type = settings.get('azure_openai_gpt_authentication_type', '') use_managed_identity = ( - auth_type == 'managed_identity' - and not apim_enabled - and not agent_config.get("key") - and bool(DefaultAzureCredential) - and not agent_config.get("endpoint_is_user_supplied", False) + auth_type == 'managed_identity' # guard 1 + and not apim_enabled # guard 2 + and not agent_config.get("key") # guard 3 + and bool(DefaultAzureCredential) # guard 4 + and not agent_config.get("endpoint_is_user_supplied", False) # guard 5 — security ) ``` -`use_managed_identity` is `True` when ALL of: -- Global auth type is `managed_identity` -- APIM is not enabled (APIM uses subscription keys, not MI) -- No API key is present (if a key exists, use it directly) -- `azure-identity` imported successfully (`DefaultAzureCredential` is not `None`) -- Endpoint is system-controlled, not user/agent-supplied (prevents MI token theft) +`use_managed_identity` is `True` only when ALL five guards hold: +1. Global auth type is `managed_identity` +2. APIM is not enabled (APIM uses subscription keys, not MI) +3. No API key is present (if a key exists, use it directly) +4. `azure-identity` imported successfully (`DefaultAzureCredential` is not `None`) +5. **Endpoint is system-controlled** — `endpoint_is_user_supplied` is `False` (Cases 2, 5, 6 only) + +Guard 5 was added in response to the Copilot security finding — see the **Security Vulnerability** +section above for the full explanation. ### 3. Updated gate condition to accept MI (~line 768) diff --git a/functional_tests/test_agent_managed_identity_endpoint_flag.py b/functional_tests/test_agent_managed_identity_endpoint_flag.py new file mode 100644 index 00000000..893297f1 --- /dev/null +++ b/functional_tests/test_agent_managed_identity_endpoint_flag.py @@ -0,0 +1,393 @@ +#!/usr/bin/env python3 +# test_agent_managed_identity_endpoint_flag.py +""" +Functional test for managed identity endpoint_is_user_supplied flag and +use_managed_identity guard logic in semantic_kernel_loader.py. + +Version: 0.239.002 +Implemented in: 0.238.025 + +This test ensures that: +1. resolve_agent_config() sets endpoint_is_user_supplied=False for Cases 5 and 6 + (system-controlled endpoints) and True for Cases 1, 3, and 4 (user/agent-supplied). +2. use_managed_identity evaluates to True only when all five guards hold: + auth_type == 'managed_identity', no APIM, no key, DefaultAzureCredential + available, and endpoint_is_user_supplied == False. +3. The AzureChatCompletion gate condition admits MI auth when appropriate and + blocks it when endpoint_is_user_supplied=True (which forces use_managed_identity=False). + +These tests mirror the decision tree in resolve_agent_config() and the +use_managed_identity expression in load_single_agent_for_kernel() from +application/single_app/semantic_kernel_loader.py. +""" + +import sys + + +# --------------------------------------------------------------------------- +# Inline mirror of the resolve_agent_config() decision tree. +# Only the endpoint_is_user_supplied assignment is exercised here. +# Logic must stay in sync with semantic_kernel_loader.py. +# --------------------------------------------------------------------------- + +def _resolve_endpoint_is_user_supplied(agent, settings): + """ + Mirror of the 'PATCHED DECISION TREE' in resolve_agent_config(). + Returns (endpoint_is_user_supplied, case_number). + """ + def any_filled(*fields): + return any(bool(f) for f in fields) + + def all_filled(*fields): + return all(bool(f) for f in fields) + + user_apim_enabled = agent.get("enable_agent_gpt_apim") in [True, 1, "true", "True"] + global_apim_enabled = settings.get("enable_gpt_apim", False) + allow_user_custom = settings.get("allow_user_custom_agent_endpoints", False) + allow_group_custom = settings.get("allow_group_custom_agent_endpoints", False) + is_group_agent = agent.get("is_group", False) + is_global_agent = agent.get("is_global", False) + + if is_group_agent: + can_use_agent_endpoints = allow_group_custom + elif is_global_agent: + can_use_agent_endpoints = False + else: + can_use_agent_endpoints = allow_user_custom + + user_apim_allowed = user_apim_enabled and can_use_agent_endpoints + + u_apim = ( + agent.get("azure_apim_gpt_endpoint"), + agent.get("azure_apim_gpt_subscription_key"), + agent.get("azure_apim_gpt_deployment"), + agent.get("azure_apim_gpt_api_version"), + ) + g_apim = ( + settings.get("azure_apim_gpt_endpoint"), + settings.get("azure_apim_gpt_subscription_key"), + settings.get("azure_apim_gpt_deployment"), + settings.get("azure_apim_gpt_api_version"), + ) + u_gpt = ( + agent.get("azure_openai_gpt_endpoint"), + agent.get("azure_openai_gpt_key"), + agent.get("azure_openai_gpt_deployment"), + agent.get("azure_openai_gpt_api_version"), + ) + g_gpt = ( + settings.get("azure_openai_gpt_endpoint"), + settings.get("azure_openai_gpt_key"), + settings.get("azure_openai_gpt_deployment"), + settings.get("azure_openai_gpt_api_version"), + ) + + # Case 1 – user APIM values present and allowed + if user_apim_allowed and any_filled(*u_apim): + return True, 1 + # Case 2 – user APIM enabled but no user values; fall to global APIM + elif user_apim_enabled and global_apim_enabled and any_filled(*g_apim): + return False, 2 + # Case 3 – agent GPT config fully filled and allowed + elif all_filled(*u_gpt) and can_use_agent_endpoints: + return True, 3 + # Case 4 – agent GPT config partially filled, no global APIM + elif any_filled(*u_gpt) and not global_apim_enabled and can_use_agent_endpoints: + return True, 4 + # Case 5 – global APIM enabled and present + elif global_apim_enabled and any_filled(*g_apim): + return False, 5 + # Case 6 – global GPT fallback + else: + return False, 6 + + +# --------------------------------------------------------------------------- +# Mirror of the use_managed_identity expression in load_single_agent_for_kernel +# --------------------------------------------------------------------------- + +def _compute_use_managed_identity(auth_type, apim_enabled, agent_key, + credential_available, endpoint_is_user_supplied): + """Mirror of the inline use_managed_identity expression.""" + DefaultAzureCredential = object() if credential_available else None + return ( + auth_type == "managed_identity" + and not apim_enabled + and not agent_key + and bool(DefaultAzureCredential) + and not endpoint_is_user_supplied + ) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +GLOBAL_ENDPOINT = "https://global.openai.azure.com" +GLOBAL_KEY = "global-key-abc" +GLOBAL_DEPLOYMENT = "gpt-4.1" +GLOBAL_API_VER = "2024-08-01" + +AGENT_ENDPOINT = "https://agent.openai.azure.com" +AGENT_KEY = "agent-key-xyz" +AGENT_DEPLOYMENT = "gpt-4o" +AGENT_API_VER = "2024-05-13" + +APIM_ENDPOINT = "https://apim.azure-api.net" +APIM_KEY = "apim-sub-key" +APIM_DEPL = "gpt-4.1" +APIM_VER = "2024-08-01" + + +def _settings(**overrides): + s = { + "azure_openai_gpt_endpoint": GLOBAL_ENDPOINT, + "azure_openai_gpt_key": GLOBAL_KEY, + "azure_openai_gpt_deployment": GLOBAL_DEPLOYMENT, + "azure_openai_gpt_api_version": GLOBAL_API_VER, + "enable_gpt_apim": False, + "per_user_semantic_kernel": True, + "allow_user_custom_agent_endpoints": False, + "allow_group_custom_agent_endpoints": False, + } + s.update(overrides) + return s + + +def _agent(**overrides): + a = { + "name": "test-agent", + "agent_type": "local", + "enable_agent_gpt_apim": False, + "is_global": False, + "is_group": False, + } + a.update(overrides) + return a + + +# --------------------------------------------------------------------------- +# Test 1 – endpoint_is_user_supplied across all 6 decision-tree cases +# --------------------------------------------------------------------------- + +def test_endpoint_is_user_supplied_all_cases(): + """endpoint_is_user_supplied must be False for Cases 2, 5, 6 and True for 1, 3, 4.""" + print("Testing endpoint_is_user_supplied flag for all 6 cases...") + errors = [] + + # Case 1 – user APIM with values present, can_use=True → True + val, case = _resolve_endpoint_is_user_supplied( + _agent( + enable_agent_gpt_apim=True, + azure_apim_gpt_endpoint=APIM_ENDPOINT, + azure_apim_gpt_subscription_key=APIM_KEY, + azure_apim_gpt_deployment=APIM_DEPL, + azure_apim_gpt_api_version=APIM_VER, + ), + _settings(allow_user_custom_agent_endpoints=True), + ) + _check(errors, "Case 1 (user APIM)", expected=True, got=val, case=case) + + # Case 2 – user APIM on but no user values, global APIM present → False + val, case = _resolve_endpoint_is_user_supplied( + _agent(enable_agent_gpt_apim=True), + _settings( + allow_user_custom_agent_endpoints=True, + enable_gpt_apim=True, + azure_apim_gpt_endpoint=APIM_ENDPOINT, + azure_apim_gpt_subscription_key=APIM_KEY, + azure_apim_gpt_deployment=APIM_DEPL, + azure_apim_gpt_api_version=APIM_VER, + ), + ) + _check(errors, "Case 2 (global APIM fallback)", expected=False, got=val, case=case) + + # Case 3 – agent GPT fully filled, can_use=True → True + val, case = _resolve_endpoint_is_user_supplied( + _agent( + azure_openai_gpt_endpoint=AGENT_ENDPOINT, + azure_openai_gpt_key=AGENT_KEY, + azure_openai_gpt_deployment=AGENT_DEPLOYMENT, + azure_openai_gpt_api_version=AGENT_API_VER, + ), + _settings(allow_user_custom_agent_endpoints=True), + ) + _check(errors, "Case 3 (full agent GPT)", expected=True, got=val, case=case) + + # Case 4 – agent GPT partially filled, no global APIM, can_use=True → True + val, case = _resolve_endpoint_is_user_supplied( + _agent(azure_openai_gpt_deployment=AGENT_DEPLOYMENT), # only deployment + _settings(allow_user_custom_agent_endpoints=True, enable_gpt_apim=False), + ) + _check(errors, "Case 4 (partial agent GPT, merged)", expected=True, got=val, case=case) + + # Case 5 – global APIM enabled and present, no agent override → False + val, case = _resolve_endpoint_is_user_supplied( + _agent(), + _settings( + enable_gpt_apim=True, + azure_apim_gpt_endpoint=APIM_ENDPOINT, + azure_apim_gpt_subscription_key=APIM_KEY, + azure_apim_gpt_deployment=APIM_DEPL, + azure_apim_gpt_api_version=APIM_VER, + ), + ) + _check(errors, "Case 5 (global APIM)", expected=False, got=val, case=case) + + # Case 6 – pure global GPT fallback (most common MI scenario) → False + val, case = _resolve_endpoint_is_user_supplied(_agent(), _settings()) + _check(errors, "Case 6 (global GPT fallback)", expected=False, got=val, case=case) + + # --- Group-agent scenarios matching: Allow Group Custom Agent Endpoints=ON --- + # Group agent with NO custom fields + allow_group_custom=True + # → no u_gpt/u_apim fields filled → falls to Case 6 → endpoint_is_user_supplied=False + # → MI is permitted (this is the user's deployment scenario) + val, case = _resolve_endpoint_is_user_supplied( + _agent(is_group=True), + _settings(allow_group_custom_agent_endpoints=True), + ) + _check(errors, "Group agent, no custom fields, allow_group_custom=True (MI permitted)", + expected=False, got=val, case=case) + + # Group agent WITH a custom endpoint + allow_group_custom=True + # → hits Case 3 (fully filled u_gpt) → endpoint_is_user_supplied=True + # → MI is BLOCKED (group admin could point at attacker endpoint) + val, case = _resolve_endpoint_is_user_supplied( + _agent( + is_group=True, + azure_openai_gpt_endpoint=AGENT_ENDPOINT, + azure_openai_gpt_key=AGENT_KEY, + azure_openai_gpt_deployment=AGENT_DEPLOYMENT, + azure_openai_gpt_api_version=AGENT_API_VER, + ), + _settings(allow_group_custom_agent_endpoints=True), + ) + _check(errors, "Group agent, custom endpoint set, allow_group_custom=True (MI blocked)", + expected=True, got=val, case=case) + + return _summarise(errors, "endpoint_is_user_supplied") + + +def _check(errors, label, expected, got, case=None): + status = "PASS" if got == expected else "FAIL" + suffix = f" (case #{case})" if case else "" + print(f" [{status}] {label}{suffix}: endpoint_is_user_supplied={got}") + if got != expected: + errors.append(f"{label}: expected {expected}, got {got}") + + +# --------------------------------------------------------------------------- +# Test 2 – use_managed_identity guard logic +# --------------------------------------------------------------------------- + +def test_use_managed_identity_logic(): + """use_managed_identity must be True only when every guard passes.""" + print("\nTesting use_managed_identity boolean logic...") + + cases = [ + # (description, auth_type, apim, key, cred_avail, user_supplied, expected) + ("all guards pass → True", + "managed_identity", False, None, True, False, True), + ("wrong auth_type → False", + "api_key", False, None, True, False, False), + ("APIM enabled → False", + "managed_identity", True, None, True, False, False), + ("key present → False", + "managed_identity", False, "abc123", True, False, False), + ("no DefaultAzureCredential → False", + "managed_identity", False, None, False, False, False), + ("endpoint_is_user_supplied=True → False", + "managed_identity", False, None, True, True, False), + ] + + errors = [] + for desc, auth, apim, key, cred, user_sup, expected in cases: + result = _compute_use_managed_identity(auth, apim, key, cred, user_sup) + status = "PASS" if result == expected else "FAIL" + print(f" [{status}] {desc}: {result}") + if result != expected: + errors.append(f"{desc}: expected {expected}, got {result}") + + return _summarise(errors, "use_managed_identity") + + +# --------------------------------------------------------------------------- +# Test 3 – AzureChatCompletion gate condition +# --------------------------------------------------------------------------- + +def test_gate_condition(): + """Gate must admit MI auth and block it when endpoint_is_user_supplied=True + (which sets use_mi=False) and no key is present.""" + print("\nTesting AzureChatCompletion gate condition...") + + def gate(endpoint, key, deployment, use_mi): + """Mirrors: if AzureChatCompletion and endpoint and (key or use_mi) and deployment""" + return bool(endpoint) and bool(key or use_mi) and bool(deployment) + + cases = [ + # (desc, endpoint, key, deployment, use_mi, expected) + ("MI auth, no key, user_supplied=False → admitted", + GLOBAL_ENDPOINT, None, GLOBAL_DEPLOYMENT, True, True), + ("key auth, no MI → admitted", + GLOBAL_ENDPOINT, GLOBAL_KEY, GLOBAL_DEPLOYMENT, False, True), + ("user_supplied=True → use_mi=False, no key → blocked", + AGENT_ENDPOINT, None, AGENT_DEPLOYMENT, False, False), + ("no endpoint → blocked", + None, None, GLOBAL_DEPLOYMENT, True, False), + ("no deployment → blocked", + GLOBAL_ENDPOINT, None, None, True, False), + ] + + errors = [] + for desc, ep, key, depl, use_mi, expected in cases: + result = gate(ep, key, depl, use_mi) + status = "PASS" if result == expected else "FAIL" + print(f" [{status}] {desc}: {result}") + if result != expected: + errors.append(f"{desc}: expected {expected}, got {result}") + + return _summarise(errors, "gate condition") + + +# --------------------------------------------------------------------------- +# Helper +# --------------------------------------------------------------------------- + +def _summarise(errors, label): + if errors: + for e in errors: + print(f" FAIL: {e}") + return False + print(f"All {label} cases passed!") + return True + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + tests = [ + test_endpoint_is_user_supplied_all_cases, + test_use_managed_identity_logic, + test_gate_condition, + ] + results = [] + for t in tests: + print(f"\n{'='*60}") + print(f"Running {t.__name__}...") + print("="*60) + try: + results.append(t()) + except Exception as exc: + import traceback + print(f"ERROR: {exc}") + traceback.print_exc() + results.append(False) + + passed = sum(1 for r in results if r) + total = len(results) + print(f"\n{'='*60}") + print(f"Results: {passed}/{total} tests passed") + print("="*60) + sys.exit(0 if all(results) else 1)