Module xelo2.io.xelo.add_to_sql

Expand source code
from datetime import datetime
from pathlib import Path
from lxml.etree import parse
from ...api.filetype import parse_filetype

def import_all():
    """I don't like importing all"""
    subj_iter = XML_SUBJECTS.iterdir()

    for p_xml_subj in subj_iter:

        if p_xml_subj.stem in ('UNP001_SubjectXML', 'unp004_SubjectXML', 'unp005_SubjectXML'):  # also test, Newtest
            continue

        xml_subj, tasks = read_xml_subject(p_xml_subj, CUTOFF)

        subjectcode = _match_subject(xml_subj['SubjectCode'])
        print(subjectcode)
        try:
            sql_subj = Subject(code=subjectcode)
        except ValueError:
            sql_subj = Subject.add(subjectcode)

        sql_subj = add_subject_to_sql(xml_subj, sql_subj)

        for task in tasks:
            if len(task) <= 4:
                continue
            print('\tchanged')
            subsets = prepare_subset(f'xelo_stem == "{task["xelo_stem"]}"')
            run_ids = set(subsets['runs'])
            if len(run_ids) == 1:
                run = Run(id=list(run_ids)[0])
                add_task_to_sql(task, run)
            elif len(run_ids) == 0:
                add_run_from_task(task, sql_subj)
            else:
                raise ValueError(f'Too many sql tasks matching "{task["xelo_stem"]}"')



def add_run_from_task(task, sql_subj):
    task_full = read_xml_task_only(task)
    sql_sess = get_session(sql_subj, _match_session(task_full['Technique']))
    if sql_sess is None:
        print(f'Could not determine session. Not adding {task}')
        return

    taskname = _match_taskname(task['TaskName'])
    run = sql_sess.add_run(taskname)

    add_task_to_sql(task, run)


def add_experimentlocation(run, task):
    ExperimentLocation = task.pop('ExperimentLocation', '')
    if ExperimentLocation:
        if ExperimentLocation.endswith('.REC'):
            ExperimentLocation = ExperimentLocation[:-4] + '.PAR'

        rec_modality = get_recording_name(task)
        rec = run.add_recording(rec_modality)
        format = parse_filetype(ExperimentLocation)
        rec.add_file(format, ExperimentLocation)


def _match_subject(code):
    if code.startswith('intraop'):
        return 'intraop' + code[-3:]
    else:
        return code


def _match_session(name):
    if name.lower() in ('ecog', 'seeg'):
        return 'IEMU'
    elif name.lower() in ('fmri', 'mri'):
        return 'MRI'

def _match_taskname(taskname):
    if taskname == 'visualattention':
        return 'visual_attention'
    else:
        return taskname


def add_subject_to_sql(xml_subj, sql_subj):

    if xml_subj.get('ProtocolSigned', None) is not None:
        protocol_sql = ', '.join(p.metc for p in sql_subj.list_protocols())
        print(f'SQL  has {protocol_sql}\nxelo has {xml_subj["ProtocolSigned"]}')

    COLUMNS_DONE = [
        'SubjectCode',
        'SubjectType',
        'Hemisphere',
        'ProtocolSigned',
        'SubjectFolderLocation',
        'GridDensity',
        ]
    [xml_subj.pop(col, None) for col in COLUMNS_DONE]

    SQLXML_FIELDS = [
        ('DateOfBirth', 'date_of_birth'),
        ('Sex', 'Sex'),
        ('Handedness', 'handedness'),
        ]
    for xml_param, sql_param in SQLXML_FIELDS:
        assign_value(sql_subj, sql_param, xml_subj.pop(xml_param, None))

    for session_name in ('IEMU', 'MRI'):
        if session_name == 'IEMU':
            SQLXML_FIELDS = [
                ('ImplantationDate', 'date_of_implantation'),
                ('ExplantationDate', 'date_of_explantation'),
                ]
        elif session_name == 'MRI':
            SQLXML_FIELDS = [  # I don't knowwhat we have here
                ]

        sql_sess = get_session(sql_subj, session_name)
        if sql_sess is not None:
            for xml_param, sql_param in SQLXML_FIELDS:
                assign_value(sql_sess, sql_param, xml_subj.pop(xml_param, None))
        else:
            for xml_param, sql_param in SQLXML_FIELDS:
                xml_value = xml_subj.pop(xml_param, '')
                if xml_value:
                    print(f'this subject has multiple {session_name}, you need to manually add {xml_param}={xml_value}')

    if xml_subj:
        raise ValueError(f'You need to add {", ".join(xml_subj)}')

    return sql_subj


def add_task_to_sql(task, run):

    add_experimentlocation(run, task)

    task['start_time'], task['duration'] = get_starttime_duration(task)
    task['TaskDescription'] = task.get('TaskDescription', '').strip() + ' ' + task.pop('Task_Instruction', '').strip()
    if task.get('Phonemes', '').strip():
        task['TaskDescription'] += ('; Phonemes ' + task.pop('Phonemes'))
    if task.get('Syllables', '').strip():
        task['TaskDescription'] += ('; Syllables ' + task.pop('Syllables'))
    task['Acquisition'] = task.get('Acquisition', '').strip() + ' ' + task.pop('TaskCodes', '').strip() + ' ' + task.pop('InsertFile', '').strip()
    if task.get('Attachments', '').strip():
        for attach in task['Attachments'].split('\\n'):
            if attach.strip().endswith('.PAR'):
                continue  # we don't care about PAR file in attachment
            run.add_file('task_log', attach.strip())

    SQLXML_FIELDS = [
        ('xelo_stem', 'xelo_stem'),
        ('Performance', 'performance'),
        ('Experimenters', 'experimenters'),
        ('TaskDescription', 'task_description'),
        ('Acquisition', 'acquisition'),
        ('BodyPart', 'body_part'),
        ('LeftRight', 'left_right'),
        ('ExecutionImagery', 'execution_imagery'),
        ('OvertCovert', 'overt_covert'),
        ('start_time', 'start_time'),
        ('duration', 'duration'),
        ]

    for xml_param, sql_param in SQLXML_FIELDS:
        assign_value(run, sql_param, task.pop(xml_param, None))

    SQLXML_FIELDS = [
        ('Location', 'region_of_interest'),
        ]
    for xml_param, sql_param in SQLXML_FIELDS:
        if task.get(xml_param, '').strip():
            rec = get_recording(run, task)  # only get recording if there is info to put in there
            assign_value(rec, sql_param, task.pop(xml_param, None))

    COLUMNS_DONE = [
        'Attachments',
        'SubjectCode',
        'TaskName',  # this should be already in there
        'TaskMetadataLocation',
        'Age',
        'ExperimentGridDensity',
        'Protocol',  # TODO: I don't know how to handle Protocol
        'BadElectrodes',  # TODO: how to handle this
        'ExperimentDate',
        'ExperimentStartTime',
        'ExperimentStopTime',
        'Technique',
        'FieldStrength',  # this should go in session
        'DataGlove',  # it's only yes or no
        'FingerMappingSequence',  # this should be in the triggers / events
        'ISI',  # this should be in the triggers / events
        'Rest_duration',  # this should be in the triggers / events
    ]
    [task.pop(col, None) for col in COLUMNS_DONE]

    # remove empty parameters
    task = {k: v for k, v in task.items() if v.strip()}
    if task:
        raise ValueError(f'You need to add {", ".join(task)}')


def assign_value(run, param, xml_value):
    if xml_value is None or (isinstance(xml_value, str) and xml_value.strip() in ('', 'na')):
        return

    if param == 'experimenters':
        _assign_list(run, xml_value)

    else:

        if param.startswith('date_of'):
            xml_value = datetime.strptime(xml_value, '%Y-%b-%d').date()
        elif param in ('left_right', 'body_part', 'execution_imagery', 'overt_covert'):
            xml_value = xml_value.lower()
        elif param == 'handedness':
            HANDEDNESS = {'Right': 'Right-handed', 'Left': 'Left-handed'}
            xml_value = HANDEDNESS[xml_value]
        elif isinstance(xml_value, str):
            xml_value = xml_value.strip()

        sql_value = getattr(run, param)
        if sql_value is None:
            # print(f'updating {param} with {xml_value}')
            setattr(run, param, xml_value)
        elif sql_value != xml_value:
            print(f'SQL  has {sql_value}\nxelo has {xml_value}')


def _assign_list(run, xml_value):
    sql_value = run.experimenters
    if xml_value is not None:
        if len(sql_value) == 0:
            run.experimenters = [x.strip().capitalize() for x in xml_value.split(', ')]
        elif sql_value != xml_value:
            print(f'SQL  has {",".join(sql_value)}\nxelo has {xml_value}')


def get_session(sql_subj, sess_name='IEMU'):
    sessions = [sess for sess in sql_subj.list_sessions() if sess.name == sess_name]
    if len(sessions) == 1:
        return sessions[0]
    elif len(sessions) == 0:
        return sql_subj.add_session(sess_name)
    else:
        # print(f'There are {len(sessions)} {sess_name} sessions for {sql_subj.codes}')
        return None

def get_recording_name(task):

    if task['Technique'].lower() in ('ecog', 'seeg'):
        return 'ieeg'
    elif task['Technique'].lower() == 'mri':
        return 'T1w'
    elif task['Technique'].lower() == 'fmri':
        return 'bold'
    else:
        raise ValueError(task)


def get_recording(sql_run, task):
    rec_name = get_recording_name(task)

    recordings = [rec for rec in sql_run.list_recordings() if rec.modality == rec_name]
    if len(recordings) == 1:
        return recordings[0]
    elif len(recordings) == 0:
        return sql_run.add_recording(rec_name)
    else:
        # print(f'There are {len(recordings)} {rec_name} sessions for {sql_run.task_name}')
        return None


def _convert_datetime(ExperimentDate, ExperimentTime):
    ExperimentDate = datetime.strptime(ExperimentDate.strip(), '%Y-%b-%d').date()
    ExperimentTime = datetime.strptime(ExperimentTime.strip(), '%H:%M').time()
    return datetime.combine(ExperimentDate, ExperimentTime)


def get_starttime_duration(task):
    if not task.get('ExperimentStartTime', '').strip():
        return None, None
    start_time = _convert_datetime(task['ExperimentDate'], task['ExperimentStartTime'])

    if not task.get('ExperimentStopTime', '').strip():
        return start_time, None

    duration = _convert_datetime(task['ExperimentDate'], task['ExperimentStopTime']) - start_time

    return start_time, duration.total_seconds()

Functions

def add_experimentlocation(run, task)
Expand source code
def add_experimentlocation(run, task):
    ExperimentLocation = task.pop('ExperimentLocation', '')
    if ExperimentLocation:
        if ExperimentLocation.endswith('.REC'):
            ExperimentLocation = ExperimentLocation[:-4] + '.PAR'

        rec_modality = get_recording_name(task)
        rec = run.add_recording(rec_modality)
        format = parse_filetype(ExperimentLocation)
        rec.add_file(format, ExperimentLocation)
def add_run_from_task(task, sql_subj)
Expand source code
def add_run_from_task(task, sql_subj):
    task_full = read_xml_task_only(task)
    sql_sess = get_session(sql_subj, _match_session(task_full['Technique']))
    if sql_sess is None:
        print(f'Could not determine session. Not adding {task}')
        return

    taskname = _match_taskname(task['TaskName'])
    run = sql_sess.add_run(taskname)

    add_task_to_sql(task, run)
def add_subject_to_sql(xml_subj, sql_subj)
Expand source code
def add_subject_to_sql(xml_subj, sql_subj):

    if xml_subj.get('ProtocolSigned', None) is not None:
        protocol_sql = ', '.join(p.metc for p in sql_subj.list_protocols())
        print(f'SQL  has {protocol_sql}\nxelo has {xml_subj["ProtocolSigned"]}')

    COLUMNS_DONE = [
        'SubjectCode',
        'SubjectType',
        'Hemisphere',
        'ProtocolSigned',
        'SubjectFolderLocation',
        'GridDensity',
        ]
    [xml_subj.pop(col, None) for col in COLUMNS_DONE]

    SQLXML_FIELDS = [
        ('DateOfBirth', 'date_of_birth'),
        ('Sex', 'Sex'),
        ('Handedness', 'handedness'),
        ]
    for xml_param, sql_param in SQLXML_FIELDS:
        assign_value(sql_subj, sql_param, xml_subj.pop(xml_param, None))

    for session_name in ('IEMU', 'MRI'):
        if session_name == 'IEMU':
            SQLXML_FIELDS = [
                ('ImplantationDate', 'date_of_implantation'),
                ('ExplantationDate', 'date_of_explantation'),
                ]
        elif session_name == 'MRI':
            SQLXML_FIELDS = [  # I don't knowwhat we have here
                ]

        sql_sess = get_session(sql_subj, session_name)
        if sql_sess is not None:
            for xml_param, sql_param in SQLXML_FIELDS:
                assign_value(sql_sess, sql_param, xml_subj.pop(xml_param, None))
        else:
            for xml_param, sql_param in SQLXML_FIELDS:
                xml_value = xml_subj.pop(xml_param, '')
                if xml_value:
                    print(f'this subject has multiple {session_name}, you need to manually add {xml_param}={xml_value}')

    if xml_subj:
        raise ValueError(f'You need to add {", ".join(xml_subj)}')

    return sql_subj
def add_task_to_sql(task, run)
Expand source code
def add_task_to_sql(task, run):

    add_experimentlocation(run, task)

    task['start_time'], task['duration'] = get_starttime_duration(task)
    task['TaskDescription'] = task.get('TaskDescription', '').strip() + ' ' + task.pop('Task_Instruction', '').strip()
    if task.get('Phonemes', '').strip():
        task['TaskDescription'] += ('; Phonemes ' + task.pop('Phonemes'))
    if task.get('Syllables', '').strip():
        task['TaskDescription'] += ('; Syllables ' + task.pop('Syllables'))
    task['Acquisition'] = task.get('Acquisition', '').strip() + ' ' + task.pop('TaskCodes', '').strip() + ' ' + task.pop('InsertFile', '').strip()
    if task.get('Attachments', '').strip():
        for attach in task['Attachments'].split('\\n'):
            if attach.strip().endswith('.PAR'):
                continue  # we don't care about PAR file in attachment
            run.add_file('task_log', attach.strip())

    SQLXML_FIELDS = [
        ('xelo_stem', 'xelo_stem'),
        ('Performance', 'performance'),
        ('Experimenters', 'experimenters'),
        ('TaskDescription', 'task_description'),
        ('Acquisition', 'acquisition'),
        ('BodyPart', 'body_part'),
        ('LeftRight', 'left_right'),
        ('ExecutionImagery', 'execution_imagery'),
        ('OvertCovert', 'overt_covert'),
        ('start_time', 'start_time'),
        ('duration', 'duration'),
        ]

    for xml_param, sql_param in SQLXML_FIELDS:
        assign_value(run, sql_param, task.pop(xml_param, None))

    SQLXML_FIELDS = [
        ('Location', 'region_of_interest'),
        ]
    for xml_param, sql_param in SQLXML_FIELDS:
        if task.get(xml_param, '').strip():
            rec = get_recording(run, task)  # only get recording if there is info to put in there
            assign_value(rec, sql_param, task.pop(xml_param, None))

    COLUMNS_DONE = [
        'Attachments',
        'SubjectCode',
        'TaskName',  # this should be already in there
        'TaskMetadataLocation',
        'Age',
        'ExperimentGridDensity',
        'Protocol',  # TODO: I don't know how to handle Protocol
        'BadElectrodes',  # TODO: how to handle this
        'ExperimentDate',
        'ExperimentStartTime',
        'ExperimentStopTime',
        'Technique',
        'FieldStrength',  # this should go in session
        'DataGlove',  # it's only yes or no
        'FingerMappingSequence',  # this should be in the triggers / events
        'ISI',  # this should be in the triggers / events
        'Rest_duration',  # this should be in the triggers / events
    ]
    [task.pop(col, None) for col in COLUMNS_DONE]

    # remove empty parameters
    task = {k: v for k, v in task.items() if v.strip()}
    if task:
        raise ValueError(f'You need to add {", ".join(task)}')
def assign_value(run, param, xml_value)
Expand source code
def assign_value(run, param, xml_value):
    if xml_value is None or (isinstance(xml_value, str) and xml_value.strip() in ('', 'na')):
        return

    if param == 'experimenters':
        _assign_list(run, xml_value)

    else:

        if param.startswith('date_of'):
            xml_value = datetime.strptime(xml_value, '%Y-%b-%d').date()
        elif param in ('left_right', 'body_part', 'execution_imagery', 'overt_covert'):
            xml_value = xml_value.lower()
        elif param == 'handedness':
            HANDEDNESS = {'Right': 'Right-handed', 'Left': 'Left-handed'}
            xml_value = HANDEDNESS[xml_value]
        elif isinstance(xml_value, str):
            xml_value = xml_value.strip()

        sql_value = getattr(run, param)
        if sql_value is None:
            # print(f'updating {param} with {xml_value}')
            setattr(run, param, xml_value)
        elif sql_value != xml_value:
            print(f'SQL  has {sql_value}\nxelo has {xml_value}')
def get_recording(sql_run, task)
Expand source code
def get_recording(sql_run, task):
    rec_name = get_recording_name(task)

    recordings = [rec for rec in sql_run.list_recordings() if rec.modality == rec_name]
    if len(recordings) == 1:
        return recordings[0]
    elif len(recordings) == 0:
        return sql_run.add_recording(rec_name)
    else:
        # print(f'There are {len(recordings)} {rec_name} sessions for {sql_run.task_name}')
        return None
def get_recording_name(task)
Expand source code
def get_recording_name(task):

    if task['Technique'].lower() in ('ecog', 'seeg'):
        return 'ieeg'
    elif task['Technique'].lower() == 'mri':
        return 'T1w'
    elif task['Technique'].lower() == 'fmri':
        return 'bold'
    else:
        raise ValueError(task)
def get_session(sql_subj, sess_name='IEMU')
Expand source code
def get_session(sql_subj, sess_name='IEMU'):
    sessions = [sess for sess in sql_subj.list_sessions() if sess.name == sess_name]
    if len(sessions) == 1:
        return sessions[0]
    elif len(sessions) == 0:
        return sql_subj.add_session(sess_name)
    else:
        # print(f'There are {len(sessions)} {sess_name} sessions for {sql_subj.codes}')
        return None
def get_starttime_duration(task)
Expand source code
def get_starttime_duration(task):
    if not task.get('ExperimentStartTime', '').strip():
        return None, None
    start_time = _convert_datetime(task['ExperimentDate'], task['ExperimentStartTime'])

    if not task.get('ExperimentStopTime', '').strip():
        return start_time, None

    duration = _convert_datetime(task['ExperimentDate'], task['ExperimentStopTime']) - start_time

    return start_time, duration.total_seconds()
def import_all()

I don't like importing all

Expand source code
def import_all():
    """I don't like importing all"""
    subj_iter = XML_SUBJECTS.iterdir()

    for p_xml_subj in subj_iter:

        if p_xml_subj.stem in ('UNP001_SubjectXML', 'unp004_SubjectXML', 'unp005_SubjectXML'):  # also test, Newtest
            continue

        xml_subj, tasks = read_xml_subject(p_xml_subj, CUTOFF)

        subjectcode = _match_subject(xml_subj['SubjectCode'])
        print(subjectcode)
        try:
            sql_subj = Subject(code=subjectcode)
        except ValueError:
            sql_subj = Subject.add(subjectcode)

        sql_subj = add_subject_to_sql(xml_subj, sql_subj)

        for task in tasks:
            if len(task) <= 4:
                continue
            print('\tchanged')
            subsets = prepare_subset(f'xelo_stem == "{task["xelo_stem"]}"')
            run_ids = set(subsets['runs'])
            if len(run_ids) == 1:
                run = Run(id=list(run_ids)[0])
                add_task_to_sql(task, run)
            elif len(run_ids) == 0:
                add_run_from_task(task, sql_subj)
            else:
                raise ValueError(f'Too many sql tasks matching "{task["xelo_stem"]}"')