-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathtiter_block.py
436 lines (372 loc) · 19.7 KB
/
titer_block.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
import xlrd
from collections import defaultdict
import re
def parse_args():
"""
Parse command line arguments.
"""
parser = argparse.ArgumentParser(
description="Find the block of titers in an Excel worksheet.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--file",
default="~/nextstrain/fludata/VIDRL-Melbourne-WHO-CC/raw-data/A/H1N1pdm/HI/2024/20240528H1N1.xlsx",
required=False,
help="Path to the Excel file",
)
parser.add_argument(
"--source",
default="vidrl",
required=False,
help="Either vidrl, niid, or crick. Used to select appropriate pattern"
)
parser.add_argument(
"--log-human-sera",
default=False,
required=False,
help="Whether to log human serum samples"
)
return parser.parse_args()
def is_numeric(value):
"""
Check if the value is numeric or a string representing a numeric value with '<'.
Basically checking for titer values e.g. "80", "< 10", "1400", ">2560", etc.
"""
if isinstance(value, (int, float)):
return True
if isinstance(value, str):
value = value.strip()
if value.startswith("<") or value.startswith('<'):
try:
float(value[1:].strip())
return True
except ValueError:
return False
if value.startswith(">"):
try:
float(value[1:].strip())
return True
except ValueError:
return False
return False
def find_titer_block(worksheet):
"""
Find the block of titers in the worksheet.
To reduce false positives, the function looks for rows and columns where at least two consecutive cells contain numeric values.
:param worksheet: The worksheet object
:return: Dictionary containing the most likely row and column indices for the titer block sorted by vote count
"""
col_start_dict = defaultdict(int)
col_end_dict = defaultdict(int)
row_start_dict = defaultdict(int)
row_end_dict = defaultdict(int)
# Iterate over each row
for row_idx in range(worksheet.nrows):
first_numeric_index = None
last_numeric_index = None
for col_idx in range(worksheet.ncols):
cell_value = worksheet.cell_value(row_idx, col_idx)
next_cell_value = (
worksheet.cell_value(row_idx, col_idx + 1)
if col_idx + 1 < worksheet.ncols
else None
)
# Check for two consecutive numeric values in a row before incrementing the count
if is_numeric(cell_value) and is_numeric(next_cell_value):
if first_numeric_index is None:
first_numeric_index = col_idx
last_numeric_index = col_idx + 1
# Only increment if any numeric values were found in the row
if first_numeric_index is not None:
col_start_dict[first_numeric_index] += 1
col_end_dict[last_numeric_index] += 1
# Iterate over each column
for col_idx in range(worksheet.ncols):
first_numeric_index = None
last_numeric_index = None
for row_idx in range(worksheet.nrows):
cell_value = worksheet.cell_value(row_idx, col_idx)
next_cell_value = (
worksheet.cell_value(row_idx + 1, col_idx)
if row_idx + 1 < worksheet.nrows
else None
)
# Check for two consecutive numeric values in a column before incrementing the count
if is_numeric(cell_value) and is_numeric(next_cell_value):
if first_numeric_index is None:
first_numeric_index = row_idx
last_numeric_index = row_idx + 1
# Only increment if any numeric values were found in the column
if first_numeric_index is not None:
row_start_dict[first_numeric_index] += 1
row_end_dict[last_numeric_index] += 1
# Sort the dictionaries by frequency in descending order
sorted_col_start = sorted(col_start_dict.items(), key=lambda item: item[1], reverse=True)
sorted_col_end = sorted(col_end_dict.items(), key=lambda item: item[1], reverse=True)
sorted_row_start = sorted(row_start_dict.items(), key=lambda item: item[1], reverse=True)
sorted_row_end = sorted(row_end_dict.items(), key=lambda item: item[1], reverse=True)
return {
"col_start": sorted_col_start,
"col_end": sorted_col_end,
"row_start": sorted_row_start,
"row_end": sorted_row_end,
}
def find_virus_columns(worksheet, titer_coords, virus_pattern=None, virus_passage_pattern=None):
"""
Find the columns containing virus names based on the most likely column indices for the titer block.
:param worksheet: The worksheet object
:param titer_block: Dictionary containing the titer block coordinates
:param virus_pattern: Regular expression pattern to match virus names (optional)
:param virus_passage_pattern: Regular expression pattern to match virus passage data (optional)
:return: Dictionary containing the column indices for virus names, virus passage data, and a list of virus names
"""
# List of outputs
virus_col_idx = None # Index of the column containing virus names
virus_passage_col_idx = None # Index of the column containing virus passage data
virus_names = (
[]
) # List of virus names, will be used by find_antigen_rows to map abbreviated antigen names to full names
# By default use VIDRL-Melbourne-WHO-CC specific patterns
# Define a regular expression pattern to match virus names
if virus_pattern is None:
virus_pattern = r"[A-Z]/[\w\s-]+/.+/\d{4}"
# Define a regular expression pattern to match virus passage column
if virus_passage_pattern is None:
virus_passage_pattern = r"(MDCK|SIAT|E\d+|hCK)"
# Find the column containing virus names searching to the left of the titer block
for col_idx in range(titer_coords['col_start'] - 1, -1, -1):
virus_count = 0
for row_idx in range(titer_coords['row_start'], titer_coords['row_end'] + 1):
cell_value = str(worksheet.cell_value(row_idx, col_idx))
if re.search(virus_pattern, cell_value):
virus_count += 1
# Index of the first column that contains more than 50% rows matching the virus pattern
if virus_count > (titer_coords['row_end'] - titer_coords['row_start']) / 2:
virus_col_idx = col_idx
break
# Get the virus names from the column containing virus names
# This allows for some lienency in matching the virus pattern in the column
for row_idx in range(titer_coords['row_start'], titer_coords['row_end'] + 1):
cell_value = str(worksheet.cell_value(row_idx, virus_col_idx))
if r"/" in cell_value:
virus_names.append(cell_value)
# Find the column containing virus passage data searching to the right of the titer block
# If not found, search left
for col_idx in list(range(titer_coords['col_end'], worksheet.ncols)) + list(range(titer_coords['col_start'] - 1, -1, -1)):
virus_passage_count = 0
for row_idx in range(titer_coords['row_end']):
cell_value = str(worksheet.cell_value(row_idx, col_idx))
if re.search(virus_passage_pattern, cell_value):
virus_passage_count += 1
# Index of the first column that contains more than 50% rows matching the virus passage pattern
if virus_passage_count > (titer_coords['row_end'] - titer_coords['row_start']) / 2:
virus_passage_col_idx = col_idx
break
return {
"virus_col_idx": virus_col_idx,
"virus_passage_col_idx": virus_passage_col_idx,
"virus_names": virus_names,
}
def find_serum_rows(worksheet, titer_coords, virus_names=None, serum_id_pattern=None, serum_passage_pattern=None, serum_abbrev_pattern=None, crick=False, ignore_serum_pattern=None, log_human_sera=False):
"""
Find the row containing cell passage data and the row containing abbreviated serum names.
:param worksheet: The worksheet object
:param titer_coords: Dictionary containing the titer block coordinates
:param virus_names: List of virus names
:param serum_id_pattern: Regular expression pattern to match serum ID (optional)
:param serum_passage_pattern: Regular expression pattern to match cell passage data (optional)
:param serum_abbrev_pattern: Regular expression pattern to match abbreviated serum names (optional)
:return: Dictionary containing the row indices for serum ID, cell passage data, abbreviated serum names, and a mapping of abbreviated names to full names
"""
# List of outputs
serum_id_row_idx = None # Index of the row containing serum ID
serum_passage_row_idx = None # Index of the row containing cell passage data
serum_abbrev_row_idx = None # Index of the row containing abbreviated antigen names
serum_mapping = {} # Mapping of abbreviated antigen names to full names
human_serum_data = [] # Will store a list of dictionaries containing human serum abbreviated name, ID, and passage data, and column index
# By default, use VIDRL-Melbourne-WHO-CC specific patterns
# Define a regular expression pattern to match serum ID
if serum_id_pattern is None:
serum_id_pattern = r"^[A-Z]\d{4,8}$"
# Define a regular expression pattern to match cell passage data
if serum_passage_pattern is None:
serum_passage_pattern = r"(MDCK\d+|SIAT\d+|E\d+)"
# Define a regular expression pattern to match abbreviated antigen names
if serum_abbrev_pattern is None:
serum_abbrev_pattern = r"\w+\s{0,1}\w+/\d+.*"
if ignore_serum_pattern is None:
ignore_serum_pattern = r"(^SH\d+|SHVAX|SHvax|sera|vaxpool|Pool).*"
# Find the row containing serum ID searching from the top of the titer block upwards
for row_idx in range(titer_coords['row_start'] - 1, -1, -1): # Iterate from row_start to the top
serum_id_count = 0
for col_idx in range(titer_coords['col_start'], titer_coords['col_end'] + 1):
cell_value = str(worksheet.cell_value(row_idx, col_idx))
cell_value = re.sub(r'[\r\n ]+', '', cell_value)
if re.search(serum_id_pattern, cell_value):
serum_id_count += 1
# Index of the first row that contains more than 50% columns matching the serum ID pattern
if serum_id_count > (titer_coords['col_end'] - titer_coords['col_start']) / 2:
serum_id_row_idx = row_idx
break
# Find the row containing cell passage data searching from the top of the titer block upwards
for row_idx in range(titer_coords['row_start'] - 1, -1, -1): # Iterate from row_start to the top
serum_passage_count = 0
for col_idx in range(titer_coords['col_start'], titer_coords['col_end'] + 1):
cell_value = str(worksheet.cell_value(row_idx, col_idx))
cell_value = re.sub(r'[\r\n ]+', '', cell_value)
if re.search(serum_passage_pattern, cell_value):
serum_passage_count += 1
if serum_passage_count > (titer_coords['col_end'] - titer_coords['col_start']) / 2:
serum_passage_row_idx = row_idx
break
# Find the row containing abbreviated serum names searching from the top of the titer block upwards
for row_idx in range(titer_coords['row_start'] -1, -1, -1):
serum_abbrev_count = 0
for col_idx in range(titer_coords['col_start'], titer_coords['col_end'] + 1):
cell_value = str(worksheet.cell_value(row_idx, col_idx))
cell_value = re.sub(r'[\r\n ]+', '', cell_value)
if re.search(serum_abbrev_pattern, cell_value):
serum_abbrev_count += 1
if serum_abbrev_count > (titer_coords['col_end'] - titer_coords['col_start']) / 2:
serum_abbrev_row_idx = row_idx
break
if serum_abbrev_row_idx is not None:
# Map abbreviated serum names to full names
virus_idx = 0
for col_idx in range(titer_coords['col_start'], titer_coords['col_end'] + 1):
cell_value = str(worksheet.cell_value(serum_abbrev_row_idx, col_idx))
if crick:
cell_value = cell_value + str(worksheet.cell_value(serum_abbrev_row_idx+1, col_idx))
cell_value = re.sub(r'[\r\n ]+', '', cell_value)
if cell_value == "":
break
# Ignore human serum (e.g. "SH2002", "sera", "SHVAX2002")
if re.search(ignore_serum_pattern, cell_value):
if log_human_sera:
max_row = max(serum_abbrev_row_idx, serum_id_row_idx, serum_passage_row_idx)
human_serum_data.append({
"col_idx": col_idx,
"serum_abbrev": cell_value,
"serum_id": str(worksheet.cell_value(serum_id_row_idx, col_idx)),
"serum_passage": str(worksheet.cell_value(serum_passage_row_idx, col_idx)),
# Sometimes egg/cell distinction is stored in a separate row
"extra_info": str(worksheet.cell_value(max_row + 1, col_idx))
})
continue
# Deal with duplicate serum abbreviations which can get out of sync with virus full names
if cell_value not in serum_mapping:
serum_mapping[cell_value] = virus_names[virus_idx]
# Increment virus_idx until it does not equal last_full_name
current_full_name = virus_names[virus_idx].lower().strip()
while virus_names[virus_idx].lower().strip() == current_full_name and virus_idx < len(virus_names) - 1:
virus_idx += 1
return {
"serum_id_row_idx": serum_id_row_idx,
"serum_passage_row_idx": serum_passage_row_idx,
"serum_abbrev_row_idx": serum_abbrev_row_idx,
"serum_mapping": serum_mapping,
"human_serum_data": human_serum_data
}
def main():
args = parse_args()
# Default patterns, VIDRL
virus_pattern = r"[A-Z]/[\w\s-]+/.+/\d{4}"
virus_passage_pattern = r"(MDCK|SIAT|E\d+|hCK)"
serum_id_pattern = r"^[A-Z]\d{4,8}"
serum_passage_pattern = r"(MDCK\d+|SIAT\d+|E\d+|CELL|cell|Cell)"
serum_abbrev_pattern = r"\w+\s{0,1}\w+/\d+.*"
crick = False
if(args.source == "niid"):
serum_id_pattern = r".+(No\.|no\.).+"
serum_passage_pattern = r".+(Egg|Cell).+"
if(args.source == "crick"):
virus_pattern = r"[A-Z]/[\w\s-]+"
serum_id_pattern = r"F\d+/\d+"
serum_passage_pattern = r"(MDCK|SIAT|Egg)"
serum_abbrev_pattern = r"[A-Z]/[\w\s-]+"
crick = True
# Load the Excel file
workbook = xlrd.open_workbook(args.file)
for worksheet_index, worksheet in enumerate(workbook.sheets(), start=1):
print(f"Reading worksheet {worksheet_index} '{worksheet.name}' in file '{args.file}'")
# Find the block of titers in the worksheet
titer_block = find_titer_block(worksheet)
if len(titer_block["col_start"]) == 0:
print("No titer block found.")
break
titer_coords = {
'col_start': titer_block["col_start"][0][0],
'col_end': titer_block["col_end"][0][0],
'row_start': titer_block["row_start"][0][0],
'row_end': titer_block["row_end"][0][0]
}
virus_block = find_virus_columns(
worksheet=worksheet,
titer_coords=titer_coords,
virus_pattern=virus_pattern,
virus_passage_pattern=virus_passage_pattern,
)
# If no virus names are found, might not be a valid worksheet, skip worksheet to avoid breaking find_serum_rows
if virus_block["virus_names"] is None:
print(f"Virus names not found. Check the virus pattern: '{virus_pattern}'")
break
serum_block = find_serum_rows(
worksheet=worksheet,
titer_coords=titer_coords,
virus_names=virus_block["virus_names"],
serum_id_pattern=serum_id_pattern,
serum_passage_pattern=serum_passage_pattern,
serum_abbrev_pattern=serum_abbrev_pattern,
crick=crick,
log_human_sera=args.log_human_sera
)
# Print the most likely row and column indices for the titer block
print(f"Titer block: n = {titer_block['row_start'][0][1]}x{titer_block['col_start'][0][1]} = {titer_block['row_start'][0][1]*titer_block['col_start'][0][1]}")
print(f" Most likely (n={titer_block['col_start'][0][1]}) col_start: {titer_block['col_start'][0][0]}")
print(f" Most likely (n={titer_block['col_end'][0][1]}) col_end: {titer_block['col_end'][0][0]}")
print(f" Most likely (n={titer_block['row_start'][0][1]}) row_start: {titer_block['row_start'][0][0]}")
print(f" Most likely (n={titer_block['row_end'][0][1]}) row_end: {titer_block['row_end'][0][0]}")
# For debugging purposes, print alternative indices (e.g. col_start, col_end, row_start, row_end)
# print("Alternative indices:")
# for i in range(1, len(titer_block['row_start'])):
# print(f" Alternative (n={titer_block['row_start'][i][1]}) row_start: {titer_block['row_start'][i][0]}")
# Print Virus and Serum annotations row and column indices
print("Virus (antigen) block: left and right of the titer block")
print(f" virus column index: {virus_block['virus_col_idx']}")
print(f" virus passage column index: {virus_block['virus_passage_col_idx']}")
print(f" virus names: {virus_block['virus_names']}")
print("Serum (antisera) block: above the titer block")
print(f" serum ID row index: {serum_block['serum_id_row_idx']}")
print(f" serum passage row index: {serum_block['serum_passage_row_idx']}")
print(f" serum abbreviated name row index: {serum_block['serum_abbrev_row_idx']}")
# Match abbreviated names across the top to the full names along the left side and auto convert to full names
if serum_block["serum_abbrev_row_idx"] is not None:
# print("Serum mapping:")
# for abbrev, full in serum_block["serum_mapping"].items():
# print(f" {abbrev} -> {full}")
print("serum_mapping = {")
for abbrev, full in serum_block["serum_mapping"].items():
print(f" '{abbrev}': '{full}',")
print("}")
# Print human serum data
if args.log_human_sera:
print("HumanSerumData\tcol_idx\tserum_abbrev\tserum_id\tserum_passage\tfilename")
for serum_data in serum_block["human_serum_data"]:
print(f"HumanSerumData\t{serum_data['col_idx']}\t{serum_data['serum_abbrev']}\t{serum_data['serum_id']}\t{serum_data['serum_passage']}\t{args.file}")
# Check if all the necessary indices were found
if virus_block["virus_col_idx"] is None:
print(f"Virus column index not found. Check the virus pattern: '{virus_pattern}'")
if virus_block["virus_passage_col_idx"] is None:
print(f"Virus passage column index not found. Check the virus passage pattern: '{virus_passage_pattern}'")
if serum_block["serum_id_row_idx"] is None:
print(f"Serum ID row index not found. Check the serum ID pattern: '{serum_id_pattern}'")
if serum_block["serum_passage_row_idx"] is None:
print(f"Serum passage row index not found. Check the serum passage pattern: '{serum_passage_pattern}'")
if serum_block["serum_abbrev_row_idx"] is None:
print(f"Serum abbreviated name row index not found. Check the serum abbreviated name pattern: '{serum_abbrev_pattern}'")
if __name__ == "__main__":
main()