Skip to content

Commit

Permalink
changes from stream mzml reading to sequential
Browse files Browse the repository at this point in the history
  • Loading branch information
ypriverol committed Mar 8, 2025
1 parent 32914fe commit 4c88bb8
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 65 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ quantms-utils have multiple scripts to generate mzML stats. These files are used

</details>



## Contributions and issues

Contributions and issues are welcome. Please, open an issue in the [GitHub repository](https://github.com/bigbio/quantms) or PR in the [GitHub repository](https://github.com/bigbio/quantms-utils).
127 changes: 65 additions & 62 deletions quantmsutils/mzml/mzml_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re
import sqlite3
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple, Union
from typing import Dict, List, Optional, Set

import click
import numpy as np
Expand All @@ -24,7 +24,9 @@
RETENTION_TIME,
SCAN,
SUMMED_PEAK_INTENSITY,
MAX_INTENSITY, PRECURSOR_RT, PRECURSOR_TOTAL_INTENSITY, PRECURSOR_INTENSITY_WINDOW,
PRECURSOR_RT,
PRECURSOR_TOTAL_INTENSITY,
PRECURSOR_INTENSITY_WINDOW,
)

logging.basicConfig(format="%(asctime)s [%(funcName)s] - %(message)s", level=logging.INFO)
Expand Down Expand Up @@ -63,9 +65,7 @@ def create_id_schema() -> pa.Schema:
)


def batch_write_bruker_d(
file_name: str, output_path: str, batch_size: int = 10000
) -> str:
def batch_write_bruker_d(file_name: str, output_path: str, batch_size: int = 10000) -> str:
"""
Batch processing and writing of Bruker .d files.
Expand Down Expand Up @@ -120,7 +120,9 @@ def batch_write_bruker_d(
query = f"""SELECT {', '.join(safe_columns)} FROM frames"""

# Set up parquet writer with appropriate schema
with pq.ParquetWriter(output_path, schema=schema, compression="gzip") as parquet_writer:
with pq.ParquetWriter(
output_path, schema=schema, compression="gzip"
) as parquet_writer:
# Stream data in batches
for chunk in pd.read_sql_query(query, conn, chunksize=batch_size):
chunk["AcquisitionDateTime"] = acquisition_date_time
Expand Down Expand Up @@ -165,13 +167,13 @@ class BatchWritingConsumer:
"""

def __init__(
self,
parquet_schema: pa.Schema,
id_parquet_schema: pa.Schema,
output_path: str,
batch_size: int = 10000,
id_only: bool = False,
id_output_path: Optional[str] = None,
self,
parquet_schema: pa.Schema,
id_parquet_schema: pa.Schema,
output_path: str,
batch_size: int = 10000,
id_only: bool = False,
id_output_path: Optional[str] = None,
):
"""
Initialize the BatchWritingConsumer.
Expand Down Expand Up @@ -205,10 +207,10 @@ def __init__(
self.scan_pattern = re.compile(r"[scan|spectrum]=(\d+)")

def transform_mzml_spectrum(
self,
i: int,
spectrum: oms.MSSpectrum,
mzml_exp: oms.MSExperiment,
self,
i: int,
spectrum: oms.MSSpectrum,
mzml_exp: oms.MSExperiment,
) -> None:
"""
Process a spectrum and add it to the batch data.
Expand Down Expand Up @@ -251,12 +253,14 @@ def transform_mzml_spectrum(
# Extract spectrum ID for ID-only mode
if self.id_only:
scan_id = self._extract_scan_id(spectrum)
self.psm_parts.append({
SCAN: scan_id,
MS_LEVEL: ms_level,
MZ_ARRAY: mz_array.tolist(),
INTENSITY_ARRAY: intensity_array.tolist(),
})
self.psm_parts.append(
{
SCAN: scan_id,
MS_LEVEL: ms_level,
MZ_ARRAY: mz_array.tolist(),
INTENSITY_ARRAY: intensity_array.tolist(),
}
)

# Working only with the first precursor
first_precursor = spectrum.getPrecursors()[0]
Expand All @@ -277,11 +281,18 @@ def transform_mzml_spectrum(
RETENTION_TIME: float(rt),
CHARGE: int(charge_state) if charge_state else None,
EXPERIMENTAL_MASS_TO_CHARGE: float(exp_mz) if exp_mz else None,
PRECURSOR_RT: first_precursor_calculated["rt"] if first_precursor_calculated else None,
PRECURSOR_RT: (
first_precursor_calculated["rt"] if first_precursor_calculated else None
),
INTENSITY: float(intensity) if intensity else None,
PRECURSOR_TOTAL_INTENSITY: first_precursor_calculated[
"total_intensity"] if first_precursor_calculated else None,
ACQUISITION_DATETIME: str(self.acquisition_datetime) if self.acquisition_datetime else None,
PRECURSOR_TOTAL_INTENSITY: (
first_precursor_calculated["total_intensity"]
if first_precursor_calculated
else None
),
ACQUISITION_DATETIME: (
str(self.acquisition_datetime) if self.acquisition_datetime else None
),
}
else:
# Process MS1
Expand All @@ -298,7 +309,9 @@ def transform_mzml_spectrum(
PRECURSOR_RT: None,
PRECURSOR_TOTAL_INTENSITY: None,
PRECURSOR_INTENSITY_WINDOW: None,
ACQUISITION_DATETIME: str(self.acquisition_datetime) if self.acquisition_datetime else None,
ACQUISITION_DATETIME: (
str(self.acquisition_datetime) if self.acquisition_datetime else None
),
}

self.batch_data.append(row_data)
Expand All @@ -315,7 +328,7 @@ def _extract_scan_id(self, spectrum: oms.MSSpectrum) -> str:
return spectrum.getNativeID()

def _extract_first_precursor_data(
self, spectrum: oms.MSSpectrum, i: int, mzml_exp: oms.MSExperiment
self, spectrum: oms.MSSpectrum, i: int, mzml_exp: oms.MSExperiment
) -> Dict:
"""
Extract precursor information from the first precursor when it is not annotated in the MS2.
Expand Down Expand Up @@ -381,16 +394,11 @@ def _write_batch(self) -> None:
# Initialize writer lazily if not already created
if self.parquet_writer is None:
self.parquet_writer = pq.ParquetWriter(
where=self.output_path,
schema=self.parquet_schema,
compression="gzip"
where=self.output_path, schema=self.parquet_schema, compression="gzip"
)

# Create a RecordBatch directly from the current batch
batch = pa.RecordBatch.from_pylist(
self.batch_data,
schema=self.parquet_schema
)
batch = pa.RecordBatch.from_pylist(self.batch_data, schema=self.parquet_schema)

# Write the batch directly
self.parquet_writer.write_batch(batch)
Expand All @@ -407,10 +415,7 @@ def _write_batch(self) -> None:
)

# Create and write batch
batch = pa.RecordBatch.from_pylist(
self.psm_parts,
schema=self.id_parquet_schema
)
batch = pa.RecordBatch.from_pylist(self.psm_parts, schema=self.id_parquet_schema)
self.id_parquet_writer.write_batch(batch)
self.psm_parts = []

Expand All @@ -437,11 +442,11 @@ def finalize(self) -> None:


def batch_write_mzml_streaming(
file_name: str,
output_path: str,
id_only: bool = False,
id_output_path: Optional[str] = None,
batch_size: int = 10000,
file_name: str,
output_path: str,
id_only: bool = False,
id_output_path: Optional[str] = None,
batch_size: int = 10000,
) -> Optional[str]:
"""
Process an mzML file using streaming and write to parquet.
Expand Down Expand Up @@ -535,9 +540,7 @@ def resolve_ms_path(ms_path: str) -> str:
candidates = list(path_obj.parent.glob(f"{path_obj.stem}*"))
valid_extensions = {".d", ".mzml", ".mzML"}
valid_candidates = [
str(c.resolve())
for c in candidates
if c.suffix.lower() in valid_extensions
str(c.resolve()) for c in candidates if c.suffix.lower() in valid_extensions
]

if len(valid_candidates) == 1:
Expand All @@ -550,16 +553,20 @@ def resolve_ms_path(ms_path: str) -> str:


@click.command("mzmlstats")
@click.option("--ms_path", type=click.Path(exists=True), required=True,
help="Path to mass spectrometry file (.mzML or .d)")
@click.option("--id_only", is_flag=True,
help="Generate a parquet with the spectrum id and the peaks")
@click.option("--batch_size", type=int, default=10000,
help="Number of rows to write in each batch")
@click.option(
"--ms_path",
type=click.Path(exists=True),
required=True,
help="Path to mass spectrometry file (.mzML or .d)",
)
@click.option(
"--id_only", is_flag=True, help="Generate a parquet with the spectrum id and the peaks"
)
@click.option(
"--batch_size", type=int, default=10000, help="Number of rows to write in each batch"
)
@click.pass_context
def mzml_statistics(
ctx, ms_path: str, id_only: bool = False, batch_size: int = 10000
) -> None:
def mzml_statistics(ctx, ms_path: str, id_only: bool = False, batch_size: int = 10000) -> None:
"""
Parse mass spectrometry data files (.mzML or Bruker .d formats) to extract
and compile statistics about the spectra.
Expand All @@ -578,11 +585,7 @@ def mzml_statistics(

# Process based on file type
if path_obj.suffix.lower() == ".d":
batch_write_bruker_d(
file_name=ms_path,
output_path=output_path,
batch_size=batch_size
)
batch_write_bruker_d(file_name=ms_path, output_path=output_path, batch_size=batch_size)
elif path_obj.suffix.lower() in [".mzml"]:
batch_write_mzml_streaming(
file_name=ms_path,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def test_convert_psm_help():

assert result.exit_code == 0


def test_check_samplesheet_help():
runner = CliRunner()
result = runner.invoke(cli, ["checksamplesheet", "--help"])
Expand Down Expand Up @@ -140,5 +141,4 @@ def test_mzml_statistics():
table1 = table1.set_index("scan")
assert len(table2) == len(table1)


assert result.exit_code == 0

0 comments on commit 4c88bb8

Please sign in to comment.