-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.py
335 lines (270 loc) · 14.1 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
from collections import namedtuple
import numpy as np
import logging
import logconfig
import time
import datetime
import csv
import glob
import functools
import os
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
Candidate_Info = namedtuple("Candidate_Info",
"isNodule_bool, diameter_mm, series_uid, center_xyz")
CandidateInfoTuple = namedtuple('CandidateInfoTuple', 'is_nodule, has_annotations, is_malignant, diameter_mm, series_uid, center_xyz')
AugmentationInfo = namedtuple("AugmentationInfo",
"flip, offset, scale, rotate, noise")
DATASET_DIR_PATH = "luna/"
# for consistency
IRC_tuple = namedtuple('IRC_tuple', ["index", "row", "column"])
XYZ_tuple = namedtuple('XYZ_tuple', ["x", "y", "z"])
def irc2xyz(irc_coord, xyz_origin, xyz_sizes, irc_transform_mat):
"""
Map the voxel-based coordinates to the patient coordidates through data included in the meta-data file of CT.
Parameters:
- irc_coord (IRC_tuple): the voxel coordinates to be transformed (index, row, column).
- xyz_origin (XYZ_tuple): the exact origin point in the patient space for reference.
- xyz_sizes (XYZ_tuple): the voxel size for scaling purposes.
- irc_transform_mat (np.array): the transformation matrix between the two spaces.
return:
transformed coordinate as XYZ_tuple
"""
cri_coord = np.array(irc_coord)[::-1]
physical_origin = np.array(xyz_origin)
physical_sizes = np.array(xyz_sizes)
xyz_coord = irc_transform_mat @ (cri_coord * physical_sizes) + physical_origin
return XYZ_tuple(*xyz_coord)
def xyz2irc(xyz_coord, xyz_origin, xyz_sizes, irc_transform_mat):
"""
Map the patient coordidates to the voxel-based coordinates through data included in the meta-data file of CT.
Parameters:
- xyz_coord (XYZ_tuple): the patient coordinates to be transformed.
- xyz_origin (XYZ_tuple): the exact origin point in the patient space for reference.
- xyz_sizes (XYZ_tuple): the voxel size for scaling purposes.
- irc_transform_mat (np.array): the transformation matrix between the two spaces.
return:
transformed coordinate as IRC_tuple
"""
coordinate_xyz = np.array(xyz_coord)
physical_origin = np.array(xyz_origin)
physical_sizes = np.array(xyz_sizes)
# reverse the computations
cri_coord = ((coordinate_xyz - physical_origin) @ np.linalg.inv(irc_transform_mat)) / physical_sizes
rounded_cri_coord = np.round(cri_coord).astype(int)
return IRC_tuple(*rounded_cri_coord[::-1])
@functools.lru_cache(maxsize=1) # caches the results of function call, if it have been called with the same argument.
def get_unified_candidate_info_list(dataset_dir_path, required_on_desk=True, subsets_included = (0,1,2,3,4)):
"""
Generate a more sanitized, cleaned, unified interface to the human
annotated data.
Args:
dataset_dir_path (str): The path to dataset dir.
required_on_desk (bool): If True, filters candidates based on the subsets present on disk.
num_subsets_included (tuple): the subsets to be included from the on-desk dataset.
This is specifically useful in case of computational limitation.
Returns:
list: List of 'CandidateInfoTuple' for all the subjects of existing CT scans.
"""
mhd_list = glob.glob("/kaggle/input/luna16/subset*/subset*/*.mhd") # extract all
filtered_mhd_list = [mhd for mhd in mhd_list if any(f"subset{id}" in mhd for id in subsets_included)]
uids_present_on_disk = {os.path.split(p)[-1][:-4] for p in filtered_mhd_list} # the unique series_uids for further filtration
annotation_info = dict()
with open(os.path.join(dataset_dir_path, "annotations.csv")) as f:
for row in list(csv.reader(f))[1:]: # neglect the first indexing column
series_uid = row[0]
nodule_center = np.array([float(x) for x in row[1:4]]) # x, y, z
nodule_diameter = float(row[4])
annotation_info.setdefault(series_uid, []).append((nodule_center, nodule_diameter))
# meaning that same series_uid has multiple annotations for mutliple nodules
candidate_list = list()
with open(os.path.join(dataset_dir_path, "candidates.csv")) as f:
for row in list(csv.reader(f))[1:]:
series_uid = row[0]
if series_uid not in uids_present_on_disk and required_on_desk:
continue # meaning that subject doesn't exist on the desk
nodule_flag = bool(int(row[4])) # is it an actual nodule?
cand_nodule_center = np.array([float(x) for x in row[1:4]])
cand_nodule_diameter = 0.0 # the assigned value to the fake nodules (incorrect assumption but won't hurt)
for annotation_tup in annotation_info.get(series_uid, []): # this way of iterating provides me a default value (robust to missing key error)
nodule_center, nodule_diameter = annotation_tup
diff = np.abs(cand_nodule_center - nodule_center)
# check whether there is any huge (more that half the raduis) deviation in any direction
if not (np.all(diff < nodule_diameter / 4)):
pass # leave the candidate diameter zero, as it's actually a fake candidate
else:
cand_nodule_diameter = nodule_diameter
break # meaning i reached to the annotation corresponding to what i need, so get out.
candidate_list.append(
Candidate_Info(nodule_flag,
cand_nodule_diameter,
series_uid,
cand_nodule_center))
candidate_list.sort(key = lambda x: (x.isNodule_bool, x.diameter_mm),reverse=True) # sort by 'nodule_flag' first, then by 'cand_nodule_diameter'
# so overall, the returned list has the candidates that are nodules indeed ordered descendingly by their sizes
# after that come the non-nodule candidates
return candidate_list
@functools.lru_cache(maxsize=1)
def get_candidate_info_list(dataset_dir_path, required_on_desk=True, subsets_included=(0, 1, 2, 3, 4)):
"""
Retrieves a list of candidate nodules from the dataset, including annotated nodules and non-nodules with malignancy info included.
Args:
dataset_dir_path (str): The path to the dataset directory containing the candidate information.
required_on_desk (bool, optional): If True, filters the files based on whether they are in the specified subsets. Defaults to True.
subsets_included (tuple of int, optional): Subsets to include for filtering if required_on_desk is True. Defaults to (0, 1, 2, 3, 4).
Returns:
list: A list of CandidateInfoTuple objects containing information about nodules and non-nodules.
"""
mhd_list = glob.glob("/kaggle/input/luna16/subset*/subset*/*.mhd")
if required_on_desk:
filtered_mhd_list = [mhd for mhd in mhd_list if any(f"subset{id}" in mhd for id in subsets_included)]
uids_present_on_disk = {os.path.split(p)[-1][:-4] for p in filtered_mhd_list}
mhd_list = uids_present_on_disk
candidates_list = []
# Extract nodules with annotations data
with open("/kaggle/input/annotations-for-segmentation/annotations_for_malignancy.csv", "r") as f:
for row in list(csv.reader(f))[1:]:
series_uid = row[0]
if series_uid not in mhd_list:
continue
center_xyz = tuple([float(x) for x in row[1:4]])
diameter = float(row[4])
is_malignant = {"False": False, "True": True}[row[5]]
candidates_list.append(
CandidateInfoTuple(
True, # It's a nodule
True, # Has annotation data
is_malignant,
diameter,
series_uid,
center_xyz
)
)
# Extract non-nodules (negative examples)
with open(os.path.join(dataset_dir_path, "candidates.csv"), "r") as f:
for row in list(csv.reader(f))[1:]:
series_uid = row[0]
if series_uid not in mhd_list:
continue
is_nodule = bool(int(row[4]))
center_xyz = tuple([float(x) for x in row[1:4]])
if not is_nodule:
candidates_list.append(
CandidateInfoTuple(
False,
False,
False,
0.0,
series_uid,
center_xyz
)
)
candidates_list.sort(reverse=True)
return candidates_list
@functools.lru_cache(maxsize=1)
def get_candidate_info_dict(dataset_dir_path, required_on_desk=True, subsets_included=(0, 1, 2, 3, 4)):
"""
Retrieves a dictionary mapping series UIDs to lists of candidate nodules.
Args:
dataset_dir_path (str): The path to the dataset directory containing the candidate information.
required_on_desk (bool, optional): If True, filters the files based on whether they are in the specified subsets. Defaults to True.
subsets_included (tuple of int, optional): Subsets to include for filtering if required_on_desk is True. Defaults to (0, 1, 2, 3, 4).
Returns:
dict: A dictionary where each key is a series UID, and the value is a list of CandidateInfoTuple objects.
"""
candidate_list = get_candidate_info_list(dataset_dir_path, required_on_desk, subsets_included)
candidate_dict = {}
for candidate_tuple in candidate_list:
candidate_dict.setdefault(candidate_tuple.series_uid, []).append(candidate_tuple)
return candidate_dict
"""
Source: https://github.com/deep-learning-with-pytorch/dlwpt-code/blob/master/util/util.py
"""
def enumerateWithEstimate(
iter,
desc_str,
start_ndx=0,
print_ndx=4,
backoff=None,
iter_len=None,
):
"""
In terms of behavior, `enumerateWithEstimate` is almost identical
to the standard `enumerate` (the differences are things like how
our function returns a generator, while `enumerate` returns a
specialized `<enumerate object at 0x...>`).
However, the side effects (logging, specifically) are what make the
function interesting.
:param iter: `iter` is the iterable that will be passed into
`enumerate`. Required.
:param desc_str: This is a human-readable string that describes
what the loop is doing. The value is arbitrary, but should be
kept reasonably short. Things like `"epoch 4 training"` or
`"deleting temp files"` or similar would all make sense.
:param start_ndx: This parameter defines how many iterations of the
loop should be skipped before timing actually starts. Skipping
a few iterations can be useful if there are startup costs like
caching that are only paid early on, resulting in a skewed
average when those early iterations dominate the average time
per iteration.
NOTE: Using `start_ndx` to skip some iterations makes the time
spent performing those iterations not be included in the
displayed duration. Please account for this if you use the
displayed duration for anything formal.
This parameter defaults to `0`.
:param print_ndx: determines which loop interation that the timing
logging will start on. The intent is that we don't start
logging until we've given the loop a few iterations to let the
average time-per-iteration a chance to stablize a bit. We
require that `print_ndx` not be less than `start_ndx` times
`backoff`, since `start_ndx` greater than `0` implies that the
early N iterations are unstable from a timing perspective.
`print_ndx` defaults to `4`.
:param backoff: This is used to how many iterations to skip before
logging again. Frequent logging is less interesting later on,
so by default we double the gap between logging messages each
time after the first.
`backoff` defaults to `2` unless iter_len is > 1000, in which
case it defaults to `4`.
:param iter_len: Since we need to know the number of items to
estimate when the loop will finish, that can be provided by
passing in a value for `iter_len`. If a value isn't provided,
then it will be set by using the value of `len(iter)`.
:return:
"""
if iter_len is None:
iter_len = len(iter)
if backoff is None: # backoff controls the rate at which logging occurs
backoff = 2
while backoff ** 7 < iter_len: # finding a suitable rate
backoff *= 2
assert backoff >= 2
while print_ndx < start_ndx * backoff: # 'print_ndx' specifies what is the upcoming index to be printed.
print_ndx *= backoff
log.warning("{} ----/{}, starting".format(
desc_str,
iter_len,
))
start_ts = time.time()
for (current_ndx, item) in enumerate(iter):
yield (current_ndx, item) # generator return
if current_ndx == print_ndx:
# ... <1>
duration_sec = ((time.time() - start_ts) / (current_ndx - start_ndx + 1)) * (iter_len-start_ndx) # Estimates the remaining time
done_dt = datetime.datetime.fromtimestamp(start_ts + duration_sec)
done_td = datetime.timedelta(seconds=duration_sec)
log.info("{} {:-4}/{}, done at {}, {}".format(
desc_str,
current_ndx,
iter_len,
str(done_dt).rsplit('.', 1)[0], # date & time get up to seconds (exclude microseconds)
str(done_td).rsplit('.', 1)[0], # elapsed time get up to seconds (exclude microseconds)
))
print_ndx *= backoff # update the upcoming print_ndx using the selected rate
if current_ndx + 1 == start_ndx:
start_ts = time.time()
log.warning("{} ----/{}, done at {}".format(
desc_str,
iter_len,
str(datetime.datetime.now()).rsplit('.', 1)[0],
))