diff --git a/CLAUDE.md b/CLAUDE.md index 4e9c2830..2ad07a8a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -58,6 +58,7 @@ return render_template('page.html', settings=public_settings) ## Version Management +- Its important to update the version at the end of every plan - Version is stored in `config.py`: `VERSION = "X.XXX.XXX"` - When incrementing, only change the third segment (e.g., `0.238.024` -> `0.238.025`) - Include the current version in functional test file headers and documentation files @@ -83,7 +84,7 @@ return render_template('page.html', settings=public_settings) ## Release Notes -After completing code changes, offer to update `docs/explanation/release_notes.md`. +After completing plans and code changes, offer to update `docs/explanation/release_notes.md`. - Add entries under the current version from `config.py` - If the version was bumped, create a new section at the top: `### **(vX.XXX.XXX)**` diff --git a/application/single_app/config.py b/application/single_app/config.py index 91288225..487763eb 100644 --- a/application/single_app/config.py +++ b/application/single_app/config.py @@ -94,7 +94,7 @@ EXECUTOR_TYPE = 'thread' EXECUTOR_MAX_WORKERS = 30 SESSION_TYPE = 'filesystem' -VERSION = "0.239.002" +VERSION = "0.239.007" SECRET_KEY = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production') @@ -257,6 +257,8 @@ def get_redis_cache_infrastructure_endpoint(redis_hostname: str) -> str: storage_account_user_documents_container_name = "user-documents" storage_account_group_documents_container_name = "group-documents" storage_account_public_documents_container_name = "public-documents" +storage_account_personal_chat_container_name = "personal-chat" +storage_account_group_chat_container_name = "group-chat" # Initialize Azure Cosmos DB client cosmos_endpoint = os.getenv("AZURE_COSMOS_ENDPOINT") @@ -745,9 +747,11 @@ def initialize_clients(settings): # This addresses the issue where the application assumes containers exist if blob_service_client: for container_name in [ - storage_account_user_documents_container_name, - storage_account_group_documents_container_name, - storage_account_public_documents_container_name + storage_account_user_documents_container_name, + storage_account_group_documents_container_name, + storage_account_public_documents_container_name, + storage_account_personal_chat_container_name, + storage_account_group_chat_container_name ]: try: container_client = blob_service_client.get_container_client(container_name) diff --git a/application/single_app/functions_content.py b/application/single_app/functions_content.py index 376d23f4..3116ed82 100644 --- a/application/single_app/functions_content.py +++ b/application/single_app/functions_content.py @@ -352,7 +352,7 @@ def generate_embedding( embedding_model = selected_embedding_model['deploymentName'] while True: - random_delay = random.uniform(0.5, 2.0) + random_delay = random.uniform(0.05, 0.2) time.sleep(random_delay) try: @@ -385,3 +385,102 @@ def generate_embedding( except Exception as e: raise + +def generate_embeddings_batch( + texts, + batch_size=16, + max_retries=5, + initial_delay=1.0, + delay_multiplier=2.0 +): + """Generate embeddings for multiple texts in batches. + + Azure OpenAI embeddings API accepts a list of strings as input. + This reduces per-call overhead and delay significantly. + + Args: + texts: List of text strings to embed. + batch_size: Number of texts per API call (default 16). + max_retries: Max retries on rate limit errors. + initial_delay: Initial retry delay in seconds. + delay_multiplier: Multiplier for exponential backoff. + + Returns: + list of (embedding, token_usage) tuples, one per input text. + """ + settings = get_settings() + + enable_embedding_apim = settings.get('enable_embedding_apim', False) + + if enable_embedding_apim: + embedding_model = settings.get('azure_apim_embedding_deployment') + embedding_client = AzureOpenAI( + api_version=settings.get('azure_apim_embedding_api_version'), + azure_endpoint=settings.get('azure_apim_embedding_endpoint'), + api_key=settings.get('azure_apim_embedding_subscription_key')) + else: + if (settings.get('azure_openai_embedding_authentication_type') == 'managed_identity'): + token_provider = get_bearer_token_provider(DefaultAzureCredential(), cognitive_services_scope) + + embedding_client = AzureOpenAI( + api_version=settings.get('azure_openai_embedding_api_version'), + azure_endpoint=settings.get('azure_openai_embedding_endpoint'), + azure_ad_token_provider=token_provider + ) + + embedding_model_obj = settings.get('embedding_model', {}) + if embedding_model_obj and embedding_model_obj.get('selected'): + selected_embedding_model = embedding_model_obj['selected'][0] + embedding_model = selected_embedding_model['deploymentName'] + else: + embedding_client = AzureOpenAI( + api_version=settings.get('azure_openai_embedding_api_version'), + azure_endpoint=settings.get('azure_openai_embedding_endpoint'), + api_key=settings.get('azure_openai_embedding_key') + ) + + embedding_model_obj = settings.get('embedding_model', {}) + if embedding_model_obj and embedding_model_obj.get('selected'): + selected_embedding_model = embedding_model_obj['selected'][0] + embedding_model = selected_embedding_model['deploymentName'] + + results = [] + for i in range(0, len(texts), batch_size): + batch = texts[i:i + batch_size] + retries = 0 + current_delay = initial_delay + + while True: + random_delay = random.uniform(0.05, 0.2) + time.sleep(random_delay) + + try: + response = embedding_client.embeddings.create( + model=embedding_model, + input=batch + ) + + for item in response.data: + token_usage = None + if hasattr(response, 'usage') and response.usage: + token_usage = { + 'prompt_tokens': response.usage.prompt_tokens // len(batch), + 'total_tokens': response.usage.total_tokens // len(batch), + 'model_deployment_name': embedding_model + } + results.append((item.embedding, token_usage)) + break + + except RateLimitError as e: + retries += 1 + if retries > max_retries: + raise + + wait_time = current_delay * random.uniform(1.0, 1.5) + time.sleep(wait_time) + current_delay *= delay_multiplier + + except Exception as e: + raise + + return results diff --git a/application/single_app/functions_documents.py b/application/single_app/functions_documents.py index ce08066d..110afbd2 100644 --- a/application/single_app/functions_documents.py +++ b/application/single_app/functions_documents.py @@ -1646,6 +1646,191 @@ def save_chunks(page_text_content, page_number, file_name, user_id, document_id, # Return token usage information for accumulation return token_usage +def save_chunks_batch(chunks_data, user_id, document_id, group_id=None, public_workspace_id=None): + """ + Save multiple chunks at once using batch embedding and batch AI Search upload. + Significantly faster than calling save_chunks() per chunk. + + Args: + chunks_data: list of dicts with keys: page_text_content, page_number, file_name + user_id: The user ID + document_id: The document ID + group_id: Optional group ID for group documents + public_workspace_id: Optional public workspace ID for public documents + + Returns: + dict with 'total_tokens', 'prompt_tokens', 'model_deployment_name' + """ + from functions_content import generate_embeddings_batch + + current_time = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') + is_group = group_id is not None + is_public_workspace = public_workspace_id is not None + + # Retrieve metadata once for all chunks + try: + if is_public_workspace: + metadata = get_document_metadata( + document_id=document_id, + user_id=user_id, + public_workspace_id=public_workspace_id + ) + elif is_group: + metadata = get_document_metadata( + document_id=document_id, + user_id=user_id, + group_id=group_id + ) + else: + metadata = get_document_metadata( + document_id=document_id, + user_id=user_id + ) + + if not metadata: + raise ValueError(f"No metadata found for document {document_id}") + + version = metadata.get("version") if metadata.get("version") else 1 + except Exception as e: + log_event(f"[save_chunks_batch] Error retrieving metadata for document {document_id}: {repr(e)}", level=logging.ERROR) + raise + + # Generate all embeddings in batches + texts = [c['page_text_content'] for c in chunks_data] + try: + embedding_results = generate_embeddings_batch(texts) + except Exception as e: + log_event(f"[save_chunks_batch] Error generating batch embeddings for document {document_id}: {e}", level=logging.ERROR) + raise + + # Check for vision analysis once + vision_analysis = metadata.get('vision_analysis') + vision_text = "" + if vision_analysis: + vision_text_parts = [] + vision_text_parts.append("\n\n=== AI Vision Analysis ===") + vision_text_parts.append(f"Model: {vision_analysis.get('model', 'unknown')}") + if vision_analysis.get('description'): + vision_text_parts.append(f"\nDescription: {vision_analysis['description']}") + if vision_analysis.get('objects'): + objects_list = vision_analysis['objects'] + if isinstance(objects_list, list): + vision_text_parts.append(f"\nObjects Detected: {', '.join(objects_list)}") + else: + vision_text_parts.append(f"\nObjects Detected: {objects_list}") + if vision_analysis.get('text'): + vision_text_parts.append(f"\nVisible Text: {vision_analysis['text']}") + if vision_analysis.get('analysis'): + vision_text_parts.append(f"\nContextual Analysis: {vision_analysis['analysis']}") + vision_text = "\n".join(vision_text_parts) + + # Build all chunk documents + chunk_documents = [] + total_token_usage = {'total_tokens': 0, 'prompt_tokens': 0, 'model_deployment_name': None} + + for idx, chunk_info in enumerate(chunks_data): + embedding, token_usage = embedding_results[idx] + page_number = chunk_info['page_number'] + file_name = chunk_info['file_name'] + page_text_content = chunk_info['page_text_content'] + + if token_usage: + total_token_usage['total_tokens'] += token_usage.get('total_tokens', 0) + total_token_usage['prompt_tokens'] += token_usage.get('prompt_tokens', 0) + if not total_token_usage['model_deployment_name']: + total_token_usage['model_deployment_name'] = token_usage.get('model_deployment_name') + + chunk_id = f"{document_id}_{page_number}" + enhanced_chunk_text = page_text_content + vision_text if vision_text else page_text_content + + if is_public_workspace: + chunk_document = { + "id": chunk_id, + "document_id": document_id, + "chunk_id": str(page_number), + "chunk_text": enhanced_chunk_text, + "embedding": embedding, + "file_name": file_name, + "chunk_keywords": [], + "chunk_summary": "", + "page_number": page_number, + "author": [], + "title": "", + "document_classification": "None", + "document_tags": metadata.get('tags', []), + "chunk_sequence": page_number, + "upload_date": current_time, + "version": version, + "public_workspace_id": public_workspace_id + } + elif is_group: + shared_group_ids = metadata.get('shared_group_ids', []) if metadata else [] + chunk_document = { + "id": chunk_id, + "document_id": document_id, + "chunk_id": str(page_number), + "chunk_text": enhanced_chunk_text, + "embedding": embedding, + "file_name": file_name, + "chunk_keywords": [], + "chunk_summary": "", + "page_number": page_number, + "author": [], + "title": "", + "document_classification": "None", + "document_tags": metadata.get('tags', []), + "chunk_sequence": page_number, + "upload_date": current_time, + "version": version, + "group_id": group_id, + "shared_group_ids": shared_group_ids + } + else: + shared_user_ids = metadata.get('shared_user_ids', []) if metadata else [] + chunk_document = { + "id": chunk_id, + "document_id": document_id, + "chunk_id": str(page_number), + "chunk_text": enhanced_chunk_text, + "embedding": embedding, + "file_name": file_name, + "chunk_keywords": [], + "chunk_summary": "", + "page_number": page_number, + "author": [], + "title": "", + "document_classification": "None", + "document_tags": metadata.get('tags', []), + "chunk_sequence": page_number, + "upload_date": current_time, + "version": version, + "user_id": user_id, + "shared_user_ids": shared_user_ids + } + + chunk_documents.append(chunk_document) + + # Batch upload to AI Search + try: + if is_public_workspace: + search_client = CLIENTS["search_client_public"] + elif is_group: + search_client = CLIENTS["search_client_group"] + else: + search_client = CLIENTS["search_client_user"] + + # Upload in sub-batches of 32 to avoid request size limits + upload_batch_size = 32 + for i in range(0, len(chunk_documents), upload_batch_size): + sub_batch = chunk_documents[i:i + upload_batch_size] + search_client.upload_documents(documents=sub_batch) + + except Exception as e: + log_event(f"[save_chunks_batch] Error uploading batch to AI Search for document {document_id}: {e}", level=logging.ERROR) + raise + + return total_token_usage + def get_document_metadata_for_citations(document_id, user_id=None, group_id=None, public_workspace_id=None): """ Retrieve keywords and abstract from a document for creating metadata citations. @@ -4669,37 +4854,30 @@ def process_single_tabular_sheet(df, document_id, user_id, file_name, update_cal # Consider accumulating page count in the caller if needed. update_callback(number_of_pages=num_chunks_final) - # Save chunks, prepending the header to each + # Save chunks, prepending the header to each — use batch processing for speed + all_chunks = [] for idx, chunk_rows_content in enumerate(final_chunks_content, start=1): - # Prepend header - header length does not count towards chunk size limit chunk_with_header = header_string + chunk_rows_content - - update_callback( - current_file_chunk=idx, - status=f"Saving chunk {idx}/{num_chunks_final} from {file_name}..." - ) - - args = { + all_chunks.append({ "page_text_content": chunk_with_header, "page_number": idx, - "file_name": file_name, - "user_id": user_id, - "document_id": document_id - } + "file_name": file_name + }) - if is_public_workspace: - args["public_workspace_id"] = public_workspace_id - elif is_group: - args["group_id"] = group_id + if all_chunks: + update_callback( + current_file_chunk=1, + status=f"Batch processing {num_chunks_final} chunks from {file_name}..." + ) - token_usage = save_chunks(**args) - total_chunks_saved += 1 - - # Accumulate embedding tokens - if token_usage: - total_embedding_tokens += token_usage.get('total_tokens', 0) - if not embedding_model_name: - embedding_model_name = token_usage.get('model_deployment_name') + batch_token_usage = save_chunks_batch( + all_chunks, user_id, document_id, + group_id=group_id, public_workspace_id=public_workspace_id + ) + total_chunks_saved = len(all_chunks) + if batch_token_usage: + total_embedding_tokens = batch_token_usage.get('total_tokens', 0) + embedding_model_name = batch_token_usage.get('model_deployment_name') return total_chunks_saved, total_embedding_tokens, embedding_model_name @@ -4729,63 +4907,75 @@ def process_tabular(document_id, user_id, temp_file_path, original_filename, fil args["group_id"] = group_id upload_to_blob(**args) + update_callback(enhanced_citations=True, status=f"Enhanced citations enabled for {file_ext}") - try: - if file_ext == '.csv': - # Process CSV - # Read CSV, attempt to infer header, keep data as string initially - df = pandas.read_csv( - temp_file_path, - keep_default_na=False, - dtype=str + # When enhanced citations is on, index a single schema summary chunk + # instead of row-by-row chunking. The tabular processing plugin handles analysis. + if enable_enhanced_citations: + try: + if file_ext == '.csv': + df_preview = pandas.read_csv(temp_file_path, keep_default_na=False, dtype=str, nrows=5) + full_df = pandas.read_csv(temp_file_path, keep_default_na=False, dtype=str) + elif file_ext in ('.xlsx', '.xls', '.xlsm'): + engine = 'openpyxl' if file_ext in ('.xlsx', '.xlsm') else 'xlrd' + df_preview = pandas.read_excel(temp_file_path, engine=engine, keep_default_na=False, dtype=str, nrows=5) + full_df = pandas.read_excel(temp_file_path, engine=engine, keep_default_na=False, dtype=str) + else: + raise ValueError(f"Unsupported tabular file type: {file_ext}") + + row_count = len(full_df) + columns = list(df_preview.columns) + preview_rows = df_preview.head(5).to_string(index=False) + + schema_summary = ( + f"Tabular data file: {original_filename}\n" + f"Columns ({len(columns)}): {', '.join(columns)}\n" + f"Total rows: {row_count}\n" + f"Preview (first 5 rows):\n{preview_rows}\n\n" + f"This file is available for detailed analysis via the Tabular Processing plugin." ) - args = { - "df": df, - "document_id": document_id, - "user_id": user_id, + + update_callback(number_of_pages=1, status=f"Indexing schema summary for {original_filename}...") + + save_args = { + "page_text_content": schema_summary, + "page_number": 1, "file_name": original_filename, - "update_callback": update_callback + "user_id": user_id, + "document_id": document_id } - if is_public_workspace: - args["public_workspace_id"] = public_workspace_id + save_args["public_workspace_id"] = public_workspace_id elif is_group: - args["group_id"] = group_id + save_args["group_id"] = group_id - result = process_single_tabular_sheet(**args) - if isinstance(result, tuple) and len(result) == 3: - chunks, tokens, model = result - total_chunks_saved = chunks - total_embedding_tokens += tokens - if not embedding_model_name: - embedding_model_name = model - else: - total_chunks_saved = result - - elif file_ext in ('.xlsx', '.xls', '.xlsm'): - # Process Excel (potentially multiple sheets) - excel_file = pandas.ExcelFile( - temp_file_path, - engine='openpyxl' if file_ext in ('.xlsx', '.xlsm') else 'xlrd' - ) - sheet_names = excel_file.sheet_names - base_name, ext = os.path.splitext(original_filename) - - accumulated_total_chunks = 0 - for sheet_name in sheet_names: - update_callback(status=f"Processing sheet '{sheet_name}'...") - # Read specific sheet, get values (not formulas), keep data as string - # Note: pandas typically reads values, not formulas by default. - df = excel_file.parse(sheet_name, keep_default_na=False, dtype=str) + token_usage = save_chunks(**save_args) + total_chunks_saved = 1 + if token_usage: + total_embedding_tokens = token_usage.get('total_tokens', 0) + embedding_model_name = token_usage.get('model_deployment_name') - # Create effective filename for this sheet - effective_filename = f"{base_name}-{sheet_name}{ext}" if len(sheet_names) > 1 else original_filename + # Don't return here — fall through to metadata extraction below + except Exception as e: + log_event(f"[process_tabular] Error creating schema summary, falling back to row-by-row: {e}", level=logging.WARNING) + # Fall through to existing row-by-row processing + # Only do row-by-row chunking if schema-only didn't produce chunks + if total_chunks_saved == 0: + try: + if file_ext == '.csv': + # Process CSV + # Read CSV, attempt to infer header, keep data as string initially + df = pandas.read_csv( + temp_file_path, + keep_default_na=False, + dtype=str + ) args = { "df": df, "document_id": document_id, "user_id": user_id, - "file_name": effective_filename, + "file_name": original_filename, "update_callback": update_callback } @@ -4797,21 +4987,62 @@ def process_tabular(document_id, user_id, temp_file_path, original_filename, fil result = process_single_tabular_sheet(**args) if isinstance(result, tuple) and len(result) == 3: chunks, tokens, model = result - accumulated_total_chunks += chunks + total_chunks_saved = chunks total_embedding_tokens += tokens if not embedding_model_name: embedding_model_name = model else: - accumulated_total_chunks += result + total_chunks_saved = result - total_chunks_saved = accumulated_total_chunks # Total across all sheets + elif file_ext in ('.xlsx', '.xls', '.xlsm'): + # Process Excel (potentially multiple sheets) + excel_file = pandas.ExcelFile( + temp_file_path, + engine='openpyxl' if file_ext in ('.xlsx', '.xlsm') else 'xlrd' + ) + sheet_names = excel_file.sheet_names + base_name, ext = os.path.splitext(original_filename) + accumulated_total_chunks = 0 + for sheet_name in sheet_names: + update_callback(status=f"Processing sheet '{sheet_name}'...") + # Read specific sheet, get values (not formulas), keep data as string + # Note: pandas typically reads values, not formulas by default. + df = excel_file.parse(sheet_name, keep_default_na=False, dtype=str) - except pandas.errors.EmptyDataError: - print(f"Warning: Tabular file or sheet is empty: {original_filename}") - update_callback(status=f"Warning: File/sheet is empty - {original_filename}", number_of_pages=0) - except Exception as e: - raise Exception(f"Failed processing Tabular file {original_filename}: {e}") + # Create effective filename for this sheet + effective_filename = f"{base_name}-{sheet_name}{ext}" if len(sheet_names) > 1 else original_filename + + args = { + "df": df, + "document_id": document_id, + "user_id": user_id, + "file_name": effective_filename, + "update_callback": update_callback + } + + if is_public_workspace: + args["public_workspace_id"] = public_workspace_id + elif is_group: + args["group_id"] = group_id + + result = process_single_tabular_sheet(**args) + if isinstance(result, tuple) and len(result) == 3: + chunks, tokens, model = result + accumulated_total_chunks += chunks + total_embedding_tokens += tokens + if not embedding_model_name: + embedding_model_name = model + else: + accumulated_total_chunks += result + + total_chunks_saved = accumulated_total_chunks # Total across all sheets + + except pandas.errors.EmptyDataError: + log_event(f"[process_tabular] Warning: Tabular file or sheet is empty: {original_filename}", level=logging.WARNING) + update_callback(status=f"Warning: File/sheet is empty - {original_filename}", number_of_pages=0) + except Exception as e: + raise Exception(f"Failed processing Tabular file {original_filename}: {e}") # Extract metadata if enabled and chunks were processed settings = get_settings() diff --git a/application/single_app/functions_settings.py b/application/single_app/functions_settings.py index 8176939d..ddd8e82c 100644 --- a/application/single_app/functions_settings.py +++ b/application/single_app/functions_settings.py @@ -25,6 +25,7 @@ def get_settings(use_cosmos=False): 'enable_text_plugin': True, 'enable_default_embedding_model_plugin': False, 'enable_fact_memory_plugin': True, + 'enable_tabular_processing_plugin': False, 'enable_multi_agent_orchestration': False, 'max_rounds_per_agent': 1, 'enable_semantic_kernel': False, @@ -391,6 +392,9 @@ def update_settings(new_settings): # always fetch the latest settings doc, which includes your merges settings_item = get_settings() settings_item.update(new_settings) + # Dependency enforcement: tabular processing requires enhanced citations + if not settings_item.get('enable_enhanced_citations', False): + settings_item['enable_tabular_processing_plugin'] = False cosmos_settings_container.upsert_item(settings_item) cache_updater = getattr(app_settings_cache, "update_settings_cache", None) if callable(cache_updater): diff --git a/application/single_app/route_backend_chats.py b/application/single_app/route_backend_chats.py index e452fed4..a609a7db 100644 --- a/application/single_app/route_backend_chats.py +++ b/application/single_app/route_backend_chats.py @@ -39,6 +39,185 @@ def get_kernel_agents(): log_event(f"[SKChat] get_kernel_agents - g.kernel_agents: {type(g_agents)} ({len(g_agents) if g_agents else 0} agents), builtins.kernel_agents: {type(builtins_agents)} ({len(builtins_agents) if builtins_agents else 0} agents)", level=logging.INFO) return g_agents or builtins_agents +async def run_tabular_sk_analysis(user_question, tabular_filenames, user_id, + conversation_id, gpt_model, settings, + source_hint="workspace", group_id=None, + public_workspace_id=None): + """Run lightweight SK with TabularProcessingPlugin to analyze tabular data. + + Creates a temporary Kernel with only the TabularProcessingPlugin, uses the + same chat model as the user's session, and returns computed analysis results. + Returns None on failure for graceful degradation. + """ + from semantic_kernel import Kernel as SKKernel + from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion + from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior + from semantic_kernel.connectors.ai.open_ai.prompt_execution_settings.azure_chat_prompt_execution_settings import AzureChatPromptExecutionSettings + from semantic_kernel.contents.chat_history import ChatHistory as SKChatHistory + from semantic_kernel_plugins.tabular_processing_plugin import TabularProcessingPlugin + + try: + log_event(f"[Tabular SK Analysis] Starting analysis for files: {tabular_filenames}", level=logging.INFO) + + # 1. Create lightweight kernel with only tabular plugin + kernel = SKKernel() + tabular_plugin = TabularProcessingPlugin() + kernel.add_plugin(tabular_plugin, plugin_name="tabular_processing") + + # 2. Create chat service using same config as main chat + enable_gpt_apim = settings.get('enable_gpt_apim', False) + if enable_gpt_apim: + chat_service = AzureChatCompletion( + service_id="tabular-analysis", + deployment_name=gpt_model, + endpoint=settings.get('azure_apim_gpt_endpoint'), + api_key=settings.get('azure_apim_gpt_subscription_key'), + api_version=settings.get('azure_apim_gpt_api_version'), + ) + else: + auth_type = settings.get('azure_openai_gpt_authentication_type') + if auth_type == 'managed_identity': + token_provider = get_bearer_token_provider(DefaultAzureCredential(), cognitive_services_scope) + chat_service = AzureChatCompletion( + service_id="tabular-analysis", + deployment_name=gpt_model, + endpoint=settings.get('azure_openai_gpt_endpoint'), + api_version=settings.get('azure_openai_gpt_api_version'), + ad_token_provider=token_provider, + ) + else: + chat_service = AzureChatCompletion( + service_id="tabular-analysis", + deployment_name=gpt_model, + endpoint=settings.get('azure_openai_gpt_endpoint'), + api_key=settings.get('azure_openai_gpt_key'), + api_version=settings.get('azure_openai_gpt_api_version'), + ) + kernel.add_service(chat_service) + + # 3. Pre-dispatch: load file schemas to eliminate discovery LLM rounds + source_context = f"source='{source_hint}'" + if group_id: + source_context += f", group_id='{group_id}'" + if public_workspace_id: + source_context += f", public_workspace_id='{public_workspace_id}'" + + schema_parts = [] + for fname in tabular_filenames: + try: + container, blob_path = tabular_plugin._resolve_blob_location_with_fallback( + user_id, conversation_id, fname, source_hint, + group_id=group_id, public_workspace_id=public_workspace_id + ) + df = tabular_plugin._read_tabular_blob_to_dataframe(container, blob_path) + df_numeric = tabular_plugin._try_numeric_conversion(df.copy()) + schema_info = { + "filename": fname, + "row_count": len(df), + "columns": list(df.columns), + "dtypes": {col: str(dtype) for col, dtype in df_numeric.dtypes.items()}, + "preview": df.head(3).to_dict(orient='records') + } + schema_parts.append(json.dumps(schema_info, indent=2, default=str)) + log_event(f"[Tabular SK Analysis] Pre-loaded schema for {fname} ({len(df)} rows)", level=logging.DEBUG) + except Exception as e: + log_event(f"[Tabular SK Analysis] Failed to pre-load schema for {fname}: {e}", level=logging.WARNING) + schema_parts.append(json.dumps({"filename": fname, "error": f"Could not pre-load: {str(e)}"})) + + schema_context = "\n".join(schema_parts) + + # 4. Build chat history with pre-loaded schemas + chat_history = SKChatHistory() + chat_history.add_system_message( + "You are a data analyst. Use the tabular_processing plugin functions to " + "analyze the data and answer the user's question.\n\n" + f"FILE SCHEMAS (pre-loaded — do NOT call list_tabular_files or describe_tabular_file):\n" + f"{schema_context}\n\n" + "IMPORTANT: Batch multiple independent function calls in a SINGLE response. " + "For example, call multiple aggregate_column or group_by_aggregate functions " + "at once rather than one at a time.\n\n" + "Return the computed results clearly." + ) + + chat_history.add_user_message( + f"Analyze the tabular data to answer: {user_question}\n" + f"Use user_id='{user_id}', conversation_id='{conversation_id}', {source_context}." + ) + + # 5. Execute with auto function calling + execution_settings = AzureChatPromptExecutionSettings( + service_id="tabular-analysis", + function_choice_behavior=FunctionChoiceBehavior.Auto( + maximum_auto_invoke_attempts=5 + ), + ) + + result = await chat_service.get_chat_message_contents( + chat_history, execution_settings, kernel=kernel + ) + + if result and result[0].content: + analysis = result[0].content + # Cap at 20k characters to stay within token budget + if len(analysis) > 20000: + analysis = analysis[:20000] + "\n[Analysis truncated]" + log_event(f"[Tabular SK Analysis] Analysis complete, {len(analysis)} chars", level=logging.INFO) + return analysis + log_event("[Tabular SK Analysis] No content in SK response", level=logging.WARNING) + return None + + except Exception as e: + log_event(f"[Tabular SK Analysis] Error: {e}", level=logging.WARNING, exceptionTraceback=True) + return None + +def collect_tabular_sk_citations(user_id, conversation_id): + """Collect plugin invocations from the tabular SK analysis and convert to citation format.""" + from semantic_kernel_plugins.plugin_invocation_logger import get_plugin_logger + + plugin_logger = get_plugin_logger() + plugin_invocations = plugin_logger.get_invocations_for_conversation(user_id, conversation_id) + + if not plugin_invocations: + return [] + + def make_json_serializable(obj): + if obj is None: + return None + elif isinstance(obj, (str, int, float, bool)): + return obj + elif isinstance(obj, dict): + return {str(k): make_json_serializable(v) for k, v in obj.items()} + elif isinstance(obj, (list, tuple)): + return [make_json_serializable(item) for item in obj] + else: + return str(obj) + + citations = [] + for inv in plugin_invocations: + timestamp_str = None + if inv.timestamp: + if hasattr(inv.timestamp, 'isoformat'): + timestamp_str = inv.timestamp.isoformat() + else: + timestamp_str = str(inv.timestamp) + + citation = { + 'tool_name': f"{inv.plugin_name}.{inv.function_name}", + 'function_name': inv.function_name, + 'plugin_name': inv.plugin_name, + 'function_arguments': make_json_serializable(inv.parameters), + 'function_result': make_json_serializable(inv.result), + 'duration_ms': inv.duration_ms, + 'timestamp': timestamp_str, + 'success': inv.success, + 'error_message': make_json_serializable(inv.error_message), + 'user_id': inv.user_id + } + citations.append(citation) + + log_event(f"[Tabular SK Citations] Collected {len(citations)} tool execution citations", level=logging.INFO) + return citations + def register_route_backend_chats(app): @app.route('/api/chat', methods=['POST']) @swagger_route(security=get_auth_security()) @@ -953,6 +1132,70 @@ def result_requires_message_reload(result: Any) -> bool: 'documents': combined_documents # Keep track of docs used }) + # Auto-detect tabular files in search results and prompt the LLM to use the plugin + if settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + tabular_files_in_results = set() + for source_doc in combined_documents: + fname = source_doc.get('file_name', '') + if fname and any(fname.lower().endswith(ext) for ext in TABULAR_EXTENSIONS): + tabular_files_in_results.add(fname) + + if tabular_files_in_results: + # Determine source based on document_scope, not just active IDs + if document_scope == 'group' and active_group_id: + tabular_source_hint = "group" + elif document_scope == 'public' and active_public_workspace_id: + tabular_source_hint = "public" + else: + tabular_source_hint = "workspace" + + tabular_filenames_str = ", ".join(tabular_files_in_results) + + # Run SK tabular analysis to pre-compute results + tabular_analysis = asyncio.run(run_tabular_sk_analysis( + user_question=user_message, + tabular_filenames=tabular_files_in_results, + user_id=user_id, + conversation_id=conversation_id, + gpt_model=gpt_model, + settings=settings, + source_hint=tabular_source_hint, + group_id=active_group_id if tabular_source_hint == "group" else None, + public_workspace_id=active_public_workspace_id if tabular_source_hint == "public" else None, + )) + + if tabular_analysis: + # Inject pre-computed analysis results as context + tabular_system_msg = ( + f"The following analysis was computed from the tabular file(s) " + f"{tabular_filenames_str} using data analysis functions:\n\n" + f"{tabular_analysis}\n\n" + f"Use these computed results to answer the user's question accurately." + ) + else: + # Fallback: instruct LLM to use plugin functions (for agent mode) + tabular_system_msg = ( + f"IMPORTANT: The search results include data from tabular file(s): {tabular_filenames_str}. " + f"The search results contain only a schema summary (column names and a few sample rows), NOT the full data. " + f"You MUST use the tabular_processing plugin functions to answer ANY question about these files. " + f"Do NOT attempt to answer using the schema summary alone — it is incomplete. " + f"Available functions: describe_tabular_file, aggregate_column, filter_rows, query_tabular_data, group_by_aggregate. " + f"Use source='{tabular_source_hint}'" + + (f" and group_id='{active_group_id}'" if tabular_source_hint == "group" else "") + + (f" and public_workspace_id='{active_public_workspace_id}'" if tabular_source_hint == "public" else "") + + "." + ) + system_messages_for_augmentation.append({ + 'role': 'system', + 'content': tabular_system_msg + }) + + # Collect tool execution citations from SK tabular analysis + if tabular_analysis: + tabular_sk_citations = collect_tabular_sk_citations(user_id, conversation_id) + if tabular_sk_citations: + agent_citations_list.extend(tabular_sk_citations) + # Loop through each source document/chunk used for this message for source_doc in combined_documents: # 4. Create a citation dictionary, selecting the desired fields @@ -1138,8 +1381,8 @@ def result_requires_message_reload(result: Any) -> bool: """ # Update the system message with enhanced content and updated documents array if system_messages_for_augmentation: - system_messages_for_augmentation[-1]['content'] = system_prompt_search - system_messages_for_augmentation[-1]['documents'] = combined_documents + system_messages_for_augmentation[0]['content'] = system_prompt_search + system_messages_for_augmentation[0]['documents'] = combined_documents # --- END NEW METADATA CITATIONS --- # Update conversation classifications if new ones were found @@ -1685,25 +1928,37 @@ def result_requires_message_reload(result: Any) -> bool: filename = message.get('filename', 'uploaded_file') file_content = message.get('file_content', '') # Assuming file content is stored is_table = message.get('is_table', False) - - # Use higher limit for tabular data that needs complete analysis - content_limit = max_tabular_content_length_in_history if is_table else max_file_content_length_in_history - - display_content = file_content[:content_limit] - if len(file_content) > content_limit: - display_content += "..." - - # Enhanced message for tabular data - if is_table: + file_content_source = message.get('file_content_source', '') + + # Tabular files stored in blob (enhanced citations enabled) - reference plugin + if is_table and file_content_source == 'blob': conversation_history_for_api.append({ - 'role': 'system', # Represent file as system info - 'content': f"[User uploaded a tabular data file named '{filename}'. This is CSV format data for analysis:\n{display_content}]\nThis is complete tabular data in CSV format. You can perform calculations, analysis, and data operations on this dataset." + 'role': 'system', + 'content': f"[User uploaded a tabular data file named '{filename}'. " + f"The file is stored in blob storage and available for analysis. " + f"Use the tabular_processing plugin functions (list_tabular_files, describe_tabular_file, " + f"aggregate_column, filter_rows, query_tabular_data, group_by_aggregate) to analyze this data. " + f"The file source is 'chat'.]" }) else: - conversation_history_for_api.append({ - 'role': 'system', # Represent file as system info - 'content': f"[User uploaded a file named '{filename}'. Content preview:\n{display_content}]\nUse this file context if relevant." - }) + # Use higher limit for tabular data that needs complete analysis + content_limit = max_tabular_content_length_in_history if is_table else max_file_content_length_in_history + + display_content = file_content[:content_limit] + if len(file_content) > content_limit: + display_content += "..." + + # Enhanced message for tabular data + if is_table: + conversation_history_for_api.append({ + 'role': 'system', # Represent file as system info + 'content': f"[User uploaded a tabular data file named '{filename}'. This is CSV format data for analysis:\n{display_content}]\nThis is complete tabular data in CSV format. You can perform calculations, analysis, and data operations on this dataset." + }) + else: + conversation_history_for_api.append({ + 'role': 'system', # Represent file as system info + 'content': f"[User uploaded a file named '{filename}'. Content preview:\n{display_content}]\nUse this file context if relevant." + }) elif role == 'image': # Handle image uploads with extracted text and vision analysis filename = message.get('filename', 'uploaded_image') is_user_upload = message.get('metadata', {}).get('is_user_upload', False) @@ -3319,7 +3574,55 @@ def generate(): 'content': system_prompt_search, 'documents': combined_documents }) - + + # Auto-detect tabular files in search results and run SK analysis + if settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + tabular_files_in_results = set() + for source_doc in combined_documents: + fname = source_doc.get('file_name', '') + if fname and any(fname.lower().endswith(ext) for ext in TABULAR_EXTENSIONS): + tabular_files_in_results.add(fname) + + if tabular_files_in_results: + # Determine source based on document_scope, not just active IDs + if document_scope == 'group' and active_group_id: + tabular_source_hint = "group" + elif document_scope == 'public' and active_public_workspace_id: + tabular_source_hint = "public" + else: + tabular_source_hint = "workspace" + + tabular_filenames_str = ", ".join(tabular_files_in_results) + + # Run SK tabular analysis to pre-compute results + tabular_analysis = asyncio.run(run_tabular_sk_analysis( + user_question=user_message, + tabular_filenames=tabular_files_in_results, + user_id=user_id, + conversation_id=conversation_id, + gpt_model=gpt_model, + settings=settings, + source_hint=tabular_source_hint, + group_id=active_group_id if tabular_source_hint == "group" else None, + public_workspace_id=active_public_workspace_id if tabular_source_hint == "public" else None, + )) + + if tabular_analysis: + system_messages_for_augmentation.append({ + 'role': 'system', + 'content': ( + f"The following analysis was computed from the tabular file(s) " + f"{tabular_filenames_str} using data analysis functions:\n\n" + f"{tabular_analysis}\n\n" + f"Use these computed results to answer the user's question accurately." + ) + }) + + # Collect tool execution citations from SK tabular analysis + tabular_sk_citations = collect_tabular_sk_citations(user_id, conversation_id) + if tabular_sk_citations: + agent_citations_list.extend(tabular_sk_citations) + # Reorder hybrid citations list in descending order based on page_number hybrid_citations_list.sort(key=lambda x: x.get('page_number', 0), reverse=True) diff --git a/application/single_app/route_backend_documents.py b/application/single_app/route_backend_documents.py index fb4eb19b..14423335 100644 --- a/application/single_app/route_backend_documents.py +++ b/application/single_app/route_backend_documents.py @@ -7,6 +7,7 @@ from utils_cache import invalidate_personal_search_cache from functions_debug import * from functions_activity_logging import log_document_upload, log_document_metadata_update_transaction +import io import os import requests from flask import current_app @@ -72,7 +73,58 @@ def get_file_content(): filename = items_sorted[0].get('filename', 'Untitled') is_table = items_sorted[0].get('is_table', False) - debug_print(f"[GET_FILE_CONTENT] Filename: {filename}, is_table: {is_table}") + file_content_source = items_sorted[0].get('file_content_source', '') + debug_print(f"[GET_FILE_CONTENT] Filename: {filename}, is_table: {is_table}, source: {file_content_source}") + + # Handle blob-stored tabular files (enhanced citations enabled) + if file_content_source == 'blob': + blob_container = items_sorted[0].get('blob_container', '') + blob_path = items_sorted[0].get('blob_path', '') + debug_print(f"[GET_FILE_CONTENT] Blob-stored file: container={blob_container}, path={blob_path}") + + if not blob_container or not blob_path: + return jsonify({'error': 'Blob storage reference is incomplete'}), 500 + + try: + blob_service_client = CLIENTS.get("storage_account_office_docs_client") + if not blob_service_client: + return jsonify({'error': 'Blob storage client not available'}), 500 + + blob_client = blob_service_client.get_blob_client( + container=blob_container, + blob=blob_path + ) + stream = blob_client.download_blob() + blob_data = stream.readall() + + # Convert to CSV using pandas for display + file_ext = os.path.splitext(filename)[1].lower() + if file_ext == '.csv': + import pandas + df = pandas.read_csv(io.BytesIO(blob_data)) + combined_content = df.to_csv(index=False) + elif file_ext in ['.xlsx', '.xlsm']: + import pandas + df = pandas.read_excel(io.BytesIO(blob_data), engine='openpyxl') + combined_content = df.to_csv(index=False) + elif file_ext == '.xls': + import pandas + df = pandas.read_excel(io.BytesIO(blob_data), engine='xlrd') + combined_content = df.to_csv(index=False) + else: + combined_content = blob_data.decode('utf-8', errors='replace') + + debug_print(f"[GET_FILE_CONTENT] Successfully read blob content, length: {len(combined_content)}") + return jsonify({ + 'file_content': combined_content, + 'filename': filename, + 'is_table': is_table, + 'file_content_source': 'blob' + }), 200 + + except Exception as blob_err: + debug_print(f"[GET_FILE_CONTENT] Error reading from blob: {blob_err}") + return jsonify({'error': f'Error reading file from storage: {str(blob_err)}'}), 500 add_file_task_to_file_processing_log(document_id=file_id, user_id=user_id, content="Combining file content from chunks, filename: " + filename + ", is_table: " + str(is_table)) combined_parts = [] diff --git a/application/single_app/route_backend_plugins.py b/application/single_app/route_backend_plugins.py index 77aab866..2e66cbab 100644 --- a/application/single_app/route_backend_plugins.py +++ b/application/single_app/route_backend_plugins.py @@ -588,6 +588,8 @@ def get_core_plugin_settings(): 'enable_text_plugin': bool(settings.get('enable_text_plugin', True)), 'enable_default_embedding_model_plugin': bool(settings.get('enable_default_embedding_model_plugin', True)), 'enable_fact_memory_plugin': bool(settings.get('enable_fact_memory_plugin', True)), + 'enable_tabular_processing_plugin': bool(settings.get('enable_tabular_processing_plugin', False)), + 'enable_enhanced_citations': bool(settings.get('enable_enhanced_citations', False)), 'enable_semantic_kernel': bool(settings.get('enable_semantic_kernel', False)), 'allow_user_plugins': bool(settings.get('allow_user_plugins', True)), 'allow_group_plugins': bool(settings.get('allow_group_plugins', True)), @@ -610,6 +612,7 @@ def update_core_plugin_settings(): 'enable_text_plugin', 'enable_default_embedding_model_plugin', 'enable_fact_memory_plugin', + 'enable_tabular_processing_plugin', 'allow_user_plugins', 'allow_group_plugins' ] @@ -627,6 +630,11 @@ def update_core_plugin_settings(): return jsonify({'error': f"Field '{key}' must be a boolean."}), 400 updates[key] = data[key] logging.info("Validated plugin settings: %s", updates) + # Dependency: tabular processing requires enhanced citations + if updates.get('enable_tabular_processing_plugin', False): + full_settings = get_settings() + if not full_settings.get('enable_enhanced_citations', False): + return jsonify({'error': 'Tabular Processing requires Enhanced Citations to be enabled.'}), 400 # Update settings success = update_settings(updates) if success: diff --git a/application/single_app/route_enhanced_citations.py b/application/single_app/route_enhanced_citations.py index c81ef225..44a35223 100644 --- a/application/single_app/route_enhanced_citations.py +++ b/application/single_app/route_enhanced_citations.py @@ -8,6 +8,7 @@ import requests import mimetypes import io +import pandas from functions_authentication import login_required, user_required, get_current_user_id from functions_settings import get_settings, enabled_required @@ -15,7 +16,7 @@ from functions_group import get_user_groups from functions_public_workspaces import get_user_visible_public_workspace_ids_from_settings from swagger_wrapper import swagger_route, get_auth_security -from config import CLIENTS, storage_account_user_documents_container_name, storage_account_group_documents_container_name, storage_account_public_documents_container_name, IMAGE_EXTENSIONS, VIDEO_EXTENSIONS, AUDIO_EXTENSIONS +from config import CLIENTS, storage_account_user_documents_container_name, storage_account_group_documents_container_name, storage_account_public_documents_container_name, storage_account_personal_chat_container_name, IMAGE_EXTENSIONS, VIDEO_EXTENSIONS, AUDIO_EXTENSIONS, TABULAR_EXTENSIONS, cosmos_messages_container from functions_debug import debug_print def register_enhanced_citations_routes(app): @@ -183,6 +184,189 @@ def get_enhanced_citation_pdf(): except Exception as e: return jsonify({"error": str(e)}), 500 + @app.route("/api/enhanced_citations/tabular", methods=["GET"]) + @swagger_route(security=get_auth_security()) + @login_required + @user_required + @enabled_required("enable_enhanced_citations") + def get_enhanced_citation_tabular(): + """ + Serve original tabular file (CSV, XLSX, etc.) from blob storage for download. + Used for chat-uploaded tabular files stored in blob storage. + """ + conversation_id = request.args.get("conversation_id") + file_id = request.args.get("file_id") + + if not conversation_id or not file_id: + return jsonify({"error": "conversation_id and file_id are required"}), 400 + + user_id = get_current_user_id() + if not user_id: + return jsonify({"error": "User not authenticated"}), 401 + + try: + # Look up the file message in Cosmos to get blob reference + query_str = """ + SELECT * FROM c + WHERE c.conversation_id = @conversation_id + AND c.id = @file_id + """ + items = list(cosmos_messages_container.query_items( + query=query_str, + parameters=[ + {'name': '@conversation_id', 'value': conversation_id}, + {'name': '@file_id', 'value': file_id} + ], + partition_key=conversation_id + )) + + if not items: + return jsonify({"error": "File not found"}), 404 + + file_msg = items[0] + file_content_source = file_msg.get('file_content_source', '') + + if file_content_source != 'blob': + return jsonify({"error": "File is not stored in blob storage"}), 400 + + blob_container = file_msg.get('blob_container', '') + blob_path = file_msg.get('blob_path', '') + filename = file_msg.get('filename', 'download') + + if not blob_container or not blob_path: + return jsonify({"error": "Blob reference is incomplete"}), 500 + + blob_service_client = CLIENTS.get("storage_account_office_docs_client") + if not blob_service_client: + return jsonify({"error": "Storage not available"}), 500 + + blob_client = blob_service_client.get_blob_client( + container=blob_container, + blob=blob_path + ) + stream = blob_client.download_blob() + content = stream.readall() + + # Determine content type + content_type, _ = mimetypes.guess_type(filename) + if not content_type: + content_type = 'application/octet-stream' + + return Response( + content, + content_type=content_type, + headers={ + 'Content-Length': str(len(content)), + 'Content-Disposition': f'attachment; filename="{filename}"', + 'Cache-Control': 'private, max-age=300', + } + ) + + except Exception as e: + debug_print(f"Error serving tabular citation: {e}") + return jsonify({"error": str(e)}), 500 + + @app.route("/api/enhanced_citations/tabular_workspace", methods=["GET"]) + @swagger_route(security=get_auth_security()) + @login_required + @user_required + @enabled_required("enable_enhanced_citations") + def get_enhanced_citation_tabular_workspace(): + """ + Serve tabular file (CSV, XLSX, etc.) from blob storage for workspace documents. + Uses doc_id to look up the document across personal, group, and public workspaces. + """ + doc_id = request.args.get("doc_id") + if not doc_id: + return jsonify({"error": "doc_id is required"}), 400 + + user_id = get_current_user_id() + if not user_id: + return jsonify({"error": "User not authenticated"}), 401 + + try: + doc_response, status_code = get_document(user_id, doc_id) + if status_code != 200: + return doc_response, status_code + + raw_doc = doc_response.get_json() + file_name = raw_doc.get('file_name', '') + ext = file_name.lower().split('.')[-1] if '.' in file_name else '' + + if ext not in ('csv', 'xlsx', 'xls', 'xlsm'): + return jsonify({"error": "File is not a tabular file"}), 400 + + return serve_enhanced_citation_content(raw_doc, force_download=True) + + except Exception as e: + debug_print(f"Error serving tabular workspace citation: {e}") + return jsonify({"error": str(e)}), 500 + + @app.route("/api/enhanced_citations/tabular_preview", methods=["GET"]) + @swagger_route(security=get_auth_security()) + @login_required + @user_required + @enabled_required("enable_enhanced_citations") + def get_enhanced_citation_tabular_preview(): + """ + Return JSON preview of a tabular file for rendering as an HTML table. + Reads the file into a pandas DataFrame and returns columns + rows as JSON. + """ + doc_id = request.args.get("doc_id") + max_rows = min(int(request.args.get("max_rows", 200)), 500) + if not doc_id: + return jsonify({"error": "doc_id is required"}), 400 + + user_id = get_current_user_id() + if not user_id: + return jsonify({"error": "User not authenticated"}), 401 + + try: + doc_response, status_code = get_document(user_id, doc_id) + if status_code != 200: + return doc_response, status_code + + raw_doc = doc_response.get_json() + file_name = raw_doc.get('file_name', '') + ext = file_name.lower().rsplit('.', 1)[-1] if '.' in file_name else '' + if ext not in ('csv', 'xlsx', 'xls', 'xlsm'): + return jsonify({"error": "File is not a tabular file"}), 400 + + # Download blob + workspace_type, container_name = determine_workspace_type_and_container(raw_doc) + blob_name = get_blob_name(raw_doc, workspace_type) + blob_service_client = CLIENTS.get("storage_account_office_docs_client") + if not blob_service_client: + return jsonify({"error": "Blob storage client not available"}), 500 + blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name) + data = blob_client.download_blob().readall() + + # Read into DataFrame + if ext == 'csv': + df = pandas.read_csv(io.BytesIO(data), keep_default_na=False, dtype=str) + elif ext in ('xlsx', 'xlsm'): + df = pandas.read_excel(io.BytesIO(data), engine='openpyxl', keep_default_na=False, dtype=str) + elif ext == 'xls': + df = pandas.read_excel(io.BytesIO(data), engine='xlrd', keep_default_na=False, dtype=str) + else: + return jsonify({"error": f"Unsupported file type: {ext}"}), 400 + + total_rows = len(df) + preview = df.head(max_rows) + + return jsonify({ + "filename": file_name, + "total_rows": total_rows, + "total_columns": len(df.columns), + "columns": list(df.columns), + "rows": preview.values.tolist(), + "truncated": total_rows > max_rows + }) + + except Exception as e: + debug_print(f"Error generating tabular preview: {e}") + return jsonify({"error": str(e)}), 500 + def get_document(user_id, doc_id): """ Get document metadata - searches across all enabled workspace types diff --git a/application/single_app/route_frontend_admin_settings.py b/application/single_app/route_frontend_admin_settings.py index 578e1545..4a971781 100644 --- a/application/single_app/route_frontend_admin_settings.py +++ b/application/single_app/route_frontend_admin_settings.py @@ -98,6 +98,8 @@ def admin_settings(): settings['enable_text_plugin'] = False if 'enable_fact_memory_plugin' not in settings: settings['enable_fact_memory_plugin'] = False + if 'enable_tabular_processing_plugin' not in settings: + settings['enable_tabular_processing_plugin'] = False if 'enable_default_embedding_model_plugin' not in settings: settings['enable_default_embedding_model_plugin'] = False if 'enable_multi_agent_orchestration' not in settings: diff --git a/application/single_app/route_frontend_chats.py b/application/single_app/route_frontend_chats.py index ca0feb1a..67a41879 100644 --- a/application/single_app/route_frontend_chats.py +++ b/application/single_app/route_frontend_chats.py @@ -237,8 +237,33 @@ def upload_file(): # Handle XML, YAML, and LOG files as text for inline chat extracted_content = extract_text_file(temp_file_path) elif file_ext_nodot in TABULAR_EXTENSIONS: - extracted_content = extract_table_file(temp_file_path, file_ext) is_table = True + + # Upload tabular file to blob storage for tabular processing plugin access + if settings.get('enable_enhanced_citations', False): + try: + blob_service_client = CLIENTS.get("storage_account_office_docs_client") + if blob_service_client: + blob_path = f"{user_id}/{conversation_id}/{filename}" + blob_client = blob_service_client.get_blob_client( + container=storage_account_personal_chat_container_name, + blob=blob_path + ) + metadata = { + "conversation_id": str(conversation_id), + "user_id": str(user_id) + } + with open(temp_file_path, "rb") as blob_f: + blob_client.upload_blob(blob_f, overwrite=True, metadata=metadata) + log_event(f"Uploaded chat tabular file to blob storage: {blob_path}") + except Exception as blob_err: + log_event( + f"Warning: Failed to upload chat tabular file to blob storage: {blob_err}", + level=logging.WARNING + ) + else: + # Only extract content for Cosmos storage when enhanced citations is disabled + extracted_content = extract_table_file(temp_file_path, file_ext) else: return jsonify({'error': 'Unsupported file type'}), 400 @@ -395,25 +420,50 @@ def upload_file(): current_thread_id = str(uuid.uuid4()) - file_message = { - 'id': file_message_id, - 'conversation_id': conversation_id, - 'role': 'file', - 'filename': filename, - 'file_content': extracted_content, - 'is_table': is_table, - 'timestamp': datetime.utcnow().isoformat(), - 'model_deployment_name': None, - 'metadata': { - 'thread_info': { - 'thread_id': current_thread_id, - 'previous_thread_id': previous_thread_id, - 'active_thread': True, - 'thread_attempt': 1 + # When enhanced citations is enabled and file is tabular, store a lightweight + # reference without file_content to avoid Cosmos DB size limits. + # The tabular data lives in blob storage and is served from there. + if is_table and settings.get('enable_enhanced_citations', False): + file_message = { + 'id': file_message_id, + 'conversation_id': conversation_id, + 'role': 'file', + 'filename': filename, + 'is_table': is_table, + 'file_content_source': 'blob', + 'blob_container': storage_account_personal_chat_container_name, + 'blob_path': f"{user_id}/{conversation_id}/{filename}", + 'timestamp': datetime.utcnow().isoformat(), + 'model_deployment_name': None, + 'metadata': { + 'thread_info': { + 'thread_id': current_thread_id, + 'previous_thread_id': previous_thread_id, + 'active_thread': True, + 'thread_attempt': 1 + } } } - } - + else: + file_message = { + 'id': file_message_id, + 'conversation_id': conversation_id, + 'role': 'file', + 'filename': filename, + 'file_content': extracted_content, + 'is_table': is_table, + 'timestamp': datetime.utcnow().isoformat(), + 'model_deployment_name': None, + 'metadata': { + 'thread_info': { + 'thread_id': current_thread_id, + 'previous_thread_id': previous_thread_id, + 'active_thread': True, + 'thread_attempt': 1 + } + } + } + # Add vision analysis if available if vision_analysis: file_message['vision_analysis'] = vision_analysis diff --git a/application/single_app/semantic_kernel_loader.py b/application/single_app/semantic_kernel_loader.py index 78f54203..eb9685bc 100644 --- a/application/single_app/semantic_kernel_loader.py +++ b/application/single_app/semantic_kernel_loader.py @@ -19,6 +19,7 @@ from semantic_kernel.functions.kernel_plugin import KernelPlugin from semantic_kernel_plugins.embedding_model_plugin import EmbeddingModelPlugin from semantic_kernel_plugins.fact_memory_plugin import FactMemoryPlugin +from semantic_kernel_plugins.tabular_processing_plugin import TabularProcessingPlugin from functions_settings import get_settings, get_user_settings from foundry_agent_runtime import AzureAIFoundryChatCompletionAgent from functions_appinsights import log_event, get_appinsights_logger @@ -408,6 +409,13 @@ def load_embedding_model_plugin(kernel: Kernel, settings): description="Provides text embedding functions using the configured embedding model." ) +def load_tabular_processing_plugin(kernel: Kernel): + kernel.add_plugin( + TabularProcessingPlugin(), + plugin_name="tabular_processing", + description="Provides data analysis on tabular files (CSV, XLSX) stored in blob storage. Can list files, describe schemas, aggregate columns, filter rows, run queries, and perform group-by operations." + ) + def load_core_plugins_only(kernel: Kernel, settings): """Load only core plugins for model-only conversations without agents.""" debug_print(f"[SK Loader] Loading core plugins only for model-only mode...") @@ -429,6 +437,10 @@ def load_core_plugins_only(kernel: Kernel, settings): load_text_plugin(kernel) log_event("[SK Loader] Loaded Text plugin.", level=logging.INFO) + if settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + load_tabular_processing_plugin(kernel) + log_event("[SK Loader] Loaded Tabular Processing plugin.", level=logging.INFO) + # =================== Semantic Kernel Initialization =================== def initialize_semantic_kernel(user_id: str=None, redis_client=None): debug_print(f"[SK Loader] Initializing Semantic Kernel and plugins...") @@ -1013,6 +1025,14 @@ def load_plugins_for_kernel(kernel, plugin_manifests, settings, mode_label="glob except Exception as e: log_event(f"[SK Loader] Failed to load Fact Memory Plugin: {e}", level=logging.WARNING) + # Register Tabular Processing Plugin if enabled (requires enhanced citations) + if settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + try: + load_tabular_processing_plugin(kernel) + log_event("[SK Loader] Loaded Tabular Processing plugin.", level=logging.INFO) + except Exception as e: + log_event(f"[SK Loader] Failed to load Tabular Processing plugin: {e}", level=logging.WARNING) + # Conditionally load static embedding model plugin if settings.get('enable_default_embedding_model_plugin', True): try: @@ -1357,7 +1377,11 @@ def load_user_semantic_kernel(kernel: Kernel, settings, user_id: str, redis_clie load_embedding_model_plugin(kernel, settings) print(f"[SK Loader] Loaded Default Embedding Model plugin.") log_event("[SK Loader] Loaded Default Embedding Model plugin.", level=logging.INFO) - + + if settings.get('enable_tabular_processing_plugin', False) and settings.get('enable_enhanced_citations', False): + load_tabular_processing_plugin(kernel) + log_event("[SK Loader] Loaded Tabular Processing plugin.", level=logging.INFO) + # Get selected agent from user settings (this still needs to be in user settings for UI state) user_settings = get_user_settings(user_id).get('settings', {}) selected_agent = user_settings.get('selected_agent') diff --git a/application/single_app/semantic_kernel_plugins/tabular_processing_plugin.py b/application/single_app/semantic_kernel_plugins/tabular_processing_plugin.py new file mode 100644 index 00000000..a525250b --- /dev/null +++ b/application/single_app/semantic_kernel_plugins/tabular_processing_plugin.py @@ -0,0 +1,515 @@ +# tabular_processing_plugin.py +""" +TabularProcessingPlugin for Semantic Kernel: provides data analysis operations +on tabular files (CSV, XLSX, XLS, XLSM) stored in Azure Blob Storage. + +Works with workspace documents (user-documents, group-documents, public-documents) +and chat-uploaded documents (personal-chat container). +""" +import asyncio +import io +import json +import logging +import pandas +from typing import Annotated, Optional, List +from semantic_kernel.functions import kernel_function +from semantic_kernel_plugins.plugin_invocation_logger import plugin_function_logger +from functions_appinsights import log_event +from config import ( + CLIENTS, + TABULAR_EXTENSIONS, + storage_account_user_documents_container_name, + storage_account_personal_chat_container_name, + storage_account_group_documents_container_name, + storage_account_public_documents_container_name, +) + + +class TabularProcessingPlugin: + """Provides data analysis functions on tabular files stored in blob storage.""" + + SUPPORTED_EXTENSIONS = {'.csv', '.xlsx', '.xls', '.xlsm'} + + def __init__(self): + self._df_cache = {} # Per-instance cache: (container, blob_name) -> DataFrame + + def _get_blob_service_client(self): + """Get the blob service client from CLIENTS cache.""" + client = CLIENTS.get("storage_account_office_docs_client") + if not client: + raise RuntimeError("Blob storage client not available. Enhanced citations must be enabled.") + return client + + def _list_tabular_blobs(self, container_name: str, prefix: str) -> List[str]: + """List all tabular file blobs under a given prefix.""" + client = self._get_blob_service_client() + container_client = client.get_container_client(container_name) + blobs = [] + for blob in container_client.list_blobs(name_starts_with=prefix): + name_lower = blob['name'].lower() + if any(name_lower.endswith(ext) for ext in self.SUPPORTED_EXTENSIONS): + blobs.append(blob['name']) + return blobs + + def _read_tabular_blob_to_dataframe(self, container_name: str, blob_name: str) -> pandas.DataFrame: + """Download a blob and read it into a pandas DataFrame. Uses per-instance cache.""" + cache_key = (container_name, blob_name) + if cache_key in self._df_cache: + log_event(f"[TabularProcessingPlugin] Cache hit for {blob_name}", level=logging.DEBUG) + return self._df_cache[cache_key].copy() + + client = self._get_blob_service_client() + blob_client = client.get_blob_client(container=container_name, blob=blob_name) + stream = blob_client.download_blob() + data = stream.readall() + + name_lower = blob_name.lower() + if name_lower.endswith('.csv'): + df = pandas.read_csv(io.BytesIO(data), keep_default_na=False, dtype=str) + elif name_lower.endswith('.xlsx') or name_lower.endswith('.xlsm'): + df = pandas.read_excel(io.BytesIO(data), engine='openpyxl', keep_default_na=False, dtype=str) + elif name_lower.endswith('.xls'): + df = pandas.read_excel(io.BytesIO(data), engine='xlrd', keep_default_na=False, dtype=str) + else: + raise ValueError(f"Unsupported tabular file type: {blob_name}") + + self._df_cache[cache_key] = df + log_event(f"[TabularProcessingPlugin] Cached DataFrame for {blob_name} ({len(df)} rows)", level=logging.DEBUG) + return df.copy() + + def _try_numeric_conversion(self, df: pandas.DataFrame) -> pandas.DataFrame: + """Attempt to convert string columns to numeric where possible.""" + for col in df.columns: + try: + df[col] = pandas.to_numeric(df[col]) + except (ValueError, TypeError): + pass + return df + + def _resolve_blob_location(self, user_id: str, conversation_id: str, filename: str, source: str, + group_id: str = None, public_workspace_id: str = None) -> tuple: + """Resolve container name and blob path from source type.""" + source = source.lower().strip() + if source == 'chat': + container = storage_account_personal_chat_container_name + blob_path = f"{user_id}/{conversation_id}/{filename}" + elif source == 'workspace': + container = storage_account_user_documents_container_name + blob_path = f"{user_id}/{filename}" + elif source == 'group': + if not group_id: + raise ValueError("group_id is required for source='group'") + container = storage_account_group_documents_container_name + blob_path = f"{group_id}/{filename}" + elif source == 'public': + if not public_workspace_id: + raise ValueError("public_workspace_id is required for source='public'") + container = storage_account_public_documents_container_name + blob_path = f"{public_workspace_id}/{filename}" + else: + raise ValueError(f"Unknown source '{source}'. Use 'workspace', 'chat', 'group', or 'public'.") + return container, blob_path + + def _resolve_blob_location_with_fallback(self, user_id: str, conversation_id: str, filename: str, source: str, + group_id: str = None, public_workspace_id: str = None) -> tuple: + """Try primary source first, then fall back to other containers if blob not found.""" + source = source.lower().strip() + attempts = [] + + # Primary attempt based on specified source + try: + primary = self._resolve_blob_location(user_id, conversation_id, filename, source, group_id, public_workspace_id) + attempts.append(primary) + except ValueError: + pass + + # Fallback attempts in priority order (skip the primary source) + if source != 'workspace': + attempts.append((storage_account_user_documents_container_name, f"{user_id}/{filename}")) + if source != 'group' and group_id: + attempts.append((storage_account_group_documents_container_name, f"{group_id}/{filename}")) + if source != 'public' and public_workspace_id: + attempts.append((storage_account_public_documents_container_name, f"{public_workspace_id}/{filename}")) + if source != 'chat': + attempts.append((storage_account_personal_chat_container_name, f"{user_id}/{conversation_id}/{filename}")) + + client = self._get_blob_service_client() + for container, blob_path in attempts: + try: + blob_client = client.get_blob_client(container=container, blob=blob_path) + if blob_client.exists(): + log_event(f"[TabularProcessingPlugin] Found blob at {container}/{blob_path}", level=logging.DEBUG) + return container, blob_path + except Exception: + continue + + # If nothing found, return primary for the original error message + if attempts: + return attempts[0] + raise ValueError(f"Could not resolve blob location for {filename}") + + @kernel_function( + description=( + "List all tabular data files available for a user. Checks workspace documents " + "(user-documents container), chat-uploaded documents (personal-chat container), " + "and optionally group or public workspace documents. " + "Returns a JSON list of available files with their source." + ), + name="list_tabular_files" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def list_tabular_files( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON list of available tabular files"]: + """List all tabular files available for the user across all accessible containers.""" + def _sync_work(): + results = [] + try: + workspace_prefix = f"{user_id}/" + workspace_blobs = self._list_tabular_blobs( + storage_account_user_documents_container_name, workspace_prefix + ) + for blob in workspace_blobs: + filename = blob.split('/')[-1] + results.append({ + "filename": filename, + "blob_path": blob, + "source": "workspace", + "container": storage_account_user_documents_container_name + }) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error listing workspace blobs: {e}", level=logging.WARNING) + + try: + chat_prefix = f"{user_id}/{conversation_id}/" + chat_blobs = self._list_tabular_blobs( + storage_account_personal_chat_container_name, chat_prefix + ) + for blob in chat_blobs: + filename = blob.split('/')[-1] + results.append({ + "filename": filename, + "blob_path": blob, + "source": "chat", + "container": storage_account_personal_chat_container_name + }) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error listing chat blobs: {e}", level=logging.WARNING) + + if group_id: + try: + group_prefix = f"{group_id}/" + group_blobs = self._list_tabular_blobs( + storage_account_group_documents_container_name, group_prefix + ) + for blob in group_blobs: + filename = blob.split('/')[-1] + results.append({ + "filename": filename, + "blob_path": blob, + "source": "group", + "container": storage_account_group_documents_container_name + }) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error listing group blobs: {e}", level=logging.WARNING) + + if public_workspace_id: + try: + public_prefix = f"{public_workspace_id}/" + public_blobs = self._list_tabular_blobs( + storage_account_public_documents_container_name, public_prefix + ) + for blob in public_blobs: + filename = blob.split('/')[-1] + results.append({ + "filename": filename, + "blob_path": blob, + "source": "public", + "container": storage_account_public_documents_container_name + }) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error listing public blobs: {e}", level=logging.WARNING) + + return json.dumps(results, indent=2) + return await asyncio.to_thread(_sync_work) + + @kernel_function( + description=( + "Get a summary of a tabular file including column names, row count, data types, " + "and a preview of the first few rows." + ), + name="describe_tabular_file" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def describe_tabular_file( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + filename: Annotated[str, "The filename of the tabular file"], + source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON summary of the tabular file"]: + """Get schema and preview of a tabular file.""" + def _sync_work(): + try: + container, blob_path = self._resolve_blob_location( + user_id, conversation_id, filename, source, + group_id=group_id, public_workspace_id=public_workspace_id + ) + df = self._read_tabular_blob_to_dataframe(container, blob_path) + df_numeric = self._try_numeric_conversion(df.copy()) + + summary = { + "filename": filename, + "row_count": len(df), + "column_count": len(df.columns), + "columns": list(df.columns), + "dtypes": {col: str(dtype) for col, dtype in df_numeric.dtypes.items()}, + "preview": df.head(5).to_dict(orient='records'), + "null_counts": df.isnull().sum().to_dict() + } + return json.dumps(summary, indent=2, default=str) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error describing file: {e}", level=logging.WARNING) + return json.dumps({"error": str(e)}) + return await asyncio.to_thread(_sync_work) + + @kernel_function( + description=( + "Execute an aggregation operation on a column of a tabular file. " + "Supported operations: sum, mean, count, min, max, median, std, nunique, value_counts." + ), + name="aggregate_column" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def aggregate_column( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + filename: Annotated[str, "The filename of the tabular file"], + column: Annotated[str, "The column name to aggregate"], + operation: Annotated[str, "Aggregation: sum, mean, count, min, max, median, std, nunique, value_counts"], + source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON result of the aggregation"]: + """Execute an aggregation operation on a column.""" + def _sync_work(): + try: + container, blob_path = self._resolve_blob_location( + user_id, conversation_id, filename, source, + group_id=group_id, public_workspace_id=public_workspace_id + ) + df = self._read_tabular_blob_to_dataframe(container, blob_path) + df = self._try_numeric_conversion(df) + + if column not in df.columns: + return json.dumps({"error": f"Column '{column}' not found. Available: {list(df.columns)}"}) + + series = df[column] + op = operation.lower().strip() + + if op == 'sum': + result = series.sum() + elif op == 'mean': + result = series.mean() + elif op == 'count': + result = series.count() + elif op == 'min': + result = series.min() + elif op == 'max': + result = series.max() + elif op == 'median': + result = series.median() + elif op == 'std': + result = series.std() + elif op == 'nunique': + result = series.nunique() + elif op == 'value_counts': + result = series.value_counts().to_dict() + else: + return json.dumps({"error": f"Unsupported operation: {operation}. Use sum, mean, count, min, max, median, std, nunique, value_counts."}) + + return json.dumps({"column": column, "operation": op, "result": result}, indent=2, default=str) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error aggregating column: {e}", level=logging.WARNING) + return json.dumps({"error": str(e)}) + return await asyncio.to_thread(_sync_work) + + @kernel_function( + description=( + "Filter rows in a tabular file based on conditions and return matching rows. " + "Supports operators: ==, !=, >, <, >=, <=, contains, startswith, endswith." + ), + name="filter_rows" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def filter_rows( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + filename: Annotated[str, "The filename of the tabular file"], + column: Annotated[str, "The column to filter on"], + operator: Annotated[str, "Operator: ==, !=, >, <, >=, <=, contains, startswith, endswith"], + value: Annotated[str, "The value to compare against"], + source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", + max_rows: Annotated[str, "Maximum rows to return"] = "100", + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON list of matching rows"]: + """Filter rows based on a condition.""" + def _sync_work(): + try: + container, blob_path = self._resolve_blob_location( + user_id, conversation_id, filename, source, + group_id=group_id, public_workspace_id=public_workspace_id + ) + df = self._read_tabular_blob_to_dataframe(container, blob_path) + df = self._try_numeric_conversion(df) + + if column not in df.columns: + return json.dumps({"error": f"Column '{column}' not found. Available: {list(df.columns)}"}) + + series = df[column] + op = operator.strip().lower() + + numeric_value = None + try: + numeric_value = float(value) + except (ValueError, TypeError): + pass + + if op == '==' or op == 'equals': + if numeric_value is not None and pandas.api.types.is_numeric_dtype(series): + mask = series == numeric_value + else: + mask = series.astype(str).str.lower() == value.lower() + elif op == '!=': + if numeric_value is not None and pandas.api.types.is_numeric_dtype(series): + mask = series != numeric_value + else: + mask = series.astype(str).str.lower() != value.lower() + elif op == '>': + mask = series > numeric_value + elif op == '<': + mask = series < numeric_value + elif op == '>=': + mask = series >= numeric_value + elif op == '<=': + mask = series <= numeric_value + elif op == 'contains': + mask = series.astype(str).str.contains(value, case=False, na=False) + elif op == 'startswith': + mask = series.astype(str).str.lower().str.startswith(value.lower()) + elif op == 'endswith': + mask = series.astype(str).str.lower().str.endswith(value.lower()) + else: + return json.dumps({"error": f"Unsupported operator: {operator}"}) + + limit = int(max_rows) + filtered = df[mask].head(limit) + return json.dumps({ + "total_matches": int(mask.sum()), + "returned_rows": len(filtered), + "data": filtered.to_dict(orient='records') + }, indent=2, default=str) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error filtering rows: {e}", level=logging.WARNING) + return json.dumps({"error": str(e)}) + return await asyncio.to_thread(_sync_work) + + @kernel_function( + description=( + "Execute a pandas query expression against a tabular file for advanced analysis. " + "The query string uses pandas DataFrame.query() syntax. " + "Examples: 'Age > 30 and State == \"CA\"', 'Price < 100'" + ), + name="query_tabular_data" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def query_tabular_data( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + filename: Annotated[str, "The filename of the tabular file"], + query_expression: Annotated[str, "Pandas query expression (e.g. 'Age > 30 and State == \"CA\"')"], + source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", + max_rows: Annotated[str, "Maximum rows to return"] = "100", + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON result of the query"]: + """Execute a pandas query expression against a tabular file.""" + def _sync_work(): + try: + container, blob_path = self._resolve_blob_location( + user_id, conversation_id, filename, source, + group_id=group_id, public_workspace_id=public_workspace_id + ) + df = self._read_tabular_blob_to_dataframe(container, blob_path) + df = self._try_numeric_conversion(df) + + result_df = df.query(query_expression) + limit = int(max_rows) + return json.dumps({ + "total_matches": len(result_df), + "returned_rows": min(len(result_df), limit), + "data": result_df.head(limit).to_dict(orient='records') + }, indent=2, default=str) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error querying data: {e}", level=logging.WARNING) + return json.dumps({"error": f"Query error: {str(e)}. Ensure column names and values are correct."}) + return await asyncio.to_thread(_sync_work) + + @kernel_function( + description=( + "Perform a group-by aggregation on a tabular file. " + "Groups data by one column and aggregates another column. " + "Supported operations: sum, mean, count, min, max." + ), + name="group_by_aggregate" + ) + @plugin_function_logger("TabularProcessingPlugin") + async def group_by_aggregate( + self, + user_id: Annotated[str, "The user ID (from Scope ID in Conversation Metadata)"], + conversation_id: Annotated[str, "The conversation ID (from Conversation Metadata)"], + filename: Annotated[str, "The filename of the tabular file"], + group_by_column: Annotated[str, "The column to group by"], + aggregate_column: Annotated[str, "The column to aggregate"], + operation: Annotated[str, "Aggregation operation: sum, mean, count, min, max"], + source: Annotated[str, "Source: 'workspace', 'chat', 'group', or 'public'"] = "chat", + group_id: Annotated[Optional[str], "Group ID (for group workspace documents)"] = None, + public_workspace_id: Annotated[Optional[str], "Public workspace ID (for public workspace documents)"] = None, + ) -> Annotated[str, "JSON result of the group-by aggregation"]: + """Group by one column and aggregate another.""" + def _sync_work(): + try: + container, blob_path = self._resolve_blob_location( + user_id, conversation_id, filename, source, + group_id=group_id, public_workspace_id=public_workspace_id + ) + df = self._read_tabular_blob_to_dataframe(container, blob_path) + df = self._try_numeric_conversion(df) + + for col in [group_by_column, aggregate_column]: + if col not in df.columns: + return json.dumps({"error": f"Column '{col}' not found. Available: {list(df.columns)}"}) + + op = operation.lower().strip() + grouped = df.groupby(group_by_column)[aggregate_column].agg(op) + return json.dumps({ + "group_by": group_by_column, + "aggregate_column": aggregate_column, + "operation": op, + "groups": len(grouped), + "result": grouped.to_dict() + }, indent=2, default=str) + except Exception as e: + log_event(f"[TabularProcessingPlugin] Error in group-by: {e}", level=logging.WARNING) + return json.dumps({"error": str(e)}) + return await asyncio.to_thread(_sync_work) diff --git a/application/single_app/static/images/custom_logo.png b/application/single_app/static/images/custom_logo.png new file mode 100644 index 00000000..ecf6e652 Binary files /dev/null and b/application/single_app/static/images/custom_logo.png differ diff --git a/application/single_app/static/images/custom_logo_dark.png b/application/single_app/static/images/custom_logo_dark.png new file mode 100644 index 00000000..4f281945 Binary files /dev/null and b/application/single_app/static/images/custom_logo_dark.png differ diff --git a/application/single_app/static/js/admin/admin_settings.js b/application/single_app/static/js/admin/admin_settings.js index 85719128..8a425247 100644 --- a/application/single_app/static/js/admin/admin_settings.js +++ b/application/single_app/static/js/admin/admin_settings.js @@ -1237,10 +1237,11 @@ function setupToggles() { const mathToggle = document.getElementById('toggle-math-plugin'); const textToggle = document.getElementById('toggle-text-plugin'); const factMemoryToggle = document.getElementById('toggle-fact-memory-plugin'); + const tabularProcessingToggle = document.getElementById('toggle-tabular-processing-plugin'); const embeddingToggle = document.getElementById('toggle-default-embedding-model-plugin'); const allowUserPluginsToggle = document.getElementById('toggle-allow-user-plugins'); const allowGroupPluginsToggle = document.getElementById('toggle-allow-group-plugins'); - const toggles = [timeToggle, httpToggle, waitToggle, mathToggle, textToggle, factMemoryToggle, embeddingToggle, allowUserPluginsToggle, allowGroupPluginsToggle]; + const toggles = [timeToggle, httpToggle, waitToggle, mathToggle, textToggle, factMemoryToggle, tabularProcessingToggle, embeddingToggle, allowUserPluginsToggle, allowGroupPluginsToggle]; // Feedback area let feedbackDiv = document.getElementById('core-plugin-toggles-feedback'); if (!feedbackDiv) { @@ -1270,6 +1271,16 @@ function setupToggles() { if (textToggle) textToggle.checked = !!settings.enable_text_plugin; if (embeddingToggle) embeddingToggle.checked = !!settings.enable_default_embedding_model_plugin; if (factMemoryToggle) factMemoryToggle.checked = !!settings.enable_fact_memory_plugin; + if (tabularProcessingToggle) { + tabularProcessingToggle.checked = !!settings.enable_tabular_processing_plugin; + const ecEnabled = !!settings.enable_enhanced_citations; + tabularProcessingToggle.disabled = !ecEnabled; + const depNote = document.getElementById('tabular-processing-dependency-note'); + if (depNote) { + depNote.textContent = ecEnabled ? 'Requires Enhanced Citations' : 'Requires Enhanced Citations (currently disabled)'; + depNote.className = ecEnabled ? 'text-muted d-block ms-4' : 'text-danger d-block ms-4'; + } + } if (allowUserPluginsToggle) allowUserPluginsToggle.checked = !!settings.allow_user_plugins; if (allowGroupPluginsToggle) allowGroupPluginsToggle.checked = !!settings.allow_group_plugins; } catch (err) { @@ -1291,6 +1302,7 @@ function setupToggles() { enable_text_plugin: textToggle ? textToggle.checked : false, enable_default_embedding_model_plugin: embeddingToggle ? embeddingToggle.checked : false, enable_fact_memory_plugin: factMemoryToggle ? factMemoryToggle.checked : false, + enable_tabular_processing_plugin: tabularProcessingToggle ? tabularProcessingToggle.checked : false, allow_user_plugins: allowUserPluginsToggle ? allowUserPluginsToggle.checked : false, allow_group_plugins: allowGroupPluginsToggle ? allowGroupPluginsToggle.checked : false }; diff --git a/application/single_app/static/js/chat/chat-enhanced-citations.js b/application/single_app/static/js/chat/chat-enhanced-citations.js index dcda708b..9d4344bb 100644 --- a/application/single_app/static/js/chat/chat-enhanced-citations.js +++ b/application/single_app/static/js/chat/chat-enhanced-citations.js @@ -18,11 +18,13 @@ export function getFileType(fileName) { const imageExtensions = ['jpg', 'jpeg', 'png', 'bmp', 'tiff', 'tif']; const videoExtensions = ['mp4', 'mov', 'avi', 'mkv', 'flv', 'webm', 'wmv', 'm4v', '3gp']; const audioExtensions = ['mp3', 'wav', 'ogg', 'aac', 'flac', 'm4a']; - + const tabularExtensions = ['csv', 'xlsx', 'xls', 'xlsm']; + if (imageExtensions.includes(ext)) return 'image'; if (ext === 'pdf') return 'pdf'; if (videoExtensions.includes(ext)) return 'video'; if (audioExtensions.includes(ext)) return 'audio'; + if (tabularExtensions.includes(ext)) return 'tabular'; return 'other'; } @@ -66,6 +68,9 @@ export function showEnhancedCitationModal(docId, pageNumberOrTimestamp, citation const audioTimestamp = convertTimestampToSeconds(pageNumberOrTimestamp); showAudioModal(docId, audioTimestamp, docMetadata.file_name); break; + case 'tabular': + showTabularDownloadModal(docId, docMetadata.file_name); + break; default: // Fall back to text citation for unsupported types import('./chat-citations.js').then(module => { @@ -291,6 +296,119 @@ export function showAudioModal(docId, timestamp, fileName) { modalInstance.show(); } +/** + * Show tabular file preview modal with data table + * @param {string} docId - Document ID + * @param {string} fileName - File name + */ +export function showTabularDownloadModal(docId, fileName) { + console.log(`Showing tabular preview modal for docId: ${docId}, fileName: ${fileName}`); + showLoadingIndicator(); + + // Create or get tabular modal + let tabularModal = document.getElementById("enhanced-tabular-modal"); + if (!tabularModal) { + tabularModal = createTabularModal(); + } + + const title = tabularModal.querySelector(".modal-title"); + const tableContainer = tabularModal.querySelector("#enhanced-tabular-table-container"); + const rowInfo = tabularModal.querySelector("#enhanced-tabular-row-info"); + const downloadBtn = tabularModal.querySelector("#enhanced-tabular-download"); + const errorContainer = tabularModal.querySelector("#enhanced-tabular-error"); + + title.textContent = `Tabular Data: ${fileName}`; + tableContainer.innerHTML = '
Loading...

Loading data preview...

'; + rowInfo.textContent = ''; + errorContainer.classList.add('d-none'); + + const downloadUrl = `/api/enhanced_citations/tabular_workspace?doc_id=${encodeURIComponent(docId)}`; + downloadBtn.href = downloadUrl; + downloadBtn.download = fileName; + + // Show modal immediately with loading state + const modalInstance = new bootstrap.Modal(tabularModal); + modalInstance.show(); + + // Fetch preview data + const previewUrl = `/api/enhanced_citations/tabular_preview?doc_id=${encodeURIComponent(docId)}`; + fetch(previewUrl) + .then(response => { + if (!response.ok) throw new Error(`HTTP ${response.status}`); + return response.json(); + }) + .then(data => { + hideLoadingIndicator(); + if (data.error) { + showTabularError(tableContainer, errorContainer, data.error); + return; + } + renderTabularPreview(tableContainer, rowInfo, data); + }) + .catch(error => { + hideLoadingIndicator(); + console.error('Error loading tabular preview:', error); + showTabularError(tableContainer, errorContainer, 'Could not load data preview.'); + }); +} + +/** + * Render tabular data as an HTML table + * @param {HTMLElement} container - Table container element + * @param {HTMLElement} rowInfo - Row info display element + * @param {Object} data - Preview data from API + */ +function renderTabularPreview(container, rowInfo, data) { + const { columns, rows, total_rows, truncated } = data; + + // Build table HTML + let html = ''; + + // Header + html += ''; + for (const col of columns) { + const escaped = col.replace(/&/g, '&').replace(//g, '>'); + html += ``; + } + html += ''; + + // Body + html += ''; + for (const row of rows) { + html += ''; + for (const cell of row) { + const val = cell === null || cell === undefined ? '' : String(cell); + const escaped = val.replace(/&/g, '&').replace(//g, '>'); + html += ``; + } + html += ''; + } + html += '
${escaped}
${escaped}
'; + + container.innerHTML = html; + + // Row info + const displayedRows = rows.length; + const totalFormatted = total_rows.toLocaleString(); + if (truncated) { + rowInfo.textContent = `Showing ${displayedRows.toLocaleString()} of ${totalFormatted} rows`; + } else { + rowInfo.textContent = `${totalFormatted} rows, ${columns.length} columns`; + } +} + +/** + * Show error state in tabular modal with download fallback + * @param {HTMLElement} tableContainer - Table container element + * @param {HTMLElement} errorContainer - Error display element + * @param {string} message - Error message + */ +function showTabularError(tableContainer, errorContainer, message) { + tableContainer.innerHTML = '
'; + errorContainer.textContent = message + ' You can still download the file below.'; + errorContainer.classList.remove('d-none'); +} + /** * Convert timestamp string to seconds * @param {string|number} timestamp - Timestamp in various formats @@ -445,3 +563,36 @@ function createPdfModal() { document.body.appendChild(modal); return modal; } + +/** + * Create tabular file preview modal HTML structure + * @returns {HTMLElement} - Modal element + */ +function createTabularModal() { + const modal = document.createElement("div"); + modal.id = "enhanced-tabular-modal"; + modal.classList.add("modal", "fade"); + modal.tabIndex = -1; + modal.innerHTML = ` + + `; + document.body.appendChild(modal); + return modal; +} diff --git a/application/single_app/static/js/chat/chat-input-actions.js b/application/single_app/static/js/chat/chat-input-actions.js index 77851319..c0c7832b 100644 --- a/application/single_app/static/js/chat/chat-input-actions.js +++ b/application/single_app/static/js/chat/chat-input-actions.js @@ -127,11 +127,11 @@ export function fetchFileContent(conversationId, fileId) { hideLoadingIndicator(); if (data.file_content && data.filename) { - showFileContentPopup(data.file_content, data.filename, data.is_table); + showFileContentPopup(data.file_content, data.filename, data.is_table, data.file_content_source, conversationId, fileId); } else if (data.error) { showToast(data.error, "danger"); } else { - ashowToastlert("Unexpected response from server.", "danger"); + showToast("Unexpected response from server.", "danger"); } }) .catch((error) => { @@ -141,7 +141,7 @@ export function fetchFileContent(conversationId, fileId) { }); } -export function showFileContentPopup(fileContent, filename, isTable) { +export function showFileContentPopup(fileContent, filename, isTable, fileContentSource, conversationId, fileId) { let modalContainer = document.getElementById("file-modal"); if (!modalContainer) { modalContainer = document.createElement("div"); @@ -155,6 +155,7 @@ export function showFileContentPopup(fileContent, filename, isTable) { diff --git a/application/single_app/tmp2xmw9c75.xlsx b/application/single_app/tmp2xmw9c75.xlsx new file mode 100644 index 00000000..e2603075 Binary files /dev/null and b/application/single_app/tmp2xmw9c75.xlsx differ diff --git a/docs/explanation/release_notes.md b/docs/explanation/release_notes.md index 8077ff3f..07b23e6d 100644 --- a/docs/explanation/release_notes.md +++ b/docs/explanation/release_notes.md @@ -2,6 +2,41 @@ # Feature Release +### **(v0.239.007)** + +#### New Features + +* **Tabular Data Analysis — SK Mini-Agent for Normal Chat** + * Tabular files (CSV, XLSX, XLS, XLSM) detected in search results now trigger a lightweight Semantic Kernel mini-agent that pre-computes data analysis before the main LLM response. This brings the same analytical depth previously only available in full agent mode to every normal chat conversation. + * **Automatic Detection**: When AI Search results include tabular files from any workspace (personal, group, or public) or chat-uploaded documents, the system automatically identifies them via the `TABULAR_EXTENSIONS` configuration and routes the query through the SK mini-agent pipeline. + * **Unified Workspace and Chat Handling**: Tabular files are processed identically regardless of their storage location. The plugin resolves blob paths across all four container types (`user-documents`, `group-documents`, `public-documents`, `personal-chat`) with automatic fallback resolution if the primary source lookup fails. A user asking about an Excel file in their personal workspace gets the same analytical treatment as one asking about a CSV uploaded directly to a chat. + * **Six Data Analysis Functions**: The `TabularProcessingPlugin` exposes `describe_tabular_file`, `aggregate_column` (sum, mean, count, min, max, median, std, nunique, value_counts), `filter_rows` (==, !=, >, <, >=, <=, contains, startswith, endswith), `query_tabular_data` (pandas query syntax), `group_by_aggregate`, and `list_tabular_files` — all registered as Semantic Kernel functions that the mini-agent orchestrates autonomously. + * **Pre-Computed Results Injected as Context**: The mini-agent's computed analysis (exact numerical results, aggregations, filtered data) is injected into the main LLM's system context so it can present accurate, citation-backed answers without hallucinating numbers. + * **Graceful Degradation**: If the mini-agent analysis fails for any reason, the system falls back to instructing the main LLM to use the tabular processing plugin functions directly, preserving full functionality. + * **Non-Streaming and Streaming Support**: Both chat modes are supported. The mini-agent runs synchronously before the main LLM call in both paths. + * **Requires Enhanced Citations**: The tabular processing plugin depends on the blob storage client initialized by the enhanced citations system. The `enable_enhanced_citations` admin setting must be enabled for tabular data analysis to activate. + * **Files Modified**: `route_backend_chats.py`, `semantic_kernel_plugins/tabular_processing_plugin.py`, `config.py`. + * (Ref: `run_tabular_sk_analysis()`, `TabularProcessingPlugin`, `collect_tabular_sk_citations()`, `TABULAR_EXTENSIONS`) + +* **Tabular Tool Execution Citations** + * Every tool call made by the SK mini-agent during tabular analysis is captured and surfaced as an agent citation, providing full transparency into the data analysis pipeline. + * **Automatic Capture**: The existing `@plugin_function_logger` decorator on all `TabularProcessingPlugin` functions records each invocation including function name, input parameters, returned results, execution duration, and success/failure status. + * **Citation Format**: Tool execution citations appear in the same "Agent Tool Execution" modal used by full agent mode, showing `tool_name` (e.g., `TabularProcessingPlugin.aggregate_column`), `function_arguments` (the exact parameters passed), and `function_result` (the computed data returned). + * **End-to-End Auditability**: Users can verify exactly which aggregations, filters, or queries were run against their data, what parameters were used, and what raw results were returned — before the LLM summarized them into the final response. + * **Files Modified**: `route_backend_chats.py`. + * (Ref: `collect_tabular_sk_citations()`, `plugin_invocation_logger.py`) + +* **SK Mini-Agent Performance Optimization** + * Reduced typical tabular analysis time from ~74 seconds to an estimated ~30-33 seconds (55-60% reduction) through three complementary optimizations. + * **DataFrame Caching**: Per-request in-memory cache eliminates redundant blob downloads. Previously, each of the ~8 tool calls in a typical analysis downloaded and parsed the same file independently. Now the file is downloaded once and subsequent calls read from cache. Cache is automatically scoped to the request (new plugin instance per analysis) and garbage-collected afterward. + * **Pre-Dispatch Schema Injection**: File schemas (columns, data types, row counts, and a 3-row preview) are pre-loaded and injected into the SK mini-agent's system prompt before execution begins. This eliminates 2 LLM round-trips that were previously spent on file discovery (`list_tabular_files`) and schema inspection (`describe_tabular_file`), allowing the model to jump directly to analysis tool calls. + * **Async Plugin Functions**: All six `@kernel_function` methods converted to `async def` using `asyncio.to_thread()`. This enables Semantic Kernel's built-in `asyncio.gather()` to truly parallelize batched tool calls (e.g., 3 simultaneous `aggregate_column` calls) instead of executing them serially on the event loop. + * **Batching Instructions**: The system prompt now instructs the model to batch multiple independent function calls in a single response, reducing LLM round-trips further. + * **Files Modified**: `tabular_processing_plugin.py`, `route_backend_chats.py`, `config.py`. + * (Ref: `_df_cache`, `asyncio.to_thread`, pre-dispatch schema injection in `run_tabular_sk_analysis()`) + +--- + ### **(v0.239.001)** #### New Features