-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathcodec_memory_upgrade.py
More file actions
282 lines (230 loc) · 10.2 KB
/
codec_memory_upgrade.py
File metadata and controls
282 lines (230 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
"""CODEC Memory Upgrade — tiered boot, temporal facts, CCF compression.
Three layers shipped together:
L0/L1 identity.txt → always-loaded boot payload (<200 tok)
L2 recent rooms → last N sessions from conversations
L3 deep FTS search → on-demand query over full history
facts table → temporal key/value store with
valid_from / valid_until / superseded_by
CCF rule-based → entity abbreviation + filler stripping
for memory writes that need shrinking
"""
from __future__ import annotations
import json, os, re, sqlite3, logging
from datetime import datetime, timedelta
from typing import Optional
log = logging.getLogger("codec_memory_upgrade")
from codec_config import DB_PATH
MEMORY_DIR = os.path.expanduser("~/.codec/memory")
IDENTITY_PATH = os.path.join(MEMORY_DIR, "identity.txt")
ENTITY_MAP_PATH = os.path.join(MEMORY_DIR, "entity_map.json")
os.makedirs(MEMORY_DIR, exist_ok=True)
# ─────────────────────────────────────────────────────────────────────────────
# Phase 1 — Tiered Boot Loader
# ─────────────────────────────────────────────────────────────────────────────
def load_identity() -> str:
"""Return L0+L1 identity.txt contents, empty string if missing."""
try:
with open(IDENTITY_PATH) as f:
return f.read().strip()
except FileNotFoundError:
return ""
def l2_room_recall(days: int = 7, limit: int = 10) -> list[dict]:
"""Last N distinct sessions with previews."""
from codec_memory import CodecMemory
return CodecMemory().get_sessions(limit=limit)
def l3_deep_search(query: str, limit: int = 5) -> list[dict]:
"""FTS5 search over full history."""
from codec_memory import CodecMemory
return CodecMemory().search(query, limit=limit)
def get_boot_context(include_rooms: bool = True) -> str:
"""Compose the full boot payload: identity + optional recent rooms preview."""
parts = [load_identity()]
if include_rooms:
rooms = l2_room_recall(limit=5)
if rooms:
parts.append("\n## L2 — Recent sessions")
for r in rooms:
ts = (r.get("last_msg") or "")[:16].replace("T", " ")
parts.append(f"- [{ts}] {r.get('preview','')[:80]}")
return "\n".join(p for p in parts if p).strip()
# ─────────────────────────────────────────────────────────────────────────────
# Phase 2 — Temporal Fact Tracking (separate `facts` table, non-destructive)
# ─────────────────────────────────────────────────────────────────────────────
_FACTS_SCHEMA = """
CREATE TABLE IF NOT EXISTS facts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT NOT NULL,
value TEXT NOT NULL,
fact_type TEXT DEFAULT 'generic',
confidence REAL DEFAULT 1.0,
valid_from TEXT NOT NULL,
valid_until TEXT,
superseded_by INTEGER,
user_id TEXT DEFAULT 'default',
source TEXT
);
CREATE INDEX IF NOT EXISTS idx_facts_key ON facts(key);
CREATE INDEX IF NOT EXISTS idx_facts_valid ON facts(valid_until);
CREATE INDEX IF NOT EXISTS idx_facts_user ON facts(user_id);
"""
def _conn() -> sqlite3.Connection:
c = sqlite3.connect(DB_PATH)
c.execute("PRAGMA journal_mode=WAL")
c.executescript(_FACTS_SCHEMA)
return c
def store_fact(key: str, value: str, fact_type: str = "generic",
confidence: float = 1.0, user_id: str = "default",
source: str = "", supersede: bool = True) -> int:
"""Store a fact. If supersede=True and an active fact with same key exists,
mark it valid_until=now and link superseded_by → new row."""
now = datetime.now().isoformat()
c = _conn()
try:
new_id = c.execute(
"INSERT INTO facts (key,value,fact_type,confidence,valid_from,user_id,source) "
"VALUES (?,?,?,?,?,?,?)",
(key, value, fact_type, confidence, now, user_id, source),
).lastrowid
if supersede:
c.execute(
"UPDATE facts SET valid_until=?, superseded_by=? "
"WHERE key=? AND user_id=? AND valid_until IS NULL AND id!=?",
(now, new_id, key, user_id, new_id),
)
c.commit()
return new_id
finally:
c.close()
def query_valid_facts(key: Optional[str] = None, user_id: str = "default",
limit: int = 50) -> list[dict]:
"""Facts currently active (valid_until IS NULL)."""
c = _conn()
try:
if key:
rows = c.execute(
"SELECT id,key,value,fact_type,confidence,valid_from,source "
"FROM facts WHERE key=? AND user_id=? AND valid_until IS NULL "
"ORDER BY id DESC LIMIT ?",
(key, user_id, limit),
).fetchall()
else:
rows = c.execute(
"SELECT id,key,value,fact_type,confidence,valid_from,source "
"FROM facts WHERE user_id=? AND valid_until IS NULL "
"ORDER BY id DESC LIMIT ?",
(user_id, limit),
).fetchall()
cols = ["id", "key", "value", "fact_type", "confidence", "valid_from", "source"]
return [dict(zip(cols, r)) for r in rows]
finally:
c.close()
def get_fact_history(key: str, user_id: str = "default") -> list[dict]:
"""Full timeline for a key — all versions, newest first."""
c = _conn()
try:
rows = c.execute(
"SELECT id,value,valid_from,valid_until,superseded_by,confidence,source "
"FROM facts WHERE key=? AND user_id=? ORDER BY id DESC",
(key, user_id),
).fetchall()
cols = ["id", "value", "valid_from", "valid_until", "superseded_by", "confidence", "source"]
return [dict(zip(cols, r)) for r in rows]
finally:
c.close()
def find_contradictions(user_id: str = "default") -> list[dict]:
"""Keys with >1 version still marked valid (shouldn't happen, audit tool)."""
c = _conn()
try:
rows = c.execute(
"SELECT key, COUNT(*) as n FROM facts "
"WHERE user_id=? AND valid_until IS NULL GROUP BY key HAVING n>1",
(user_id,),
).fetchall()
return [{"key": r[0], "active_versions": r[1]} for r in rows]
finally:
c.close()
def cleanup_expired(older_than_days: int = 365, user_id: str = "default") -> int:
"""Delete superseded facts older than N days."""
cutoff = (datetime.now() - timedelta(days=older_than_days)).isoformat()
c = _conn()
try:
cur = c.execute(
"DELETE FROM facts WHERE user_id=? AND valid_until IS NOT NULL AND valid_until<?",
(user_id, cutoff),
)
c.commit()
return cur.rowcount
finally:
c.close()
# ─────────────────────────────────────────────────────────────────────────────
# Phase 3 — CCF (CODEC Compressed Format) rule-based compressor
# ─────────────────────────────────────────────────────────────────────────────
DEFAULT_ENTITY_MAP = {
"Mickael Farina": "MF",
"Mickael": "MF",
"AVA Digital": "AVA",
"AVA Digital LLC": "AVA",
"Claude Desktop": "CD",
"Claude Code": "CC",
"Claude Cursor": "CCur",
"Marbella": "MRB",
"Spain": "ES",
"Mac Studio": "MS",
"localhost:8081": "L81",
"localhost:8082": "L82",
"localhost:8083": "L83",
"localhost:8084": "L84",
"localhost:8085": "L85",
}
FILLER_WORDS = {
"basically", "actually", "literally", "honestly", "sort of", "kind of",
"you know", "i mean", "like,", "um ", "uh ", "er ",
}
def _load_entity_map() -> dict:
if os.path.exists(ENTITY_MAP_PATH):
try:
with open(ENTITY_MAP_PATH) as f:
return json.load(f)
except Exception:
pass
return DEFAULT_ENTITY_MAP.copy()
def _save_entity_map(m: dict) -> None:
with open(ENTITY_MAP_PATH, "w") as f:
json.dump(m, f, indent=2, sort_keys=True)
def compress_rule_based(text: str, entity_map: Optional[dict] = None) -> str:
"""Apply entity substitutions + filler removal. Preserves FTS-friendly tokens."""
if not text:
return text
emap = entity_map or _load_entity_map()
out = text
# Sort by length desc so longer phrases match first (Mickael Farina before Mickael)
for full, abbr in sorted(emap.items(), key=lambda kv: -len(kv[0])):
out = re.sub(r'\b' + re.escape(full) + r'\b', abbr, out, flags=re.IGNORECASE)
for f in FILLER_WORDS:
out = re.sub(r'\b' + re.escape(f) + r'\b', '', out, flags=re.IGNORECASE)
out = re.sub(r'\s+', ' ', out).strip()
return out
def decompress_for_display(text: str, entity_map: Optional[dict] = None) -> str:
"""Expand abbreviations back for human readability."""
if not text:
return text
emap = entity_map or _load_entity_map()
out = text
for full, abbr in sorted(emap.items(), key=lambda kv: -len(kv[1])):
out = re.sub(r'\b' + re.escape(abbr) + r'\b', full, out)
return out
def add_entity(full: str, abbr: str) -> dict:
m = _load_entity_map()
m[full] = abbr
_save_entity_map(m)
return m
def remove_entity(full: str) -> dict:
m = _load_entity_map()
m.pop(full, None)
_save_entity_map(m)
return m
def list_entities() -> dict:
return _load_entity_map()
# Seed default entity map file on first import
if not os.path.exists(ENTITY_MAP_PATH):
_save_entity_map(DEFAULT_ENTITY_MAP)