Module xelo2.bids.mri

Expand source code
from logging import getLogger
from json import dump
from os import environ
from numpy import linspace, r_, tile
from pathlib import Path
from shutil import move, copyfile, copyfileobj
from subprocess import run, DEVNULL
from tempfile import mkstemp, gettempdir
import gzip

from nibabel import save as nisave
from nibabel import load as niload
from bidso.utils import replace_extension

from .io.parrec import convert_parrec_nibabel
from .utils import rename_task, make_bids_name, find_one_file, make_taskdescription, set_notnone

lg = getLogger(__name__)

# DIRECTION might depend on the way that the data is stored in Nifti file.
# Hopefully nibabel is consistent in how it converts the data but we need to check (also the sign of qform)
DIRECTION = {
    'RL': 'i',
    'LR': 'i-',
    'PA': 'j',
    'AP': 'j-',
    'IS': 'k',
    'SI': 'k-',
    }


def convert_mri(run, rec, dest_path, name, deface=True):
    """Return base name for this run"""
    output_nii = dest_path / f'{make_bids_name(name)}_{rec.modality}.nii.gz'

    file = find_one_file(rec, ('parrec', ))
    if file is not None:
        input_nii, PAR = convert_parrec_nibabel(file.path, MagneticFieldStrength=run.session.MagneticFieldStrength)
        move(input_nii, output_nii)

    else:
        file = find_one_file(rec, ('nifti', ))
        PAR = None
        if file is None:
            return None

        else:
            input_nii = file.path

            if input_nii.name.endswith('.nii.gz'):
                copyfile(file.path, output_nii)
            elif input_nii.name.endswith('.nii'):
                gz(file.path, output_nii)
            else:
                lg.warning(f'Unknown extension for nifti for {input_nii}')
                return None

    if run.task_name == 'MP2RAGE':
        lg.info('Keeping only the first volume for MP2RAGE')
        select(output_nii, 'first')

    nii_shape = _fix_tr(output_nii, rec)

    if PAR is not None and 'phase' in PAR['image_types']:
        phase_file = dest_path / f'{make_bids_name(name)}_phase.nii.gz'
        lg.info(f'Splitting phase info to {phase_file.name}')
        phase_nii = select(output_nii, 'split')
        phase_nii.to_filename(phase_file)

    if deface and rec.modality in ('T1w', 'T2w', 'T2star', 'PD', 'FLAIR'):
        run_deface(output_nii)

    sidecar = _convert_sidecar(run, rec, PAR, nii_shape)
    sidecar_file = replace_extension(output_nii, '.json')

    with sidecar_file.open('w') as f:
        dump(sidecar, f, indent=2)

    return output_nii


def select(nii, slicing):
    """If slicing is "split", it returns the nifti of the second half"""
    img = niload(nii)
    half = int(img.shape[3] / 2)
    secondhalf = None

    if slicing == 'first':
        img = img.slicer[:, :, :, 0]

    elif slicing == 'firsthalf':
        img = img.slicer[:, :, :, :half]

    elif slicing == 'secondhalf':
        img = img.slicer[:, :, :, half:]

    elif slicing == 'split':
        secondhalf = img.slicer[:, :, :, half:]
        img = img.slicer[:, :, :, :half]

    img.to_filename(nii)

    return secondhalf


def _fix_tr(nii, rec):
    """
    Returns
    -------
    tuple
        shape of the nifti file
    """
    img = niload(str(nii))

    # this seems a bug in nibabel. It stores time in sec, not in msec
    img.header.set_xyzt_units('mm', 'sec')

    if rec.modality in ('bold', 'epi') and rec.RepetitionTime is not None:
        img.header['pixdim'][4] = rec.RepetitionTime

    nisave(img, str(nii))

    return img.shape


def _convert_sidecar(run, rec, hdr=None, shape=None):
    D = {
        'InstitutionName': 'University Medical Center Utrecht',
        'InstitutionAddress': 'Heidelberglaan 100, 3584 CX Utrecht, the Netherlands',
        }

    if run.session.MagneticFieldStrength is not None:
        D['MagneticFieldStrength'] = float(run.session.MagneticFieldStrength[:-1])

    if rec.PhaseEncodingDirection is not None:
        D['PhaseEncodingDirection'] = DIRECTION[rec.PhaseEncodingDirection]
    if rec.SliceEncodingDirection is not None:
        D['SliceEncodingDirection'] = DIRECTION[rec.SliceEncodingDirection]

    for field in 'EchoTime', 'EffectiveEchoSpacing', 'FlipAngle':
        set_notnone(D, hdr, field)

    for field in 'Sequence', 'MultibandAccelerationFactor':
        set_notnone(D, rec, field)

    if rec.modality in ('bold', 'epi'):
        set_notnone(D, hdr, 'RepetitionTime')  # first get RepetitionTime from Header,
        set_notnone(D, rec, 'RepetitionTime')  # then, use the one specified in rec
        D['TaskName'] = rename_task(run.task_name)
        D['TaskDescription'] = make_taskdescription(run)

        if hdr is not None:
            add_slicetiming(D, hdr, rec)
        if shape is not None and 'EffectiveEchoSpacing' in D and 'PhaseEncodingDirection' in D:
            NIFTI_INDEX = {'i': 0, 'j': 1, 'k': 2}
            ReconMatrixPE = shape[NIFTI_INDEX[D['PhaseEncodingDirection'][0]]]
            D['TotalReadoutTime'] = D['EffectiveEchoSpacing'] * (ReconMatrixPE - 1)

    return D


def add_slicetiming(D, hdr, rec):
    """

    TODO
    ----
    Is there a SliceTiming when the SliceOrder is 3D?
    """
    if rec.SliceOrder is None:
        lg.warning(f'Please specify SliceOrder for Recording(db, id={rec.id})')
        return

    elif rec.SliceOrder == '3D':
        return

    # get TR from SQL, but if it's not specified used PAR/REC
    TR = D.get('RepetitionTime', hdr['RepetitionTime'])

    n_slices = hdr['n_slices']

    multiband = D.get('MultibandAccelerationFactor', 1)
    n_slices = int(n_slices / multiband)

    SliceTiming = linspace(0, TR, n_slices + 1)[:-1]

    if rec.SliceOrder == 'Interleaved':
        SliceTiming = r_[SliceTiming[::2], SliceTiming[1::2]]

    SliceTiming = tile(SliceTiming, multiband)
    D['SliceTiming'] = SliceTiming.tolist()


def run_deface(nii):
    lg.info(f'Defacing {nii.name}, it might take a while')
    path_avg = Path(environ['FREESURFER_HOME']) / 'average'

    # generate a unique file name in the same folder
    if nii.name.endswith('.nii.gz'):
        suffix = '.nii.gz'
    elif nii.name.endswith('.nii'):
        suffix = '.nii'
    nii_tmp = Path(mkstemp(dir=nii.parent, suffix=suffix)[1])
    nii_tmp.unlink()

    run([
        'mri_deface',  # from freesurfer
        nii,
        path_avg / 'talairach_mixed_with_skull.gca',
        path_avg / 'face.gca',
        nii_tmp],
        stdout=DEVNULL, stderr=DEVNULL,
        cwd=gettempdir(),
        )

    nii_tmp.rename(nii)


def gz(input_file, output_file):
    with input_file.open('rb') as f_in:
        with gzip.open(output_file, 'wb') as f_out:
            copyfileobj(f_in, f_out)

Functions

def add_slicetiming(D, hdr, rec)

Todo

Is there a SliceTiming when the SliceOrder is 3D?

Expand source code
def add_slicetiming(D, hdr, rec):
    """

    TODO
    ----
    Is there a SliceTiming when the SliceOrder is 3D?
    """
    if rec.SliceOrder is None:
        lg.warning(f'Please specify SliceOrder for Recording(db, id={rec.id})')
        return

    elif rec.SliceOrder == '3D':
        return

    # get TR from SQL, but if it's not specified used PAR/REC
    TR = D.get('RepetitionTime', hdr['RepetitionTime'])

    n_slices = hdr['n_slices']

    multiband = D.get('MultibandAccelerationFactor', 1)
    n_slices = int(n_slices / multiband)

    SliceTiming = linspace(0, TR, n_slices + 1)[:-1]

    if rec.SliceOrder == 'Interleaved':
        SliceTiming = r_[SliceTiming[::2], SliceTiming[1::2]]

    SliceTiming = tile(SliceTiming, multiband)
    D['SliceTiming'] = SliceTiming.tolist()
def convert_mri(run, rec, dest_path, name, deface=True)

Return base name for this run

Expand source code
def convert_mri(run, rec, dest_path, name, deface=True):
    """Return base name for this run"""
    output_nii = dest_path / f'{make_bids_name(name)}_{rec.modality}.nii.gz'

    file = find_one_file(rec, ('parrec', ))
    if file is not None:
        input_nii, PAR = convert_parrec_nibabel(file.path, MagneticFieldStrength=run.session.MagneticFieldStrength)
        move(input_nii, output_nii)

    else:
        file = find_one_file(rec, ('nifti', ))
        PAR = None
        if file is None:
            return None

        else:
            input_nii = file.path

            if input_nii.name.endswith('.nii.gz'):
                copyfile(file.path, output_nii)
            elif input_nii.name.endswith('.nii'):
                gz(file.path, output_nii)
            else:
                lg.warning(f'Unknown extension for nifti for {input_nii}')
                return None

    if run.task_name == 'MP2RAGE':
        lg.info('Keeping only the first volume for MP2RAGE')
        select(output_nii, 'first')

    nii_shape = _fix_tr(output_nii, rec)

    if PAR is not None and 'phase' in PAR['image_types']:
        phase_file = dest_path / f'{make_bids_name(name)}_phase.nii.gz'
        lg.info(f'Splitting phase info to {phase_file.name}')
        phase_nii = select(output_nii, 'split')
        phase_nii.to_filename(phase_file)

    if deface and rec.modality in ('T1w', 'T2w', 'T2star', 'PD', 'FLAIR'):
        run_deface(output_nii)

    sidecar = _convert_sidecar(run, rec, PAR, nii_shape)
    sidecar_file = replace_extension(output_nii, '.json')

    with sidecar_file.open('w') as f:
        dump(sidecar, f, indent=2)

    return output_nii
def gz(input_file, output_file)
Expand source code
def gz(input_file, output_file):
    with input_file.open('rb') as f_in:
        with gzip.open(output_file, 'wb') as f_out:
            copyfileobj(f_in, f_out)
def run_deface(nii)
Expand source code
def run_deface(nii):
    lg.info(f'Defacing {nii.name}, it might take a while')
    path_avg = Path(environ['FREESURFER_HOME']) / 'average'

    # generate a unique file name in the same folder
    if nii.name.endswith('.nii.gz'):
        suffix = '.nii.gz'
    elif nii.name.endswith('.nii'):
        suffix = '.nii'
    nii_tmp = Path(mkstemp(dir=nii.parent, suffix=suffix)[1])
    nii_tmp.unlink()

    run([
        'mri_deface',  # from freesurfer
        nii,
        path_avg / 'talairach_mixed_with_skull.gca',
        path_avg / 'face.gca',
        nii_tmp],
        stdout=DEVNULL, stderr=DEVNULL,
        cwd=gettempdir(),
        )

    nii_tmp.rename(nii)
def select(nii, slicing)

If slicing is "split", it returns the nifti of the second half

Expand source code
def select(nii, slicing):
    """If slicing is "split", it returns the nifti of the second half"""
    img = niload(nii)
    half = int(img.shape[3] / 2)
    secondhalf = None

    if slicing == 'first':
        img = img.slicer[:, :, :, 0]

    elif slicing == 'firsthalf':
        img = img.slicer[:, :, :, :half]

    elif slicing == 'secondhalf':
        img = img.slicer[:, :, :, half:]

    elif slicing == 'split':
        secondhalf = img.slicer[:, :, :, half:]
        img = img.slicer[:, :, :, :half]

    img.to_filename(nii)

    return secondhalf