-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
144 lines (121 loc) · 5.32 KB
/
database.py
File metadata and controls
144 lines (121 loc) · 5.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import sqlite3
from utils import read_files_from_dir, get_doc_content, EXCLUDE, b64d, b64e
import os
import pickle
def create_connection(database_path: str = "documents.db", first_time: bool = False):
cx = sqlite3.connect(database_path)
cu = cx.cursor()
cu.execute("ATTACH DATABASE 'cache.db' AS cache")
if first_time:
clear_cache(cu, clear_all=True)
create_cache(cu)
return cx, cu
def prepare_database(database_path: str = "documents.db", dir_path: str = None, generate: bool = False, transformer = None) -> tuple[sqlite3.Connection, sqlite3.Cursor]:
"""
Walks the given directory and populates a SQLite database with the file contents,
a summary of the contents, and a SHA3-256 hash of the summary.
The resulting database is stored in the current working directory as 'documents.db'.
Args:
dir_path: The root directory to walk
database_path: The path to the SQLite database
Returns:
A tuple of a SQLite cursor and connection
"""
cx, cu = create_connection(database_path)
# cache
cu.execute("ATTACH DATABASE 'cache.db' AS cache")
create_cache(cu)
if generate and dir_path and transformer:
#? too slow on large directories
cu.execute("CREATE TABLE IF NOT EXISTS documents (id INTEGER PRIMARY KEY, file_path TEXT, text TEXT, vectors TEXT, hash TEXT)")
_generate_database(dir_path, cx, cu, transformer)
else:
print("documents database is not generated")
return cx, cu
def clear_cache(cu: sqlite3.Cursor, clear_all: bool = False) -> None:
"""recreate cache table"""
if clear_all:
cu.execute("DROP TABLE IF EXISTS cache.directories")
cu.execute("DROP TABLE IF EXISTS cache.files")
create_cache(cu)
def create_cache(cu: sqlite3.Cursor) -> None:
cu.execute("""
CREATE TABLE IF NOT EXISTS cache.directories (
id INTEGER PRIMARY KEY,
batch TEXT
)
""")
cu.execute("""
CREATE TABLE IF NOT EXISTS cache.files (
id INTEGER PRIMARY KEY,
batch TEXT
)
""")
def _generate_database(dir_path: str, cx: sqlite3.Connection, cu: sqlite3.Cursor, transformer) -> None:
for file in read_files_from_dir(dir_path):
for exclude in EXCLUDE:
if exclude.lower() in file.lower():
break
else:
result = get_doc_content(file, cu, transformer)
if result is not None:
text, shasum, vectors = result
vectors = b64e(pickle.dumps(vectors))
print(f"Adding '{file}' to database with size: '{os.path.getsize(file)} bytes'")
if cu.execute("SELECT id FROM documents WHERE file_path = ?", (file,)).fetchone() is not None:
cu.execute("UPDATE documents SET text = ?, hash = ?, vectors = ? WHERE file_path = ?", (text, shasum, vectors, file))
else:
cu.execute("INSERT INTO documents (file_path, text, vectors, hash) VALUES (?, ?, ?, ?)", (file, text, vectors, shasum))
else:
continue
cx.commit()
def insert_cache(cx: sqlite3.Connection, cu: sqlite3.Cursor, batch: str, table: str) -> None:
#! sql injection - not relevant to this project (just do not pass it to user-control)
cu.execute(f"INSERT INTO {table} (batch) VALUES (?)", (batch,))
cx.commit()
def get_docs(cu: sqlite3.Cursor, n: int, offset: int, table: str = "documents") -> list:
"""
Fetches n rows from the documents table starting from the nth row.
Args:
cu: SQLite cursor to execute the query
n: The number of rows to fetch
offset: The starting row index (0-based)
Returns:
A list of tuples representing the rows
"""
#! sql injection if table is not sanitized, cant use table as parameter
#! (in this project it is not issue)
cu.execute(f"SELECT file_path, text FROM {table} LIMIT ? OFFSET ?", (table, n, offset))
data = cu.fetchall()
return map(lambda x: f"{x[0]}: {b64d(x[1].encode("utf-8"))}", data)
def get_docs_vectors(cu: sqlite3.Cursor, ids: list[int], table: str = "documents") -> list:
"""
Fetches rows from the documents table by list of IDs.
Args:
cu: SQLite cursor to execute the query
ids: List of IDs to fetch
Returns:
A list of tuples representing the rows
"""
#! sql injection if table is not sanitized, cant use table as parameter
#! (in this project it is not issue)
cu.execute(f"SELECT vectors FROM {table} WHERE id IN (%s)" % ",".join("?"*len(ids)), ids)
data = cu.fetchall()
return map(lambda x: pickle.loads(b64d(x[0].encode("utf-8"))), data)
def get_docs_by_ids(cu: sqlite3.Cursor, ids: list[int], table: str = "documents") -> list:
"""
Fetches rows from the documents table by list of IDs.
Args:
cu: SQLite cursor to execute the query
ids: List of IDs to fetch
Returns:
A list of tuples representing the rows
"""
#! sql injection if table is not sanitized, cant use table as parameter
#! (in this project it is not issue)
if "cache" in table:
cu.execute(f"SELECT batch FROM {table} WHERE id IN (%s)" % ",".join("?"*len(ids)), ids)
else:
cu.execute(f"SELECT file_path FROM {table} WHERE id IN (%s)" % ",".join("?"*len(ids)), ids)
data = cu.fetchall()
return map(lambda x: x[0], data)