diff --git a/dlt/dlt.py b/dlt/dlt.py index 8d19577..64e624e 100644 --- a/dlt/dlt.py +++ b/dlt/dlt.py @@ -613,11 +613,16 @@ def __init__(self, **kwords): self.indexed = False self.end = False self.live_run = kwords.pop("is_live", False) + # Stop event for threading usage in caller self.stop_reading = threading.Event() # Stop event for process usage in caller self.stop_reading_proc = multiprocessing.Event() + self.stop_reading_clean_buffer = kwords.pop("stop_reading_clean_buffer", False) + if self.stop_reading_clean_buffer: + logger.warning("Clean read buffer when receiving stop reading event: %s", self.stop_reading_clean_buffer) + def __repr__(self): # pylint: disable=bad-continuation return "".format( @@ -808,7 +813,9 @@ def __iter__(self): # pylint: disable=too-many-branches self._open_file() found_data = False - while not self._is_stop_reading_set() or corruption_check_try: # pylint: disable=too-many-nested-blocks + while ( + not (self.stop_reading.is_set() or self.stop_reading_proc.is_set()) or corruption_check_try + ): # pylint: disable=too-many-nested-blocks os_stat = os.stat(self.filename) mtime = os_stat.st_mtime @@ -816,9 +823,10 @@ def __iter__(self): # pylint: disable=too-many-branches cached_mtime = mtime corruption_check_try = False - while not self._is_stop_reading_set() and ( - dltlib.dlt_file_read(ctypes.byref(self), self.verbose) >= DLT_RETURN_OK - ): + while not ( + not self.stop_reading_clean_buffer + and (self.stop_reading.is_set() or self.stop_reading_proc.is_set()) + ) and (dltlib.dlt_file_read(ctypes.byref(self), self.verbose) >= DLT_RETURN_OK): found_data = True if ( self.filter @@ -1190,16 +1198,18 @@ def save(messages, filename, append=False): tracefile.write(msg.to_bytes()) -def load(filename, filters=None, split=False, verbose=False, live_run=False): +def load(filename, filters=None, split=False, verbose=False, live_run=False, stop_reading_clean_buffer=False): """Load DLT messages from a file :param str filename: Filename for the DLT log file the messages will be store to :param list filters: List of filters to apply [("APPID", "CTID"), ...] :param bool split: Ignored - compatibility option :param bool verbose: Be verbose + :param bool stop_reading_clean_buffer: Clean read buffer when receive + stop_reading signal from another threads or processes :returns: A DLTFile object :rtype: DLTFile object """ - cfile = cDLTFile(filename=filename, is_live=live_run) + cfile = cDLTFile(filename=filename, is_live=live_run, stop_reading_clean_buffer=stop_reading_clean_buffer) cfile.set_filters(filters) return cfile