diff --git a/README.md b/README.md index 4ae706f..db6fca6 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ ## 0) 현재 완성도 빠른 진단 -현 시점 기준 기능 완성도(실사용 관점): **약 93%** +현 시점 기준 기능 완성도(실사용 관점): **약 94%** - 완료 - CSV 기초 요약(행/열/결측/숫자 통계) @@ -219,7 +219,7 @@ bitnet-analyze doctor --model bitnet:latest bitnet-analyze report sample.csv --question "핵심 요약" --out analysis_report.md # 8) 다중 CSV 통합 분석(JSON+MD+코드가이드) -bitnet-analyze multi-analyze a.csv b.csv c.csv --question "컬럼별 비율과 지역별 차이 분석" --out-json multi.json --out-report multi.md +bitnet-analyze multi-analyze a.csv b.csv c.csv --question "컬럼별 비율과 지역별 차이 분석" --group-column 시도명 --target-column 세차유형 --out-json multi.json --out-report multi.md ``` --- diff --git a/bitnet_tools/cli.py b/bitnet_tools/cli.py index dcd8a71..c14e299 100644 --- a/bitnet_tools/cli.py +++ b/bitnet_tools/cli.py @@ -58,6 +58,8 @@ def _build_parser() -> argparse.ArgumentParser: multi_parser = subparsers.add_parser("multi-analyze", help="Analyze multiple CSV files together") multi_parser.add_argument("csv", nargs="+", type=Path, help="Input CSV paths") multi_parser.add_argument("--question", required=True, help="Analysis question") + multi_parser.add_argument("--group-column", default=None, help="Optional group column for ratio table") + multi_parser.add_argument("--target-column", default=None, help="Optional target column for ratio table") multi_parser.add_argument( "--out-json", type=Path, @@ -110,7 +112,12 @@ def main(argv: list[str] | None = None) -> int: if args.command == "multi-analyze": - result = analyze_multiple_csv(args.csv, args.question) + result = analyze_multiple_csv( + args.csv, + args.question, + group_column=args.group_column, + target_column=args.target_column, + ) args.out_json.write_text(result_to_json(result), encoding="utf-8") args.out_report.write_text(build_multi_csv_markdown(result), encoding="utf-8") print(f"multi analysis json saved: {args.out_json}") diff --git a/bitnet_tools/multi_csv.py b/bitnet_tools/multi_csv.py index adb80f4..69c9467 100644 --- a/bitnet_tools/multi_csv.py +++ b/bitnet_tools/multi_csv.py @@ -2,14 +2,75 @@ import csv import json -from collections import Counter +import math +from collections import Counter, defaultdict from pathlib import Path from typing import Any from .analysis import _to_float, summarize_reader -def _profile_rows(rows: list[dict[str, str]], columns: list[str]) -> dict[str, Any]: +def _quantile(sorted_values: list[float], q: float) -> float: + if not sorted_values: + return 0.0 + if len(sorted_values) == 1: + return sorted_values[0] + pos = (len(sorted_values) - 1) * q + low = int(math.floor(pos)) + high = int(math.ceil(pos)) + if low == high: + return sorted_values[low] + weight = pos - low + return sorted_values[low] * (1 - weight) + sorted_values[high] * weight + + +def _outlier_ratio(values: list[float]) -> float: + if len(values) < 4: + return 0.0 + sorted_values = sorted(values) + q1 = _quantile(sorted_values, 0.25) + q3 = _quantile(sorted_values, 0.75) + iqr = q3 - q1 + if iqr == 0: + return 0.0 + low = q1 - 1.5 * iqr + high = q3 + 1.5 * iqr + outliers = sum(1 for v in sorted_values if v < low or v > high) + return round(outliers / len(sorted_values), 6) + + +def _group_ratio_table(rows: list[dict[str, str]], group_col: str, target_col: str) -> dict[str, Any]: + table: dict[str, Counter[str]] = defaultdict(Counter) + for row in rows: + g = (row.get(group_col) or "").strip() + t = (row.get(target_col) or "").strip() + if g and t: + table[g][t] += 1 + + ratio_table: dict[str, Any] = {} + for g, counter in table.items(): + total = sum(counter.values()) + ratio_table[g] = { + k: { + "count": v, + "ratio": round(v / total, 6) if total else 0.0, + } + for k, v in counter.items() + } + + return { + "group_column": group_col, + "target_column": target_col, + "groups": ratio_table, + } + + +def _profile_rows( + rows: list[dict[str, str]], + columns: list[str], + group_column: str | None = None, + target_column: str | None = None, +) -> dict[str, Any]: row_count = len(rows) missing = {c: 0 for c in columns} non_missing = {c: 0 for c in columns} @@ -19,6 +80,7 @@ def _profile_rows(rows: list[dict[str, str]], columns: list[str]) -> dict[str, A numeric_positive = {c: 0 for c in columns} numeric_zero = {c: 0 for c in columns} numeric_negative = {c: 0 for c in columns} + numeric_values: dict[str, list[float]] = {c: [] for c in columns} for row in rows: for col in columns: @@ -32,6 +94,7 @@ def _profile_rows(rows: list[dict[str, str]], columns: list[str]) -> dict[str, A num = _to_float(raw) if num is not None: + numeric_values[col].append(num) if num > 0: numeric_positive[col] += 1 elif num < 0: @@ -60,26 +123,61 @@ def _profile_rows(rows: list[dict[str, str]], columns: list[str]) -> dict[str, A "positive_ratio": round(numeric_positive[col] / numeric_total, 6), "zero_ratio": round(numeric_zero[col] / numeric_total, 6), "negative_ratio": round(numeric_negative[col] / numeric_total, 6), + "outlier_ratio": _outlier_ratio(numeric_values[col]), } + dominant_value_ratio = top_values[0]["ratio"] if top_values else 0.0 profiles[col] = { "missing_count": missing[col], "missing_ratio": round(missing[col] / row_count, 6) if row_count else 0.0, "non_missing_count": nn, "unique_count": len(uniques[col]), "unique_ratio": round(len(uniques[col]) / nn, 6) if nn else 0.0, + "dominant_value_ratio": dominant_value_ratio, "top_values": top_values, "numeric_distribution": numeric_distribution, "dtype": summary.dtypes[col], } + group_target_ratio: dict[str, Any] | None = None + if group_column and target_column and group_column in columns and target_column in columns: + group_target_ratio = _group_ratio_table(rows, group_column, target_column) + return { "summary": summary.to_dict(), "column_profiles": profiles, + "group_target_ratio": group_target_ratio, } -def analyze_multiple_csv(csv_paths: list[Path], question: str) -> dict[str, Any]: +def _schema_drift(files: list[dict[str, Any]], shared_columns: list[str]) -> dict[str, Any]: + drift: dict[str, Any] = {} + for col in shared_columns: + dtypes = [f["column_profiles"][col]["dtype"] for f in files if col in f["column_profiles"]] + missing_ratios = [f["column_profiles"][col]["missing_ratio"] for f in files if col in f["column_profiles"]] + dominant_ratios = [f["column_profiles"][col]["dominant_value_ratio"] for f in files if col in f["column_profiles"]] + + means = [] + for f in files: + stats = f["summary"]["numeric_stats"].get(col) + if stats: + means.append(stats["mean"]) + + drift[col] = { + "dtype_changed": len(set(dtypes)) > 1, + "missing_ratio_range": round(max(missing_ratios) - min(missing_ratios), 6) if missing_ratios else 0.0, + "dominant_value_ratio_range": round(max(dominant_ratios) - min(dominant_ratios), 6) if dominant_ratios else 0.0, + "mean_range": round(max(means) - min(means), 6) if means else 0.0, + } + return drift + + +def analyze_multiple_csv( + csv_paths: list[Path], + question: str, + group_column: str | None = None, + target_column: str | None = None, +) -> dict[str, Any]: if not csv_paths: raise ValueError("at least one CSV path is required") @@ -98,7 +196,7 @@ def analyze_multiple_csv(csv_paths: list[Path], question: str) -> dict[str, Any] columns = [str(c) for c in reader.fieldnames] rows = list(reader) - profiled = _profile_rows(rows, columns) + profiled = _profile_rows(rows, columns, group_column=group_column, target_column=target_column) total_rows += profiled["summary"]["row_count"] all_columns.append(set(columns)) @@ -108,6 +206,7 @@ def analyze_multiple_csv(csv_paths: list[Path], question: str) -> dict[str, Any] "question": question, "summary": profiled["summary"], "column_profiles": profiled["column_profiles"], + "group_target_ratio": profiled["group_target_ratio"], } ) @@ -121,13 +220,26 @@ def analyze_multiple_csv(csv_paths: list[Path], question: str) -> dict[str, Any] "shared_columns": shared_columns, "union_columns": union_columns, "files": files, - "code_guidance": build_code_guidance(shared_columns), + "schema_drift": _schema_drift(files, shared_columns), + "code_guidance": build_code_guidance(shared_columns, group_column, target_column), } -def build_code_guidance(shared_columns: list[str]) -> dict[str, str]: +def build_code_guidance( + shared_columns: list[str], + group_column: str | None = None, + target_column: str | None = None, +) -> dict[str, str]: join_key = shared_columns[0] if shared_columns else "공통키컬럼" + group_block = "" + if group_column and target_column: + group_block = ( + f"ratio_tbl = (merged.groupby('{group_column}')['{target_column}'].value_counts(normalize=True)" + ".rename('ratio').reset_index())\n" + "print('그룹-타깃 비율표:\n', ratio_tbl.head(20))\n\n" + ) + pandas_code = ( "import pandas as pd\n" "import matplotlib.pyplot as plt\n\n" @@ -145,15 +257,17 @@ def build_code_guidance(shared_columns: list[str]) -> dict[str, str]: " ratio = (merged[numeric_cols] > 0).mean().sort_values(ascending=False)\n" " print('양수 비율 상위:\n', ratio.head(10))\n" " ratio.head(10).plot(kind='bar', title='양수 비율 상위 10개 컬럼')\n" - " plt.tight_layout(); plt.show()\n" + " plt.tight_layout(); plt.show()\n\n" + f"{group_block}" ) return { "recommended_steps": ( "1) 공통 키 컬럼 확인 후 병합\n" "2) 컬럼별 결측/고유값/상위값 비율 확인\n" - "3) 수치형 컬럼 비율(양수/0/음수)과 분포 시각화\n" - "4) 지역/유형 컬럼과 수치형 컬럼 교차 집계로 인사이트 도출" + "3) 수치형 컬럼 비율(양수/0/음수), 이상치 비율, 분포 확인\n" + "4) 그룹 컬럼 기준 타깃 비율 분석(예: 시도명-세차유형)\n" + "5) 파일 간 스키마 변화/평균 변화 범위 확인" ), "pandas_example": pandas_code, } @@ -178,19 +292,29 @@ def build_multi_csv_markdown(result: dict[str, Any]) -> str: f"- 행 수: {file_info['summary']['row_count']}", f"- 열 수: {file_info['summary']['column_count']}", "", - "| 컬럼 | 타입 | 결측비율 | 고유비율 |", - "|---|---|---:|---:|", + "| 컬럼 | 타입 | 결측비율 | 고유비율 | 대표값비율 |", + "|---|---|---:|---:|---:|", ] ) for col in file_info["summary"]["columns"]: prof = file_info["column_profiles"][col] lines.append( - f"| {col} | {prof['dtype']} | {prof['missing_ratio']:.4f} | {prof['unique_ratio']:.4f} |" + f"| {col} | {prof['dtype']} | {prof['missing_ratio']:.4f} | {prof['unique_ratio']:.4f} | {prof['dominant_value_ratio']:.4f} |" ) + if file_info.get("group_target_ratio"): + gtr = file_info["group_target_ratio"] + lines.extend(["", f"- 그룹비율: {gtr['group_column']} x {gtr['target_column']}"]) lines.append("") + lines.extend(["## 파일 간 스키마/분포 변화", "", "| 컬럼 | 타입변화 | 결측비율범위 | 대표값비율범위 | 평균범위 |", "|---|---|---:|---:|---:|"]) + for col, drift in result["schema_drift"].items(): + lines.append( + f"| {col} | {drift['dtype_changed']} | {drift['missing_ratio_range']:.4f} | {drift['dominant_value_ratio_range']:.4f} | {drift['mean_range']:.4f} |" + ) + lines.extend( [ + "", "## 코드 가이드", "", "```text", diff --git a/tests/test_analysis.py b/tests/test_analysis.py index cc2efdb..41de94b 100644 --- a/tests/test_analysis.py +++ b/tests/test_analysis.py @@ -75,3 +75,17 @@ def test_multi_csv_report_builder(tmp_path): assert result["file_count"] == 2 assert "city" in result["shared_columns"] assert "다중 CSV 분석 리포트" in report + + +def test_multi_csv_schema_drift_and_group_ratio(tmp_path): + p1 = tmp_path / "a.csv" + p2 = tmp_path / "b.csv" + p1.write_text("city,type,val\nseoul,A,1\nseoul,B,2\n", encoding="utf-8") + p2.write_text("city,type,val\nseoul,A,100\nbusan,A,200\n", encoding="utf-8") + + result = analyze_multiple_csv([p1, p2], "드리프트", group_column="city", target_column="type") + + assert "schema_drift" in result + assert "val" in result["schema_drift"] + assert result["schema_drift"]["val"]["mean_range"] > 0 + assert result["files"][0]["group_target_ratio"] is not None diff --git a/tests/test_cli.py b/tests/test_cli.py index 7ecded8..ea1caeb 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -81,3 +81,33 @@ def test_cli_multi_analyze_mode(tmp_path): assert out_json.exists() assert out_md.exists() assert "다중 CSV 분석 리포트" in out_md.read_text(encoding="utf-8") + + +def test_cli_multi_analyze_with_group_target(tmp_path): + p1 = tmp_path / "a.csv" + p2 = tmp_path / "b.csv" + out_json = tmp_path / "out2.json" + out_md = tmp_path / "out2.md" + + p1.write_text("city,type,val\nseoul,A,1\nseoul,B,2\n", encoding="utf-8") + p2.write_text("city,type,val\nseoul,A,10\nbusan,A,20\n", encoding="utf-8") + + code = cli.main([ + "multi-analyze", + str(p1), + str(p2), + "--question", + "그룹비율", + "--group-column", + "city", + "--target-column", + "type", + "--out-json", + str(out_json), + "--out-report", + str(out_md), + ]) + + assert code == 0 + body = out_json.read_text(encoding="utf-8") + assert "group_target_ratio" in body