diff --git a/bitnet_tools/ui/app.js b/bitnet_tools/ui/app.js index f1cc1ac..c367aee 100644 --- a/bitnet_tools/ui/app.js +++ b/bitnet_tools/ui/app.js @@ -33,6 +33,8 @@ const UI = { startChartsJobBtn: document.getElementById('startChartsJobBtn'), retryChartsJobBtn: document.getElementById('retryChartsJobBtn'), chartsJobStatus: document.getElementById('chartsJobStatus'), + preprocessStatus: document.getElementById('preprocessStatus'), + retryPreprocessBtn: document.getElementById('retryPreprocessBtn'), dashboardJson: document.getElementById('dashboardJson'), dashboardCards: document.getElementById('dashboardCards'), dashboardInsights: document.getElementById('dashboardInsights'), @@ -73,6 +75,7 @@ const appState = { latestMultiResult: null, structuredInsights: [], chartJob: { id: null, files: [], status: 'idle', pollTimer: null }, + preprocessJob: { id: null, status: 'idle', pollTimer: null, payload: null }, uploadedFile: null, detectedInputType: 'csv', candidateTables: [], @@ -646,6 +649,87 @@ function setChartsJobStatusText(text) { if (UI.chartsJobStatus) UI.chartsJobStatus.textContent = text; } +function setPreprocessStatusText(text) { + if (UI.preprocessStatus) UI.preprocessStatus.textContent = text; +} + +function stopPreprocessPolling() { + if (appState.preprocessJob.pollTimer) { + clearInterval(appState.preprocessJob.pollTimer); + appState.preprocessJob.pollTimer = null; + } +} + +function explainFailureReason(reason) { + if (reason === 'file_corruption') return '파일 손상'; + if (reason === 'memory_limit') return '메모리 제한'; + return '파서 오류'; +} + +async function runAnalyzeFromPreprocessed(result, fallbackQuestion = '') { + const body = { + input_type: result.input_type || 'csv', + source_name: result.source_name || '', + normalized_csv_text: result.normalized_csv_text || '', + meta: result.meta || {}, + question: result.question || fallbackQuestion || UI.question?.value || '', + }; + const data = await postJson('/api/analyze', body, '분석'); + appState.latestPrompt = data.prompt; + UI.summary.textContent = JSON.stringify(data.summary, null, 2); + renderAnalyzeAssist(data); + if (UI.prompt) UI.prompt.textContent = data.prompt; + if (UI.answer) UI.answer.textContent = ''; + setStatus(STATUS.analyzeDone); +} + +async function pollPreprocessJobOnce() { + if (!appState.preprocessJob.id) return; + try { + const result = await getJson(`/api/preprocess/jobs/${appState.preprocessJob.id}`, '입력 전처리 조회'); + appState.preprocessJob.status = result.status; + setPreprocessStatusText(`job=${result.job_id} status=${result.status}`); + + if (result.status === 'done') { + stopPreprocessPolling(); + if (UI.retryPreprocessBtn) UI.retryPreprocessBtn.disabled = true; + setStatus('입력 전처리 완료, 분석을 이어서 실행합니다.'); + await runAnalyzeFromPreprocessed(result, appState.preprocessJob.payload?.question || ''); + } else if (result.status === 'failed') { + stopPreprocessPolling(); + if (UI.retryPreprocessBtn) UI.retryPreprocessBtn.disabled = false; + const reason = explainFailureReason(result.failure_reason || 'parser_error'); + showError(`입력 전처리가 실패했습니다. (${reason})`, result.error || 'unknown'); + setPreprocessStatusText(`job=${result.job_id} status=failed reason=${reason}`); + setStatus('입력 전처리 실패'); + } + } catch (err) { + stopPreprocessPolling(); + if (UI.retryPreprocessBtn) UI.retryPreprocessBtn.disabled = false; + showError(err.userMessage || '입력 전처리 상태 조회 실패', err.detail || ''); + setStatus('입력 전처리 상태 조회 실패'); + } +} + +function startPreprocessPolling() { + stopPreprocessPolling(); + appState.preprocessJob.pollTimer = setInterval(() => { + pollPreprocessJobOnce(); + }, 1200); +} + +async function startPreprocessAndAnalyze(payload) { + appState.preprocessJob.payload = payload; + const queued = await postJson('/api/preprocess/jobs', payload, '입력 전처리 생성'); + appState.preprocessJob.id = queued.job_id; + appState.preprocessJob.status = queued.status; + if (UI.retryPreprocessBtn) UI.retryPreprocessBtn.disabled = true; + setPreprocessStatusText(`job=${queued.job_id} status=${queued.status}`); + setStatus('입력 전처리 큐 등록 완료'); + await pollPreprocessJobOnce(); + startPreprocessPolling(); +} + function stopChartPolling() { if (appState.chartJob.pollTimer) { clearInterval(appState.chartJob.pollTimer); @@ -752,13 +836,7 @@ async function runAnalyze() { toggleBusy(true); try { const body = await buildAnalyzeRequest(); - const data = await postJson('/api/analyze', body, '분석'); - appState.latestPrompt = data.prompt; - UI.summary.textContent = JSON.stringify(data.summary, null, 2); - renderAnalyzeAssist(data); - if (UI.prompt) UI.prompt.textContent = data.prompt; - if (UI.answer) UI.answer.textContent = ''; - setStatus(STATUS.analyzeDone); + await startPreprocessAndAnalyze(body); } catch (err) { UI.summary.textContent = err.userMessage || '오류'; showError(err.userMessage || '분석 실패', err.detail || ''); @@ -942,6 +1020,22 @@ function bindEvents() { UI.multiAnalyzeBtn?.addEventListener('click', runMultiAnalyze); UI.startChartsJobBtn?.addEventListener('click', startChartsJob); UI.retryChartsJobBtn?.addEventListener('click', retryChartsJob); + UI.retryPreprocessBtn?.addEventListener('click', async () => { + if (!appState.preprocessJob.payload) { + showError('재시도할 입력 전처리 작업이 없습니다.', 'preprocessJob.payload is empty'); + return; + } + clearError(); + try { + toggleBusy(true); + await startPreprocessAndAnalyze(appState.preprocessJob.payload); + } catch (err) { + showError(err.userMessage || '입력 전처리 재시도 실패', err.detail || ''); + setStatus('입력 전처리 재시도 실패'); + } finally { + toggleBusy(false); + } + }); UI.renderDashboardBtn?.addEventListener('click', () => { clearError(); @@ -993,7 +1087,9 @@ function init() { if (UI.filterFile) UI.filterFile.innerHTML = ''; if (UI.filterType) UI.filterType.innerHTML = ''; if (UI.retryChartsJobBtn) UI.retryChartsJobBtn.disabled = true; + if (UI.retryPreprocessBtn) UI.retryPreprocessBtn.disabled = true; setChartsJobStatusText('차트 작업 대기 중'); + setPreprocessStatusText('입력 전처리 대기 중'); } init(); diff --git a/bitnet_tools/ui/index.html b/bitnet_tools/ui/index.html index 59113ec..7ed2212 100644 --- a/bitnet_tools/ui/index.html +++ b/bitnet_tools/ui/index.html @@ -72,6 +72,10 @@

고급: 모델 실행

3) 실행 상태

대기 중
+
+ +
+
입력 전처리 대기 중
상세 오류 보기 diff --git a/bitnet_tools/web.py b/bitnet_tools/web.py index d97acb9..ed81905 100644 --- a/bitnet_tools/web.py +++ b/bitnet_tools/web.py @@ -5,6 +5,7 @@ from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer import base64 import csv +from datetime import datetime, timedelta, timezone import io import json from pathlib import Path @@ -33,6 +34,12 @@ _CHART_JOBS: dict[str, Future] = {} _CHART_LOCK = threading.Lock() +PREPROCESS_JOB_DIR = Path('.bitnet_cache') / 'preprocess_jobs' +PREPROCESS_JOB_TTL_SECONDS = 60 * 60 +_PREPROCESS_EXECUTOR = ThreadPoolExecutor(max_workers=2) +_PREPROCESS_JOBS: dict[str, dict[str, Any]] = {} +_PREPROCESS_LOCK = threading.Lock() + @@ -197,6 +204,137 @@ def _extract_sheet_names(file_base64: str) -> list[str]: zf, _ = _load_xlsx_from_base64(file_base64) return [name for name, _ in _get_xlsx_sheet_entries(zf)] + +def _classify_preprocess_error(exc: Exception) -> str: + msg = str(exc).lower() + if any(token in msg for token in ['memory', '메모리', 'out of memory', 'oom']): + return 'memory_limit' + if any(token in msg for token in ['base64', 'zip', 'corrupt', '손상', 'broken', 'unsupported excel format']): + return 'file_corruption' + return 'parser_error' + + +def _cleanup_expired_preprocess_jobs() -> None: + now = datetime.now(timezone.utc) + threshold = now - timedelta(seconds=PREPROCESS_JOB_TTL_SECONDS) + + with _PREPROCESS_LOCK: + expired = [ + job_id + for job_id, rec in _PREPROCESS_JOBS.items() + if datetime.fromisoformat(rec.get('expire_at', now.isoformat())) <= now + ] + for job_id in expired: + _PREPROCESS_JOBS.pop(job_id, None) + + if PREPROCESS_JOB_DIR.exists(): + for path in PREPROCESS_JOB_DIR.iterdir(): + if not path.is_dir(): + continue + mtime = datetime.fromtimestamp(path.stat().st_mtime, tz=timezone.utc) + if mtime <= threshold: + for child in path.glob('**/*'): + if child.is_file(): + child.unlink(missing_ok=True) + for child_dir in sorted(path.glob('**/*'), reverse=True): + if child_dir.is_dir(): + child_dir.rmdir() + path.rmdir() + + +def _run_preprocess_job(job_id: str, request_payload: dict[str, Any]) -> dict[str, Any]: + PREPROCESS_JOB_DIR.mkdir(parents=True, exist_ok=True) + job_dir = PREPROCESS_JOB_DIR / job_id + job_dir.mkdir(parents=True, exist_ok=True) + + input_type = str(request_payload.get('input_type', 'csv') or 'csv').strip().lower() + question = str(request_payload.get('question', '') or '').strip() + source_name, normalized_csv_text, meta = _coerce_csv_text_from_file_payload(request_payload) + + artifact_csv = job_dir / 'normalized.csv' + artifact_meta = job_dir / 'meta.json' + artifact_csv.write_text(normalized_csv_text, encoding='utf-8') + artifact_meta.write_text( + json.dumps({'source_name': source_name, 'input_type': input_type, 'meta': meta}, ensure_ascii=False, indent=2), + encoding='utf-8', + ) + + return { + 'job_id': job_id, + 'status': 'done', + 'question': question, + 'source_name': source_name, + 'input_type': input_type, + 'normalized_csv_text': normalized_csv_text, + 'meta': meta, + 'artifacts': { + 'job_dir': str(job_dir), + 'normalized_csv': str(artifact_csv), + 'meta_json': str(artifact_meta), + }, + } + + +def _preprocess_job_worker(job_id: str, request_payload: dict[str, Any]) -> None: + with _PREPROCESS_LOCK: + rec = _PREPROCESS_JOBS.get(job_id) + if rec is not None: + rec['status'] = 'running' + rec['started_at'] = datetime.now(timezone.utc).isoformat() + try: + result = _run_preprocess_job(job_id, request_payload) + with _PREPROCESS_LOCK: + rec = _PREPROCESS_JOBS.get(job_id) + if rec is not None: + rec['status'] = 'done' + rec['result'] = result + rec['finished_at'] = datetime.now(timezone.utc).isoformat() + except Exception as exc: + with _PREPROCESS_LOCK: + rec = _PREPROCESS_JOBS.get(job_id) + if rec is not None: + rec['status'] = 'failed' + rec['error'] = str(exc) + rec['failure_reason'] = _classify_preprocess_error(exc) + rec['finished_at'] = datetime.now(timezone.utc).isoformat() + + +def submit_preprocess_job(payload: dict[str, Any]) -> str: + if not isinstance(payload, dict): + raise ValueError('payload is required') + _cleanup_expired_preprocess_jobs() + job_id = uuid.uuid4().hex + now = datetime.now(timezone.utc) + with _PREPROCESS_LOCK: + _PREPROCESS_JOBS[job_id] = { + 'job_id': job_id, + 'status': 'queued', + 'created_at': now.isoformat(), + 'expire_at': (now + timedelta(seconds=PREPROCESS_JOB_TTL_SECONDS)).isoformat(), + 'result': None, + } + _PREPROCESS_EXECUTOR.submit(_preprocess_job_worker, job_id, payload) + return job_id + + +def get_preprocess_job(job_id: str) -> dict[str, Any]: + _cleanup_expired_preprocess_jobs() + with _PREPROCESS_LOCK: + rec = _PREPROCESS_JOBS.get(job_id) + if rec is None: + return {'job_id': job_id, 'status': 'not_found'} + status = rec.get('status', 'queued') + if status == 'done' and isinstance(rec.get('result'), dict): + return rec['result'] + if status == 'failed': + return { + 'job_id': job_id, + 'status': 'failed', + 'error': rec.get('error', 'unknown error'), + 'failure_reason': rec.get('failure_reason', 'parser_error'), + } + return {'job_id': job_id, 'status': status} + def _run_chart_job(job_id: str, files: list[dict[str, str]]) -> dict[str, Any]: CHART_JOB_DIR.mkdir(parents=True, exist_ok=True) job_input_dir = CHART_JOB_DIR / f"{job_id}_input" @@ -311,6 +449,11 @@ def do_GET(self) -> None: if not job_id: return self._send_json(self._error_payload('job id is required'), HTTPStatus.BAD_REQUEST) return self._send_json(get_chart_job(job_id)) + if route.startswith('/api/preprocess/jobs/'): + job_id = route.split('/')[-1].strip() + if not job_id: + return self._send_json(self._error_payload('job id is required'), HTTPStatus.BAD_REQUEST) + return self._send_json(get_preprocess_job(job_id)) self.send_error(HTTPStatus.NOT_FOUND) def do_POST(self) -> None: @@ -404,6 +547,10 @@ def do_POST(self) -> None: ) return self._send_json(result) + if route == '/api/preprocess/jobs': + job_id = submit_preprocess_job(payload) + return self._send_json({'job_id': job_id, 'status': 'queued'}, HTTPStatus.ACCEPTED) + if route == "/api/multi-analyze": files = payload.get("files", []) diff --git a/tests/test_web.py b/tests/test_web.py index 6eb1f45..b2f3192 100644 --- a/tests/test_web.py +++ b/tests/test_web.py @@ -40,6 +40,48 @@ def test_get_chart_job_not_found(): assert result["status"] == "not_found" +def test_submit_and_get_preprocess_job_done(monkeypatch, tmp_path): + monkeypatch.setattr(web, "PREPROCESS_JOB_DIR", tmp_path / "prep") + + job_id = web.submit_preprocess_job({ + "input_type": "csv", + "name": "sample.csv", + "normalized_csv_text": "a,b\n1,2\n", + "question": "요약", + }) + + result = web.get_preprocess_job(job_id) + for _ in range(30): + if result["status"] != "queued" and result["status"] != "running": + break + time.sleep(0.01) + result = web.get_preprocess_job(job_id) + + assert result["status"] == "done" + assert result["input_type"] == "csv" + assert "normalized_csv" in result["artifacts"] + + +def test_preprocess_job_failed_reason(monkeypatch, tmp_path): + monkeypatch.setattr(web, "PREPROCESS_JOB_DIR", tmp_path / "prep") + + def broken(payload): + raise ValueError("memory allocation failed") + + monkeypatch.setattr(web, "_run_preprocess_job", lambda *_args, **_kwargs: broken(None)) + job_id = web.submit_preprocess_job({"input_type": "csv", "normalized_csv_text": "x\n1\n"}) + + result = web.get_preprocess_job(job_id) + for _ in range(30): + if result["status"] != "queued" and result["status"] != "running": + break + time.sleep(0.01) + result = web.get_preprocess_job(job_id) + + assert result["status"] == "failed" + assert result["failure_reason"] == "memory_limit" + + def _make_docx_b64() -> str: xml = """