Module xelo2.database.tables
Expand source code
from re import search
from PyQt5.QtSql import QSqlQuery
from PyQt5.QtCore import QMetaType
from PyQt5.Qt import QByteArray
LEVELS = [
'subjects',
'sessions',
'protocols',
'runs',
'recordings',
]
METATABLES = [
'allowed_values',
'intended_for',
'files',
'subject_codes',
'events',
'experimenters',
'runs_experimenters',
'runs_protocols',
]
ELEC_CHAN = [
'channel',
'electrode',
]
EXPECTED_TABLES = (
METATABLES
+ LEVELS
+ [x + '_files' for x in LEVELS]
+ [x + 's' for x in ELEC_CHAN]
+ [x + '_groups' for x in ELEC_CHAN]
)
def parse_all_tables(info_schema, db):
TABLES = {}
for table in sorted(db.tables()):
indices = lookup_indexes(info_schema, db, table)
comments = lookup_comments(info_schema, db, table)
driver = db.driver()
rec = driver.record(table)
table_d = {}
for i in range(rec.count()):
field = rec.field(i)
name = field.name()
d = {}
d['type'] = QMetaType.typeName(field.type())
d['values'] = lookup_allowed_values(db, table, name)
d['index'] = indices.get(name, False)
doc = comments.get(name, None)
if doc is None:
d['alias'] = name
d['doc'] = None
elif ': ' in doc:
d['alias'], d['doc'] = doc.split(': ')
else:
d['alias'] = doc.strip()
d['doc'] = None
table_d[name] = d
TABLES[table] = table_d
return TABLES
def lookup_allowed_values(db, table, column):
"""Look up allowed values from the table
Parameters
----------
db : instance of QSqlDatabase
table : str
name of the table
column : str
name of the column
Returns
-------
list of str
list of allowed values (it is empty when all the values are allowed)
"""
query = QSqlQuery(db)
query.prepare('SELECT allowed_value FROM allowed_values WHERE `table_name` = :table and `column_name` = :column')
query.bindValue(':table', table)
query.bindValue(':column', column)
if not query.exec():
raise SyntaxError(query.lastError().text())
values = []
while query.next():
values.append(query.value('allowed_value'))
return values
def lookup_indexes(info_schema, db, table):
"""Look up which columns are indices
Parameters
----------
info_schema : instance of QSqlDatabase
this should be the `information_schema` database
db : instance of QSqlDatabase
the database with the actual data
table : str
name of the table
Returns
-------
dict
key is the name of the column. 'PRIMARY' for primary indices and
for foreign keys, it's table_name (column_name)
"""
if not info_schema.databaseName() == 'information_schema':
raise ValueError('The first argument should be the `information_schema` database, not the database with the data')
query = QSqlQuery(info_schema)
query.prepare("""SELECT `column_name`, `referenced_table_name`, `referenced_column_name` FROM `key_column_usage`
WHERE `table_schema` = :schema AND `table_name` = :table""")
query.bindValue(':schema', db.databaseName())
query.bindValue(':table', table)
if not query.exec():
raise SyntaxError(query.lastError().text())
values = {}
while query.next():
k = query.value('column_name')
t = query.value('referenced_table_name')
c = query.value('referenced_column_name')
if t == '':
values[k] = None
else:
values[k] = f'{t} ({c})'
return values
def lookup_comments(info_schema, db, table):
"""Look up which columns have comments
Parameters
----------
info_schema : instance of QSqlDatabase
this should be the `information_schema` database
db : instance of QSqlDatabase
the database with the actual data
table : str
name of the table
Returns
-------
dict
key is the name of the column.
"""
if not info_schema.databaseName() == 'information_schema':
raise ValueError('The first argument should be the `information_schema` database, not the database with the data')
query = QSqlQuery(info_schema)
query.prepare("""SELECT `column_name`, `column_comment` FROM `columns`
WHERE `table_schema` = :schema AND `table_name` = :table""")
query.bindValue(':schema', db.databaseName())
query.bindValue(':table', table)
if not query.exec():
raise SyntaxError(query.lastError().text())
values = {}
while query.next():
k = query.value('column_name')
c = query.value('column_comment')
if len(c) > 0:
if isinstance(c, QByteArray):
c = c.data().decode()
values[k] = c
return values
def parse_subtables(info_schema, db, table):
statements = lookup_statements(info_schema, db, table)
SUBTABLES = []
for statement in statements:
sub = parse_trigger_statements(statement)
sub['parent'] = table
if sub is not None:
SUBTABLES.append(sub)
return SUBTABLES
def lookup_statements(info_schema, db, table):
if not info_schema.databaseName() == 'information_schema':
raise ValueError('The first argument should be the `information_schema` database, not the database with the data')
query = QSqlQuery(info_schema)
query.prepare("""SELECT `action_statement` FROM `triggers`
WHERE `event_object_schema` = :schema AND `event_object_table` = :table AND `event_manipulation` = 'INSERT' AND `action_timing` = 'AFTER'""")
query.bindValue(':schema', db.databaseName())
query.bindValue(':table', table)
if not query.exec():
raise SyntaxError(query.lastError().text())
statements = []
while query.next():
c = query.value('action_statement')
if isinstance(c, QByteArray):
c = c.data().decode()
statements.append(c)
return statements
def parse_trigger_statements(statement):
cond_str = search(r"IF NEW.([a-z_]+) = '(.+?)'", statement)
if cond_str is None:
cond_str = search(r"IF NEW.([a-z_]+) IN \((.+?)\)", statement)
if cond_str is None:
print('Could not parse trigger condition')
return
else:
parameter = cond_str.group(1)
values = [x.strip("' ") for x in cond_str.group(2).split(',')]
else:
parameter = cond_str.group(1)
values = [cond_str.group(2), ]
subtable_str = search(r"INSERT INTO ([a-z_]+)", statement)
if cond_str is None:
print('Could not parse subtable')
return
else:
subtable = subtable_str.group(1)
return {
'subtable': subtable,
'parameter': parameter,
'values': values,
}
Functions
def lookup_allowed_values(db, table, column)
-
Look up allowed values from the table
Parameters
db
:instance
ofQSqlDatabase
table
:str
- name of the table
column
:str
- name of the column
Returns
list
ofstr
- list of allowed values (it is empty when all the values are allowed)
Expand source code
def lookup_allowed_values(db, table, column): """Look up allowed values from the table Parameters ---------- db : instance of QSqlDatabase table : str name of the table column : str name of the column Returns ------- list of str list of allowed values (it is empty when all the values are allowed) """ query = QSqlQuery(db) query.prepare('SELECT allowed_value FROM allowed_values WHERE `table_name` = :table and `column_name` = :column') query.bindValue(':table', table) query.bindValue(':column', column) if not query.exec(): raise SyntaxError(query.lastError().text()) values = [] while query.next(): values.append(query.value('allowed_value')) return values
def lookup_comments(info_schema, db, table)
-
Look up which columns have comments
Parameters
info_schema
:instance
ofQSqlDatabase
- this should be the
information_schema
database db
:instance
ofQSqlDatabase
- the database with the actual data
table
:str
- name of the table
Returns
dict
- key is the name of the column.
Expand source code
def lookup_comments(info_schema, db, table): """Look up which columns have comments Parameters ---------- info_schema : instance of QSqlDatabase this should be the `information_schema` database db : instance of QSqlDatabase the database with the actual data table : str name of the table Returns ------- dict key is the name of the column. """ if not info_schema.databaseName() == 'information_schema': raise ValueError('The first argument should be the `information_schema` database, not the database with the data') query = QSqlQuery(info_schema) query.prepare("""SELECT `column_name`, `column_comment` FROM `columns` WHERE `table_schema` = :schema AND `table_name` = :table""") query.bindValue(':schema', db.databaseName()) query.bindValue(':table', table) if not query.exec(): raise SyntaxError(query.lastError().text()) values = {} while query.next(): k = query.value('column_name') c = query.value('column_comment') if len(c) > 0: if isinstance(c, QByteArray): c = c.data().decode() values[k] = c return values
def lookup_indexes(info_schema, db, table)
-
Look up which columns are indices
Parameters
info_schema
:instance
ofQSqlDatabase
- this should be the
information_schema
database db
:instance
ofQSqlDatabase
- the database with the actual data
table
:str
- name of the table
Returns
dict
- key is the name of the column. 'PRIMARY' for primary indices and for foreign keys, it's table_name (column_name)
Expand source code
def lookup_indexes(info_schema, db, table): """Look up which columns are indices Parameters ---------- info_schema : instance of QSqlDatabase this should be the `information_schema` database db : instance of QSqlDatabase the database with the actual data table : str name of the table Returns ------- dict key is the name of the column. 'PRIMARY' for primary indices and for foreign keys, it's table_name (column_name) """ if not info_schema.databaseName() == 'information_schema': raise ValueError('The first argument should be the `information_schema` database, not the database with the data') query = QSqlQuery(info_schema) query.prepare("""SELECT `column_name`, `referenced_table_name`, `referenced_column_name` FROM `key_column_usage` WHERE `table_schema` = :schema AND `table_name` = :table""") query.bindValue(':schema', db.databaseName()) query.bindValue(':table', table) if not query.exec(): raise SyntaxError(query.lastError().text()) values = {} while query.next(): k = query.value('column_name') t = query.value('referenced_table_name') c = query.value('referenced_column_name') if t == '': values[k] = None else: values[k] = f'{t} ({c})' return values
def lookup_statements(info_schema, db, table)
-
Expand source code
def lookup_statements(info_schema, db, table): if not info_schema.databaseName() == 'information_schema': raise ValueError('The first argument should be the `information_schema` database, not the database with the data') query = QSqlQuery(info_schema) query.prepare("""SELECT `action_statement` FROM `triggers` WHERE `event_object_schema` = :schema AND `event_object_table` = :table AND `event_manipulation` = 'INSERT' AND `action_timing` = 'AFTER'""") query.bindValue(':schema', db.databaseName()) query.bindValue(':table', table) if not query.exec(): raise SyntaxError(query.lastError().text()) statements = [] while query.next(): c = query.value('action_statement') if isinstance(c, QByteArray): c = c.data().decode() statements.append(c) return statements
def parse_all_tables(info_schema, db)
-
Expand source code
def parse_all_tables(info_schema, db): TABLES = {} for table in sorted(db.tables()): indices = lookup_indexes(info_schema, db, table) comments = lookup_comments(info_schema, db, table) driver = db.driver() rec = driver.record(table) table_d = {} for i in range(rec.count()): field = rec.field(i) name = field.name() d = {} d['type'] = QMetaType.typeName(field.type()) d['values'] = lookup_allowed_values(db, table, name) d['index'] = indices.get(name, False) doc = comments.get(name, None) if doc is None: d['alias'] = name d['doc'] = None elif ': ' in doc: d['alias'], d['doc'] = doc.split(': ') else: d['alias'] = doc.strip() d['doc'] = None table_d[name] = d TABLES[table] = table_d return TABLES
def parse_subtables(info_schema, db, table)
-
Expand source code
def parse_subtables(info_schema, db, table): statements = lookup_statements(info_schema, db, table) SUBTABLES = [] for statement in statements: sub = parse_trigger_statements(statement) sub['parent'] = table if sub is not None: SUBTABLES.append(sub) return SUBTABLES
def parse_trigger_statements(statement)
-
Expand source code
def parse_trigger_statements(statement): cond_str = search(r"IF NEW.([a-z_]+) = '(.+?)'", statement) if cond_str is None: cond_str = search(r"IF NEW.([a-z_]+) IN \((.+?)\)", statement) if cond_str is None: print('Could not parse trigger condition') return else: parameter = cond_str.group(1) values = [x.strip("' ") for x in cond_str.group(2).split(',')] else: parameter = cond_str.group(1) values = [cond_str.group(2), ] subtable_str = search(r"INSERT INTO ([a-z_]+)", statement) if cond_str is None: print('Could not parse subtable') return else: subtable = subtable_str.group(1) return { 'subtable': subtable, 'parameter': parameter, 'values': values, }