Module xelo2.gui.interface
Expand source code
from logging import getLogger
from pathlib import Path
from datetime import date, datetime
from functools import partial
from numpy import isin, array
from PyQt5.QtWidgets import (
QAbstractItemView,
QAction,
QComboBox,
QDateEdit,
QDateTimeEdit,
QDialog,
QDockWidget,
QGroupBox,
QFileDialog,
QHBoxLayout,
QInputDialog,
QLineEdit,
QListWidget,
QListWidgetItem,
QMainWindow,
QMenu,
QMessageBox,
QPushButton,
QProgressDialog,
QDoubleSpinBox,
QSpinBox,
QTableView,
QTableWidget,
QTableWidgetItem,
QVBoxLayout,
QWidget,
)
from PyQt5.QtGui import (
QBrush,
QColor,
QDesktopServices,
QGuiApplication,
QPalette,
)
from PyQt5.QtCore import (
Qt,
pyqtSlot,
QDate,
QDateTime,
QSettings,
QUrl,
)
from PyQt5.QtSql import (
QSqlQuery,
QSqlTableModel,
)
from ..api import list_subjects, Subject, Session, Run, Channels, Electrodes
from ..api.utils import collect_columns
from ..database import access_database, lookup_allowed_values
from ..database.tables import LEVELS
from ..bids.root import create_bids
from ..bids.io.parrec import convert_parrec_nibabel
from ..bids.utils import find_one_file
from ..io.parrec import add_parrec
from ..io.ephys import add_ephys_to_sess
from ..io.channels import create_channels
from ..io.electrodes import import_electrodes
from ..io.events import read_events_from_ephys
from ..io.tsv import load_tsv, save_tsv
from .utils import _protocol_name, _name, _session_name, guess_modality
from .actions import create_menubar, Search, create_shortcuts
from .models import FilesWidget, EventsModel
from .modal import (
NewFile,
EditElectrodes,
Popup_Experimenters,
Popup_Protocols,
Popup_IntendedFor,
CompareEvents,
CalculateOffset,
parse_accessdatabase,
)
EXTRA_LEVELS = ['channels', 'electrodes']
NULL_TEXT = 'Unknown / Unspecified'
settings = QSettings("xelo2", "xelo2")
lg = getLogger(__name__)
class Interface(QMainWindow):
"""TODO: disable everything until you load database"""
db = None
test = False
unsaved_changes = False
def __init__(self, db_name=None, username=None, password=None, hostname='localhost'):
super().__init__()
create_menubar(self)
create_shortcuts(self)
lists = {}
groups = {}
for k in LEVELS + EXTRA_LEVELS:
groups[k] = QGroupBox(k.capitalize())
lists[k] = QListWidget()
if k in LEVELS:
lists[k].currentItemChanged.connect(self.proc_all)
elif k in EXTRA_LEVELS:
lists[k].currentItemChanged.connect(self.show_channels_electrodes)
# right click
lists[k].setContextMenuPolicy(Qt.CustomContextMenu)
lists[k].customContextMenuRequested.connect(partial(self.rightclick_list, level=k))
layout = QVBoxLayout()
layout.addWidget(lists[k])
if k == 'runs':
b = QPushButton('Add to export list')
b.clicked.connect(self.exporting)
layout.addWidget(b)
groups[k].setLayout(layout)
# PARAMETERS: Widget
t_params = QTableWidget()
t_params.horizontalHeader().setStretchLastSection(True)
t_params.setSelectionBehavior(QAbstractItemView.SelectRows)
t_params.setColumnCount(3)
t_params.setHorizontalHeaderLabels(['Level', 'Parameter', 'Value'])
t_params.verticalHeader().setVisible(False)
# EVENTS: Widget
self.events_view = QTableView(self)
self.events_view.horizontalHeader().setStretchLastSection(True)
self.events_view.setAlternatingRowColors(True)
self.events_view.setContextMenuPolicy(Qt.CustomContextMenu)
self.events_view.customContextMenuRequested.connect(partial(self.rightclick_table, table='events'))
# CHANNELS: Widget
self.channels_view = QTableView(self)
self.channels_view.horizontalHeader().setStretchLastSection(True)
self.channels_view.setContextMenuPolicy(Qt.CustomContextMenu)
self.channels_view.customContextMenuRequested.connect(partial(self.rightclick_table, table='channels'))
# ELECTRODES: Form
self.elec_form = QTableWidget()
self.elec_form.horizontalHeader().setStretchLastSection(True)
self.elec_form.setSelectionBehavior(QAbstractItemView.SelectRows)
self.elec_form.setColumnCount(2)
self.elec_form.setHorizontalHeaderLabels(['Parameter', 'Value'])
self.elec_form.verticalHeader().setVisible(False)
# ELECTRODES: Widget
self.electrodes_view = QTableView(self)
self.electrodes_view.horizontalHeader().setStretchLastSection(True)
self.electrodes_view.setContextMenuPolicy(Qt.CustomContextMenu)
self.electrodes_view.customContextMenuRequested.connect(partial(self.rightclick_table, table='electrodes'))
# FILES: Widget
t_files = FilesWidget(self)
t_files.horizontalHeader().setStretchLastSection(True)
t_files.setSelectionBehavior(QAbstractItemView.SelectRows)
t_files.setColumnCount(3)
t_files.setHorizontalHeaderLabels(['Level', 'Format', 'File'])
t_files.verticalHeader().setVisible(False)
# right click
t_files.setContextMenuPolicy(Qt.CustomContextMenu)
t_files.customContextMenuRequested.connect(self.rightclick_files)
# EXPORT: Widget
w_export = QWidget()
col_export = QVBoxLayout()
t_export = QTableWidget()
t_export.horizontalHeader().setStretchLastSection(True)
t_export.setSelectionBehavior(QAbstractItemView.SelectRows)
t_export.setColumnCount(4)
t_export.setHorizontalHeaderLabels(['Subject', 'Session', 'Run', 'Start Time'])
t_export.verticalHeader().setVisible(False)
p_clearexport = QPushButton('Clear list')
p_clearexport.clicked.connect(self.clear_export)
p_doexport = QPushButton('Export ...')
p_doexport.clicked.connect(self.do_export)
col_export.addWidget(t_export)
col_export.addWidget(p_clearexport)
col_export.addWidget(p_doexport)
w_export.setLayout(col_export)
# session and protocol in the same column
col_sessmetc = QVBoxLayout()
col_sessmetc.addWidget(groups['sessions'])
col_sessmetc.addWidget(groups['protocols'])
# recordings, channels and electrodes
col_recchanelec = QVBoxLayout()
col_recchanelec.addWidget(groups['recordings'])
col_recchanelec.addWidget(groups['channels'])
col_recchanelec.addWidget(groups['electrodes'])
# TOP PANELS
layout_top = QHBoxLayout()
layout_top.addWidget(groups['subjects'])
layout_top.addLayout(col_sessmetc)
layout_top.addWidget(groups['runs'])
layout_top.addLayout(col_recchanelec)
# FULL LAYOUT
# central widget
central_widget = QWidget()
layout = QVBoxLayout()
layout.addLayout(layout_top)
central_widget.setLayout(layout)
self.setCentralWidget(central_widget)
self.setCorner(Qt.TopLeftCorner, Qt.LeftDockWidgetArea)
self.setCorner(Qt.TopRightCorner, Qt.RightDockWidgetArea)
self.setCorner(Qt.BottomLeftCorner, Qt.LeftDockWidgetArea)
self.setCorner(Qt.BottomRightCorner, Qt.RightDockWidgetArea)
# parameters
dockwidget = QDockWidget('Parameters', self)
dockwidget.setWidget(t_params)
dockwidget.setAllowedAreas(Qt.RightDockWidgetArea | Qt.LeftDockWidgetArea)
dockwidget.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable)
dockwidget.setObjectName('dock_parameters') # savestate
self.addDockWidget(Qt.RightDockWidgetArea, dockwidget)
# events
dockwidget = QDockWidget('Events', self)
dockwidget.setWidget(self.events_view)
dockwidget.setAllowedAreas(Qt.RightDockWidgetArea | Qt.LeftDockWidgetArea)
dockwidget.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable)
dockwidget.setObjectName('dock_events') # savestate
self.addDockWidget(Qt.RightDockWidgetArea, dockwidget)
# channels
dockwidget = QDockWidget('Channels', self)
dockwidget.setWidget(self.channels_view)
dockwidget.setAllowedAreas(Qt.RightDockWidgetArea | Qt.LeftDockWidgetArea)
dockwidget.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable)
dockwidget.setObjectName('dock_channels') # savestate
self.addDockWidget(Qt.RightDockWidgetArea, dockwidget)
# electrodes
dockwidget = QDockWidget('Electrodes', self)
temp_widget = QWidget() # you need extra widget to set layout in qdockwidget
elec_layout = QVBoxLayout(temp_widget)
elec_layout.addWidget(self.elec_form)
elec_layout.addWidget(self.electrodes_view, stretch=1)
dockwidget.setWidget(temp_widget)
dockwidget.setAllowedAreas(Qt.RightDockWidgetArea | Qt.LeftDockWidgetArea)
dockwidget.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable)
dockwidget.setObjectName('dock_electrodes') # savestate
self.addDockWidget(Qt.RightDockWidgetArea, dockwidget)
# files
dockwidget = QDockWidget('Files', self)
dockwidget.setWidget(t_files)
dockwidget.setAllowedAreas(Qt.TopDockWidgetArea | Qt.BottomDockWidgetArea)
dockwidget.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable)
dockwidget.setObjectName('dock_files') # savestate
self.addDockWidget(Qt.BottomDockWidgetArea, dockwidget)
# export
dockwidget = QDockWidget('Export', self)
dockwidget.setWidget(w_export)
dockwidget.setAllowedAreas(Qt.RightDockWidgetArea | Qt.LeftDockWidgetArea)
dockwidget.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable)
dockwidget.setObjectName('dock_export') # savestate
self.addDockWidget(Qt.RightDockWidgetArea, dockwidget)
# restore geometry
window_geometry = settings.value('window/geometry')
if window_geometry is not None:
self.restoreGeometry(window_geometry)
window_state = settings.value('window/state')
if window_state is not None:
self.restoreState(window_state)
# SAVE THESE ITEMS
self.groups = groups
self.lists = lists
self.t_params = t_params
self.t_files = t_files
self.t_export = t_export
self.exports = []
self.search = Search()
self.statusBar()
self.show()
if db_name is None:
DB_ARGS = parse_accessdatabase(self)
if DB_ARGS is not None:
self.sql_access(**DB_ARGS)
else:
self.sql_access(db_name, username, password, hostname=hostname)
def sql_access(self, db_name=None, username=None, password=None, hostname=None):
"""This is where you access the database
"""
self.db = access_database(db_name, username, hostname, password)
self.db['db'].transaction()
self.events_model = EventsModel(self.db)
self.events_view.setModel(self.events_model)
self.events_view.hideColumn(0)
self.channels_model = QSqlTableModel(self, self.db['db'])
self.channels_model.setTable('channels')
self.channels_view.setModel(self.channels_model)
self.channels_view.hideColumn(0)
self.electrodes_model = QSqlTableModel(self, self.db['db'])
self.electrodes_model.setTable('electrodes')
self.electrodes_view.setModel(self.electrodes_model)
self.electrodes_view.hideColumn(0)
self.list_subjects()
def sql_commit(self):
self.db['db'].commit()
self.setWindowTitle('')
self.unsaved_changes = False
self.db['db'].transaction()
def sql_rollback(self):
self.db['db'].rollback()
self.unsaved_changes = False
self.setWindowTitle('')
self.db['db'].transaction()
self.list_subjects()
def list_subjects(self, code_to_select=None):
"""
code_to_select : str
code of the subject to select
"""
for line in self.lists.values():
line.clear()
to_select = None
if self.subjsort.isChecked():
args = {
'alphabetical': True,
'reverse': False,
}
else:
args = {
'alphabetical': False,
'reverse': True,
}
for subj in list_subjects(self.db, **args):
item = QListWidgetItem(str(subj))
if subj.id in self.search.subjects:
highlight(item)
item.setData(Qt.UserRole, subj)
self.lists['subjects'].addItem(item)
if code_to_select is not None and code_to_select == str(subj):
to_select = item
if to_select is None: # select first one
to_select = item
self.lists['subjects'].setCurrentItem(to_select)
@pyqtSlot(QListWidgetItem, QListWidgetItem)
def proc_all(self, current=None, previous=None, item=None):
"""GUI calls current and previous. You can call item"""
self.list_channels_electrodes()
if item is None:
# when clicking on a previously selected list, it sends a signal where current is None, but I don't understand why
if current is None:
return
item = current.data(Qt.UserRole)
if item.t == 'subject':
self.list_sessions_and_protocols(item)
elif item.t == 'session':
self.list_runs(item)
elif item.t == 'protocol':
pass
elif item.t == 'run':
self.list_recordings(item)
self.show_events(item)
elif item.t == 'recording':
self.list_channels_electrodes(item)
self.list_params()
self.list_files()
def list_sessions_and_protocols(self, subj=None):
if subj is None or subj is False:
subj = self.current('subjects')
for level, l in self.lists.items():
if level in ('subjects', ):
continue
l.clear()
for sess in subj.list_sessions():
item = QListWidgetItem_time(sess, _session_name(sess))
if sess.id in self.search.sessions:
highlight(item)
self.lists['sessions'].addItem(item)
self.lists['sessions'].setCurrentRow(0)
for protocol in subj.list_protocols():
item = QListWidgetItem(_protocol_name(protocol))
item.setData(Qt.UserRole, protocol)
self.lists['protocols'].addItem(item)
self.lists['protocols'].setCurrentRow(0)
def list_runs(self, sess=None):
if sess is None or sess is False:
sess = self.current('sessions')
for level, l in self.lists.items():
if level in ('subjects', 'sessions', 'protocols'):
continue
l.clear()
for i, run in enumerate(sess.list_runs()):
item = QListWidgetItem_time(run, f'#{i + 1: 3d}: {run.task_name}')
if run.id in self.search.runs:
highlight(item)
self.lists['runs'].addItem(item)
self.lists['runs'].setCurrentRow(0)
def list_recordings(self, run=None):
if run is None or run is False:
run = self.current('runs')
for level, l in self.lists.items():
if level in ('subjects', 'sessions', 'protocols', 'runs'):
continue
l.clear()
for recording in run.list_recordings():
item = QListWidgetItem(recording.modality)
item.setData(Qt.UserRole, recording)
if recording.id in self.search.recordings:
highlight(item)
self.lists['recordings'].addItem(item)
self.lists['recordings'].setCurrentRow(0)
def list_channels_electrodes(self, recording=None):
for level, l in self.lists.items():
if level in ('channels', 'electrodes'):
l.clear()
self.channels_model.setFilter('channel_group_id = 0')
self.channels_model.select()
self.channels_view.setEnabled(False)
self.electrodes_model.setFilter('electrode_group_id = 0')
self.electrodes_model.select()
self.electrodes_view.setEnabled(False)
self.elec_form.clearContents()
if recording is None:
return
sess = self.current('sessions')
if recording.modality in ('ieeg', 'eeg', 'meg'):
for chan in sess.list_channels():
item = QListWidgetItem(_name(chan.name))
item.setData(Qt.UserRole, chan)
self.lists['channels'].addItem(item)
for elec in sess.list_electrodes():
item = QListWidgetItem(_name(elec.name))
item.setData(Qt.UserRole, elec)
self.lists['electrodes'].addItem(item)
def statusbar_selected(self):
statusbar = []
for k, v in self.lists.items():
item = v.currentItem()
if item is None:
continue
obj = item.data(Qt.UserRole)
statusbar.append(repr(obj))
self.statusBar().showMessage('\t'.join(statusbar))
def list_params(self):
self.statusbar_selected()
self.t_params.blockSignals(True)
self.t_params.clearContents()
all_params = []
for k, v in self.lists.items():
item = v.currentItem()
if item is None:
continue
obj = item.data(Qt.UserRole)
parameters = {}
parameters.update(list_parameters(self, obj))
if k == 'runs':
w = Popup_Experimenters(obj, self)
parameters.update({'Experimenters': w})
w = Popup_Protocols(obj, self)
parameters.update({'Protocols': w})
if obj.task_name == 'top_up':
w = Popup_IntendedFor(obj, self)
parameters.update({'Intended For': w})
elif k == 'recordings':
if obj.modality in ('ieeg', 'eeg', 'meg'):
parameters.update(list_parameters(self, obj))
sess = self.current('sessions')
w = QComboBox() # add callback here
w.addItem('(undefined channels)', None)
for chan in sess.list_channels():
w.addItem(_name(chan.name), chan)
channels = obj.channels
if channels is None:
w.setCurrentText('')
else:
w.setCurrentText(_name(channels.name))
w.activated.connect(partial(self.combo_chanelec, widget=w))
parameters.update({'Channels': w})
w = QComboBox()
w.addItem('(undefined electrodes)', None)
for elec in sess.list_electrodes():
w.addItem(_name(elec.name), elec)
electrodes = obj.electrodes
if electrodes is None:
w.setCurrentText('')
else:
w.setCurrentText(_name(electrodes.name))
w.activated.connect(partial(self.combo_chanelec, widget=w))
parameters.update({'Electrodes': w})
for p_k, p_v in parameters.items():
all_params.append({
'level': self.groups[k].title(),
'parameter': p_k,
'value': p_v,
})
self.t_params.setRowCount(len(all_params))
for i, val in enumerate(all_params):
item = QTableWidgetItem(val['level'])
item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)
item.setBackground(QBrush(QColor('lightGray')))
self.t_params.setItem(i, 0, item)
item = QTableWidgetItem(val['parameter'])
item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)
self.t_params.setItem(i, 1, item)
item = QTableWidgetItem(str(val['value']))
self.t_params.setCellWidget(i, 2, val['value'])
self.t_params.blockSignals(False)
def combo_chanelec(self, i, widget):
data = widget.currentData()
recording = self.current('recordings')
if data is None:
if widget.currentText() == '(undefined channels)':
recording.detach_channels()
else:
recording.detach_electrodes()
elif data.t == 'channel_group':
recording.attach_channels(data)
elif data.t == 'electrode_group':
recording.attach_electrodes(data)
def electrode_intendedfor(self, index, elec, combobox):
if index == 0:
elec.IntendedFor = None
else:
elec.IntendedFor = combobox[index]
self.modified()
def current(self, level):
item = self.lists[level].currentItem()
if item is not None:
return item.data(Qt.UserRole)
def list_files(self):
self.t_files.blockSignals(True)
self.t_files.clearContents()
all_files = []
for k, v in self.lists.items():
item = v.currentItem()
if item is None:
continue
obj = item.data(Qt.UserRole)
for file in obj.list_files():
all_files.append({
'level': self.groups[k].title(),
'format': file.format,
'path': file.path,
'obj': [obj, file],
})
self.t_files.setRowCount(len(all_files))
for i, val in enumerate(all_files):
item = QTableWidgetItem(val['level'])
item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)
item.setBackground(QBrush(QColor('lightGray')))
item.setData(Qt.UserRole, val['obj'])
self.t_files.setItem(i, 0, item)
item = QTableWidgetItem(val['format'])
item.setData(Qt.UserRole, val['obj'])
self.t_files.setItem(i, 1, item)
item = QTableWidgetItem(str(val['path']))
try:
path_exists = val['path'].exists()
except PermissionError as err:
lg.warning(err)
item.setForeground(QBrush(QColor('orange')))
else:
if not path_exists:
item.setForeground(QBrush(QColor('red')))
item.setData(Qt.UserRole, val['obj'])
self.t_files.setItem(i, 2, item)
self.t_files.blockSignals(False)
def changed(self, obj, column, x):
if isinstance(x, QDate):
x = x.toPyDate()
elif isinstance(x, QDateTime):
x = x.toPyDateTime()
else:
if isinstance(x, QLineEdit):
x = x.text()
if x == NULL_TEXT:
x = None
else:
x = f'{x}'
setattr(obj, column, x)
self.modified()
def show_events(self, item):
self.events_model.update(item.events)
def compare_events_with_file(self):
run = self.current('runs')
rec = self.current('recordings')
file = find_one_file(rec, ('blackrock', 'micromed', 'bci2000'))
if file is None:
print('Could not find electrophysiology file')
return
self.events_model.compare(run, rec, file)
@pyqtSlot(QListWidgetItem, QListWidgetItem)
def show_channels_electrodes(self, current=None, previous=None, item=None):
if current is not None:
item = current.data(Qt.UserRole)
if item is None:
return
self.statusbar_selected()
if item.t == 'channel_group':
self.channels_view.setEnabled(True)
self.channels_model.setFilter(f'channel_group_id = {item.id}')
self.channels_model.select()
elif item.t == 'electrode_group':
self.elec_form.blockSignals(True)
parameters = list_parameters(self, item)
parameters['Intended For'] = make_electrode_combobox(self, item)
self.elec_form.setRowCount(len(parameters))
for i, kv in enumerate(parameters.items()):
k, v = kv
table_item = QTableWidgetItem(k)
self.elec_form.setItem(i, 0, table_item)
self.elec_form.setCellWidget(i, 1, v)
self.elec_form.blockSignals(False)
self.electrodes_view.setEnabled(True)
self.electrodes_model.setFilter(f'electrode_group_id = {item.id}')
self.electrodes_model.select()
def exporting(self, checked=None, subj=None, sess=None, run=None):
if subj is None:
subj = self.lists['subjects'].currentItem().data(Qt.UserRole)
sess = self.lists['sessions'].currentItem().data(Qt.UserRole)
run = self.lists['runs'].currentItem().data(Qt.UserRole)
d = {}
d['subjects'] = str(subj)
if sess.name == 'MRI':
d['sessions'] = f'{sess.name} ({sess.MagneticFieldStrength})'
else:
d['sessions'] = sess.name
d['run_id'] = run.id
d['runs'] = f'{run.task_name}'
d['start_time'] = f'{run.start_time:%d %b %Y %H:%M:%S}'
self.exports.append(d)
self.list_exports()
def clear_export(self):
self.exports = []
self.list_exports()
def list_exports(self):
self.t_export.clearContents()
n_exports = len(self.exports)
self.t_export.setRowCount(n_exports)
for i, l in enumerate(self.exports):
item = QTableWidgetItem(l['subjects'])
self.t_export.setItem(i, 0, item)
item = QTableWidgetItem(l['sessions'])
self.t_export.setItem(i, 1, item)
item = QTableWidgetItem(l['runs'])
self.t_export.setItem(i, 2, item)
item = QTableWidgetItem(l['start_time'])
self.t_export.setItem(i, 3, item)
def rightclick_table(self, pos, table=None):
if table == 'events':
view = self.events_view
elif table == 'channels':
view = self.channels_view
elif table == 'electrodes':
view = self.electrodes_view
menu = QMenu(self)
action = QAction(f'Import {table} from tsv ...', self)
action.triggered.connect(lambda x: self.tsv_import(table=table))
menu.addAction(action)
action = QAction(f'Export {table} to tsv ...', self)
action.triggered.connect(lambda x: self.tsv_export(table=table))
menu.addAction(action)
menu.popup(view.mapToGlobal(pos))
def tsv_import(self, table):
tsv_file = QFileDialog.getOpenFileName(
self,
f"Import {table} from file",
None,
"Tab-separated values (*.tsv)")[0]
if tsv_file == '':
return
if table == 'events':
run = self.current('runs')
X = run.events
else:
current = self.current(table)
X = current.data
X = load_tsv(Path(tsv_file), X.dtype)
if table == 'events':
run.events = X
self.show_events(run)
else:
current.data = _fake_names(X)
recording = self.current('recordings')
self.list_channels_electrodes(recording=recording)
self.modified()
def tsv_export(self, table):
tsv_file = QFileDialog.getSaveFileName(
self,
f"Save {table} to file",
None,
"Tab-separated values (*.tsv)")[0]
if tsv_file == '':
return
if table == 'events':
run = self.current('runs')
X = run.events
else:
current = self.current(table)
X = current.data
save_tsv(Path(tsv_file), X)
def rightclick_list(self, pos, level=None):
item = self.lists[level].itemAt(pos)
menu = QMenu(self)
if item is None:
action = QAction(f'Add {level}', self)
action.triggered.connect(lambda x: self.new_item(level=level))
menu.addAction(action)
else:
obj = item.data(Qt.UserRole)
if obj.t in ('channel_group', 'electrode_group'):
action_rename = QAction('Rename', self)
action_rename.triggered.connect(lambda x: self.rename_item(obj))
menu.addAction(action_rename)
action_delete = QAction('Delete', self)
action_delete.triggered.connect(lambda x: self.delete_item(obj))
menu.addAction(action_delete)
menu.popup(self.lists[level].mapToGlobal(pos))
def rename_item(self, item):
text, ok = QInputDialog.getText(
self,
f'Rename {item.t.split("_")[0]}',
'New title:',
)
if ok and text != '':
item.name = text
def delete_item(self, item):
item.delete()
if item.t == 'subject':
self.list_subjects()
elif item.t == 'session':
self.list_sessions_and_protocols(item.subject)
elif item.t == 'protocol':
self.list_sessions_and_protocols(item.subject)
elif item.t == 'run':
self.list_runs(item.session)
elif item.t == 'recording':
self.list_recordings(item.run)
self.list_params()
self.list_files()
def rightclick_files(self, pos):
item = self.t_files.itemAt(pos)
if item is None:
menu = QMenu(self)
action = QAction('Add File', self)
action.triggered.connect(lambda x: self.new_file(self))
menu.addAction(action)
menu.popup(self.t_files.mapToGlobal(pos))
else:
level_obj, file_obj = item.data(Qt.UserRole)
file_path = file_obj.path.resolve()
url_directory = QUrl.fromLocalFile(str(file_path.parent))
action_edit = QAction('Edit File', self)
action_edit.triggered.connect(lambda x: self.edit_file(level_obj, file_obj))
action_copy = QAction('Copy Path to File', self)
action_copy.triggered.connect(lambda x: copy_to_clipboard(str(file_obj.path)))
action_openfile = QAction('Open File', self)
action_openfile.triggered.connect(lambda x: self.open_file(file_path))
action_opendirectory = QAction('Open Containing Folder', self)
action_opendirectory.triggered.connect(lambda x: QDesktopServices.openUrl(url_directory))
action_delete = QAction('Delete', self)
action_delete.triggered.connect(lambda x: self.delete_file(level_obj, file_obj))
menu = QMenu('File Information', self)
menu.addAction(action_edit)
menu.addAction(action_copy)
menu.addAction(action_openfile)
menu.addAction(action_opendirectory)
menu.addSeparator()
menu.addAction(action_delete)
menu.popup(self.t_files.mapToGlobal(pos))
def open_file(self, file_path):
if file_path.suffix.lower() == '.par':
print(f'converting {file_path}')
file_path = convert_parrec_nibabel(file_path)[0]
print(f'converted to {file_path}')
url_file = QUrl.fromLocalFile(str(file_path))
QDesktopServices.openUrl(url_file)
def sql_search(self):
text, ok = QInputDialog.getText(
self,
'Search the database',
'WHERE statement' + ' ' * 200,
QLineEdit.Normal,
self.search.previous,
)
if ok and text != '':
self.search.where(self.db, text)
self.list_subjects()
def sql_search_clear(self):
self.search.clear()
self.list_subjects()
def add_search_results_to_export(self):
for subj_id, sess_id, run_id in zip(self.search.subjects, self.search.sessions, self.search.runs):
self.exporting(
subj=Subject(self.db, id=subj_id),
sess=Session(self.db, id=sess_id),
run=Run(self.db, id=run_id),
)
def modified(self):
self.unsaved_changes = True
self.setWindowTitle('*' + self.windowTitle())
def do_export(self, checked=None):
subset = {'subjects': [], 'sessions': [], 'runs': []}
run_ids = '(' + ', '.join([str(x['run_id']) for x in self.exports]) + ')'
query = QSqlQuery(self.db['db'])
query.prepare(f"""\
SELECT subjects.id, sessions.id, runs.id FROM runs
JOIN sessions ON sessions.id = runs.session_id
JOIN subjects ON subjects.id = sessions.subject_id
WHERE runs.id IN {run_ids}
""")
if not query.exec():
raise SyntaxError(query.lastError().text())
while query.next():
subset['subjects'].append(query.value(0))
subset['sessions'].append(query.value(1))
subset['runs'].append(query.value(2))
data_path = QFileDialog.getSaveFileName(
self,
"Choose directory where to save the recordings in BIDS format",
'bids_output',
'Folder (*)',
)[0]
if data_path == '':
return
data_path = Path(data_path).resolve()
if data_path.exists():
QMessageBox.warning(
self,
'Folder exists',
'This folder already exists. Make sure you choose a folder name that does not exist',
)
return
create_bids(self.db, data_path, deface=False, subset=subset)
def new_item(self, checked=None, level=None):
if level == 'subjects':
text, ok = QInputDialog.getText(
self,
'Add New Subject',
'Subject Code:',
)
elif level == 'sessions':
current_subject = self.current('subjects')
text, ok = QInputDialog.getItem(
self,
f'Add New Session for {current_subject}',
'Session Name:',
lookup_allowed_values(self.db['db'], 'sessions', 'name'),
0, False)
elif level == 'protocols':
current_subject = self.current('subjects')
text, ok = QInputDialog.getItem(
self,
f'Add New Protocol for {current_subject}',
'Protocol Name:',
lookup_allowed_values(self.db['db'], 'protocols', 'metc'),
0, False)
elif level == 'runs':
current_session = self.current('sessions')
text, ok = QInputDialog.getItem(
self,
f'Add New Run for {current_session.name}',
'Task Name:',
lookup_allowed_values(self.db['db'], 'runs', 'task_name'),
0, False)
elif level == 'recordings':
current_run = self.current('runs')
modalities = lookup_allowed_values(self.db['db'], 'recordings', 'modality')
guess = guess_modality(current_run)
if guess is None or guess not in modalities:
idx = 0
else:
idx = modalities.index(guess)
text, ok = QInputDialog.getItem(
self,
f'Add New Recording for {current_run.task_name}',
'Modality:',
modalities,
idx,
False)
elif level in ('channels', 'electrodes'):
current_recording = self.current('recordings')
if current_recording is None or current_recording.modality not in ('ieeg', 'eeg', 'meg'):
QMessageBox.warning(
self,
f'Cannot add {level}',
'You should first select an "ieeg" / "eeg" / "meg" recording')
return
text, ok = QInputDialog.getText(
self,
f'Add new {level}',
'Name to identify this setup:',
)
if ok and text != '':
if level == 'subjects':
code = text.strip()
Subject.add(self.db, code)
self.list_subjects(code)
elif level == 'sessions':
current_subject.add_session(text)
self.list_sessions_and_protocols(current_subject)
elif level == 'protocols':
current_subject.add_protocol(text)
self.list_sessions_and_protocols(current_subject)
elif level == 'runs':
current_session.add_run(text)
self.list_runs(current_session)
elif level == 'recordings':
current_run.add_recording(text)
self.list_recordings(current_run)
elif level in ('channels', 'electrodes'):
if level in 'channels':
chan = Channels.add(self.db)
chan.name = text
current_recording.attach_channels(chan)
elif level in 'electrodes':
elec = Electrodes.add(self.db)
elec.name = text
current_recording.attach_electrodes(elec)
self.list_recordings(self.current('runs'))
self.list_channels_electrodes(current_recording)
self.list_params()
self.modified()
def edit_subject_codes(self):
subject = self.current('subjects')
text = str(subject)
if text == '(subject without code)':
text = ''
text, ok = QInputDialog.getText(
self,
'Edit Subject Codes',
'Separate each code by a comma (spaces are ignored)',
text=text,
)
if ok and text != '':
text = text.strip(', ')
subject.codes = [x.strip() for x in text.split(',')]
self.list_subjects()
self.modified()
def new_file(self, checked=None, filename=None):
get_new_file = NewFile(self, filename=filename)
result = get_new_file.exec()
if result:
level = get_new_file.level.currentText().lower() + 's'
item = self.current(level)
format = get_new_file.format.currentText()
path = get_new_file.filepath.text()
item.add_file(format, path)
self.list_files()
self.modified()
def edit_file(self, level_obj, file_obj):
get_new_file = NewFile(self, file_obj, level_obj)
result = get_new_file.exec()
if result:
format = get_new_file.format.currentText()
path = get_new_file.filepath.text()
file_obj.path = path
file_obj.format = format
self.list_files()
self.modified()
def calculate_offset(self):
warning_title = 'Cannot calculate offset'
run = self.current('runs')
recordings = run.list_recordings()
if len(recordings) == 0:
QMessageBox.warning(self, warning_title, 'There are no recordings')
return
rec_fixed = recordings[0]
rec_moving = self.current('recordings')
if rec_fixed.id == rec_moving.id:
QMessageBox.warning(self, warning_title, 'This function compares the first recording with the highlighted recording. Please select another recording to compute the offset')
return
file_fixed = find_one_file(rec_fixed, ('blackrock', 'micromed', 'bci2000'))
if file_fixed is None:
QMessageBox.warning(self, warning_title, 'The first recording does not have an ephys file')
return
file_moving = find_one_file(rec_moving, ('blackrock', 'micromed', 'bci2000'))
if file_moving is None:
QMessageBox.warning(self, warning_title, 'The selected recording does not have an ephys file')
return
calcoffset = CalculateOffset(self, file_fixed, file_moving)
result = calcoffset.exec()
if result:
text = calcoffset.offset.text()
if text.endswith(')'):
return
offset = float(text[:-1])
rec_moving.offset = offset
self.list_params()
self.modified()
def edit_electrode_data(self):
elec = self.current('electrodes')
data = elec.data
edit_electrodes = EditElectrodes(self, data)
result = edit_electrodes.exec()
if result:
parameter = edit_electrodes.parameter.currentText()
value = edit_electrodes.value.text()
data[parameter] = array(value).astype(data.dtype[parameter])
elec.data = data
self.show_channels_electrodes(item=elec)
self.modified()
def io_parrec(self):
run = self.current('runs')
recording = self.current('recordings')
success = False
for file in recording.list_files():
if file.format == 'parrec':
add_parrec(file.path, run=run, recording=recording, update=True)
success = True
break
if success:
self.list_recordings(run)
self.list_params()
self.modified()
else:
self.statusBar().showMessage('Cound not find PAR/REC to collect info from')
def io_parrec_sess(self):
sess = self.current('sessions')
par_folder = QFileDialog.getExistingDirectory()
if par_folder == '':
return
list_parrec = list(Path(par_folder).glob('*.PAR'))
progress = QProgressDialog('', 'Cancel', 0, len(list_parrec), self)
progress.setWindowTitle(f'Importing PAR/REC files to "{sess.subject}"/"{sess.name}"')
progress.setMinimumDuration(0)
progress.setWindowModality(Qt.WindowModal)
for i, par_file in enumerate(list_parrec):
progress.setValue(i)
progress.setLabelText(f'Importing {par_file.name}')
QGuiApplication.processEvents()
add_parrec(par_file, sess=sess)
if progress.wasCanceled():
break
progress.setValue(i + 1)
self.list_runs(sess)
self.list_params()
self.modified()
def io_ephys(self):
sess = self.current('sessions')
ephys_file = QFileDialog.getOpenFileName(
self,
"Select File",
None)[0]
if ephys_file == '':
return
add_ephys_to_sess(self.db, sess, Path(ephys_file))
self.list_runs(sess)
self.list_params()
self.modified()
def io_events_only(self):
run = self.current('runs')
recording = self.current('recordings')
ephys_file = find_one_file(recording, ('blackrock', 'micromed', 'bci2000'))
if ephys_file is None:
return
events = read_events_from_ephys(ephys_file, run, recording)
if len(events) > 0:
run.events = events
self.show_events(run)
self.modified()
else:
print('there were no events')
def io_events(self):
run = self.current('runs')
recording = self.current('recordings')
ephys_file = find_one_file(recording, ('blackrock', 'micromed', 'bci2000'))
if ephys_file is None:
return
compare_events = CompareEvents(self, run, ephys_file.path)
result = compare_events.exec()
if result == QDialog.Accepted:
run.start_time = compare_events.info['start_time']
run.duration = compare_events.info['duration']
run.events = compare_events.info['events']
self.list_params()
self.show_events(run)
self.modified()
def io_channels(self):
recording = self.current('recordings')
ephys_file = find_one_file(recording, ('blackrock', 'micromed', 'bci2000'))
if ephys_file is None:
return
chan = create_channels(self.db, ephys_file.path)
if chan is None:
return
chan.name = '(imported)'
recording.attach_channels(chan)
self.modified()
self.list_recordings()
def io_electrodes(self):
mat_file = QFileDialog.getOpenFileName(
self,
"Open File",
None,
"Matlab (*.mat)")[0]
if mat_file == '':
return
rec = self.current('recordings')
chan = rec.channels
chan_data = chan.data
idx = isin(chan_data['type'], ('ECOG', 'SEEG'))
n_chan = idx.sum()
lg.warning(f'# of ECOG/SEEG channels for this recording: {n_chan}')
xyz = import_electrodes(mat_file, n_chan)
if xyz is None:
print('you need to do this manually')
return
elec = Electrodes.add(self.db)
elec_data = elec.empty(n_chan)
elec_data['name'] = chan_data['name'][idx]
elec_data['x'] = xyz[:, 0]
elec_data['y'] = xyz[:, 1]
elec_data['z'] = xyz[:, 2]
elec.data = elec_data
rec.attach_electrodes(elec)
self.modified()
self.list_recordings()
def delete_file(self, level_obj, file_obj):
level_obj.delete_file(file_obj)
self.list_files()
self.modified()
def closeEvent(self, event):
if self.unsaved_changes:
answer = QMessageBox.question(
self,
'Confirm Closing',
'There are unsaved changes. Are you sure you want to exit?',
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No)
if answer == QMessageBox.No:
event.ignore()
return
settings.setValue('window/geometry', self.saveGeometry())
settings.setValue('window/state', self.saveState())
event.accept()
def list_parameters(parent, obj):
db = parent.db
columns = collect_columns(db, obj=obj)
d = {}
for col, t in columns.items():
col_info = db['tables'][t][col]
if not (col_info['index'] is False):
continue
value = getattr(obj, col)
if col_info['type'] == 'QDateTime':
w = make_datetime(value)
w.dateTimeChanged.connect(partial(parent.changed, obj, col))
elif col_info['type'] == 'QDate':
w = make_date(value)
w.dateChanged.connect(partial(parent.changed, obj, col))
elif col_info['type'] == 'double':
w = make_float(value)
w.valueChanged.connect(partial(parent.changed, obj, col))
elif col_info['type'] == 'int':
w = make_integer(value)
w.valueChanged.connect(partial(parent.changed, obj, col))
elif col_info['type'] == 'QString':
if col_info['values']:
values = col_info['values']
if len(values) > 20:
values = sorted(values)
w = make_combobox(value, values)
w.currentTextChanged.connect(partial(parent.changed, obj, col))
else:
w = make_edit(value)
w.editingFinished.connect(partial(parent.changed, obj, col, w))
else:
raise ValueError(f'unknown type "{col_info["type"]}"')
if col_info['doc'] is not None:
w.setToolTip(col_info['doc'])
d[col_info['alias']] = w
return d
def make_edit(value):
w = QLineEdit()
w.insert(value)
return w
def make_integer(value):
w = QSpinBox()
w.setRange(-2e7, 2e7)
if value is None:
w.setValue(0)
palette = QPalette()
palette.setColor(QPalette.Text, Qt.red)
w.setPalette(palette)
else:
w.setValue(value)
return w
def make_float(value):
w = QDoubleSpinBox()
w.setDecimals(3)
w.setRange(-1e8, 1e8)
if value is None:
w.setValue(0)
palette = QPalette()
palette.setColor(QPalette.Text, Qt.red)
w.setPalette(palette)
else:
try:
w.setValue(value)
except TypeError:
print(value)
print(type(value))
return w
def make_combobox(value, possible_values):
w = QComboBox()
values = [NULL_TEXT, ] + possible_values
w.addItems(values)
w.setCurrentText(value)
return w
def make_electrode_combobox(self, elec):
subj = self.lists['subjects'].currentItem().data(Qt.UserRole)
INTENDED = {'Unknown': 0}
for sess in subj.list_sessions():
sess_name = _session_name(sess)
for i, one_run in enumerate(sess.list_runs()):
if one_run.task_name in ('ct_anatomy_scan', 'flair_anatomy_scan', 't1_anatomy_scan', 't2_anatomy_scan', 't2star_anatomy_scan', 'MP2RAGE'):
name = f'#{i + 1: 2d}: {one_run.task_name}'
INTENDED[sess_name + ' / ' + name] = one_run.id
w = QComboBox()
for k in INTENDED:
w.addItem(k)
intendedfor = elec.IntendedFor
if intendedfor is not None:
w.setCurrentIndex(list(INTENDED.values()).index(intendedfor))
w.currentIndexChanged.connect(partial(self.electrode_intendedfor, elec=elec, combobox=list(INTENDED.values())))
return w
def make_date(value):
w = QDateEdit()
w.setCalendarPopup(True)
w.setDisplayFormat('dd/MM/yyyy')
if value is None:
w.setDate(date(1900, 1, 1))
palette = QPalette()
palette.setColor(QPalette.Text, Qt.red)
w.setPalette(palette)
else:
w.setDate(value)
return w
def make_datetime(value):
w = QDateTimeEdit()
w.setCalendarPopup(True)
w.setDisplayFormat('dd/MM/yyyy HH:mm:ss.zzz')
if value is None:
w.setDateTime(datetime(1900, 1, 1, 0, 0, 0))
palette = QPalette()
palette.setColor(QPalette.Text, Qt.red)
w.setPalette(palette)
else:
w.setDateTime(value)
return w
def copy_to_clipboard(text):
clipboard = QGuiApplication.clipboard()
clipboard.setText(text)
def highlight(item):
item.setBackground(Qt.yellow)
font = item.font()
font.setBold(True)
item.setFont(font)
class QListWidgetItem_time(QListWidgetItem):
def __init__(self, obj, title):
self.obj = obj
super().__init__(title)
self.setData(Qt.UserRole, obj)
def __lt__(self, other):
return self.obj.start_time < other.obj.start_time
def _fake_names(X):
"""We cannot have empty channel names, so we use it the MICROMED
convention
"""
for i in range(X['name'].shape[0]):
if X['name'][i] == '':
X['name'][i] = f'el{i + 1}'
return X
Functions
def copy_to_clipboard(text)
-
Expand source code
def copy_to_clipboard(text): clipboard = QGuiApplication.clipboard() clipboard.setText(text)
def highlight(item)
-
Expand source code
def highlight(item): item.setBackground(Qt.yellow) font = item.font() font.setBold(True) item.setFont(font)
def list_parameters(parent, obj)
-
Expand source code
def list_parameters(parent, obj): db = parent.db columns = collect_columns(db, obj=obj) d = {} for col, t in columns.items(): col_info = db['tables'][t][col] if not (col_info['index'] is False): continue value = getattr(obj, col) if col_info['type'] == 'QDateTime': w = make_datetime(value) w.dateTimeChanged.connect(partial(parent.changed, obj, col)) elif col_info['type'] == 'QDate': w = make_date(value) w.dateChanged.connect(partial(parent.changed, obj, col)) elif col_info['type'] == 'double': w = make_float(value) w.valueChanged.connect(partial(parent.changed, obj, col)) elif col_info['type'] == 'int': w = make_integer(value) w.valueChanged.connect(partial(parent.changed, obj, col)) elif col_info['type'] == 'QString': if col_info['values']: values = col_info['values'] if len(values) > 20: values = sorted(values) w = make_combobox(value, values) w.currentTextChanged.connect(partial(parent.changed, obj, col)) else: w = make_edit(value) w.editingFinished.connect(partial(parent.changed, obj, col, w)) else: raise ValueError(f'unknown type "{col_info["type"]}"') if col_info['doc'] is not None: w.setToolTip(col_info['doc']) d[col_info['alias']] = w return d
def make_combobox(value, possible_values)
-
Expand source code
def make_combobox(value, possible_values): w = QComboBox() values = [NULL_TEXT, ] + possible_values w.addItems(values) w.setCurrentText(value) return w
def make_date(value)
-
Expand source code
def make_date(value): w = QDateEdit() w.setCalendarPopup(True) w.setDisplayFormat('dd/MM/yyyy') if value is None: w.setDate(date(1900, 1, 1)) palette = QPalette() palette.setColor(QPalette.Text, Qt.red) w.setPalette(palette) else: w.setDate(value) return w
def make_datetime(value)
-
Expand source code
def make_datetime(value): w = QDateTimeEdit() w.setCalendarPopup(True) w.setDisplayFormat('dd/MM/yyyy HH:mm:ss.zzz') if value is None: w.setDateTime(datetime(1900, 1, 1, 0, 0, 0)) palette = QPalette() palette.setColor(QPalette.Text, Qt.red) w.setPalette(palette) else: w.setDateTime(value) return w
def make_edit(value)
-
Expand source code
def make_edit(value): w = QLineEdit() w.insert(value) return w
def make_electrode_combobox(self, elec)
-
Expand source code
def make_electrode_combobox(self, elec): subj = self.lists['subjects'].currentItem().data(Qt.UserRole) INTENDED = {'Unknown': 0} for sess in subj.list_sessions(): sess_name = _session_name(sess) for i, one_run in enumerate(sess.list_runs()): if one_run.task_name in ('ct_anatomy_scan', 'flair_anatomy_scan', 't1_anatomy_scan', 't2_anatomy_scan', 't2star_anatomy_scan', 'MP2RAGE'): name = f'#{i + 1: 2d}: {one_run.task_name}' INTENDED[sess_name + ' / ' + name] = one_run.id w = QComboBox() for k in INTENDED: w.addItem(k) intendedfor = elec.IntendedFor if intendedfor is not None: w.setCurrentIndex(list(INTENDED.values()).index(intendedfor)) w.currentIndexChanged.connect(partial(self.electrode_intendedfor, elec=elec, combobox=list(INTENDED.values()))) return w
def make_float(value)
-
Expand source code
def make_float(value): w = QDoubleSpinBox() w.setDecimals(3) w.setRange(-1e8, 1e8) if value is None: w.setValue(0) palette = QPalette() palette.setColor(QPalette.Text, Qt.red) w.setPalette(palette) else: try: w.setValue(value) except TypeError: print(value) print(type(value)) return w
def make_integer(value)
-
Expand source code
def make_integer(value): w = QSpinBox() w.setRange(-2e7, 2e7) if value is None: w.setValue(0) palette = QPalette() palette.setColor(QPalette.Text, Qt.red) w.setPalette(palette) else: w.setValue(value) return w
Classes
class Interface (db_name=None, username=None, password=None, hostname='localhost')
-
TODO: disable everything until you load database
Expand source code
class Interface(QMainWindow): """TODO: disable everything until you load database""" db = None test = False unsaved_changes = False def __init__(self, db_name=None, username=None, password=None, hostname='localhost'): super().__init__() create_menubar(self) create_shortcuts(self) lists = {} groups = {} for k in LEVELS + EXTRA_LEVELS: groups[k] = QGroupBox(k.capitalize()) lists[k] = QListWidget() if k in LEVELS: lists[k].currentItemChanged.connect(self.proc_all) elif k in EXTRA_LEVELS: lists[k].currentItemChanged.connect(self.show_channels_electrodes) # right click lists[k].setContextMenuPolicy(Qt.CustomContextMenu) lists[k].customContextMenuRequested.connect(partial(self.rightclick_list, level=k)) layout = QVBoxLayout() layout.addWidget(lists[k]) if k == 'runs': b = QPushButton('Add to export list') b.clicked.connect(self.exporting) layout.addWidget(b) groups[k].setLayout(layout) # PARAMETERS: Widget t_params = QTableWidget() t_params.horizontalHeader().setStretchLastSection(True) t_params.setSelectionBehavior(QAbstractItemView.SelectRows) t_params.setColumnCount(3) t_params.setHorizontalHeaderLabels(['Level', 'Parameter', 'Value']) t_params.verticalHeader().setVisible(False) # EVENTS: Widget self.events_view = QTableView(self) self.events_view.horizontalHeader().setStretchLastSection(True) self.events_view.setAlternatingRowColors(True) self.events_view.setContextMenuPolicy(Qt.CustomContextMenu) self.events_view.customContextMenuRequested.connect(partial(self.rightclick_table, table='events')) # CHANNELS: Widget self.channels_view = QTableView(self) self.channels_view.horizontalHeader().setStretchLastSection(True) self.channels_view.setContextMenuPolicy(Qt.CustomContextMenu) self.channels_view.customContextMenuRequested.connect(partial(self.rightclick_table, table='channels')) # ELECTRODES: Form self.elec_form = QTableWidget() self.elec_form.horizontalHeader().setStretchLastSection(True) self.elec_form.setSelectionBehavior(QAbstractItemView.SelectRows) self.elec_form.setColumnCount(2) self.elec_form.setHorizontalHeaderLabels(['Parameter', 'Value']) self.elec_form.verticalHeader().setVisible(False) # ELECTRODES: Widget self.electrodes_view = QTableView(self) self.electrodes_view.horizontalHeader().setStretchLastSection(True) self.electrodes_view.setContextMenuPolicy(Qt.CustomContextMenu) self.electrodes_view.customContextMenuRequested.connect(partial(self.rightclick_table, table='electrodes')) # FILES: Widget t_files = FilesWidget(self) t_files.horizontalHeader().setStretchLastSection(True) t_files.setSelectionBehavior(QAbstractItemView.SelectRows) t_files.setColumnCount(3) t_files.setHorizontalHeaderLabels(['Level', 'Format', 'File']) t_files.verticalHeader().setVisible(False) # right click t_files.setContextMenuPolicy(Qt.CustomContextMenu) t_files.customContextMenuRequested.connect(self.rightclick_files) # EXPORT: Widget w_export = QWidget() col_export = QVBoxLayout() t_export = QTableWidget() t_export.horizontalHeader().setStretchLastSection(True) t_export.setSelectionBehavior(QAbstractItemView.SelectRows) t_export.setColumnCount(4) t_export.setHorizontalHeaderLabels(['Subject', 'Session', 'Run', 'Start Time']) t_export.verticalHeader().setVisible(False) p_clearexport = QPushButton('Clear list') p_clearexport.clicked.connect(self.clear_export) p_doexport = QPushButton('Export ...') p_doexport.clicked.connect(self.do_export) col_export.addWidget(t_export) col_export.addWidget(p_clearexport) col_export.addWidget(p_doexport) w_export.setLayout(col_export) # session and protocol in the same column col_sessmetc = QVBoxLayout() col_sessmetc.addWidget(groups['sessions']) col_sessmetc.addWidget(groups['protocols']) # recordings, channels and electrodes col_recchanelec = QVBoxLayout() col_recchanelec.addWidget(groups['recordings']) col_recchanelec.addWidget(groups['channels']) col_recchanelec.addWidget(groups['electrodes']) # TOP PANELS layout_top = QHBoxLayout() layout_top.addWidget(groups['subjects']) layout_top.addLayout(col_sessmetc) layout_top.addWidget(groups['runs']) layout_top.addLayout(col_recchanelec) # FULL LAYOUT # central widget central_widget = QWidget() layout = QVBoxLayout() layout.addLayout(layout_top) central_widget.setLayout(layout) self.setCentralWidget(central_widget) self.setCorner(Qt.TopLeftCorner, Qt.LeftDockWidgetArea) self.setCorner(Qt.TopRightCorner, Qt.RightDockWidgetArea) self.setCorner(Qt.BottomLeftCorner, Qt.LeftDockWidgetArea) self.setCorner(Qt.BottomRightCorner, Qt.RightDockWidgetArea) # parameters dockwidget = QDockWidget('Parameters', self) dockwidget.setWidget(t_params) dockwidget.setAllowedAreas(Qt.RightDockWidgetArea | Qt.LeftDockWidgetArea) dockwidget.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable) dockwidget.setObjectName('dock_parameters') # savestate self.addDockWidget(Qt.RightDockWidgetArea, dockwidget) # events dockwidget = QDockWidget('Events', self) dockwidget.setWidget(self.events_view) dockwidget.setAllowedAreas(Qt.RightDockWidgetArea | Qt.LeftDockWidgetArea) dockwidget.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable) dockwidget.setObjectName('dock_events') # savestate self.addDockWidget(Qt.RightDockWidgetArea, dockwidget) # channels dockwidget = QDockWidget('Channels', self) dockwidget.setWidget(self.channels_view) dockwidget.setAllowedAreas(Qt.RightDockWidgetArea | Qt.LeftDockWidgetArea) dockwidget.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable) dockwidget.setObjectName('dock_channels') # savestate self.addDockWidget(Qt.RightDockWidgetArea, dockwidget) # electrodes dockwidget = QDockWidget('Electrodes', self) temp_widget = QWidget() # you need extra widget to set layout in qdockwidget elec_layout = QVBoxLayout(temp_widget) elec_layout.addWidget(self.elec_form) elec_layout.addWidget(self.electrodes_view, stretch=1) dockwidget.setWidget(temp_widget) dockwidget.setAllowedAreas(Qt.RightDockWidgetArea | Qt.LeftDockWidgetArea) dockwidget.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable) dockwidget.setObjectName('dock_electrodes') # savestate self.addDockWidget(Qt.RightDockWidgetArea, dockwidget) # files dockwidget = QDockWidget('Files', self) dockwidget.setWidget(t_files) dockwidget.setAllowedAreas(Qt.TopDockWidgetArea | Qt.BottomDockWidgetArea) dockwidget.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable) dockwidget.setObjectName('dock_files') # savestate self.addDockWidget(Qt.BottomDockWidgetArea, dockwidget) # export dockwidget = QDockWidget('Export', self) dockwidget.setWidget(w_export) dockwidget.setAllowedAreas(Qt.RightDockWidgetArea | Qt.LeftDockWidgetArea) dockwidget.setFeatures(QDockWidget.DockWidgetMovable | QDockWidget.DockWidgetFloatable) dockwidget.setObjectName('dock_export') # savestate self.addDockWidget(Qt.RightDockWidgetArea, dockwidget) # restore geometry window_geometry = settings.value('window/geometry') if window_geometry is not None: self.restoreGeometry(window_geometry) window_state = settings.value('window/state') if window_state is not None: self.restoreState(window_state) # SAVE THESE ITEMS self.groups = groups self.lists = lists self.t_params = t_params self.t_files = t_files self.t_export = t_export self.exports = [] self.search = Search() self.statusBar() self.show() if db_name is None: DB_ARGS = parse_accessdatabase(self) if DB_ARGS is not None: self.sql_access(**DB_ARGS) else: self.sql_access(db_name, username, password, hostname=hostname) def sql_access(self, db_name=None, username=None, password=None, hostname=None): """This is where you access the database """ self.db = access_database(db_name, username, hostname, password) self.db['db'].transaction() self.events_model = EventsModel(self.db) self.events_view.setModel(self.events_model) self.events_view.hideColumn(0) self.channels_model = QSqlTableModel(self, self.db['db']) self.channels_model.setTable('channels') self.channels_view.setModel(self.channels_model) self.channels_view.hideColumn(0) self.electrodes_model = QSqlTableModel(self, self.db['db']) self.electrodes_model.setTable('electrodes') self.electrodes_view.setModel(self.electrodes_model) self.electrodes_view.hideColumn(0) self.list_subjects() def sql_commit(self): self.db['db'].commit() self.setWindowTitle('') self.unsaved_changes = False self.db['db'].transaction() def sql_rollback(self): self.db['db'].rollback() self.unsaved_changes = False self.setWindowTitle('') self.db['db'].transaction() self.list_subjects() def list_subjects(self, code_to_select=None): """ code_to_select : str code of the subject to select """ for line in self.lists.values(): line.clear() to_select = None if self.subjsort.isChecked(): args = { 'alphabetical': True, 'reverse': False, } else: args = { 'alphabetical': False, 'reverse': True, } for subj in list_subjects(self.db, **args): item = QListWidgetItem(str(subj)) if subj.id in self.search.subjects: highlight(item) item.setData(Qt.UserRole, subj) self.lists['subjects'].addItem(item) if code_to_select is not None and code_to_select == str(subj): to_select = item if to_select is None: # select first one to_select = item self.lists['subjects'].setCurrentItem(to_select) @pyqtSlot(QListWidgetItem, QListWidgetItem) def proc_all(self, current=None, previous=None, item=None): """GUI calls current and previous. You can call item""" self.list_channels_electrodes() if item is None: # when clicking on a previously selected list, it sends a signal where current is None, but I don't understand why if current is None: return item = current.data(Qt.UserRole) if item.t == 'subject': self.list_sessions_and_protocols(item) elif item.t == 'session': self.list_runs(item) elif item.t == 'protocol': pass elif item.t == 'run': self.list_recordings(item) self.show_events(item) elif item.t == 'recording': self.list_channels_electrodes(item) self.list_params() self.list_files() def list_sessions_and_protocols(self, subj=None): if subj is None or subj is False: subj = self.current('subjects') for level, l in self.lists.items(): if level in ('subjects', ): continue l.clear() for sess in subj.list_sessions(): item = QListWidgetItem_time(sess, _session_name(sess)) if sess.id in self.search.sessions: highlight(item) self.lists['sessions'].addItem(item) self.lists['sessions'].setCurrentRow(0) for protocol in subj.list_protocols(): item = QListWidgetItem(_protocol_name(protocol)) item.setData(Qt.UserRole, protocol) self.lists['protocols'].addItem(item) self.lists['protocols'].setCurrentRow(0) def list_runs(self, sess=None): if sess is None or sess is False: sess = self.current('sessions') for level, l in self.lists.items(): if level in ('subjects', 'sessions', 'protocols'): continue l.clear() for i, run in enumerate(sess.list_runs()): item = QListWidgetItem_time(run, f'#{i + 1: 3d}: {run.task_name}') if run.id in self.search.runs: highlight(item) self.lists['runs'].addItem(item) self.lists['runs'].setCurrentRow(0) def list_recordings(self, run=None): if run is None or run is False: run = self.current('runs') for level, l in self.lists.items(): if level in ('subjects', 'sessions', 'protocols', 'runs'): continue l.clear() for recording in run.list_recordings(): item = QListWidgetItem(recording.modality) item.setData(Qt.UserRole, recording) if recording.id in self.search.recordings: highlight(item) self.lists['recordings'].addItem(item) self.lists['recordings'].setCurrentRow(0) def list_channels_electrodes(self, recording=None): for level, l in self.lists.items(): if level in ('channels', 'electrodes'): l.clear() self.channels_model.setFilter('channel_group_id = 0') self.channels_model.select() self.channels_view.setEnabled(False) self.electrodes_model.setFilter('electrode_group_id = 0') self.electrodes_model.select() self.electrodes_view.setEnabled(False) self.elec_form.clearContents() if recording is None: return sess = self.current('sessions') if recording.modality in ('ieeg', 'eeg', 'meg'): for chan in sess.list_channels(): item = QListWidgetItem(_name(chan.name)) item.setData(Qt.UserRole, chan) self.lists['channels'].addItem(item) for elec in sess.list_electrodes(): item = QListWidgetItem(_name(elec.name)) item.setData(Qt.UserRole, elec) self.lists['electrodes'].addItem(item) def statusbar_selected(self): statusbar = [] for k, v in self.lists.items(): item = v.currentItem() if item is None: continue obj = item.data(Qt.UserRole) statusbar.append(repr(obj)) self.statusBar().showMessage('\t'.join(statusbar)) def list_params(self): self.statusbar_selected() self.t_params.blockSignals(True) self.t_params.clearContents() all_params = [] for k, v in self.lists.items(): item = v.currentItem() if item is None: continue obj = item.data(Qt.UserRole) parameters = {} parameters.update(list_parameters(self, obj)) if k == 'runs': w = Popup_Experimenters(obj, self) parameters.update({'Experimenters': w}) w = Popup_Protocols(obj, self) parameters.update({'Protocols': w}) if obj.task_name == 'top_up': w = Popup_IntendedFor(obj, self) parameters.update({'Intended For': w}) elif k == 'recordings': if obj.modality in ('ieeg', 'eeg', 'meg'): parameters.update(list_parameters(self, obj)) sess = self.current('sessions') w = QComboBox() # add callback here w.addItem('(undefined channels)', None) for chan in sess.list_channels(): w.addItem(_name(chan.name), chan) channels = obj.channels if channels is None: w.setCurrentText('') else: w.setCurrentText(_name(channels.name)) w.activated.connect(partial(self.combo_chanelec, widget=w)) parameters.update({'Channels': w}) w = QComboBox() w.addItem('(undefined electrodes)', None) for elec in sess.list_electrodes(): w.addItem(_name(elec.name), elec) electrodes = obj.electrodes if electrodes is None: w.setCurrentText('') else: w.setCurrentText(_name(electrodes.name)) w.activated.connect(partial(self.combo_chanelec, widget=w)) parameters.update({'Electrodes': w}) for p_k, p_v in parameters.items(): all_params.append({ 'level': self.groups[k].title(), 'parameter': p_k, 'value': p_v, }) self.t_params.setRowCount(len(all_params)) for i, val in enumerate(all_params): item = QTableWidgetItem(val['level']) item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled) item.setBackground(QBrush(QColor('lightGray'))) self.t_params.setItem(i, 0, item) item = QTableWidgetItem(val['parameter']) item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled) self.t_params.setItem(i, 1, item) item = QTableWidgetItem(str(val['value'])) self.t_params.setCellWidget(i, 2, val['value']) self.t_params.blockSignals(False) def combo_chanelec(self, i, widget): data = widget.currentData() recording = self.current('recordings') if data is None: if widget.currentText() == '(undefined channels)': recording.detach_channels() else: recording.detach_electrodes() elif data.t == 'channel_group': recording.attach_channels(data) elif data.t == 'electrode_group': recording.attach_electrodes(data) def electrode_intendedfor(self, index, elec, combobox): if index == 0: elec.IntendedFor = None else: elec.IntendedFor = combobox[index] self.modified() def current(self, level): item = self.lists[level].currentItem() if item is not None: return item.data(Qt.UserRole) def list_files(self): self.t_files.blockSignals(True) self.t_files.clearContents() all_files = [] for k, v in self.lists.items(): item = v.currentItem() if item is None: continue obj = item.data(Qt.UserRole) for file in obj.list_files(): all_files.append({ 'level': self.groups[k].title(), 'format': file.format, 'path': file.path, 'obj': [obj, file], }) self.t_files.setRowCount(len(all_files)) for i, val in enumerate(all_files): item = QTableWidgetItem(val['level']) item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled) item.setBackground(QBrush(QColor('lightGray'))) item.setData(Qt.UserRole, val['obj']) self.t_files.setItem(i, 0, item) item = QTableWidgetItem(val['format']) item.setData(Qt.UserRole, val['obj']) self.t_files.setItem(i, 1, item) item = QTableWidgetItem(str(val['path'])) try: path_exists = val['path'].exists() except PermissionError as err: lg.warning(err) item.setForeground(QBrush(QColor('orange'))) else: if not path_exists: item.setForeground(QBrush(QColor('red'))) item.setData(Qt.UserRole, val['obj']) self.t_files.setItem(i, 2, item) self.t_files.blockSignals(False) def changed(self, obj, column, x): if isinstance(x, QDate): x = x.toPyDate() elif isinstance(x, QDateTime): x = x.toPyDateTime() else: if isinstance(x, QLineEdit): x = x.text() if x == NULL_TEXT: x = None else: x = f'{x}' setattr(obj, column, x) self.modified() def show_events(self, item): self.events_model.update(item.events) def compare_events_with_file(self): run = self.current('runs') rec = self.current('recordings') file = find_one_file(rec, ('blackrock', 'micromed', 'bci2000')) if file is None: print('Could not find electrophysiology file') return self.events_model.compare(run, rec, file) @pyqtSlot(QListWidgetItem, QListWidgetItem) def show_channels_electrodes(self, current=None, previous=None, item=None): if current is not None: item = current.data(Qt.UserRole) if item is None: return self.statusbar_selected() if item.t == 'channel_group': self.channels_view.setEnabled(True) self.channels_model.setFilter(f'channel_group_id = {item.id}') self.channels_model.select() elif item.t == 'electrode_group': self.elec_form.blockSignals(True) parameters = list_parameters(self, item) parameters['Intended For'] = make_electrode_combobox(self, item) self.elec_form.setRowCount(len(parameters)) for i, kv in enumerate(parameters.items()): k, v = kv table_item = QTableWidgetItem(k) self.elec_form.setItem(i, 0, table_item) self.elec_form.setCellWidget(i, 1, v) self.elec_form.blockSignals(False) self.electrodes_view.setEnabled(True) self.electrodes_model.setFilter(f'electrode_group_id = {item.id}') self.electrodes_model.select() def exporting(self, checked=None, subj=None, sess=None, run=None): if subj is None: subj = self.lists['subjects'].currentItem().data(Qt.UserRole) sess = self.lists['sessions'].currentItem().data(Qt.UserRole) run = self.lists['runs'].currentItem().data(Qt.UserRole) d = {} d['subjects'] = str(subj) if sess.name == 'MRI': d['sessions'] = f'{sess.name} ({sess.MagneticFieldStrength})' else: d['sessions'] = sess.name d['run_id'] = run.id d['runs'] = f'{run.task_name}' d['start_time'] = f'{run.start_time:%d %b %Y %H:%M:%S}' self.exports.append(d) self.list_exports() def clear_export(self): self.exports = [] self.list_exports() def list_exports(self): self.t_export.clearContents() n_exports = len(self.exports) self.t_export.setRowCount(n_exports) for i, l in enumerate(self.exports): item = QTableWidgetItem(l['subjects']) self.t_export.setItem(i, 0, item) item = QTableWidgetItem(l['sessions']) self.t_export.setItem(i, 1, item) item = QTableWidgetItem(l['runs']) self.t_export.setItem(i, 2, item) item = QTableWidgetItem(l['start_time']) self.t_export.setItem(i, 3, item) def rightclick_table(self, pos, table=None): if table == 'events': view = self.events_view elif table == 'channels': view = self.channels_view elif table == 'electrodes': view = self.electrodes_view menu = QMenu(self) action = QAction(f'Import {table} from tsv ...', self) action.triggered.connect(lambda x: self.tsv_import(table=table)) menu.addAction(action) action = QAction(f'Export {table} to tsv ...', self) action.triggered.connect(lambda x: self.tsv_export(table=table)) menu.addAction(action) menu.popup(view.mapToGlobal(pos)) def tsv_import(self, table): tsv_file = QFileDialog.getOpenFileName( self, f"Import {table} from file", None, "Tab-separated values (*.tsv)")[0] if tsv_file == '': return if table == 'events': run = self.current('runs') X = run.events else: current = self.current(table) X = current.data X = load_tsv(Path(tsv_file), X.dtype) if table == 'events': run.events = X self.show_events(run) else: current.data = _fake_names(X) recording = self.current('recordings') self.list_channels_electrodes(recording=recording) self.modified() def tsv_export(self, table): tsv_file = QFileDialog.getSaveFileName( self, f"Save {table} to file", None, "Tab-separated values (*.tsv)")[0] if tsv_file == '': return if table == 'events': run = self.current('runs') X = run.events else: current = self.current(table) X = current.data save_tsv(Path(tsv_file), X) def rightclick_list(self, pos, level=None): item = self.lists[level].itemAt(pos) menu = QMenu(self) if item is None: action = QAction(f'Add {level}', self) action.triggered.connect(lambda x: self.new_item(level=level)) menu.addAction(action) else: obj = item.data(Qt.UserRole) if obj.t in ('channel_group', 'electrode_group'): action_rename = QAction('Rename', self) action_rename.triggered.connect(lambda x: self.rename_item(obj)) menu.addAction(action_rename) action_delete = QAction('Delete', self) action_delete.triggered.connect(lambda x: self.delete_item(obj)) menu.addAction(action_delete) menu.popup(self.lists[level].mapToGlobal(pos)) def rename_item(self, item): text, ok = QInputDialog.getText( self, f'Rename {item.t.split("_")[0]}', 'New title:', ) if ok and text != '': item.name = text def delete_item(self, item): item.delete() if item.t == 'subject': self.list_subjects() elif item.t == 'session': self.list_sessions_and_protocols(item.subject) elif item.t == 'protocol': self.list_sessions_and_protocols(item.subject) elif item.t == 'run': self.list_runs(item.session) elif item.t == 'recording': self.list_recordings(item.run) self.list_params() self.list_files() def rightclick_files(self, pos): item = self.t_files.itemAt(pos) if item is None: menu = QMenu(self) action = QAction('Add File', self) action.triggered.connect(lambda x: self.new_file(self)) menu.addAction(action) menu.popup(self.t_files.mapToGlobal(pos)) else: level_obj, file_obj = item.data(Qt.UserRole) file_path = file_obj.path.resolve() url_directory = QUrl.fromLocalFile(str(file_path.parent)) action_edit = QAction('Edit File', self) action_edit.triggered.connect(lambda x: self.edit_file(level_obj, file_obj)) action_copy = QAction('Copy Path to File', self) action_copy.triggered.connect(lambda x: copy_to_clipboard(str(file_obj.path))) action_openfile = QAction('Open File', self) action_openfile.triggered.connect(lambda x: self.open_file(file_path)) action_opendirectory = QAction('Open Containing Folder', self) action_opendirectory.triggered.connect(lambda x: QDesktopServices.openUrl(url_directory)) action_delete = QAction('Delete', self) action_delete.triggered.connect(lambda x: self.delete_file(level_obj, file_obj)) menu = QMenu('File Information', self) menu.addAction(action_edit) menu.addAction(action_copy) menu.addAction(action_openfile) menu.addAction(action_opendirectory) menu.addSeparator() menu.addAction(action_delete) menu.popup(self.t_files.mapToGlobal(pos)) def open_file(self, file_path): if file_path.suffix.lower() == '.par': print(f'converting {file_path}') file_path = convert_parrec_nibabel(file_path)[0] print(f'converted to {file_path}') url_file = QUrl.fromLocalFile(str(file_path)) QDesktopServices.openUrl(url_file) def sql_search(self): text, ok = QInputDialog.getText( self, 'Search the database', 'WHERE statement' + ' ' * 200, QLineEdit.Normal, self.search.previous, ) if ok and text != '': self.search.where(self.db, text) self.list_subjects() def sql_search_clear(self): self.search.clear() self.list_subjects() def add_search_results_to_export(self): for subj_id, sess_id, run_id in zip(self.search.subjects, self.search.sessions, self.search.runs): self.exporting( subj=Subject(self.db, id=subj_id), sess=Session(self.db, id=sess_id), run=Run(self.db, id=run_id), ) def modified(self): self.unsaved_changes = True self.setWindowTitle('*' + self.windowTitle()) def do_export(self, checked=None): subset = {'subjects': [], 'sessions': [], 'runs': []} run_ids = '(' + ', '.join([str(x['run_id']) for x in self.exports]) + ')' query = QSqlQuery(self.db['db']) query.prepare(f"""\ SELECT subjects.id, sessions.id, runs.id FROM runs JOIN sessions ON sessions.id = runs.session_id JOIN subjects ON subjects.id = sessions.subject_id WHERE runs.id IN {run_ids} """) if not query.exec(): raise SyntaxError(query.lastError().text()) while query.next(): subset['subjects'].append(query.value(0)) subset['sessions'].append(query.value(1)) subset['runs'].append(query.value(2)) data_path = QFileDialog.getSaveFileName( self, "Choose directory where to save the recordings in BIDS format", 'bids_output', 'Folder (*)', )[0] if data_path == '': return data_path = Path(data_path).resolve() if data_path.exists(): QMessageBox.warning( self, 'Folder exists', 'This folder already exists. Make sure you choose a folder name that does not exist', ) return create_bids(self.db, data_path, deface=False, subset=subset) def new_item(self, checked=None, level=None): if level == 'subjects': text, ok = QInputDialog.getText( self, 'Add New Subject', 'Subject Code:', ) elif level == 'sessions': current_subject = self.current('subjects') text, ok = QInputDialog.getItem( self, f'Add New Session for {current_subject}', 'Session Name:', lookup_allowed_values(self.db['db'], 'sessions', 'name'), 0, False) elif level == 'protocols': current_subject = self.current('subjects') text, ok = QInputDialog.getItem( self, f'Add New Protocol for {current_subject}', 'Protocol Name:', lookup_allowed_values(self.db['db'], 'protocols', 'metc'), 0, False) elif level == 'runs': current_session = self.current('sessions') text, ok = QInputDialog.getItem( self, f'Add New Run for {current_session.name}', 'Task Name:', lookup_allowed_values(self.db['db'], 'runs', 'task_name'), 0, False) elif level == 'recordings': current_run = self.current('runs') modalities = lookup_allowed_values(self.db['db'], 'recordings', 'modality') guess = guess_modality(current_run) if guess is None or guess not in modalities: idx = 0 else: idx = modalities.index(guess) text, ok = QInputDialog.getItem( self, f'Add New Recording for {current_run.task_name}', 'Modality:', modalities, idx, False) elif level in ('channels', 'electrodes'): current_recording = self.current('recordings') if current_recording is None or current_recording.modality not in ('ieeg', 'eeg', 'meg'): QMessageBox.warning( self, f'Cannot add {level}', 'You should first select an "ieeg" / "eeg" / "meg" recording') return text, ok = QInputDialog.getText( self, f'Add new {level}', 'Name to identify this setup:', ) if ok and text != '': if level == 'subjects': code = text.strip() Subject.add(self.db, code) self.list_subjects(code) elif level == 'sessions': current_subject.add_session(text) self.list_sessions_and_protocols(current_subject) elif level == 'protocols': current_subject.add_protocol(text) self.list_sessions_and_protocols(current_subject) elif level == 'runs': current_session.add_run(text) self.list_runs(current_session) elif level == 'recordings': current_run.add_recording(text) self.list_recordings(current_run) elif level in ('channels', 'electrodes'): if level in 'channels': chan = Channels.add(self.db) chan.name = text current_recording.attach_channels(chan) elif level in 'electrodes': elec = Electrodes.add(self.db) elec.name = text current_recording.attach_electrodes(elec) self.list_recordings(self.current('runs')) self.list_channels_electrodes(current_recording) self.list_params() self.modified() def edit_subject_codes(self): subject = self.current('subjects') text = str(subject) if text == '(subject without code)': text = '' text, ok = QInputDialog.getText( self, 'Edit Subject Codes', 'Separate each code by a comma (spaces are ignored)', text=text, ) if ok and text != '': text = text.strip(', ') subject.codes = [x.strip() for x in text.split(',')] self.list_subjects() self.modified() def new_file(self, checked=None, filename=None): get_new_file = NewFile(self, filename=filename) result = get_new_file.exec() if result: level = get_new_file.level.currentText().lower() + 's' item = self.current(level) format = get_new_file.format.currentText() path = get_new_file.filepath.text() item.add_file(format, path) self.list_files() self.modified() def edit_file(self, level_obj, file_obj): get_new_file = NewFile(self, file_obj, level_obj) result = get_new_file.exec() if result: format = get_new_file.format.currentText() path = get_new_file.filepath.text() file_obj.path = path file_obj.format = format self.list_files() self.modified() def calculate_offset(self): warning_title = 'Cannot calculate offset' run = self.current('runs') recordings = run.list_recordings() if len(recordings) == 0: QMessageBox.warning(self, warning_title, 'There are no recordings') return rec_fixed = recordings[0] rec_moving = self.current('recordings') if rec_fixed.id == rec_moving.id: QMessageBox.warning(self, warning_title, 'This function compares the first recording with the highlighted recording. Please select another recording to compute the offset') return file_fixed = find_one_file(rec_fixed, ('blackrock', 'micromed', 'bci2000')) if file_fixed is None: QMessageBox.warning(self, warning_title, 'The first recording does not have an ephys file') return file_moving = find_one_file(rec_moving, ('blackrock', 'micromed', 'bci2000')) if file_moving is None: QMessageBox.warning(self, warning_title, 'The selected recording does not have an ephys file') return calcoffset = CalculateOffset(self, file_fixed, file_moving) result = calcoffset.exec() if result: text = calcoffset.offset.text() if text.endswith(')'): return offset = float(text[:-1]) rec_moving.offset = offset self.list_params() self.modified() def edit_electrode_data(self): elec = self.current('electrodes') data = elec.data edit_electrodes = EditElectrodes(self, data) result = edit_electrodes.exec() if result: parameter = edit_electrodes.parameter.currentText() value = edit_electrodes.value.text() data[parameter] = array(value).astype(data.dtype[parameter]) elec.data = data self.show_channels_electrodes(item=elec) self.modified() def io_parrec(self): run = self.current('runs') recording = self.current('recordings') success = False for file in recording.list_files(): if file.format == 'parrec': add_parrec(file.path, run=run, recording=recording, update=True) success = True break if success: self.list_recordings(run) self.list_params() self.modified() else: self.statusBar().showMessage('Cound not find PAR/REC to collect info from') def io_parrec_sess(self): sess = self.current('sessions') par_folder = QFileDialog.getExistingDirectory() if par_folder == '': return list_parrec = list(Path(par_folder).glob('*.PAR')) progress = QProgressDialog('', 'Cancel', 0, len(list_parrec), self) progress.setWindowTitle(f'Importing PAR/REC files to "{sess.subject}"/"{sess.name}"') progress.setMinimumDuration(0) progress.setWindowModality(Qt.WindowModal) for i, par_file in enumerate(list_parrec): progress.setValue(i) progress.setLabelText(f'Importing {par_file.name}') QGuiApplication.processEvents() add_parrec(par_file, sess=sess) if progress.wasCanceled(): break progress.setValue(i + 1) self.list_runs(sess) self.list_params() self.modified() def io_ephys(self): sess = self.current('sessions') ephys_file = QFileDialog.getOpenFileName( self, "Select File", None)[0] if ephys_file == '': return add_ephys_to_sess(self.db, sess, Path(ephys_file)) self.list_runs(sess) self.list_params() self.modified() def io_events_only(self): run = self.current('runs') recording = self.current('recordings') ephys_file = find_one_file(recording, ('blackrock', 'micromed', 'bci2000')) if ephys_file is None: return events = read_events_from_ephys(ephys_file, run, recording) if len(events) > 0: run.events = events self.show_events(run) self.modified() else: print('there were no events') def io_events(self): run = self.current('runs') recording = self.current('recordings') ephys_file = find_one_file(recording, ('blackrock', 'micromed', 'bci2000')) if ephys_file is None: return compare_events = CompareEvents(self, run, ephys_file.path) result = compare_events.exec() if result == QDialog.Accepted: run.start_time = compare_events.info['start_time'] run.duration = compare_events.info['duration'] run.events = compare_events.info['events'] self.list_params() self.show_events(run) self.modified() def io_channels(self): recording = self.current('recordings') ephys_file = find_one_file(recording, ('blackrock', 'micromed', 'bci2000')) if ephys_file is None: return chan = create_channels(self.db, ephys_file.path) if chan is None: return chan.name = '(imported)' recording.attach_channels(chan) self.modified() self.list_recordings() def io_electrodes(self): mat_file = QFileDialog.getOpenFileName( self, "Open File", None, "Matlab (*.mat)")[0] if mat_file == '': return rec = self.current('recordings') chan = rec.channels chan_data = chan.data idx = isin(chan_data['type'], ('ECOG', 'SEEG')) n_chan = idx.sum() lg.warning(f'# of ECOG/SEEG channels for this recording: {n_chan}') xyz = import_electrodes(mat_file, n_chan) if xyz is None: print('you need to do this manually') return elec = Electrodes.add(self.db) elec_data = elec.empty(n_chan) elec_data['name'] = chan_data['name'][idx] elec_data['x'] = xyz[:, 0] elec_data['y'] = xyz[:, 1] elec_data['z'] = xyz[:, 2] elec.data = elec_data rec.attach_electrodes(elec) self.modified() self.list_recordings() def delete_file(self, level_obj, file_obj): level_obj.delete_file(file_obj) self.list_files() self.modified() def closeEvent(self, event): if self.unsaved_changes: answer = QMessageBox.question( self, 'Confirm Closing', 'There are unsaved changes. Are you sure you want to exit?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if answer == QMessageBox.No: event.ignore() return settings.setValue('window/geometry', self.saveGeometry()) settings.setValue('window/state', self.saveState()) event.accept()
Ancestors
- PyQt5.QtWidgets.QMainWindow
- PyQt5.QtWidgets.QWidget
- PyQt5.QtCore.QObject
- sip.wrapper
- PyQt5.QtGui.QPaintDevice
- sip.simplewrapper
Class variables
var db
var test
var unsaved_changes
Methods
def add_search_results_to_export(self)
-
Expand source code
def add_search_results_to_export(self): for subj_id, sess_id, run_id in zip(self.search.subjects, self.search.sessions, self.search.runs): self.exporting( subj=Subject(self.db, id=subj_id), sess=Session(self.db, id=sess_id), run=Run(self.db, id=run_id), )
def calculate_offset(self)
-
Expand source code
def calculate_offset(self): warning_title = 'Cannot calculate offset' run = self.current('runs') recordings = run.list_recordings() if len(recordings) == 0: QMessageBox.warning(self, warning_title, 'There are no recordings') return rec_fixed = recordings[0] rec_moving = self.current('recordings') if rec_fixed.id == rec_moving.id: QMessageBox.warning(self, warning_title, 'This function compares the first recording with the highlighted recording. Please select another recording to compute the offset') return file_fixed = find_one_file(rec_fixed, ('blackrock', 'micromed', 'bci2000')) if file_fixed is None: QMessageBox.warning(self, warning_title, 'The first recording does not have an ephys file') return file_moving = find_one_file(rec_moving, ('blackrock', 'micromed', 'bci2000')) if file_moving is None: QMessageBox.warning(self, warning_title, 'The selected recording does not have an ephys file') return calcoffset = CalculateOffset(self, file_fixed, file_moving) result = calcoffset.exec() if result: text = calcoffset.offset.text() if text.endswith(')'): return offset = float(text[:-1]) rec_moving.offset = offset self.list_params() self.modified()
def changed(self, obj, column, x)
-
Expand source code
def changed(self, obj, column, x): if isinstance(x, QDate): x = x.toPyDate() elif isinstance(x, QDateTime): x = x.toPyDateTime() else: if isinstance(x, QLineEdit): x = x.text() if x == NULL_TEXT: x = None else: x = f'{x}' setattr(obj, column, x) self.modified()
def clear_export(self)
-
Expand source code
def clear_export(self): self.exports = [] self.list_exports()
def closeEvent(self, event)
-
closeEvent(self, QCloseEvent)
Expand source code
def closeEvent(self, event): if self.unsaved_changes: answer = QMessageBox.question( self, 'Confirm Closing', 'There are unsaved changes. Are you sure you want to exit?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if answer == QMessageBox.No: event.ignore() return settings.setValue('window/geometry', self.saveGeometry()) settings.setValue('window/state', self.saveState()) event.accept()
def combo_chanelec(self, i, widget)
-
Expand source code
def combo_chanelec(self, i, widget): data = widget.currentData() recording = self.current('recordings') if data is None: if widget.currentText() == '(undefined channels)': recording.detach_channels() else: recording.detach_electrodes() elif data.t == 'channel_group': recording.attach_channels(data) elif data.t == 'electrode_group': recording.attach_electrodes(data)
def compare_events_with_file(self)
-
Expand source code
def compare_events_with_file(self): run = self.current('runs') rec = self.current('recordings') file = find_one_file(rec, ('blackrock', 'micromed', 'bci2000')) if file is None: print('Could not find electrophysiology file') return self.events_model.compare(run, rec, file)
def current(self, level)
-
Expand source code
def current(self, level): item = self.lists[level].currentItem() if item is not None: return item.data(Qt.UserRole)
def delete_file(self, level_obj, file_obj)
-
Expand source code
def delete_file(self, level_obj, file_obj): level_obj.delete_file(file_obj) self.list_files() self.modified()
def delete_item(self, item)
-
Expand source code
def delete_item(self, item): item.delete() if item.t == 'subject': self.list_subjects() elif item.t == 'session': self.list_sessions_and_protocols(item.subject) elif item.t == 'protocol': self.list_sessions_and_protocols(item.subject) elif item.t == 'run': self.list_runs(item.session) elif item.t == 'recording': self.list_recordings(item.run) self.list_params() self.list_files()
def do_export(self, checked=None)
-
Expand source code
def do_export(self, checked=None): subset = {'subjects': [], 'sessions': [], 'runs': []} run_ids = '(' + ', '.join([str(x['run_id']) for x in self.exports]) + ')' query = QSqlQuery(self.db['db']) query.prepare(f"""\ SELECT subjects.id, sessions.id, runs.id FROM runs JOIN sessions ON sessions.id = runs.session_id JOIN subjects ON subjects.id = sessions.subject_id WHERE runs.id IN {run_ids} """) if not query.exec(): raise SyntaxError(query.lastError().text()) while query.next(): subset['subjects'].append(query.value(0)) subset['sessions'].append(query.value(1)) subset['runs'].append(query.value(2)) data_path = QFileDialog.getSaveFileName( self, "Choose directory where to save the recordings in BIDS format", 'bids_output', 'Folder (*)', )[0] if data_path == '': return data_path = Path(data_path).resolve() if data_path.exists(): QMessageBox.warning( self, 'Folder exists', 'This folder already exists. Make sure you choose a folder name that does not exist', ) return create_bids(self.db, data_path, deface=False, subset=subset)
def edit_electrode_data(self)
-
Expand source code
def edit_electrode_data(self): elec = self.current('electrodes') data = elec.data edit_electrodes = EditElectrodes(self, data) result = edit_electrodes.exec() if result: parameter = edit_electrodes.parameter.currentText() value = edit_electrodes.value.text() data[parameter] = array(value).astype(data.dtype[parameter]) elec.data = data self.show_channels_electrodes(item=elec) self.modified()
def edit_file(self, level_obj, file_obj)
-
Expand source code
def edit_file(self, level_obj, file_obj): get_new_file = NewFile(self, file_obj, level_obj) result = get_new_file.exec() if result: format = get_new_file.format.currentText() path = get_new_file.filepath.text() file_obj.path = path file_obj.format = format self.list_files() self.modified()
def edit_subject_codes(self)
-
Expand source code
def edit_subject_codes(self): subject = self.current('subjects') text = str(subject) if text == '(subject without code)': text = '' text, ok = QInputDialog.getText( self, 'Edit Subject Codes', 'Separate each code by a comma (spaces are ignored)', text=text, ) if ok and text != '': text = text.strip(', ') subject.codes = [x.strip() for x in text.split(',')] self.list_subjects() self.modified()
def electrode_intendedfor(self, index, elec, combobox)
-
Expand source code
def electrode_intendedfor(self, index, elec, combobox): if index == 0: elec.IntendedFor = None else: elec.IntendedFor = combobox[index] self.modified()
def exporting(self, checked=None, subj=None, sess=None, run=None)
-
Expand source code
def exporting(self, checked=None, subj=None, sess=None, run=None): if subj is None: subj = self.lists['subjects'].currentItem().data(Qt.UserRole) sess = self.lists['sessions'].currentItem().data(Qt.UserRole) run = self.lists['runs'].currentItem().data(Qt.UserRole) d = {} d['subjects'] = str(subj) if sess.name == 'MRI': d['sessions'] = f'{sess.name} ({sess.MagneticFieldStrength})' else: d['sessions'] = sess.name d['run_id'] = run.id d['runs'] = f'{run.task_name}' d['start_time'] = f'{run.start_time:%d %b %Y %H:%M:%S}' self.exports.append(d) self.list_exports()
def io_channels(self)
-
Expand source code
def io_channels(self): recording = self.current('recordings') ephys_file = find_one_file(recording, ('blackrock', 'micromed', 'bci2000')) if ephys_file is None: return chan = create_channels(self.db, ephys_file.path) if chan is None: return chan.name = '(imported)' recording.attach_channels(chan) self.modified() self.list_recordings()
def io_electrodes(self)
-
Expand source code
def io_electrodes(self): mat_file = QFileDialog.getOpenFileName( self, "Open File", None, "Matlab (*.mat)")[0] if mat_file == '': return rec = self.current('recordings') chan = rec.channels chan_data = chan.data idx = isin(chan_data['type'], ('ECOG', 'SEEG')) n_chan = idx.sum() lg.warning(f'# of ECOG/SEEG channels for this recording: {n_chan}') xyz = import_electrodes(mat_file, n_chan) if xyz is None: print('you need to do this manually') return elec = Electrodes.add(self.db) elec_data = elec.empty(n_chan) elec_data['name'] = chan_data['name'][idx] elec_data['x'] = xyz[:, 0] elec_data['y'] = xyz[:, 1] elec_data['z'] = xyz[:, 2] elec.data = elec_data rec.attach_electrodes(elec) self.modified() self.list_recordings()
def io_ephys(self)
-
Expand source code
def io_ephys(self): sess = self.current('sessions') ephys_file = QFileDialog.getOpenFileName( self, "Select File", None)[0] if ephys_file == '': return add_ephys_to_sess(self.db, sess, Path(ephys_file)) self.list_runs(sess) self.list_params() self.modified()
def io_events(self)
-
Expand source code
def io_events(self): run = self.current('runs') recording = self.current('recordings') ephys_file = find_one_file(recording, ('blackrock', 'micromed', 'bci2000')) if ephys_file is None: return compare_events = CompareEvents(self, run, ephys_file.path) result = compare_events.exec() if result == QDialog.Accepted: run.start_time = compare_events.info['start_time'] run.duration = compare_events.info['duration'] run.events = compare_events.info['events'] self.list_params() self.show_events(run) self.modified()
def io_events_only(self)
-
Expand source code
def io_events_only(self): run = self.current('runs') recording = self.current('recordings') ephys_file = find_one_file(recording, ('blackrock', 'micromed', 'bci2000')) if ephys_file is None: return events = read_events_from_ephys(ephys_file, run, recording) if len(events) > 0: run.events = events self.show_events(run) self.modified() else: print('there were no events')
def io_parrec(self)
-
Expand source code
def io_parrec(self): run = self.current('runs') recording = self.current('recordings') success = False for file in recording.list_files(): if file.format == 'parrec': add_parrec(file.path, run=run, recording=recording, update=True) success = True break if success: self.list_recordings(run) self.list_params() self.modified() else: self.statusBar().showMessage('Cound not find PAR/REC to collect info from')
def io_parrec_sess(self)
-
Expand source code
def io_parrec_sess(self): sess = self.current('sessions') par_folder = QFileDialog.getExistingDirectory() if par_folder == '': return list_parrec = list(Path(par_folder).glob('*.PAR')) progress = QProgressDialog('', 'Cancel', 0, len(list_parrec), self) progress.setWindowTitle(f'Importing PAR/REC files to "{sess.subject}"/"{sess.name}"') progress.setMinimumDuration(0) progress.setWindowModality(Qt.WindowModal) for i, par_file in enumerate(list_parrec): progress.setValue(i) progress.setLabelText(f'Importing {par_file.name}') QGuiApplication.processEvents() add_parrec(par_file, sess=sess) if progress.wasCanceled(): break progress.setValue(i + 1) self.list_runs(sess) self.list_params() self.modified()
def list_channels_electrodes(self, recording=None)
-
Expand source code
def list_channels_electrodes(self, recording=None): for level, l in self.lists.items(): if level in ('channels', 'electrodes'): l.clear() self.channels_model.setFilter('channel_group_id = 0') self.channels_model.select() self.channels_view.setEnabled(False) self.electrodes_model.setFilter('electrode_group_id = 0') self.electrodes_model.select() self.electrodes_view.setEnabled(False) self.elec_form.clearContents() if recording is None: return sess = self.current('sessions') if recording.modality in ('ieeg', 'eeg', 'meg'): for chan in sess.list_channels(): item = QListWidgetItem(_name(chan.name)) item.setData(Qt.UserRole, chan) self.lists['channels'].addItem(item) for elec in sess.list_electrodes(): item = QListWidgetItem(_name(elec.name)) item.setData(Qt.UserRole, elec) self.lists['electrodes'].addItem(item)
def list_exports(self)
-
Expand source code
def list_exports(self): self.t_export.clearContents() n_exports = len(self.exports) self.t_export.setRowCount(n_exports) for i, l in enumerate(self.exports): item = QTableWidgetItem(l['subjects']) self.t_export.setItem(i, 0, item) item = QTableWidgetItem(l['sessions']) self.t_export.setItem(i, 1, item) item = QTableWidgetItem(l['runs']) self.t_export.setItem(i, 2, item) item = QTableWidgetItem(l['start_time']) self.t_export.setItem(i, 3, item)
def list_files(self)
-
Expand source code
def list_files(self): self.t_files.blockSignals(True) self.t_files.clearContents() all_files = [] for k, v in self.lists.items(): item = v.currentItem() if item is None: continue obj = item.data(Qt.UserRole) for file in obj.list_files(): all_files.append({ 'level': self.groups[k].title(), 'format': file.format, 'path': file.path, 'obj': [obj, file], }) self.t_files.setRowCount(len(all_files)) for i, val in enumerate(all_files): item = QTableWidgetItem(val['level']) item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled) item.setBackground(QBrush(QColor('lightGray'))) item.setData(Qt.UserRole, val['obj']) self.t_files.setItem(i, 0, item) item = QTableWidgetItem(val['format']) item.setData(Qt.UserRole, val['obj']) self.t_files.setItem(i, 1, item) item = QTableWidgetItem(str(val['path'])) try: path_exists = val['path'].exists() except PermissionError as err: lg.warning(err) item.setForeground(QBrush(QColor('orange'))) else: if not path_exists: item.setForeground(QBrush(QColor('red'))) item.setData(Qt.UserRole, val['obj']) self.t_files.setItem(i, 2, item) self.t_files.blockSignals(False)
def list_params(self)
-
Expand source code
def list_params(self): self.statusbar_selected() self.t_params.blockSignals(True) self.t_params.clearContents() all_params = [] for k, v in self.lists.items(): item = v.currentItem() if item is None: continue obj = item.data(Qt.UserRole) parameters = {} parameters.update(list_parameters(self, obj)) if k == 'runs': w = Popup_Experimenters(obj, self) parameters.update({'Experimenters': w}) w = Popup_Protocols(obj, self) parameters.update({'Protocols': w}) if obj.task_name == 'top_up': w = Popup_IntendedFor(obj, self) parameters.update({'Intended For': w}) elif k == 'recordings': if obj.modality in ('ieeg', 'eeg', 'meg'): parameters.update(list_parameters(self, obj)) sess = self.current('sessions') w = QComboBox() # add callback here w.addItem('(undefined channels)', None) for chan in sess.list_channels(): w.addItem(_name(chan.name), chan) channels = obj.channels if channels is None: w.setCurrentText('') else: w.setCurrentText(_name(channels.name)) w.activated.connect(partial(self.combo_chanelec, widget=w)) parameters.update({'Channels': w}) w = QComboBox() w.addItem('(undefined electrodes)', None) for elec in sess.list_electrodes(): w.addItem(_name(elec.name), elec) electrodes = obj.electrodes if electrodes is None: w.setCurrentText('') else: w.setCurrentText(_name(electrodes.name)) w.activated.connect(partial(self.combo_chanelec, widget=w)) parameters.update({'Electrodes': w}) for p_k, p_v in parameters.items(): all_params.append({ 'level': self.groups[k].title(), 'parameter': p_k, 'value': p_v, }) self.t_params.setRowCount(len(all_params)) for i, val in enumerate(all_params): item = QTableWidgetItem(val['level']) item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled) item.setBackground(QBrush(QColor('lightGray'))) self.t_params.setItem(i, 0, item) item = QTableWidgetItem(val['parameter']) item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled) self.t_params.setItem(i, 1, item) item = QTableWidgetItem(str(val['value'])) self.t_params.setCellWidget(i, 2, val['value']) self.t_params.blockSignals(False)
def list_recordings(self, run=None)
-
Expand source code
def list_recordings(self, run=None): if run is None or run is False: run = self.current('runs') for level, l in self.lists.items(): if level in ('subjects', 'sessions', 'protocols', 'runs'): continue l.clear() for recording in run.list_recordings(): item = QListWidgetItem(recording.modality) item.setData(Qt.UserRole, recording) if recording.id in self.search.recordings: highlight(item) self.lists['recordings'].addItem(item) self.lists['recordings'].setCurrentRow(0)
def list_runs(self, sess=None)
-
Expand source code
def list_runs(self, sess=None): if sess is None or sess is False: sess = self.current('sessions') for level, l in self.lists.items(): if level in ('subjects', 'sessions', 'protocols'): continue l.clear() for i, run in enumerate(sess.list_runs()): item = QListWidgetItem_time(run, f'#{i + 1: 3d}: {run.task_name}') if run.id in self.search.runs: highlight(item) self.lists['runs'].addItem(item) self.lists['runs'].setCurrentRow(0)
def list_sessions_and_protocols(self, subj=None)
-
Expand source code
def list_sessions_and_protocols(self, subj=None): if subj is None or subj is False: subj = self.current('subjects') for level, l in self.lists.items(): if level in ('subjects', ): continue l.clear() for sess in subj.list_sessions(): item = QListWidgetItem_time(sess, _session_name(sess)) if sess.id in self.search.sessions: highlight(item) self.lists['sessions'].addItem(item) self.lists['sessions'].setCurrentRow(0) for protocol in subj.list_protocols(): item = QListWidgetItem(_protocol_name(protocol)) item.setData(Qt.UserRole, protocol) self.lists['protocols'].addItem(item) self.lists['protocols'].setCurrentRow(0)
def list_subjects(self, code_to_select=None)
-
code_to_select : str code of the subject to select
Expand source code
def list_subjects(self, code_to_select=None): """ code_to_select : str code of the subject to select """ for line in self.lists.values(): line.clear() to_select = None if self.subjsort.isChecked(): args = { 'alphabetical': True, 'reverse': False, } else: args = { 'alphabetical': False, 'reverse': True, } for subj in list_subjects(self.db, **args): item = QListWidgetItem(str(subj)) if subj.id in self.search.subjects: highlight(item) item.setData(Qt.UserRole, subj) self.lists['subjects'].addItem(item) if code_to_select is not None and code_to_select == str(subj): to_select = item if to_select is None: # select first one to_select = item self.lists['subjects'].setCurrentItem(to_select)
def modified(self)
-
Expand source code
def modified(self): self.unsaved_changes = True self.setWindowTitle('*' + self.windowTitle())
def new_file(self, checked=None, filename=None)
-
Expand source code
def new_file(self, checked=None, filename=None): get_new_file = NewFile(self, filename=filename) result = get_new_file.exec() if result: level = get_new_file.level.currentText().lower() + 's' item = self.current(level) format = get_new_file.format.currentText() path = get_new_file.filepath.text() item.add_file(format, path) self.list_files() self.modified()
def new_item(self, checked=None, level=None)
-
Expand source code
def new_item(self, checked=None, level=None): if level == 'subjects': text, ok = QInputDialog.getText( self, 'Add New Subject', 'Subject Code:', ) elif level == 'sessions': current_subject = self.current('subjects') text, ok = QInputDialog.getItem( self, f'Add New Session for {current_subject}', 'Session Name:', lookup_allowed_values(self.db['db'], 'sessions', 'name'), 0, False) elif level == 'protocols': current_subject = self.current('subjects') text, ok = QInputDialog.getItem( self, f'Add New Protocol for {current_subject}', 'Protocol Name:', lookup_allowed_values(self.db['db'], 'protocols', 'metc'), 0, False) elif level == 'runs': current_session = self.current('sessions') text, ok = QInputDialog.getItem( self, f'Add New Run for {current_session.name}', 'Task Name:', lookup_allowed_values(self.db['db'], 'runs', 'task_name'), 0, False) elif level == 'recordings': current_run = self.current('runs') modalities = lookup_allowed_values(self.db['db'], 'recordings', 'modality') guess = guess_modality(current_run) if guess is None or guess not in modalities: idx = 0 else: idx = modalities.index(guess) text, ok = QInputDialog.getItem( self, f'Add New Recording for {current_run.task_name}', 'Modality:', modalities, idx, False) elif level in ('channels', 'electrodes'): current_recording = self.current('recordings') if current_recording is None or current_recording.modality not in ('ieeg', 'eeg', 'meg'): QMessageBox.warning( self, f'Cannot add {level}', 'You should first select an "ieeg" / "eeg" / "meg" recording') return text, ok = QInputDialog.getText( self, f'Add new {level}', 'Name to identify this setup:', ) if ok and text != '': if level == 'subjects': code = text.strip() Subject.add(self.db, code) self.list_subjects(code) elif level == 'sessions': current_subject.add_session(text) self.list_sessions_and_protocols(current_subject) elif level == 'protocols': current_subject.add_protocol(text) self.list_sessions_and_protocols(current_subject) elif level == 'runs': current_session.add_run(text) self.list_runs(current_session) elif level == 'recordings': current_run.add_recording(text) self.list_recordings(current_run) elif level in ('channels', 'electrodes'): if level in 'channels': chan = Channels.add(self.db) chan.name = text current_recording.attach_channels(chan) elif level in 'electrodes': elec = Electrodes.add(self.db) elec.name = text current_recording.attach_electrodes(elec) self.list_recordings(self.current('runs')) self.list_channels_electrodes(current_recording) self.list_params() self.modified()
def open_file(self, file_path)
-
Expand source code
def open_file(self, file_path): if file_path.suffix.lower() == '.par': print(f'converting {file_path}') file_path = convert_parrec_nibabel(file_path)[0] print(f'converted to {file_path}') url_file = QUrl.fromLocalFile(str(file_path)) QDesktopServices.openUrl(url_file)
def proc_all(self, current=None, previous=None, item=None)
-
GUI calls current and previous. You can call item
Expand source code
@pyqtSlot(QListWidgetItem, QListWidgetItem) def proc_all(self, current=None, previous=None, item=None): """GUI calls current and previous. You can call item""" self.list_channels_electrodes() if item is None: # when clicking on a previously selected list, it sends a signal where current is None, but I don't understand why if current is None: return item = current.data(Qt.UserRole) if item.t == 'subject': self.list_sessions_and_protocols(item) elif item.t == 'session': self.list_runs(item) elif item.t == 'protocol': pass elif item.t == 'run': self.list_recordings(item) self.show_events(item) elif item.t == 'recording': self.list_channels_electrodes(item) self.list_params() self.list_files()
def rename_item(self, item)
-
Expand source code
def rename_item(self, item): text, ok = QInputDialog.getText( self, f'Rename {item.t.split("_")[0]}', 'New title:', ) if ok and text != '': item.name = text
def rightclick_files(self, pos)
-
Expand source code
def rightclick_files(self, pos): item = self.t_files.itemAt(pos) if item is None: menu = QMenu(self) action = QAction('Add File', self) action.triggered.connect(lambda x: self.new_file(self)) menu.addAction(action) menu.popup(self.t_files.mapToGlobal(pos)) else: level_obj, file_obj = item.data(Qt.UserRole) file_path = file_obj.path.resolve() url_directory = QUrl.fromLocalFile(str(file_path.parent)) action_edit = QAction('Edit File', self) action_edit.triggered.connect(lambda x: self.edit_file(level_obj, file_obj)) action_copy = QAction('Copy Path to File', self) action_copy.triggered.connect(lambda x: copy_to_clipboard(str(file_obj.path))) action_openfile = QAction('Open File', self) action_openfile.triggered.connect(lambda x: self.open_file(file_path)) action_opendirectory = QAction('Open Containing Folder', self) action_opendirectory.triggered.connect(lambda x: QDesktopServices.openUrl(url_directory)) action_delete = QAction('Delete', self) action_delete.triggered.connect(lambda x: self.delete_file(level_obj, file_obj)) menu = QMenu('File Information', self) menu.addAction(action_edit) menu.addAction(action_copy) menu.addAction(action_openfile) menu.addAction(action_opendirectory) menu.addSeparator() menu.addAction(action_delete) menu.popup(self.t_files.mapToGlobal(pos))
def rightclick_list(self, pos, level=None)
-
Expand source code
def rightclick_list(self, pos, level=None): item = self.lists[level].itemAt(pos) menu = QMenu(self) if item is None: action = QAction(f'Add {level}', self) action.triggered.connect(lambda x: self.new_item(level=level)) menu.addAction(action) else: obj = item.data(Qt.UserRole) if obj.t in ('channel_group', 'electrode_group'): action_rename = QAction('Rename', self) action_rename.triggered.connect(lambda x: self.rename_item(obj)) menu.addAction(action_rename) action_delete = QAction('Delete', self) action_delete.triggered.connect(lambda x: self.delete_item(obj)) menu.addAction(action_delete) menu.popup(self.lists[level].mapToGlobal(pos))
def rightclick_table(self, pos, table=None)
-
Expand source code
def rightclick_table(self, pos, table=None): if table == 'events': view = self.events_view elif table == 'channels': view = self.channels_view elif table == 'electrodes': view = self.electrodes_view menu = QMenu(self) action = QAction(f'Import {table} from tsv ...', self) action.triggered.connect(lambda x: self.tsv_import(table=table)) menu.addAction(action) action = QAction(f'Export {table} to tsv ...', self) action.triggered.connect(lambda x: self.tsv_export(table=table)) menu.addAction(action) menu.popup(view.mapToGlobal(pos))
def show_channels_electrodes(self, current=None, previous=None, item=None)
-
Expand source code
@pyqtSlot(QListWidgetItem, QListWidgetItem) def show_channels_electrodes(self, current=None, previous=None, item=None): if current is not None: item = current.data(Qt.UserRole) if item is None: return self.statusbar_selected() if item.t == 'channel_group': self.channels_view.setEnabled(True) self.channels_model.setFilter(f'channel_group_id = {item.id}') self.channels_model.select() elif item.t == 'electrode_group': self.elec_form.blockSignals(True) parameters = list_parameters(self, item) parameters['Intended For'] = make_electrode_combobox(self, item) self.elec_form.setRowCount(len(parameters)) for i, kv in enumerate(parameters.items()): k, v = kv table_item = QTableWidgetItem(k) self.elec_form.setItem(i, 0, table_item) self.elec_form.setCellWidget(i, 1, v) self.elec_form.blockSignals(False) self.electrodes_view.setEnabled(True) self.electrodes_model.setFilter(f'electrode_group_id = {item.id}') self.electrodes_model.select()
def show_events(self, item)
-
Expand source code
def show_events(self, item): self.events_model.update(item.events)
def sql_access(self, db_name=None, username=None, password=None, hostname=None)
-
This is where you access the database
Expand source code
def sql_access(self, db_name=None, username=None, password=None, hostname=None): """This is where you access the database """ self.db = access_database(db_name, username, hostname, password) self.db['db'].transaction() self.events_model = EventsModel(self.db) self.events_view.setModel(self.events_model) self.events_view.hideColumn(0) self.channels_model = QSqlTableModel(self, self.db['db']) self.channels_model.setTable('channels') self.channels_view.setModel(self.channels_model) self.channels_view.hideColumn(0) self.electrodes_model = QSqlTableModel(self, self.db['db']) self.electrodes_model.setTable('electrodes') self.electrodes_view.setModel(self.electrodes_model) self.electrodes_view.hideColumn(0) self.list_subjects()
def sql_commit(self)
-
Expand source code
def sql_commit(self): self.db['db'].commit() self.setWindowTitle('') self.unsaved_changes = False self.db['db'].transaction()
def sql_rollback(self)
-
Expand source code
def sql_rollback(self): self.db['db'].rollback() self.unsaved_changes = False self.setWindowTitle('') self.db['db'].transaction() self.list_subjects()
def sql_search(self)
-
Expand source code
def sql_search(self): text, ok = QInputDialog.getText( self, 'Search the database', 'WHERE statement' + ' ' * 200, QLineEdit.Normal, self.search.previous, ) if ok and text != '': self.search.where(self.db, text) self.list_subjects()
def sql_search_clear(self)
-
Expand source code
def sql_search_clear(self): self.search.clear() self.list_subjects()
def statusbar_selected(self)
-
Expand source code
def statusbar_selected(self): statusbar = [] for k, v in self.lists.items(): item = v.currentItem() if item is None: continue obj = item.data(Qt.UserRole) statusbar.append(repr(obj)) self.statusBar().showMessage('\t'.join(statusbar))
def tsv_export(self, table)
-
Expand source code
def tsv_export(self, table): tsv_file = QFileDialog.getSaveFileName( self, f"Save {table} to file", None, "Tab-separated values (*.tsv)")[0] if tsv_file == '': return if table == 'events': run = self.current('runs') X = run.events else: current = self.current(table) X = current.data save_tsv(Path(tsv_file), X)
def tsv_import(self, table)
-
Expand source code
def tsv_import(self, table): tsv_file = QFileDialog.getOpenFileName( self, f"Import {table} from file", None, "Tab-separated values (*.tsv)")[0] if tsv_file == '': return if table == 'events': run = self.current('runs') X = run.events else: current = self.current(table) X = current.data X = load_tsv(Path(tsv_file), X.dtype) if table == 'events': run.events = X self.show_events(run) else: current.data = _fake_names(X) recording = self.current('recordings') self.list_channels_electrodes(recording=recording) self.modified()
class QListWidgetItem_time (obj, title)
-
QListWidgetItem(parent: QListWidget = None, type: int = QListWidgetItem.Type) QListWidgetItem(str, parent: QListWidget = None, type: int = QListWidgetItem.Type) QListWidgetItem(QIcon, str, parent: QListWidget = None, type: int = QListWidgetItem.Type) QListWidgetItem(QListWidgetItem)
Expand source code
class QListWidgetItem_time(QListWidgetItem): def __init__(self, obj, title): self.obj = obj super().__init__(title) self.setData(Qt.UserRole, obj) def __lt__(self, other): return self.obj.start_time < other.obj.start_time
Ancestors
- PyQt5.QtWidgets.QListWidgetItem
- sip.wrapper
- sip.simplewrapper