diff --git a/bitnet_tools/web.py b/bitnet_tools/web.py
index 23d70b0..9f7c5f0 100644
--- a/bitnet_tools/web.py
+++ b/bitnet_tools/web.py
@@ -3,6 +3,9 @@
from http import HTTPStatus
from concurrent.futures import Future, ThreadPoolExecutor
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
+import base64
+import csv
+import io
import json
from pathlib import Path
import subprocess
@@ -10,6 +13,10 @@
import threading
import uuid
from typing import Any
+
+import re
+import xml.etree.ElementTree as ET
+import zipfile
from urllib.parse import urlparse
from .analysis import build_analysis_payload_from_request
@@ -26,6 +33,158 @@
_CHART_LOCK = threading.Lock()
+
+
+def _coerce_csv_text_from_file_payload(file_payload: dict[str, Any]) -> tuple[str, str, dict[str, Any]]:
+ input_type = str(file_payload.get('input_type', 'csv') or 'csv').strip().lower()
+ source_name = str(file_payload.get('name', ''))
+ meta: dict[str, Any] = {'source_name': source_name, 'input_type': input_type}
+
+ if input_type == 'excel':
+ raw_b64 = str(file_payload.get('file_base64', '')).strip()
+ if not raw_b64:
+ raise ValueError('excel file_base64 is required')
+ sheet_name = str(file_payload.get('sheet_name', '')).strip() or None
+ normalized_text = _normalize_excel_base64_to_csv_text(raw_b64, sheet_name)
+ meta['sheet_name'] = sheet_name or ''
+ return source_name, normalized_text, meta
+
+ normalized = str(file_payload.get('normalized_csv_text', '')).strip()
+ if not normalized:
+ normalized = str(file_payload.get('csv_text', '')).strip()
+ if not normalized:
+ raise ValueError('normalized_csv_text is required')
+ return source_name, normalized, meta
+
+
+def _xlsx_col_to_index(cell_ref: str) -> int:
+ letters = ''.join(ch for ch in cell_ref if ch.isalpha()).upper()
+ idx = 0
+ for ch in letters:
+ idx = idx * 26 + (ord(ch) - ord('A') + 1)
+ return max(idx - 1, 0)
+
+
+def _load_xlsx_from_base64(file_base64: str) -> tuple[zipfile.ZipFile, str]:
+ try:
+ raw = base64.b64decode(file_base64)
+ except Exception as exc:
+ raise ValueError(f'invalid excel base64: {exc}') from exc
+
+ try:
+ zf = zipfile.ZipFile(io.BytesIO(raw))
+ except Exception as exc:
+ raise ValueError(f'failed to read excel file: {exc}') from exc
+
+ if 'xl/workbook.xml' not in zf.namelist():
+ raise ValueError('지원하지 않는 Excel 형식입니다. .xlsx 파일을 사용하세요. | detail: only xlsx(OOXML) is supported')
+ return zf, 'xl/workbook.xml'
+
+
+def _get_xlsx_sheet_entries(zf: zipfile.ZipFile) -> list[tuple[str, str]]:
+ ns = {'x': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main', 'r': 'http://schemas.openxmlformats.org/officeDocument/2006/relationships'}
+ wb_root = ET.fromstring(zf.read('xl/workbook.xml'))
+ rel_root = ET.fromstring(zf.read('xl/_rels/workbook.xml.rels'))
+ rel_map: dict[str, str] = {}
+ for rel in rel_root.findall('{http://schemas.openxmlformats.org/package/2006/relationships}Relationship'):
+ rel_map[rel.attrib.get('Id', '')] = rel.attrib.get('Target', '')
+
+ sheets: list[tuple[str, str]] = []
+ for sheet in wb_root.findall('x:sheets/x:sheet', ns):
+ name = sheet.attrib.get('name', '')
+ rid = sheet.attrib.get('{http://schemas.openxmlformats.org/officeDocument/2006/relationships}id', '')
+ target = rel_map.get(rid, '')
+ if target and not target.startswith('xl/'):
+ target = f"xl/{target.lstrip('/')}"
+ if name and target:
+ sheets.append((name, target))
+ return sheets
+
+
+def _get_xlsx_shared_strings(zf: zipfile.ZipFile) -> list[str]:
+ if 'xl/sharedStrings.xml' not in zf.namelist():
+ return []
+ root = ET.fromstring(zf.read('xl/sharedStrings.xml'))
+ values: list[str] = []
+ for si in root.findall('{http://schemas.openxmlformats.org/spreadsheetml/2006/main}si'):
+ text = ''.join(t.text or '' for t in si.iter('{http://schemas.openxmlformats.org/spreadsheetml/2006/main}t'))
+ values.append(text)
+ return values
+
+
+def _read_xlsx_sheet_rows(zf: zipfile.ZipFile, sheet_path: str, shared_strings: list[str]) -> list[list[str]]:
+ ns = {'x': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'}
+ root = ET.fromstring(zf.read(sheet_path))
+ rows: list[list[str]] = []
+ for row in root.findall('x:sheetData/x:row', ns):
+ cells: list[str] = []
+ for cell in row.findall('x:c', ns):
+ ref = cell.attrib.get('r', '')
+ cell_idx = _xlsx_col_to_index(ref)
+ while len(cells) <= cell_idx:
+ cells.append('')
+ cell_type = cell.attrib.get('t', '')
+ value = ''
+ if cell_type == 'inlineStr':
+ value = ''.join(t.text or '' for t in cell.iter('{http://schemas.openxmlformats.org/spreadsheetml/2006/main}t'))
+ else:
+ v = cell.find('x:v', ns)
+ raw_v = v.text if v is not None and v.text is not None else ''
+ if cell_type == 's' and raw_v.isdigit():
+ idx = int(raw_v)
+ value = shared_strings[idx] if 0 <= idx < len(shared_strings) else ''
+ else:
+ value = raw_v
+ cells[cell_idx] = value
+ rows.append(cells)
+ return rows
+
+
+def _normalize_excel_base64_to_csv_text(file_base64: str, sheet_name: str | None = None) -> str:
+ zf, _ = _load_xlsx_from_base64(file_base64)
+ sheets = _get_xlsx_sheet_entries(zf)
+ if not sheets:
+ raise ValueError('시트가 비어 있습니다. 데이터를 포함한 시트를 선택하세요. | detail: workbook has no sheets')
+
+ target_sheet = sheets[0]
+ if sheet_name:
+ matches = [s for s in sheets if s[0] == sheet_name]
+ if not matches:
+ raise ValueError(f'sheet not found: {sheet_name}')
+ target_sheet = matches[0]
+
+ shared_strings = _get_xlsx_shared_strings(zf)
+ rows = _read_xlsx_sheet_rows(zf, target_sheet[1], shared_strings)
+ non_empty_rows = [r for r in rows if any(str(c).strip() for c in r)]
+ if not non_empty_rows:
+ raise ValueError('시트가 비어 있습니다. 데이터를 포함한 시트를 선택하세요. | detail: selected sheet has no non-empty rows')
+
+ header = non_empty_rows[0]
+ if not any(str(c).strip() for c in header):
+ raise ValueError('헤더를 확인해주세요. 첫 행에 컬럼명이 필요합니다. | detail: header row is empty')
+
+ seen: set[str] = set()
+ for idx, name in enumerate(header):
+ n = str(name).strip()
+ if not n:
+ raise ValueError(f'헤더를 확인해주세요. 빈 컬럼명이 있습니다. | detail: empty header at index {idx}')
+ if n in seen:
+ raise ValueError(f'헤더를 확인해주세요. 중복 컬럼명이 있습니다. | detail: duplicated header "{n}"')
+ seen.add(n)
+
+ output = io.StringIO()
+ writer = csv.writer(output)
+ max_len = max(len(r) for r in non_empty_rows)
+ for row in non_empty_rows:
+ padded = row + [''] * (max_len - len(row))
+ writer.writerow(padded)
+ return output.getvalue()
+
+
+def _extract_sheet_names(file_base64: str) -> list[str]:
+ zf, _ = _load_xlsx_from_base64(file_base64)
+ return [name for name, _ in _get_xlsx_sheet_entries(zf)]
+
def _run_chart_job(job_id: str, files: list[dict[str, str]]) -> dict[str, Any]:
CHART_JOB_DIR.mkdir(parents=True, exist_ok=True)
job_input_dir = CHART_JOB_DIR / f"{job_id}_input"
@@ -34,12 +193,10 @@ def _run_chart_job(job_id: str, files: list[dict[str, str]]) -> dict[str, Any]:
csv_paths: list[Path] = []
for i, item in enumerate(files):
- name = str(item.get('name', f'file_{i}.csv'))
- text = str(item.get('csv_text', ''))
+ source_name, text, _ = _coerce_csv_text_from_file_payload(item)
if not text.strip():
continue
- if not name.endswith('.csv'):
- name = f"{name}.csv"
+ name = source_name if source_name.endswith('.csv') else f"{source_name}.csv"
path = job_input_dir / name
path.write_text(text, encoding='utf-8')
csv_paths.append(path)
@@ -154,16 +311,37 @@ def do_POST(self) -> None:
return self._send_json(self._error_payload('invalid json'), HTTPStatus.BAD_REQUEST)
try:
+ if route == '/api/sheets':
+ input_type = str(payload.get('input_type', 'auto') or 'auto').strip().lower()
+ if input_type != 'excel':
+ return self._send_json({'sheet_names': []})
+ file_base64 = str(payload.get('file_base64', '')).strip()
+ if not file_base64:
+ return self._send_json(self._error_payload('excel file is required', 'file_base64 is empty', input_type='excel', preprocessing_stage='input_validation'), HTTPStatus.BAD_REQUEST)
+ sheet_names = _extract_sheet_names(file_base64)
+ return self._send_json({'sheet_names': sheet_names})
+
if route == "/api/analyze":
question = str(payload.get("question", "")).strip()
if not question:
question = "이 데이터의 핵심 인사이트를 알려줘"
+ input_type = str(payload.get("input_type", "csv") or "csv").strip().lower()
+ normalized_csv_text = str(payload.get("normalized_csv_text", "") or "")
+ source_name = str(payload.get("source_name", "") or "")
+ meta = payload.get("meta", {}) if isinstance(payload.get("meta", {}), dict) else {}
+ if input_type == "excel":
+ normalized_csv_text = _normalize_excel_base64_to_csv_text(
+ str(payload.get("file_base64", "") or ""),
+ str(payload.get("sheet_name", "") or "").strip() or None,
+ )
+ meta = {**meta, "sheet_name": str(payload.get("sheet_name", "") or "").strip() or ""}
+
request_payload = {
- "input_type": payload.get("input_type", "csv"),
- "source_name": payload.get("source_name", ""),
- "normalized_csv_text": payload.get("normalized_csv_text", ""),
- "meta": payload.get("meta", {}),
+ "input_type": input_type,
+ "source_name": source_name,
+ "normalized_csv_text": normalized_csv_text,
+ "meta": meta,
"csv_text": payload.get("csv_text", ""),
}
try:
@@ -198,18 +376,16 @@ def do_POST(self) -> None:
for i, f in enumerate(files):
if not isinstance(f, dict):
continue
- name = str(f.get("name", f"file_{i}.csv"))
- text = str(f.get("csv_text", ""))
+ name, text, _ = _coerce_csv_text_from_file_payload(f)
if not text.strip():
continue
- if not name.endswith('.csv'):
- name = f"{name}.csv"
- path = Path(td) / name
+ out_name = name if name.endswith('.csv') else f"{name}.csv"
+ path = Path(td) / out_name
path.write_text(text, encoding="utf-8")
tmp_paths.append(path)
if not tmp_paths:
- return self._send_json(self._error_payload('valid csv_text files are required'), HTTPStatus.BAD_REQUEST)
+ return self._send_json(self._error_payload('valid normalized_csv_text files are required'), HTTPStatus.BAD_REQUEST)
result = analyze_multiple_csv(
tmp_paths,