capsul-flask/capsulflask/db.py

146 lines
4.6 KiB
Python
Raw Permalink Normal View History

2023-12-03 20:04:54 +00:00
import psycopg
2023-12-03 20:07:52 +00:00
import psycopg_pool
import re
import sys
from os import listdir
from os.path import isfile, join
from flask import current_app
from flask import g
from capsulflask.db_model import DBModel
from capsulflask.shared import my_exec_info_message
def init_app(app, is_running_server):
try:
2023-12-03 20:17:45 +00:00
app.config['PSYCOPG_CONNECTION_POOL'] = psycopg_pool.ConnectionPool(
conninfo=app.config['POSTGRES_CONNECTION_PARAMETERS'],
min_size=1,
max_size=20
)
except:
app.logger.error(f"""
error was thrown when connecting to postgres database:
{my_exec_info_message(sys.exc_info())}"""
)
raise
# tell the app to clean up the DB connection when shutting down.
app.teardown_appcontext(close_db)
# only run the migrations if we are running the server.
# If we are just running a cli command (e.g. to fix a broken migration 😅), skip it
if not is_running_server:
return
schemaMigrations = {}
schemaMigrationsPath = join(app.root_path, 'schema_migrations')
2020-05-16 04:19:01 +00:00
app.logger.info("loading schema migration scripts from {}".format(schemaMigrationsPath))
for filename in listdir(schemaMigrationsPath):
2020-05-13 05:28:53 +00:00
result = re.search(r"^\d+_(up|down)", filename)
if not result:
2020-05-16 04:19:01 +00:00
app.logger.error(f"schemaVersion {filename} must match ^\\d+_(up|down). exiting.")
2020-05-13 05:28:53 +00:00
exit(1)
key = result.group()
with open(join(schemaMigrationsPath, filename), 'rb') as file:
schemaMigrations[key] = file.read().decode("utf8")
2020-10-30 02:25:29 +00:00
2023-12-03 20:04:54 +00:00
connection = app.config['PSYCOPG_CONNECTION_POOL'].getconn()
hasSchemaVersionTable = False
actionWasTaken = False
schemaVersion = 0
2022-07-19 01:55:52 +00:00
desiredSchemaVersion = 25
2020-05-10 01:36:14 +00:00
cursor = connection.cursor()
cursor.execute("""
2022-04-11 19:42:47 +00:00
SELECT table_name, table_schema FROM information_schema.tables WHERE table_schema = %s
""", (app.config['DATABASE_SCHEMA'], ))
rows = cursor.fetchall()
for row in rows:
if row[0] == "schemaversion":
hasSchemaVersionTable = True
if hasSchemaVersionTable == False:
2020-05-16 04:19:01 +00:00
app.logger.info("no table named schemaversion found in the {} schema. running migration 01_up".format(app.config['DATABASE_SCHEMA']))
try:
cursor.execute(schemaMigrations["01_up"])
2020-05-10 01:36:14 +00:00
connection.commit()
except:
2020-05-16 04:19:01 +00:00
app.logger.error("unable to create the schemaversion table because: {}".format(my_exec_info_message(sys.exc_info())))
exit(1)
actionWasTaken = True
cursor.execute("SELECT Version FROM schemaversion")
schemaVersion = cursor.fetchall()[0][0]
if schemaVersion > desiredSchemaVersion:
2020-05-16 04:19:01 +00:00
app.logger.critical("schemaVersion ({}) > desiredSchemaVersion ({}). schema downgrades are not supported yet. exiting.".format(
schemaVersion, desiredSchemaVersion
))
exit(1)
while schemaVersion < desiredSchemaVersion:
migrationKey = "%02d_up" % (schemaVersion+1)
2020-05-16 04:19:01 +00:00
app.logger.info("schemaVersion ({}) < desiredSchemaVersion ({}). running migration {}".format(
schemaVersion, desiredSchemaVersion, migrationKey
))
try:
cursor.execute(schemaMigrations[migrationKey])
2020-05-10 01:36:14 +00:00
connection.commit()
except KeyError:
2020-05-16 04:19:01 +00:00
app.logger.critical("missing schema migration script: {}_xyz.sql".format(migrationKey))
exit(1)
except:
2020-05-16 04:19:01 +00:00
app.logger.critical("unable to execute the schema migration {} because: {}".format(migrationKey, my_exec_info_message(sys.exc_info())))
exit(1)
actionWasTaken = True
schemaVersion += 1
cursor.execute("SELECT Version FROM schemaversion")
versionFromDatabase = cursor.fetchall()[0][0]
if schemaVersion != versionFromDatabase:
2020-05-16 04:19:01 +00:00
app.logger.critical("incorrect schema version value \"{}\" after running migration {}, expected \"{}\". exiting.".format(
versionFromDatabase,
2020-10-30 02:25:29 +00:00
migrationKey,
schemaVersion
))
exit(1)
2020-10-30 02:25:29 +00:00
2022-04-11 19:42:47 +00:00
cursor.execute("SELECT message FROM broadcast_message")
rows = cursor.fetchall()
if len(rows) > 0:
app.config['BROADCAST_BANNER_MESSAGE'] = rows[0][0]
else:
app.config['BROADCAST_BANNER_MESSAGE'] = None
2022-04-11 19:42:47 +00:00
cursor.close()
2023-12-03 20:04:54 +00:00
app.config['PSYCOPG_CONNECTION_POOL'].putconn(connection)
2020-05-16 04:19:01 +00:00
app.logger.info("{} current schemaVersion: \"{}\"".format(
("schema migration completed." if actionWasTaken else "schema is already up to date. "), schemaVersion
))
def get_model() -> DBModel:
if 'db_model' not in g:
2023-12-03 20:04:54 +00:00
connection = current_app.config['PSYCOPG_CONNECTION_POOL'].getconn()
2020-05-10 01:36:14 +00:00
cursor = connection.cursor()
g.db_model = DBModel(connection, cursor)
return g.db_model
def close_db(e=None):
db_model = g.pop("db_model", None)
if db_model is not None:
db_model.cursor.close()
2023-12-03 20:04:54 +00:00
current_app.config['PSYCOPG_CONNECTION_POOL'].putconn(db_model.connection)