-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdb.py
More file actions
135 lines (107 loc) · 4.25 KB
/
db.py
File metadata and controls
135 lines (107 loc) · 4.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import psycopg2
import re
import sys
from os import listdir
from os.path import isfile, join
from psycopg2 import pool
from flask import current_app
from flask import g
from db_model import DBModel
from shared import my_exec_info_message
# increment this to enable new database changes written to sql files in the schema_migrations folder
desiredSchemaVersion = 5
def init_app(app):
try:
app.config['PSYCOPG2_CONNECTION_POOL'] = psycopg2.pool.SimpleConnectionPool(
1,
20,
app.config['POSTGRES_CONNECTION_PARAMETERS']
)
except:
app.logger.error(f"""
error was thrown when connecting to postgres database:
{my_exec_info_message(sys.exc_info())}"""
)
raise
# tell the app to clean up the DB connection when shutting down.
app.teardown_appcontext(close_db)
schemaMigrations = {}
schemaMigrationsPath = join(app.root_path, 'schema_migrations')
app.logger.info("loading schema migration scripts from {}".format(schemaMigrationsPath))
for filename in listdir(schemaMigrationsPath):
result = re.search(r"^\d+_(up|down)", filename)
if not result:
app.logger.error(f"schemaVersion {filename} must match ^\\d+_(up|down). exiting.")
exit(1)
key = result.group()
with open(join(schemaMigrationsPath, filename), 'rb') as file:
schemaMigrations[key] = file.read().decode("utf8")
connection = app.config['PSYCOPG2_CONNECTION_POOL'].getconn()
hasSchemaVersionTable = False
actionWasTaken = False
schemaVersion = 0
cursor = connection.cursor()
cursor.execute("""
SELECT table_name, table_schema FROM information_schema.tables WHERE table_schema = %s
""", (app.config['DATABASE_SCHEMA'], ))
rows = cursor.fetchall()
for row in rows:
if row[0] == "schemaversion":
hasSchemaVersionTable = True
if hasSchemaVersionTable == False:
app.logger.info("no table named schemaversion found in the {} schema. running migration 01_up".format(app.config['DATABASE_SCHEMA']))
try:
cursor.execute(schemaMigrations["01_up"])
connection.commit()
except:
app.logger.error("unable to create the schemaversion table because: {}".format(my_exec_info_message(sys.exc_info())))
exit(1)
actionWasTaken = True
cursor.execute("SELECT Version FROM schemaversion")
schemaVersion = cursor.fetchall()[0][0]
if schemaVersion > desiredSchemaVersion:
app.logger.critical("schemaVersion ({}) > desiredSchemaVersion ({}). schema downgrades are not supported yet. exiting.".format(
schemaVersion, desiredSchemaVersion
))
exit(1)
while schemaVersion < desiredSchemaVersion:
migrationKey = "%02d_up" % (schemaVersion+1)
app.logger.info("schemaVersion ({}) < desiredSchemaVersion ({}). running migration {}".format(
schemaVersion, desiredSchemaVersion, migrationKey
))
try:
cursor.execute(schemaMigrations[migrationKey])
connection.commit()
except KeyError:
app.logger.critical("missing schema migration script: {}_xyz.sql".format(migrationKey))
exit(1)
except:
app.logger.critical("unable to execute the schema migration {} because: {}".format(migrationKey, my_exec_info_message(sys.exc_info())))
exit(1)
actionWasTaken = True
schemaVersion += 1
cursor.execute("SELECT Version FROM schemaversion")
versionFromDatabase = cursor.fetchall()[0][0]
if schemaVersion != versionFromDatabase:
app.logger.critical("incorrect schema version value \"{}\" after running migration {}, expected \"{}\". exiting.".format(
versionFromDatabase,
migrationKey,
schemaVersion
))
exit(1)
cursor.close()
app.config['PSYCOPG2_CONNECTION_POOL'].putconn(connection)
app.logger.info("{} current schemaVersion: \"{}\"".format(
("schema migration completed." if actionWasTaken else "schema is already up to date. "), schemaVersion
))
def get_model() -> DBModel:
if 'db_model' not in g:
connection = current_app.config['PSYCOPG2_CONNECTION_POOL'].getconn()
cursor = connection.cursor()
g.db_model = DBModel(connection, cursor)
return g.db_model
def close_db(e=None):
db_model = g.pop("db_model", None)
if db_model is not None:
db_model.cursor.close()
current_app.config['PSYCOPG2_CONNECTION_POOL'].putconn(db_model.connection)