Skip to content

Commit

Permalink
Merge pull request #283 from clamsproject/develop
Browse files Browse the repository at this point in the history
releasing 1.0.15
  • Loading branch information
keighrim authored Jun 7, 2024
2 parents b24722a + c42234f commit 30f67ff
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 114 deletions.
232 changes: 120 additions & 112 deletions mmif/serialize/mmif.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,104 @@
__all__ = ['Mmif']


class MmifMetadata(MmifObject):
"""
Basic MmifObject class to contain the top-level metadata of a MMIF file.
:param metadata_obj: the JSON data
"""

def __init__(self, metadata_obj: Optional[Union[bytes, str, dict]] = None) -> None:
# TODO (krim @ 10/7/20): there could be a better name and a better way to give a value to this
self.mmif: str = f"http://mmif.clams.ai/{mmif.__specver__}"
self._required_attributes = ["mmif"]
super().__init__(metadata_obj)


class DocumentsList(DataList[Document]):
"""
DocumentsList object that implements :class:`mmif.serialize.model.DataList`
for :class:`mmif.serialize.document.Document`.
"""
_items: Dict[str, Document]

def _deserialize(self, input_list: list) -> None: # pytype: disable=signature-mismatch
"""
Extends base ``_deserialize`` method to initialize ``items`` as a dict from
document IDs to :class:`mmif.serialize.document.Document` objects.
:param input_list: the JSON data that defines the list of documents
:return: None
"""
self._items = {item['properties']['id']: Document(item) for item in input_list}

def append(self, value: Document, overwrite=False) -> None:
"""
Appends a document to the list.
Fails if there is already a document with the same ID
in the list, unless ``overwrite`` is set to True.
:param value: the :class:`mmif.serialize.document.Document`
object to add
:param overwrite: if set to True, will overwrite an
existing document with the same ID
:raises KeyError: if ``overwrite`` is set to False and
a document with the same ID exists
in the list
:return: None
"""
super()._append_with_key(value.id, value, overwrite)


class ViewsList(DataList[View]):
"""
ViewsList object that implements :class:`mmif.serialize.model.DataList`
for :class:`mmif.serialize.view.View`.
"""
_items: Dict[str, View]

def __init__(self, mmif_obj: Optional[Union[bytes, str, list]] = None):
super().__init__(mmif_obj)

def _deserialize(self, input_list: list) -> None: # pytype: disable=signature-mismatch
"""
Extends base ``_deserialize`` method to initialize ``items`` as a dict from
view IDs to :class:`mmif.serialize.view.View` objects.
:param input_list: the JSON data that defines the list of views
:return: None
"""
if input_list:
self._items = {item['id']: View(item) for item in input_list}

def append(self, value: View, overwrite=False) -> None:
"""
Appends a view to the list.
Fails if there is already a view with the same ID
in the list, unless ``overwrite`` is set to True.
:param value: the :class:`mmif.serialize.view.View`
object to add
:param overwrite: if set to True, will overwrite an
existing view with the same ID
:raises KeyError: if ``overwrite`` is set to False and
a view with the same ID exists
in the list
:return: None
"""
super()._append_with_key(value.id, value, overwrite)

def get_last(self) -> Optional[View]:
"""
Returns the last view appended to the list.
"""
for view in reversed(self._items.values()):
if 'error' not in view.metadata and 'warning' not in view.metadata:
return view


class Mmif(MmifObject):
"""
MmifObject that represents a full MMIF file.
Expand Down Expand Up @@ -560,131 +658,41 @@ def get_end(self, annotation: Annotation) -> Union[int, float]:
"""
return self._get_linear_anchor_point(annotation, start=False)

# pytype: disable=bad-return-type
def __getitem__(self, item: str) -> Union[Document, View, Annotation]:
def __getitem__(self, item: str) \
-> Union[Document, View, Annotation, MmifMetadata, DocumentsList, ViewsList]:
"""
getitem implementation for Mmif. When nothing is found, this will raise an error
rather than returning a None (although pytype doesn't think so...)
getitem implementation for Mmif. This will try to find any object, given an identifier or an immediate
attribute name. When nothing is found, this will raise an error rather than returning a None
:raises KeyError: if the item is not found or if the search results are ambiguous
:param item: the search string, a document ID, a view ID, or a view-scoped annotation ID
:param item: an attribute name or an object identifier (a document ID, a view ID, or an annotation ID). When
annotation ID is given as a "short" ID (without view ID prefix), the method will try to find a
match from the first view, and return immediately if found.
:return: the object searched for
:raise KeyError: if the item is not found or multiple objects are found with the same ID
"""
if item in self._named_attributes():
return self.__dict__[item]
split_attempt = item.split(self.id_delimiter)

document_result = self.documents.get(split_attempt[0])
view_result = self.views.get(split_attempt[0])
found = []

if len(split_attempt) == 1:
anno_result = None
elif view_result:
anno_result = view_result[split_attempt[1]]
found.append(self.documents.get(split_attempt[0]))
found.append(self.views.get(split_attempt[0]))
for view in self.views:
found.append(view.annotations.get(split_attempt[0]))
elif len(split_attempt) == 2:
v = self.get_view_by_id(split_attempt[0])
if v is not None:
found.append(v.annotations.get(split_attempt[1]))
else:
raise KeyError("Tried to subscript into a view that doesn't exist")
found = [x for x in found if x is not None]

if view_result and document_result:
if len(found) > 1:
raise KeyError("Ambiguous ID search result")
if not (view_result or document_result):
elif len(found) == 0:
raise KeyError("ID not found: %s" % item)
return anno_result or view_result or document_result
# pytype: enable=bad-return-type


class MmifMetadata(MmifObject):
"""
Basic MmifObject class to contain the top-level metadata of a MMIF file.
:param metadata_obj: the JSON data
"""

def __init__(self, metadata_obj: Optional[Union[bytes, str, dict]] = None) -> None:
# TODO (krim @ 10/7/20): there could be a better name and a better way to give a value to this
self.mmif: str = f"http://mmif.clams.ai/{mmif.__specver__}"
self._required_attributes = ["mmif"]
super().__init__(metadata_obj)


class DocumentsList(DataList[Document]):
"""
DocumentsList object that implements :class:`mmif.serialize.model.DataList`
for :class:`mmif.serialize.document.Document`.
"""
_items: Dict[str, Document]

def _deserialize(self, input_list: list) -> None: # pytype: disable=signature-mismatch
"""
Extends base ``_deserialize`` method to initialize ``items`` as a dict from
document IDs to :class:`mmif.serialize.document.Document` objects.
:param input_list: the JSON data that defines the list of documents
:return: None
"""
self._items = {item['properties']['id']: Document(item) for item in input_list}

def append(self, value: Document, overwrite=False) -> None:
"""
Appends a document to the list.
Fails if there is already a document with the same ID
in the list, unless ``overwrite`` is set to True.
:param value: the :class:`mmif.serialize.document.Document`
object to add
:param overwrite: if set to True, will overwrite an
existing document with the same ID
:raises KeyError: if ``overwrite`` is set to False and
a document with the same ID exists
in the list
:return: None
"""
super()._append_with_key(value.id, value, overwrite)


class ViewsList(DataList[View]):
"""
ViewsList object that implements :class:`mmif.serialize.model.DataList`
for :class:`mmif.serialize.view.View`.
"""
_items: Dict[str, View]

def __init__(self, mmif_obj: Optional[Union[bytes, str, list]] = None):
super().__init__(mmif_obj)

def _deserialize(self, input_list: list) -> None: # pytype: disable=signature-mismatch
"""
Extends base ``_deserialize`` method to initialize ``items`` as a dict from
view IDs to :class:`mmif.serialize.view.View` objects.
:param input_list: the JSON data that defines the list of views
:return: None
"""
if input_list:
self._items = {item['id']: View(item) for item in input_list}

def append(self, value: View, overwrite=False) -> None:
"""
Appends a view to the list.
Fails if there is already a view with the same ID
in the list, unless ``overwrite`` is set to True.
:param value: the :class:`mmif.serialize.view.View`
object to add
:param overwrite: if set to True, will overwrite an
existing view with the same ID
:raises KeyError: if ``overwrite`` is set to False and
a view with the same ID exists
in the list
:return: None
"""
super()._append_with_key(value.id, value, overwrite)

def get_last(self) -> Optional[View]:
"""
Returns the last view appended to the list.
"""
for view in reversed(self._items.values()):
if 'error' not in view.metadata and 'warning' not in view.metadata:
return view
else:
return found[-1]
42 changes: 40 additions & 2 deletions mmif/utils/video_document_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import mmif
from mmif import Annotation, Document, Mmif
from mmif.utils.timeunit_helper import convert
from mmif.vocabulary import DocumentTypes
from mmif.vocabulary import DocumentTypes, AnnotationTypes

for cv_dep in ('cv2', 'ffmpeg', 'PIL'):
try:
Expand Down Expand Up @@ -83,14 +83,16 @@ def extract_frames_as_images(video_document: Document, framenums: List[int], as_
frames = []
video = capture(video_document)
cur_f = 0
tot_fcount = video_document.get_property(FRAMECOUNT_DOCPROP_KEY)
while True:
if not framenums or cur_f > video_document.get_property(FRAMECOUNT_DOCPROP_KEY):
if not framenums or cur_f > tot_fcount:
break
ret, frame = video.read()
if cur_f == framenums[0]:
if not ret:
sec = convert(cur_f, 'f', 's', video_document.get_property(FPS_DOCPROP_KEY))
warnings.warn(f'Frame #{cur_f} ({sec}s) could not be read from the video {video_document.id}.')
cur_f += 1
continue
frames.append(Image.fromarray(frame[:, :, ::-1]) if as_PIL else frame)
framenums.pop(0)
Expand Down Expand Up @@ -125,6 +127,42 @@ def extract_mid_frame(mmif: Mmif, time_frame: Annotation, as_PIL: bool = False):
return extract_frames_as_images(vd, [get_mid_framenum(mmif, time_frame)], as_PIL=as_PIL)[0]


def get_representative_framenum(mmif: Mmif, time_frame: Annotation):
"""
Calculates the representative frame number from an annotation.
:param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance
:param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation containing a `representatives` property (``"@type": ".../TimeFrame/..."``)
:return: representative frame number as an integer
"""
if 'representatives' not in time_frame.properties:
raise ValueError(f'The time frame {time_frame.id} does not have a representative.')
timeunit = time_frame.get_property('timeUnit')
video_document = mmif[time_frame.get_property('document')]
fps = get_framerate(video_document)
representatives = time_frame.get_property('representatives')
top_representative_id = representatives[0]
try:
representative_timepoint_anno = mmif[time_frame._parent_view_id+time_frame.id_delimiter+top_representative_id]
except KeyError:
raise ValueError(f'Representative timepoint {top_representative_id} not found in any view.')
return convert(representative_timepoint_anno.get_property('timePoint'), timeunit, 'frame', fps)


def extract_representative_frame(mmif: Mmif, time_frame: Annotation, as_PIL: bool = False):
"""
Extracts the representative frame of an annotation as a numpy ndarray or PIL Image.
:param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance
:param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``)
:param as_PIL: return :py:class:`~PIL.Image.Image` instead of :py:class:`~numpy.ndarray`
:return: frame as a :py:class:`numpy.ndarray` or :py:class:`PIL.Image.Image`
"""
video_document = mmif[time_frame.get_property('document')]
rep_frame_num = get_representative_framenum(mmif, time_frame)
return extract_frames_as_images(video_document, [rep_frame_num], as_PIL=as_PIL)[0]


def sample_frames(start_frame: int, end_frame: int, sample_rate: float = 1) -> List[int]:
"""
Helper function to sample frames from a time interval.
Expand Down
23 changes: 23 additions & 0 deletions tests/test_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,29 @@ def test_mmif_getitem_document(self):
except KeyError:
self.fail("didn't get document 'm1'")

def test_mmif_getitem_idconflict(self):
m = Mmif(validate=False)
v1 = m.new_view()
v1.id = 'v1'
v2 = m.new_view()
v2.id = 'v1'
with pytest.raises(KeyError):
_ = m['v1']

m = Mmif(validate=False)
v1 = m.new_view()
v1a = v1.new_annotation(AnnotationTypes.Annotation, id='a1')
v2 = m.new_view()
v2a = v2.new_annotation(AnnotationTypes.Annotation, id='a1')
self.assertIsNotNone(m[v1.id])
self.assertIsNotNone(m[v2.id])
# conflict short IDs
self.assertEqual(v1a.id, v2a.id)
with pytest.raises(KeyError):
_ = m[v1a.id]
self.assertIsNotNone(m[v1a.long_id])
self.assertIsNotNone(m[v2a.long_id])

def test_mmif_getitem_view(self):
try:
v1 = self.mmif_obj['v1']
Expand Down
17 changes: 17 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,23 @@ def test_extract_mid_frame(self):
tf = self.a_view.new_annotation(AnnotationTypes.TimeFrame, start=0, end=3, timeUnit='seconds', document='d1')
self.assertEqual(vdh.convert(1.5, 's', 'f', self.fps), vdh.get_mid_framenum(self.mmif_obj, tf))

def test_extract_representative_frame(self):
tp = self.a_view.new_annotation(AnnotationTypes.TimePoint, timePoint=1500, timeUnit='milliseconds', document='d1')
tf = self.a_view.new_annotation(AnnotationTypes.TimeFrame, start=1000, end=2000, timeUnit='milliseconds', document='d1')
tf.add_property('representatives', [tp.id])
rep_frame_num = vdh.get_representative_framenum(self.mmif_obj, tf)
expected_frame_num = vdh.millisecond_to_framenum(self.video_doc, tp.get_property('timePoint'))
self.assertEqual(expected_frame_num, rep_frame_num)
# check there is an error if no representatives
tf = self.a_view.new_annotation(AnnotationTypes.TimeFrame, start=1000, end=2000, timeUnit='milliseconds', document='d1')
with pytest.raises(ValueError):
vdh.get_representative_framenum(self.mmif_obj, tf)
# check there is an error if there is a representative referencing a timepoint that
# does not exist
tf.add_property('representatives', ['fake_tp_id'])
with pytest.raises(ValueError):
vdh.get_representative_framenum(self.mmif_obj, tf)

def test_get_framerate(self):
self.assertAlmostEqual(29.97, vdh.get_framerate(self.video_doc), places=0)

Expand Down

0 comments on commit 30f67ff

Please sign in to comment.