Module xelo2.io.tsv
Expand source code
from collections import defaultdict
from numpy import floating, character, issubdtype, isnan, empty, NaN
from numpy.lib.recfunctions import rename_fields, drop_fields
def load_tsv(fname, dtypes):
with fname.open() as f:
header = f.readline().strip().split('\t')
d = defaultdict(list)
for line in f:
values = line.strip('\n').split('\t')
for h, v in zip(header, values):
if h == 'group':
h = 'groups'
if v == 'n/a':
if issubdtype(dtypes[h], floating):
v = NaN
else:
v = ''
d[h].append(v)
X = empty(len(d[header[0]]), dtype=dtypes)
for h in header:
if h == 'group':
h = 'groups'
if h in X.dtype.names:
"""
if h == 'trial_type':
X[h] = d['trial_name']
else:
"""
X[h] = d[h]
return X
def save_tsv(fname, X, necessary_columns=None):
# BIDS wants 'group' but it's a reserved word in SQL
X = rename_fields(X, {'groups': 'group'})
X = _remove_empty_columns(X, necessary_columns)
with fname.open('w') as f:
if X is None: # when all the columns are empty
return
dtypes = X.dtype
f.write('\t'.join(dtypes.names) + '\n')
for x in X:
values = []
for name in dtypes.names:
if issubdtype(dtypes[name], floating):
if isnan(x[name]):
values.append('n/a')
else:
values.append(f'{x[name]:.3f}')
elif issubdtype(dtypes[name], character):
if x[name] == '':
values.append('n/a')
else:
values.append(x[name])
else:
values.append(str(x[name]))
f.write('\t'.join(values) + '\n')
def _remove_empty_columns(tsv, necessary_columns=None):
"""Remove column where all the values are empty (either NaN or '')
Parameters
----------
tsv
necessary_columns : list of str
list of columns that you need to keep
Returns
-------
tsv
"""
if necessary_columns is None:
necessary_columns = []
dtypes = tsv.dtype
to_remove = []
for name in dtypes.names:
if name in necessary_columns:
continue
if issubdtype(dtypes[name], floating):
if isnan(tsv[name]).all():
to_remove.append(name)
elif issubdtype(dtypes[name], character):
if (tsv[name] == '').all():
to_remove.append(name)
return drop_fields(tsv, to_remove)
Functions
def load_tsv(fname, dtypes)
-
Expand source code
def load_tsv(fname, dtypes): with fname.open() as f: header = f.readline().strip().split('\t') d = defaultdict(list) for line in f: values = line.strip('\n').split('\t') for h, v in zip(header, values): if h == 'group': h = 'groups' if v == 'n/a': if issubdtype(dtypes[h], floating): v = NaN else: v = '' d[h].append(v) X = empty(len(d[header[0]]), dtype=dtypes) for h in header: if h == 'group': h = 'groups' if h in X.dtype.names: """ if h == 'trial_type': X[h] = d['trial_name'] else: """ X[h] = d[h] return X
def save_tsv(fname, X, necessary_columns=None)
-
Expand source code
def save_tsv(fname, X, necessary_columns=None): # BIDS wants 'group' but it's a reserved word in SQL X = rename_fields(X, {'groups': 'group'}) X = _remove_empty_columns(X, necessary_columns) with fname.open('w') as f: if X is None: # when all the columns are empty return dtypes = X.dtype f.write('\t'.join(dtypes.names) + '\n') for x in X: values = [] for name in dtypes.names: if issubdtype(dtypes[name], floating): if isnan(x[name]): values.append('n/a') else: values.append(f'{x[name]:.3f}') elif issubdtype(dtypes[name], character): if x[name] == '': values.append('n/a') else: values.append(x[name]) else: values.append(str(x[name])) f.write('\t'.join(values) + '\n')