diff --git a/chem_spectra/controller/transform_api.py b/chem_spectra/controller/transform_api.py index fecf0c3..75837ca 100644 --- a/chem_spectra/controller/transform_api.py +++ b/chem_spectra/controller/transform_api.py @@ -13,6 +13,7 @@ ) from chem_spectra.model.transformer import TransformerModel as TraModel +from chem_spectra.lib.composer.lcms import LCMSComposer from chem_spectra.lib.converter.bagit.base import BagItBaseConverter from chem_spectra.model.molecule import MoleculeModel @@ -62,6 +63,24 @@ def zip_jcamp_n_img(): ) ) rsp.headers['X-Extra-Info-JSON'] = json.dumps({'spc_type': 'bagit', 'invalid_molfile': invalid_molfile}) + elif isinstance(cmpsr, LCMSComposer): + # check if composered model is hplc ms + list_jcamps = cmpsr.data + dst_list = [] + for idx in range(len(list_jcamps)): + tf_jcamp = list_jcamps[idx] + tf_arr = [tf_jcamp] + dst_list.append(tf_arr) + + memory = to_zip_bag_it_response(dst_list) + rsp = make_response( + send_file( + memory, + download_name='spectrum.zip', + as_attachment=True + ) + ) + rsp.headers['X-Extra-Info-JSON'] = json.dumps({'spc_type': 'hplc', 'invalid_molfile': invalid_molfile}) elif isinstance(cmpsr, collections.abc.Sequence): dst_list = [] spc_type = '' diff --git a/chem_spectra/lib/composer/base.py b/chem_spectra/lib/composer/base.py index 704b4aa..0624a62 100644 --- a/chem_spectra/lib/composer/base.py +++ b/chem_spectra/lib/composer/base.py @@ -136,6 +136,7 @@ def generate_original_metadata(self): ) content.append('\n\n') return content + def gen_ending(self): return [ '##END=\n', diff --git a/chem_spectra/lib/composer/lcms.py b/chem_spectra/lib/composer/lcms.py new file mode 100644 index 0000000..3e1bee2 --- /dev/null +++ b/chem_spectra/lib/composer/lcms.py @@ -0,0 +1,310 @@ +import tempfile # noqa: E402 + +from chem_spectra.lib.composer.base import BaseComposer # noqa: E402 +import numpy as np # noqa: E402 + + +TEXT_SPECTRUM_ORIG = '$$ === CHEMSPECTRA SPECTRUM ORIG ===\n' +TEXT_MS_DATA_TABLE = '##DATA TABLE= (XY..XY), PEAKS\n' # '##XYDATA= (X++(Y..Y))\n' # noqa + +class LCMSComposer: + def __init__(self, core): + self.core = core + self.title = core.fname + self.data = self.__compose() + + def __compose(self): + tic_postive_data, tic_negative_data, uvvis_data, spectra_postive_data, spectra_negative_data = self.core.data + tic_postive = self.__gen_tic(tic_data=tic_postive_data) + tic_negative = self.__gen_tic(tic_data=tic_negative_data, is_negative=True) + tic_positive_jcamp, tic_negative_jcamp = self.tf_jcamp(tic_postive), self.tf_jcamp(tic_negative) + + uvvis = self.__gen_uvvis(data=uvvis_data) + uvvis_jcamp = self.tf_jcamp(uvvis) + + mz_positive = self.__gen_mz_spectra(data=spectra_postive_data) + mz_positive_jcamp = self.tf_jcamp(mz_positive) + + mz_negative = self.__gen_mz_spectra(data=spectra_negative_data, is_negative=True) + mz_negative_jcamp = self.tf_jcamp(mz_negative) + return [tic_positive_jcamp, tic_negative_jcamp, uvvis_jcamp, mz_positive_jcamp, mz_negative_jcamp] + + def __gen_tic(self, tic_data, is_negative = False): + time, intensity = tic_data['time'], tic_data['Intensity'] + max_time, min_time = np.max(time), np.min(time) + max_intensity, min_intensity = np.max(intensity), np.min(intensity) + content = [ + '\n', + TEXT_SPECTRUM_ORIG, + '##TITLE={}\n'.format(self.title), + '##JCAMP-DX=5.00\n', + '##DATA TYPE={}\n'.format('LC/MS'), + '##DATA CLASS= XYDATA\n', + '##ORIGIN=\n', + '##OWNER=\n', + '##SPECTROMETER/DATA SYSTEM=\n', + '##$CSCATEGORY=TIC SPECTRUM\n', + '##XUNITS=time\n', + '##YUNITS=Intensity\n', + '##XFACTOR=1\n', + '##YFACTOR=1\n', + '##FIRSTX={}\n'.format(time[0]), + '##LASTX={}\n'.format(time[len(time)-1]), + '##MAXX={}\n'.format(max_time), + '##MAXY={}\n'.format(max_intensity), + '##MINX={}\n'.format(min_time), + '##MINY={}\n'.format(min_intensity), + '##NPOINTS={}\n'.format(len(time)), + '##XYDATA= (XY..XY)\n', + ] + + for i, _ in enumerate(time): + content.append( + '{}, {}\n'.format(time[i], intensity[i]) + ) + + content.extend(self.__gen_ending()) + + return content + + def __gen_uvvis(self, data): + content = [ + '\n', + TEXT_SPECTRUM_ORIG, + '##TITLE={}\n'.format(self.title), + '##JCAMP-DX=5.00\n', + '##DATA TYPE={}\n'.format('LC/MS'), + '##DATA CLASS= NTUPLES\n', + '##ORIGIN=\n', + '##OWNER=\n', + '##SPECTROMETER/DATA SYSTEM=\n', + '##$CSCATEGORY=UVVIS SPECTRUM\n', + '##VAR_NAME= RETENTION TIME, DETECTOR SIGNAL, WAVELENGTH\n', + '##SYMBOL= X, Y, T\n', + '##VAR_TYPE= INDEPENDENT, DEPENDENT, INDEPENDENT\n', + '##VAR_FORM= AFFN, AFFN, AFFN\n', + '##VAR_DIM= , , 3\n', + '##UNITS= RETENTION TIME, DETECTOR SIGNAL, WAVELENGTH\n', + '##FIRST= , , 1\n', + ] + + msspcs = [] + ms_tempfile = tempfile.TemporaryFile() + for time, value in data.items(): + xs, ys = value['RetentionTime'], value['DetectorSignal'] + msspc = [ + '##PAGE={}\n'.format(time), + '##NPOINTS={}\n'.format(len(xs)), + '##DATA TABLE= (XY..XY), PEAKS\n', + ] + for idx, _ in enumerate(xs): + my_content = '{}, {};\n'.format(xs[idx], ys[idx]) + msspc += my_content + file_content = ''.join(msspc) + ms_tempfile.write(file_content.encode('utf-8')) + + ms_tempfile.seek(0) + lines = ms_tempfile.readlines() + decoded_lines = [line.decode('utf-8').strip() for line in lines] + msspcs = '\n'.join(decoded_lines) + ms_tempfile.close() + + content.extend(msspcs) + content.extend(self.__gen_ending()) + + return content + + def __gen_mz_spectra(self, data, is_negative=False): + category = '##$CSCATEGORY=MZ NEGATIVE SPECTRUM\n' if is_negative else '##$CSCATEGORY=MZ POSITIVE SPECTRUM\n' + content = [ + '\n', + TEXT_SPECTRUM_ORIG, + '##TITLE={}\n'.format(self.title), + '##JCAMP-DX=5.00\n', + '##DATA TYPE={}\n'.format('LC/MS'), + '##DATA CLASS= NTUPLES\n', + '##ORIGIN=\n', + '##OWNER=\n', + '##SPECTROMETER/DATA SYSTEM=\n', + '##$CSCATEGORY=UVVIS SPECTRUM\n', + '##VAR_NAME= RETENTION TIME, DETECTOR SIGNAL, WAVELENGTH\n', + '##SYMBOL= X, Y, T\n', + '##VAR_TYPE= INDEPENDENT, DEPENDENT, INDEPENDENT\n', + '##VAR_FORM= AFFN, AFFN, AFFN\n', + '##VAR_DIM= , , 3\n', + '##UNITS= RETENTION TIME, DETECTOR SIGNAL, WAVELENGTH\n', + '##FIRST= , , 1\n', + category, + ] + + msspcs = [] + ms_tempfile = tempfile.TemporaryFile() + for time, value in data.items(): + xs, ys = value['mz'], value['intensities'] + msspc = [ + '##PAGE={}\n'.format(time), + '##NPOINTS={}\n'.format(len(xs)), + '##DATA TABLE= (XY..XY), PEAKS\n', + ] + for idx, _ in enumerate(xs): + my_content = '{}, {};\n'.format(xs[idx], ys[idx]) + msspc += my_content + file_content = ''.join(msspc) + ms_tempfile.write(file_content.encode('utf-8')) + + ms_tempfile.seek(0) + lines = ms_tempfile.readlines() + decoded_lines = [line.decode('utf-8').strip() for line in lines] + msspcs = '\n'.join(decoded_lines) + ms_tempfile.close() + + content.extend(msspcs) + content.extend(self.__gen_ending()) + + return content + + def __gen_ending(self): + return [ + '\n##END=\n', + '\n' + ] + + def tf_jcamp(self, data): + meta = ''.join(data) + tf = tempfile.NamedTemporaryFile(suffix='.jdx') + tf.write(bytes(meta, 'UTF-8')) + tf.seek(0) + return tf +# class LCMSComposer(BaseComposer): +# def __init__(self, core): +# super().__init__(core) +# self.title = core.fname +# self.meta = self.__compose() + + # def __gen_headers_spectrum_orig(self): + # return [ + # '\n', + # TEXT_SPECTRUM_ORIG, + # '##TITLE={}\n'.format(self.title), + # '##JCAMP-DX=5.00\n', + # '##DATA TYPE={}\n'.format('LC/MS'), + # '##DATA CLASS= NTUPLES\n', + # '##ORIGIN=\n', + # '##OWNER=\n', + # '##SPECTROMETER/DATA SYSTEM=\n', + # # '##.SPECTROMETER TYPE={}\n'.format(self.core.dic.get('SPECTROMETER TYPE', '')), # TRAP # noqa: E501 + # # '##.INLET={}\n'.format(self.core.dic.get('INLET', '')), # GC + # # '##.IONIZATION MODE={}\n'.format(self.core.dic.get('IONIZATION MODE', '')), # EI+ # noqa: E501 + # '##$CSCATEGORY=SPECTRUM\n', + # # '##$CSSCANAUTOTARGET={}\n'.format(self.core.auto_scan), + # # '##$CSSCANEDITTARGET={}\n'.format( + # # self.core.edit_scan or self.core.auto_scan + # # ), + # # '##$CSSCANCOUNT={}\n'.format(len(self.core.datatables)), + # # '##$CSTHRESHOLD={}\n'.format(self.core.thres / 100), + # ] + +# def __gen_ntuples_begin(self): +# return ['##NTUPLES={}\n'.format('MASS SPECTRUM')] + +# def __gen_ntuples_end(self): +# return ['##END NTUPLES={}\n'.format('MASS SPECTRUM')] + +# def __gen_config(self): +# return [ + # '##VAR_NAME= MASS, INTENSITY, RETENTION TIME\n', + # '##SYMBOL= X, Y, T\n', + # '##VAR_TYPE= INDEPENDENT, DEPENDENT, INDEPENDENT\n', + # '##VAR_FORM= AFFN, AFFN, AFFN\n', + # '##VAR_DIM= , , 3\n', + # '##UNITS= M/Z, RELATIVE ABUNDANCE, SECONDS\n', + # '##FIRST= , , 1\n', + # # '##LAST= , , {}\n'.format(len(self.core.datatables)), +# ] + +# def __gen_ms_spectra(self): + # msspcs = [] + # ms_tempfile = tempfile.TemporaryFile() + # spectra_data = self.core.data[3] # the 1st and 2nd is tic positive and negative, the 3rd is uvvis + # for time, value in spectra_data.items(): + # xs, ys = value['mz'], value['intensities'] + # msspc = [ + # '##PAGE={}\n'.format(time), + # '##NPOINTS={}\n'.format(len(value['mz'])), + # '##DATA TABLE= (XY..XY), PEAKS\n', + # ] + # for idx in range(len(xs)): + # my_content = '{}, {};\n'.format(xs[idx], ys[idx]) + # msspc += my_content + # file_content = ''.join(msspc) + # ms_tempfile.write(file_content.encode('utf-8')) + + # ms_tempfile.seek(0) + # lines = ms_tempfile.readlines() + # decoded_lines = [line.decode('utf-8').strip() for line in lines] + # msspcs = '\n'.join(decoded_lines) + # ms_tempfile.close() +# return msspcs + +# def __compose(self): +# meta = [] +# meta.extend(self.__gen_headers_spectrum_orig()) + +# meta.extend(self.__gen_ntuples_begin()) +# meta.extend(self.__gen_config()) +# meta.extend(self.__gen_ms_spectra()) +# meta.extend(self.__gen_ntuples_end()) + +# # meta.extend(self.generate_original_metadata()) + +# meta.extend(self.gen_ending()) +# return meta + +# # def __prism(self, spc): +# # blues_x, blues_y, greys_x, greys_y = [], [], [], [] +# # thres = 0 +# # if spc.shape[0] > 0: # RESOLVE_VSMBNAN2 +# # thres = spc[:, 1].max() * (self.core.thres / 100) + +# # for pt in spc: +# # x, y = pt[0], pt[1] +# # if y >= thres: +# # blues_x.append(x) +# # blues_y.append(y) +# # else: +# # greys_x.append(x) +# # greys_y.append(y) +# # return blues_x, blues_y, greys_x, greys_y + +# # def prism_peaks(self): +# # idx = (self.core.edit_scan or self.core.auto_scan) - 1 +# # spc = self.core.spectra[idx] +# # return self.__prism(spc) + tuple([idx+1]) + +# def tf_img(self): +# # plt.rcParams['figure.figsize'] = [16, 9] +# # plt.rcParams['font.size'] = 14 +# # # PLOT data +# # blues_x, blues_y, greys_x, greys_y, _ = self.prism_peaks() +# # plt.bar(greys_x, greys_y, width=0, edgecolor='#dddddd') +# # plt.bar(blues_x, blues_y, width=0, edgecolor='#1f77b4') + +# # # PLOT label +# # plt.xlabel('X (m/z)', fontsize=18) +# # plt.ylabel('Y (Relative Abundance)', fontsize=18) +# # plt.grid(False) + +# # # Save +# # tf = tempfile.NamedTemporaryFile(suffix='.png') +# # plt.savefig(tf, format='png') +# # tf.seek(0) +# # plt.clf() +# # plt.cla() +# # return tf +# return None + +# def tf_csv(self): +# return None + +# def generate_nmrium(self): +# return None \ No newline at end of file diff --git a/chem_spectra/lib/composer/ms_fix.py b/chem_spectra/lib/composer/ms_fix.py new file mode 100644 index 0000000..c10ad64 --- /dev/null +++ b/chem_spectra/lib/composer/ms_fix.py @@ -0,0 +1,147 @@ +import matplotlib +matplotlib.use('Agg') + +import tempfile # noqa: E402 +import matplotlib.pyplot as plt # noqa: E402 + +from chem_spectra.lib.composer.base import BaseComposer # noqa: E402 + + +TEXT_SPECTRUM_ORIG = '$$ === CHEMSPECTRA SPECTRUM ORIG ===\n' +TEXT_MS_DATA_TABLE = '##DATA TABLE= (XY..XY), PEAKS\n' # '##XYDATA= (X++(Y..Y))\n' # noqa + + +class MSComposer(BaseComposer): + def __init__(self, core): + super().__init__(core) + self.title = core.fname + self.meta = self.__compose() + + def __gen_headers_spectrum_orig(self): + return [ + '\n', + TEXT_SPECTRUM_ORIG, + '##TITLE={}\n'.format(self.title), + '##JCAMP-DX=5.00\n', + '##DATA TYPE={}\n'.format('MASS SPECTRUM'), + '##DATA CLASS= NTUPLES\n', + '##ORIGIN=\n', + '##OWNER=\n', + '##SPECTROMETER/DATA SYSTEM=\n', + '##.SPECTROMETER TYPE={}\n'.format(self.core.dic.get('SPECTROMETER TYPE', '')), # TRAP # noqa: E501 + '##.INLET={}\n'.format(self.core.dic.get('INLET', '')), # GC + '##.IONIZATION MODE={}\n'.format(self.core.dic.get('IONIZATION MODE', '')), # EI+ # noqa: E501 + '##$CSCATEGORY=SPECTRUM\n', + '##$CSSCANAUTOTARGET={}\n'.format(self.core.auto_scan), + '##$CSSCANEDITTARGET={}\n'.format( + self.core.edit_scan or self.core.auto_scan + ), + '##$CSSCANCOUNT={}\n'.format(len(self.core.datatables)), + '##$CSTHRESHOLD={}\n'.format(self.core.thres / 100), + ] + + def __gen_ntuples_begin(self): + return ['##NTUPLES={}\n'.format('MASS SPECTRUM')] + + def __gen_ntuples_end(self): + return ['##END NTUPLES={}\n'.format('MASS SPECTRUM')] + + def __gen_config(self): + return [ + '##VAR_NAME= MASS, INTENSITY, RETENTION TIME\n', + '##SYMBOL= X, Y, T\n', + '##VAR_TYPE= INDEPENDENT, DEPENDENT, INDEPENDENT\n', + '##VAR_FORM= AFFN, AFFN, AFFN\n', + '##VAR_DIM= , , 3\n', + '##UNITS= M/Z, RELATIVE ABUNDANCE, SECONDS\n', + '##FIRST= , , 1\n', + '##LAST= , , {}\n'.format(len(self.core.datatables)), + ] + + def __gen_ms_spectra(self): + msspcs = [] + # with open('ms_compose.txt', 'a') as tmpfile: + # for idx, dt in enumerate(self.core.datatables): + # msspc = [ + # '##PAGE={}\n'.format(idx + 1), + # '##NPOINTS={}\n'.format(dt['pts']), + # TEXT_MS_DATA_TABLE, + # ] + # my_content = msspc + dt['dt'] + # tmpfile.write(''.join(my_content)) + + for idx, dt in enumerate(self.core.datatables): + msspc = [ + '##PAGE={}\n'.format(idx + 1), + '##NPOINTS={}\n'.format(dt['pts']), + TEXT_MS_DATA_TABLE, + ] + msspcs = msspcs + msspc + dt['dt'] + # with open('ms_compose.txt', 'r') as tmpfile: + # # msspcs = tmpfile.read() + # lines = tmpfile.readlines() + # msspcs = ''.join(lines) + return msspcs + + def __compose(self): + meta = [] + meta.extend(self.__gen_headers_spectrum_orig()) + + meta.extend(self.__gen_ntuples_begin()) + meta.extend(self.__gen_config()) + meta.extend(self.__gen_ms_spectra()) + meta.extend(self.__gen_ntuples_end()) + + # meta.extend(self.generate_original_metadata()) + + meta.extend(self.gen_ending()) + return meta + + def __prism(self, spc): + blues_x, blues_y, greys_x, greys_y = [], [], [], [] + thres = 0 + if spc.shape[0] > 0: # RESOLVE_VSMBNAN2 + thres = spc[:, 1].max() * (self.core.thres / 100) + + for pt in spc: + x, y = pt[0], pt[1] + if y >= thres: + blues_x.append(x) + blues_y.append(y) + else: + greys_x.append(x) + greys_y.append(y) + return blues_x, blues_y, greys_x, greys_y + + def prism_peaks(self): + idx = (self.core.edit_scan or self.core.auto_scan) - 1 + spc = self.core.spectra[idx] + return self.__prism(spc) + tuple([idx+1]) + + def tf_img(self): + plt.rcParams['figure.figsize'] = [16, 9] + plt.rcParams['font.size'] = 14 + # PLOT data + blues_x, blues_y, greys_x, greys_y, _ = self.prism_peaks() + plt.bar(greys_x, greys_y, width=0, edgecolor='#dddddd') + plt.bar(blues_x, blues_y, width=0, edgecolor='#1f77b4') + + # PLOT label + plt.xlabel('X (m/z)', fontsize=18) + plt.ylabel('Y (Relative Abundance)', fontsize=18) + plt.grid(False) + + # Save + tf = tempfile.NamedTemporaryFile(suffix='.png') + plt.savefig(tf, format='png') + tf.seek(0) + plt.clf() + plt.cla() + return tf + + def tf_csv(self): + return None + + def generate_nmrium(self): + return None + \ No newline at end of file diff --git a/chem_spectra/lib/converter/lcms/__init__.py b/chem_spectra/lib/converter/lcms/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chem_spectra/lib/converter/lcms/base.py b/chem_spectra/lib/converter/lcms/base.py new file mode 100644 index 0000000..26581d0 --- /dev/null +++ b/chem_spectra/lib/converter/lcms/base.py @@ -0,0 +1,50 @@ +import os +import pandas as pd +from chem_spectra.lib.converter.share import parse_params + +class LCMSBaseConverter: + def __init__(self, target_dir, params=False, fname=''): + self.params = parse_params(params) + self.typ = None + self.fname = fname + if target_dir is None: + self.data = None + else: + self.data = self.__read(target_dir, fname) + + def __read(self, target_dir, fname): + tic_postive_data = self.__read_tic(target_dir) + tic_negative_data = self.__read_tic(target_dir, is_negative=True) + + uvvis_data = self.__read_uvvis(target_dir) + + spectra_postive_data = self.__read_spectra(target_dir) + spectra_negative_data = self.__read_spectra(target_dir, is_negative=True) + + return [tic_postive_data, tic_negative_data, uvvis_data, spectra_postive_data, spectra_negative_data] + + def __read_tic(self, target_dir, is_negative = False): + file_path = os.path.join(target_dir, 'TIC_PLUS.csv') + if is_negative: + file_path = os.path.join(target_dir, 'TIC_MINUS.csv') + data_frame = pd.read_csv(file_path, header=0) + tic_postive_data = data_frame.to_dict(orient='list') + return tic_postive_data + + def __read_uvvis(self, target_dir): + file_path = os.path.join(target_dir, 'LCMS.csv') + data_frame = pd.read_csv(file_path, index_col='wavelength', header=0) + grouped_df = data_frame.groupby('wavelength').agg(list) + data_dict = {wavelength: {'RetentionTime': group['RetentionTime'], 'DetectorSignal': group['DetectorSignal']} for wavelength, group in grouped_df.iterrows()} + return data_dict + + def __read_spectra(self, target_dir, is_negative = False): + file_path = os.path.join(target_dir, 'MZ_PLUS_Spectra.csv') + if is_negative: + file_path = os.path.join(target_dir, 'MZ_MINUS_Spectra.csv') + data_frame = pd.read_csv(file_path, index_col='time', header=0) + grouped_df = data_frame.groupby('time').agg(list) + data_dict = {time: {'mz': group['mz'], 'intensities': group['intensities']} for time, group in grouped_df.iterrows()} + return data_dict + + diff --git a/chem_spectra/model/transformer.py b/chem_spectra/model/transformer.py index fd8b363..735da72 100644 --- a/chem_spectra/model/transformer.py +++ b/chem_spectra/model/transformer.py @@ -13,9 +13,11 @@ from chem_spectra.lib.converter.fid.base import FidBaseConverter from chem_spectra.lib.converter.fid.bruker import FidHasBruckerProcessed from chem_spectra.lib.converter.bagit.base import BagItBaseConverter +from chem_spectra.lib.converter.lcms.base import LCMSBaseConverter from chem_spectra.lib.converter.ms import MSConverter from chem_spectra.lib.composer.ni import NIComposer from chem_spectra.lib.composer.ms import MSComposer +from chem_spectra.lib.composer.lcms import LCMSComposer from chem_spectra.lib.composer.base import BaseComposer # noqa: F401 from chem_spectra.lib.converter.nmrium.base import NMRiumDataConverter import matplotlib.pyplot as plt # noqa: E402 @@ -63,6 +65,13 @@ def search_bag_it_file(td): except: # noqa: E722 return False +def search_lcms_file(td): + try: + target_dir = find_dir(td, 'LCMS.csv') + return target_dir + except: # noqa: E722 + return False + class TransformerModel: def __init__(self, file, molfile=None, params=False, multiple_files=False): @@ -180,9 +189,14 @@ def zip2cvp(self): return nicv, nicp, invalid_molfile else: is_bagit = search_bag_it_file(td) + is_lcms = search_lcms_file(td) if is_bagit: bagcv = BagItBaseConverter(td, self.params, self.file.name) return bagcv, bagcv, False + elif is_lcms: + lcms_cv = LCMSBaseConverter(td, self.params, self.file.name) + lcms_np = LCMSComposer(lcms_cv) + return lcms_cv, lcms_np, False return False, False, False diff --git a/tests/fixtures/source/lcms/lcms.zip b/tests/fixtures/source/lcms/lcms.zip new file mode 100644 index 0000000..03ee21e Binary files /dev/null and b/tests/fixtures/source/lcms/lcms.zip differ diff --git a/tests/lib/composer/test_lcms_composer.py b/tests/lib/composer/test_lcms_composer.py new file mode 100644 index 0000000..9f0d5e6 --- /dev/null +++ b/tests/lib/composer/test_lcms_composer.py @@ -0,0 +1,105 @@ +import json +import pytest +import tempfile +import zipfile +from chem_spectra.lib.converter.lcms.base import LCMSBaseConverter +from chem_spectra.lib.composer.lcms import LCMSComposer + +source = './tests/fixtures/source/lcms/lcms.zip' + +@pytest.fixture +def zip_file(): + return source + +def test_init_lcms_composer_failed(): + with pytest.raises(Exception) as error: + _ = LCMSComposer(None) + + assert error is not None + +def test_init_lcms_composer_success(zip_file): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(zip_file, 'r') as z: + z.extractall(td) + + lcms_converter = LCMSBaseConverter(td) + lcms_composer = LCMSComposer(core=lcms_converter) + + assert lcms_composer is not None + assert lcms_composer.core == lcms_converter + assert len(lcms_composer.data) == 5 + +def test_lcms_composer_tic_postive(zip_file): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(zip_file, 'r') as z: + z.extractall(td) + + lcms_converter = LCMSBaseConverter(td) + lcms_composer = LCMSComposer(core=lcms_converter) + + tic_postive_jcamp = lcms_composer.data[0] + assert tic_postive_jcamp is not None + with open(tic_postive_jcamp.name) as file: + file_content = file.read() + assert file_content != "" + assert '##$CSCATEGORY=TIC SPECTRUM' in file_content + +def test_lcms_composer_tic_negative(zip_file): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(zip_file, 'r') as z: + z.extractall(td) + + lcms_converter = LCMSBaseConverter(td) + lcms_composer = LCMSComposer(core=lcms_converter) + + tic_postive_jcamp = lcms_composer.data[1] + assert tic_postive_jcamp is not None + with open(tic_postive_jcamp.name) as file: + file_content = file.read() + assert file_content != "" + assert '##$CSCATEGORY=TIC SPECTRUM' in file_content + +def test_lcms_composer_tic_uvvis(zip_file): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(zip_file, 'r') as z: + z.extractall(td) + + lcms_converter = LCMSBaseConverter(td) + lcms_composer = LCMSComposer(core=lcms_converter) + + uvvis_jcamp = lcms_composer.data[2] + assert uvvis_jcamp is not None + with open(uvvis_jcamp.name) as file: + file_content = file.read() + assert file_content != "" + assert '##$CSCATEGORY=UVVIS SPECTRUM' in file_content + +def test_lcms_composer_mz_spectra_positive(zip_file): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(zip_file, 'r') as z: + z.extractall(td) + + lcms_converter = LCMSBaseConverter(td) + lcms_composer = LCMSComposer(core=lcms_converter) + + spectra_jcamp = lcms_composer.data[3] + assert spectra_jcamp is not None + with open(spectra_jcamp.name) as file: + file_content = file.read() + assert file_content != "" + assert '##$CSCATEGORY=MZ POSITIVE SPECTRUM' in file_content + +def test_lcms_composer_mz_spectra_positive(zip_file): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(zip_file, 'r') as z: + z.extractall(td) + + lcms_converter = LCMSBaseConverter(td) + lcms_composer = LCMSComposer(core=lcms_converter) + + spectra_jcamp = lcms_composer.data[4] + assert spectra_jcamp is not None + with open(spectra_jcamp.name) as file: + file_content = file.read() + assert file_content != "" + assert '##$CSCATEGORY=MZ NEGATIVE SPECTRUM' in file_content diff --git a/tests/lib/converter/lcms/test_lcms_converter.py b/tests/lib/converter/lcms/test_lcms_converter.py new file mode 100644 index 0000000..64e3c9b --- /dev/null +++ b/tests/lib/converter/lcms/test_lcms_converter.py @@ -0,0 +1,87 @@ +import tempfile +import zipfile +import mimetypes +import base64 +from chem_spectra.lib.converter.lcms.base import LCMSBaseConverter as LCMSConveter + +target_dir = './tests/fixtures/source/lcms/lcms.zip' + +def assertFileType(file, mimeStr): + assert mimetypes.guess_type(file.name)[0] == mimeStr + +def assertJcampContent(jcamp, field): + assertFileType(jcamp, 'chemical/x-jcamp-dx') + jcamp_content = str(jcamp.read()) + assert field in jcamp_content + +def isBase64(encodeString): + plainStr = base64.b64decode(encodeString) + encodedStr = base64.b64encode(plainStr).decode("utf-8") + assert encodedStr == encodeString + +def test_lcms_converter_failed(): + converter = LCMSConveter(None) + assert converter.data is None + +def test_lcms_converter_success(): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(target_dir, 'r') as z: + z.extractall(td) + + converter = LCMSConveter(td, fname='lcms') + assert converter.data is not None + assert len(converter.data) == 5 + +def test_lcms_converter_tic_positive(): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(target_dir, 'r') as z: + z.extractall(td) + + converter = LCMSConveter(td, fname='lcms') + tic_positive = converter.data[0] + assert len(tic_positive['time']) > 0 + assert len(tic_positive['Intensity']) > 0 + +def test_lcms_converter_tic_negative(): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(target_dir, 'r') as z: + z.extractall(td) + + converter = LCMSConveter(td, fname='lcms') + tic_positive = converter.data[1] + assert len(tic_positive['time']) > 0 + assert len(tic_positive['Intensity']) > 0 + +def test_lcms_converter_uvvis(): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(target_dir, 'r') as z: + z.extractall(td) + + converter = LCMSConveter(td, fname='lcms') + uvvis = converter.data[2] + for wavelength in uvvis.keys(): + assert len(uvvis[wavelength]['RetentionTime']) > 0 + assert len(uvvis[wavelength]['DetectorSignal']) > 0 + +def test_lcms_converter_spectra_positive(): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(target_dir, 'r') as z: + z.extractall(td) + + converter = LCMSConveter(td, fname='lcms') + spectra_positive = converter.data[3] + for time in spectra_positive.keys(): + assert len(spectra_positive[time]['mz']) > 0 + assert len(spectra_positive[time]['intensities']) > 0 + +def test_lcms_converter_spectra_negative(): + with tempfile.TemporaryDirectory() as td: + with zipfile.ZipFile(target_dir, 'r') as z: + z.extractall(td) + + converter = LCMSConveter(td, fname='lcms') + spectra_negative = converter.data[4] + for time in spectra_negative.keys(): + assert len(spectra_negative[time]['mz']) > 0 + assert len(spectra_negative[time]['intensities']) > 0 +