Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

## 0) 현재 완성도 빠른 진단

현 시점 기준 기능 완성도(실사용 관점): **약 93%**
현 시점 기준 기능 완성도(실사용 관점): **약 94%**

- 완료
- CSV 기초 요약(행/열/결측/숫자 통계)
Expand Down Expand Up @@ -219,7 +219,7 @@ bitnet-analyze doctor --model bitnet:latest
bitnet-analyze report sample.csv --question "핵심 요약" --out analysis_report.md

# 8) 다중 CSV 통합 분석(JSON+MD+코드가이드)
bitnet-analyze multi-analyze a.csv b.csv c.csv --question "컬럼별 비율과 지역별 차이 분석" --out-json multi.json --out-report multi.md
bitnet-analyze multi-analyze a.csv b.csv c.csv --question "컬럼별 비율과 지역별 차이 분석" --group-column 시도명 --target-column 세차유형 --out-json multi.json --out-report multi.md
```

---
Expand Down
9 changes: 8 additions & 1 deletion bitnet_tools/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def _build_parser() -> argparse.ArgumentParser:
multi_parser = subparsers.add_parser("multi-analyze", help="Analyze multiple CSV files together")
multi_parser.add_argument("csv", nargs="+", type=Path, help="Input CSV paths")
multi_parser.add_argument("--question", required=True, help="Analysis question")
multi_parser.add_argument("--group-column", default=None, help="Optional group column for ratio table")
multi_parser.add_argument("--target-column", default=None, help="Optional target column for ratio table")
multi_parser.add_argument(
"--out-json",
type=Path,
Expand Down Expand Up @@ -110,7 +112,12 @@ def main(argv: list[str] | None = None) -> int:


if args.command == "multi-analyze":
result = analyze_multiple_csv(args.csv, args.question)
result = analyze_multiple_csv(
args.csv,
args.question,
group_column=args.group_column,
target_column=args.target_column,
)
args.out_json.write_text(result_to_json(result), encoding="utf-8")
args.out_report.write_text(build_multi_csv_markdown(result), encoding="utf-8")
print(f"multi analysis json saved: {args.out_json}")
Expand Down
148 changes: 136 additions & 12 deletions bitnet_tools/multi_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,75 @@

import csv
import json
from collections import Counter
import math
from collections import Counter, defaultdict
from pathlib import Path
from typing import Any

from .analysis import _to_float, summarize_reader


def _profile_rows(rows: list[dict[str, str]], columns: list[str]) -> dict[str, Any]:
def _quantile(sorted_values: list[float], q: float) -> float:
if not sorted_values:
return 0.0
if len(sorted_values) == 1:
return sorted_values[0]
pos = (len(sorted_values) - 1) * q
low = int(math.floor(pos))
high = int(math.ceil(pos))
if low == high:
return sorted_values[low]
weight = pos - low
return sorted_values[low] * (1 - weight) + sorted_values[high] * weight


def _outlier_ratio(values: list[float]) -> float:
if len(values) < 4:
return 0.0
sorted_values = sorted(values)
q1 = _quantile(sorted_values, 0.25)
q3 = _quantile(sorted_values, 0.75)
iqr = q3 - q1
if iqr == 0:
return 0.0
low = q1 - 1.5 * iqr
high = q3 + 1.5 * iqr
outliers = sum(1 for v in sorted_values if v < low or v > high)
return round(outliers / len(sorted_values), 6)


def _group_ratio_table(rows: list[dict[str, str]], group_col: str, target_col: str) -> dict[str, Any]:
table: dict[str, Counter[str]] = defaultdict(Counter)
for row in rows:
g = (row.get(group_col) or "").strip()
t = (row.get(target_col) or "").strip()
if g and t:
table[g][t] += 1

ratio_table: dict[str, Any] = {}
for g, counter in table.items():
total = sum(counter.values())
ratio_table[g] = {
k: {
"count": v,
"ratio": round(v / total, 6) if total else 0.0,
}
for k, v in counter.items()
}

return {
"group_column": group_col,
"target_column": target_col,
"groups": ratio_table,
}


def _profile_rows(
rows: list[dict[str, str]],
columns: list[str],
group_column: str | None = None,
target_column: str | None = None,
) -> dict[str, Any]:
row_count = len(rows)
missing = {c: 0 for c in columns}
non_missing = {c: 0 for c in columns}
Expand All @@ -19,6 +80,7 @@ def _profile_rows(rows: list[dict[str, str]], columns: list[str]) -> dict[str, A
numeric_positive = {c: 0 for c in columns}
numeric_zero = {c: 0 for c in columns}
numeric_negative = {c: 0 for c in columns}
numeric_values: dict[str, list[float]] = {c: [] for c in columns}

for row in rows:
for col in columns:
Expand All @@ -32,6 +94,7 @@ def _profile_rows(rows: list[dict[str, str]], columns: list[str]) -> dict[str, A

num = _to_float(raw)
if num is not None:
numeric_values[col].append(num)
if num > 0:
numeric_positive[col] += 1
elif num < 0:
Expand Down Expand Up @@ -60,26 +123,61 @@ def _profile_rows(rows: list[dict[str, str]], columns: list[str]) -> dict[str, A
"positive_ratio": round(numeric_positive[col] / numeric_total, 6),
"zero_ratio": round(numeric_zero[col] / numeric_total, 6),
"negative_ratio": round(numeric_negative[col] / numeric_total, 6),
"outlier_ratio": _outlier_ratio(numeric_values[col]),
}

dominant_value_ratio = top_values[0]["ratio"] if top_values else 0.0
profiles[col] = {
"missing_count": missing[col],
"missing_ratio": round(missing[col] / row_count, 6) if row_count else 0.0,
"non_missing_count": nn,
"unique_count": len(uniques[col]),
"unique_ratio": round(len(uniques[col]) / nn, 6) if nn else 0.0,
"dominant_value_ratio": dominant_value_ratio,
"top_values": top_values,
"numeric_distribution": numeric_distribution,
"dtype": summary.dtypes[col],
}

group_target_ratio: dict[str, Any] | None = None
if group_column and target_column and group_column in columns and target_column in columns:
group_target_ratio = _group_ratio_table(rows, group_column, target_column)

return {
"summary": summary.to_dict(),
"column_profiles": profiles,
"group_target_ratio": group_target_ratio,
}


def analyze_multiple_csv(csv_paths: list[Path], question: str) -> dict[str, Any]:
def _schema_drift(files: list[dict[str, Any]], shared_columns: list[str]) -> dict[str, Any]:
drift: dict[str, Any] = {}
for col in shared_columns:
dtypes = [f["column_profiles"][col]["dtype"] for f in files if col in f["column_profiles"]]
missing_ratios = [f["column_profiles"][col]["missing_ratio"] for f in files if col in f["column_profiles"]]
dominant_ratios = [f["column_profiles"][col]["dominant_value_ratio"] for f in files if col in f["column_profiles"]]

means = []
for f in files:
stats = f["summary"]["numeric_stats"].get(col)
if stats:
means.append(stats["mean"])

drift[col] = {
"dtype_changed": len(set(dtypes)) > 1,
"missing_ratio_range": round(max(missing_ratios) - min(missing_ratios), 6) if missing_ratios else 0.0,
"dominant_value_ratio_range": round(max(dominant_ratios) - min(dominant_ratios), 6) if dominant_ratios else 0.0,
"mean_range": round(max(means) - min(means), 6) if means else 0.0,
}
return drift


def analyze_multiple_csv(
csv_paths: list[Path],
question: str,
group_column: str | None = None,
target_column: str | None = None,
) -> dict[str, Any]:
if not csv_paths:
raise ValueError("at least one CSV path is required")

Expand All @@ -98,7 +196,7 @@ def analyze_multiple_csv(csv_paths: list[Path], question: str) -> dict[str, Any]
columns = [str(c) for c in reader.fieldnames]
rows = list(reader)

profiled = _profile_rows(rows, columns)
profiled = _profile_rows(rows, columns, group_column=group_column, target_column=target_column)
total_rows += profiled["summary"]["row_count"]
all_columns.append(set(columns))

Expand All @@ -108,6 +206,7 @@ def analyze_multiple_csv(csv_paths: list[Path], question: str) -> dict[str, Any]
"question": question,
"summary": profiled["summary"],
"column_profiles": profiled["column_profiles"],
"group_target_ratio": profiled["group_target_ratio"],
}
)

Expand All @@ -121,13 +220,26 @@ def analyze_multiple_csv(csv_paths: list[Path], question: str) -> dict[str, Any]
"shared_columns": shared_columns,
"union_columns": union_columns,
"files": files,
"code_guidance": build_code_guidance(shared_columns),
"schema_drift": _schema_drift(files, shared_columns),
"code_guidance": build_code_guidance(shared_columns, group_column, target_column),
}


def build_code_guidance(shared_columns: list[str]) -> dict[str, str]:
def build_code_guidance(
shared_columns: list[str],
group_column: str | None = None,
target_column: str | None = None,
) -> dict[str, str]:
join_key = shared_columns[0] if shared_columns else "공통키컬럼"

group_block = ""
if group_column and target_column:
group_block = (
f"ratio_tbl = (merged.groupby('{group_column}')['{target_column}'].value_counts(normalize=True)"
".rename('ratio').reset_index())\n"
"print('그룹-타깃 비율표:\n', ratio_tbl.head(20))\n\n"
)

pandas_code = (
"import pandas as pd\n"
"import matplotlib.pyplot as plt\n\n"
Expand All @@ -145,15 +257,17 @@ def build_code_guidance(shared_columns: list[str]) -> dict[str, str]:
" ratio = (merged[numeric_cols] > 0).mean().sort_values(ascending=False)\n"
" print('양수 비율 상위:\n', ratio.head(10))\n"
" ratio.head(10).plot(kind='bar', title='양수 비율 상위 10개 컬럼')\n"
" plt.tight_layout(); plt.show()\n"
" plt.tight_layout(); plt.show()\n\n"
f"{group_block}"
)

return {
"recommended_steps": (
"1) 공통 키 컬럼 확인 후 병합\n"
"2) 컬럼별 결측/고유값/상위값 비율 확인\n"
"3) 수치형 컬럼 비율(양수/0/음수)과 분포 시각화\n"
"4) 지역/유형 컬럼과 수치형 컬럼 교차 집계로 인사이트 도출"
"3) 수치형 컬럼 비율(양수/0/음수), 이상치 비율, 분포 확인\n"
"4) 그룹 컬럼 기준 타깃 비율 분석(예: 시도명-세차유형)\n"
"5) 파일 간 스키마 변화/평균 변화 범위 확인"
),
"pandas_example": pandas_code,
}
Expand All @@ -178,19 +292,29 @@ def build_multi_csv_markdown(result: dict[str, Any]) -> str:
f"- 행 수: {file_info['summary']['row_count']}",
f"- 열 수: {file_info['summary']['column_count']}",
"",
"| 컬럼 | 타입 | 결측비율 | 고유비율 |",
"|---|---|---:|---:|",
"| 컬럼 | 타입 | 결측비율 | 고유비율 | 대표값비율 |",
"|---|---|---:|---:|---:|",
]
)
for col in file_info["summary"]["columns"]:
prof = file_info["column_profiles"][col]
lines.append(
f"| {col} | {prof['dtype']} | {prof['missing_ratio']:.4f} | {prof['unique_ratio']:.4f} |"
f"| {col} | {prof['dtype']} | {prof['missing_ratio']:.4f} | {prof['unique_ratio']:.4f} | {prof['dominant_value_ratio']:.4f} |"
)
if file_info.get("group_target_ratio"):
gtr = file_info["group_target_ratio"]
lines.extend(["", f"- 그룹비율: {gtr['group_column']} x {gtr['target_column']}"])
lines.append("")

lines.extend(["## 파일 간 스키마/분포 변화", "", "| 컬럼 | 타입변화 | 결측비율범위 | 대표값비율범위 | 평균범위 |", "|---|---|---:|---:|---:|"])
for col, drift in result["schema_drift"].items():
lines.append(
f"| {col} | {drift['dtype_changed']} | {drift['missing_ratio_range']:.4f} | {drift['dominant_value_ratio_range']:.4f} | {drift['mean_range']:.4f} |"
)

lines.extend(
[
"",
"## 코드 가이드",
"",
"```text",
Expand Down
14 changes: 14 additions & 0 deletions tests/test_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,17 @@ def test_multi_csv_report_builder(tmp_path):
assert result["file_count"] == 2
assert "city" in result["shared_columns"]
assert "다중 CSV 분석 리포트" in report


def test_multi_csv_schema_drift_and_group_ratio(tmp_path):
p1 = tmp_path / "a.csv"
p2 = tmp_path / "b.csv"
p1.write_text("city,type,val\nseoul,A,1\nseoul,B,2\n", encoding="utf-8")
p2.write_text("city,type,val\nseoul,A,100\nbusan,A,200\n", encoding="utf-8")

result = analyze_multiple_csv([p1, p2], "드리프트", group_column="city", target_column="type")

assert "schema_drift" in result
assert "val" in result["schema_drift"]
assert result["schema_drift"]["val"]["mean_range"] > 0
assert result["files"][0]["group_target_ratio"] is not None
30 changes: 30 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,33 @@ def test_cli_multi_analyze_mode(tmp_path):
assert out_json.exists()
assert out_md.exists()
assert "다중 CSV 분석 리포트" in out_md.read_text(encoding="utf-8")


def test_cli_multi_analyze_with_group_target(tmp_path):
p1 = tmp_path / "a.csv"
p2 = tmp_path / "b.csv"
out_json = tmp_path / "out2.json"
out_md = tmp_path / "out2.md"

p1.write_text("city,type,val\nseoul,A,1\nseoul,B,2\n", encoding="utf-8")
p2.write_text("city,type,val\nseoul,A,10\nbusan,A,20\n", encoding="utf-8")

code = cli.main([
"multi-analyze",
str(p1),
str(p2),
"--question",
"그룹비율",
"--group-column",
"city",
"--target-column",
"type",
"--out-json",
str(out_json),
"--out-report",
str(out_md),
])

assert code == 0
body = out_json.read_text(encoding="utf-8")
assert "group_target_ratio" in body