diff --git a/bitnet_tools/multi_csv.py b/bitnet_tools/multi_csv.py index ee48828..667813c 100644 --- a/bitnet_tools/multi_csv.py +++ b/bitnet_tools/multi_csv.py @@ -13,6 +13,8 @@ from .analysis import _to_float CACHE_DIR = Path('.bitnet_cache') +UNIQUE_BITMAP_SIZE = 65536 +TOP_VALUE_TRACK_CAP = 5000 def _quantile(sorted_values: list[float], q: float) -> float: @@ -110,6 +112,32 @@ def _infer_semantic_type(col: str, dtype: str, samples: list[str], unique_ratio: return 'text' +def _update_unique_bitmap(bitmap: bytearray, value: str) -> None: + h = hashlib.sha1(value.encode('utf-8')).digest() + idx = int.from_bytes(h[:8], 'big') % UNIQUE_BITMAP_SIZE + bitmap[idx // 8] |= 1 << (idx % 8) + + +def _estimate_unique_count(bitmap: bytearray) -> int: + set_bits = sum(bin(b).count('1') for b in bitmap) + if set_bits <= 0: + return 0 + if set_bits >= UNIQUE_BITMAP_SIZE: + return UNIQUE_BITMAP_SIZE + zero_bits = UNIQUE_BITMAP_SIZE - set_bits + return max(1, int(round(-UNIQUE_BITMAP_SIZE * math.log(zero_bits / UNIQUE_BITMAP_SIZE)))) + + +def _update_bounded_counter(counter: Counter[str], value: str, other_holder: dict[str, int], cap: int) -> None: + if value in counter: + counter[value] += 1 + return + if len(counter) < cap: + counter[value] += 1 + return + other_holder['count'] += 1 + + def _profile_csv_stream( path: Path, group_column: str | None = None, @@ -125,8 +153,9 @@ def _profile_csv_stream( missing = {c: 0 for c in columns} non_missing = {c: 0 for c in columns} - unique_sets: dict[str, set[str]] = {c: set() for c in columns} + unique_bitmaps: dict[str, bytearray] = {c: bytearray(UNIQUE_BITMAP_SIZE // 8) for c in columns} value_counts: dict[str, Counter[str]] = {c: Counter() for c in columns} + value_overflow: dict[str, dict[str, int]] = {c: {'count': 0} for c in columns} value_samples: dict[str, list[str]] = {c: [] for c in columns} numeric_positive = {c: 0 for c in columns} @@ -156,8 +185,8 @@ def _profile_csv_stream( missing[col] += 1 continue non_missing[col] += 1 - unique_sets[col].add(raw) - value_counts[col][raw] += 1 + _update_unique_bitmap(unique_bitmaps[col], raw) + _update_bounded_counter(value_counts[col], raw, value_overflow[col], TOP_VALUE_TRACK_CAP) _reservoir_sample_str(value_samples[col], raw, non_missing[col], value_sample_cap) num = _to_float(raw) @@ -200,6 +229,9 @@ def _profile_csv_stream( nn = non_missing[col] top = value_counts[col].most_common(5) + if value_overflow[col]['count'] > 0: + top.append(('__OTHER__', value_overflow[col]['count'])) + top = sorted(top, key=lambda x: x[1], reverse=True)[:5] top_values = [ {'value': v, 'count': cnt, 'ratio': round(cnt / row_count, 6) if row_count else 0.0} for v, cnt in top @@ -215,19 +247,21 @@ def _profile_csv_stream( 'outlier_ratio': _outlier_ratio(numeric_outlier_samples[col]), } - unique_ratio = round(len(unique_sets[col]) / nn, 6) if nn else 0.0 + unique_count = _estimate_unique_count(unique_bitmaps[col]) + unique_ratio = round(min(unique_count, nn) / nn, 6) if nn else 0.0 dominant_value_ratio = top_values[0]['ratio'] if top_values else 0.0 profiles[col] = { 'missing_count': missing[col], 'missing_ratio': round(missing[col] / row_count, 6) if row_count else 0.0, 'non_missing_count': nn, - 'unique_count': len(unique_sets[col]), + 'unique_count': unique_count, 'unique_ratio': unique_ratio, 'dominant_value_ratio': dominant_value_ratio, 'top_values': top_values, 'numeric_distribution': numeric_distribution, 'dtype': dtypes[col], 'semantic_type': _infer_semantic_type(col, dtypes[col], value_samples[col], unique_ratio), + 'top_values_capped': value_overflow[col]['count'] > 0, } summary = { diff --git a/tests/test_analysis.py b/tests/test_analysis.py index a3a4517..cc88aa7 100644 --- a/tests/test_analysis.py +++ b/tests/test_analysis.py @@ -126,3 +126,17 @@ def test_multi_csv_cache_created(tmp_path, monkeypatch): result = multi.analyze_multiple_csv([p], "캐시") assert result["file_count"] == 1 assert any((tmp_path / ".cache").glob("*.json")) + + +def test_multi_csv_top_values_capped_marker(monkeypatch, tmp_path): + import bitnet_tools.multi_csv as multi + + monkeypatch.setattr(multi, "TOP_VALUE_TRACK_CAP", 3) + p = tmp_path / "cardinality.csv" + p.write_text("col\na\nb\nc\nd\na\n", encoding="utf-8") + + result = multi.analyze_multiple_csv([p], "카디널리티") + prof = result["files"][0]["column_profiles"]["col"] + + assert prof["top_values_capped"] is True + assert any(x["value"] == "__OTHER__" for x in prof["top_values"])