-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcsvbase.py
More file actions
executable file
·153 lines (113 loc) · 3.14 KB
/
csvbase.py
File metadata and controls
executable file
·153 lines (113 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/python3
import os
import sys
import csv
import time
import json
import shutil
import hashlib
from hash import TRUNCATE
from sd.rotate import rotate
from sd.format_number import fmt_time
BAK_NUM = 8 # Number of database backups
VERSION = 2.0
# Database version number
# 2.0 Switched to csv for increased robustness and speed
class Csvbase:
"Create a csv database that rotates and has a checksum."
def __init__(self, basedir, name, headers=None):
self.basedir = basedir
self.name = name
self.version = VERSION
self.last_save = time.time() # Last save time
self.meta = dict() # First line contains dict
if headers:
self.headers = headers
else:
self.headers = []
def load(self,):
'''Load the database'''
# Load the database if possible
baks = rotate(os.path.join(self.basedir, self.name), move=False, limit=BAK_NUM)
good = None # Name of file successfully loaded
rows = []
for path in baks:
if os.path.exists(path):
print("Loading:", path)
rows = self.load_csv(path)
if rows != False:
good = path
break
else:
# If no good database detected:
if not good and os.path.exists(baks[0]):
print("Corrupted database moved to", baks[1])
rotate(baks[0], limit=BAK_NUM)
return []
if rows:
print("\tDatabase was last saved", fmt_time(time.time() - self.last_save), 'ago')
return rows
def load_csv(self, path):
"Load the database from CSV format"
rows = []
m = hashlib.sha512()
with open(path, mode='r', newline='', encoding='utf-8') as f:
reader = csv.reader(f)
meta_line = next(reader, None)
if not meta_line:
return
meta = json.loads(meta_line[0])
self.version = meta.get('version', 1.0)
self.last_save = meta.get('mtime', 0)
self.headers = next(reader, [])
for row in reader:
if row[0].startswith('#CHECKSUM'):
if '#CHECKSUM:' + m.hexdigest()[:TRUNCATE] == row[0]:
return rows
else:
print("Checksum mismatch!")
return False
else:
m.update(','.join(row).encode())
rows.append(row)
return False
def save(self, rows):
# Rotate any old backup files
baks = rotate(os.path.join(self.basedir, self.name), limit=BAK_NUM)
meta = dict(
mtime=time.time(),
encoding='hex',
version=self.version,
)
self.last_save = meta['mtime']
m = hashlib.sha512()
# Save main CSV
out = baks[0]
if not os.path.exists(baks[1]):
print("Creating database:", out)
with open(out, mode='w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([json.dumps(meta)])
writer.writerow(self.headers)
for row in rows:
writer.writerow(row)
m.update(','.join(row).encode())
writer.writerow([f'#CHECKSUM:{m.hexdigest()[:TRUNCATE]}'])
f.flush()
os.fsync(f.fileno())
print("Wrote:", out)
# If only copy, make a backup
if not os.path.exists(baks[1]):
shutil.copy(baks[0], baks[1])
return True
def tester():
print("\nTesting load:")
base = sys.argv[1]
name = sys.argv[2]
c = Csvbase(base, name)
rows = c.load()
print("\nRows:")
for row in rows:
print(row)
if __name__ == "__main__":
tester()