Skip to content

Commit

Permalink
Merge pull request #58 from molssi-seamm/dev
Browse files Browse the repository at this point in the history
Added support for URI's
  • Loading branch information
seamm authored Jun 27, 2024
2 parents 4c20d30 + 0307806 commit 7c29f30
Show file tree
Hide file tree
Showing 10 changed files with 290 additions and 162 deletions.
4 changes: 4 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
=======
History
=======
2024.6.27 -- Added support for URI's
* Now recognize URI's in the form local:path/to/file
* An optional URI handler can be passed in to resolve such URI's.

2024.6.5 -- Bugfix: Handling of duplicates in lists
* Roundoff in floating point numbers caused some duplicates to be missed. The code
now checks for duplicates in a more robust way.
Expand Down
191 changes: 116 additions & 75 deletions seamm_util/include_open.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,71 +6,92 @@
import collections
import gzip
import logging
import os
import os.path
from pathlib import Path

module_logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)

# logger.setLevel(logging.DEBUG)

def splitext(filename):

def default_uri_handler(path):
"""Do nothing! Just return the path"""
return Path(path)


def splitext(path):
"""
Get the extension of a file, ignoring .gz or .bz2 on the end
Args:
filename ('str'): the name, including any path, of the file
Parameters
----------
path : str or pathlib.Path
The name or path of the file
"""
name, ext = os.path.splitext(filename)
if not isinstance(path, Path):
path = Path(path)

ext = path.suffix
if ext in (".gz", ".bz2"):
name, ext = os.path.splitext(name)
ext = path.with_suffix("").suffix
return ext


class Open(object):
def __init__(self, filename, mode="r", logger=None, include=None, history=10):
def __init__(
self,
path,
mode="r",
logger=logger,
include="#include",
history=10,
uri_handler=None,
):
"""Open a file, automatically handling 'include'
Args:
filename ('str'): The path to the file
mode ('str', optional): The mode to open the file. Defaults to
read-only.
logger (obj, optional): The logging object to use
include ('str', optional): The keyword that triggers an include
history (integer, optional): Length of history to keep
Parameters
----------
path : str or pathlib.Path
The path to the file
mode : str (optional)
The mode to open the file. Defaults to read-only.
logger : logging.Logger (optional)
The logging object to use
include : str (optional)
The keyword that triggers an include. Defaults to "#include"
history : int (optional)
Length of history to keep
uri_handler : function (optional)
A method to handle any URIs, like 'local:'. Defaults to None.
"""
if not isinstance(filename, str):
raise RuntimeError("Filename must be a string path")
self._filename = filename
if not isinstance(path, Path):
path = Path(path)
self.mode = mode
if logger:
self.logger = logger
else:
self.logger = module_logger
self.logger = logger
self.include = include
self._history = history
self._depth = -1
if uri_handler is None:
self._uri_handler = default_uri_handler
else:
self._uri_handler = uri_handler

self._deque = collections.deque(maxlen=history)

self._total_lines = 0
self._linenos = []
self._files = []
self._paths = []
self._paths = [self._uri_handler(path).expanduser().resolve()]
self._visited = [("", self.path)]
self._fds = []
self._cwd = None
self._cwd = Path.cwd()

def __enter__(self):
"""Handle the enter event for the context manager by opening the file"""
self.logger.debug("in __enter__")
self._cwd = os.getcwd()
self._linenos.append(0)
self._files.append(self._filename)
filepath = os.path.join(self._cwd, self._filename)
path = os.path.dirname(filepath)
self._paths.append(path)
self._fds.append(self._open(self.path))

self._fds.append(self._open(filepath))

self.logger.debug(" opened {}".format(self._filename))
self.logger.debug(f" opened {self.path}")
return self

def __exit__(self, *args, **kwargs):
Expand All @@ -92,18 +113,34 @@ def __next__(self):

line = self._next()
words = line.split()
if self.include and len(words) > 0 and words[0] == self.include:
filename = line.split()[1]
if self.include is not None and len(words) > 0 and words[0] == self.include:
if len(words) > 2:
filename, tmp = words[1:3]
missing_ok = tmp.lower() == "missing_ok"
else:
filename = words[1]
missing_ok = False
self.logger.debug(" opening include file {}".format(filename))
self._linenos.append(0)
self._files.append(filename)
filepath = os.path.join(self._paths[-1], filename)
path = os.path.dirname(filepath)
self._paths.append(path)

self._fds.append(self._open(filepath))

self.logger.debug(" opened it")
try:
path = self.path.parent / self._uri_handler(filename)
except FileNotFoundError:
self.logger.debug(" URI handler did not find the file")
if not missing_ok:
raise
else:
# only use a file once
if path not in self.visited:
# Check that the file exists in case the URI handler doesn't
if path.exists():
self._paths.append(path.expanduser().resolve())
self._fds.append(self._open(self.path))
self.logger.debug(" opened it")
self.visited.append((filename, path))
elif missing_ok:
pass
else:
raise FileNotFoundError(str(path))
line = self.__next__()
self.logger.log(0, line)

Expand All @@ -112,30 +149,34 @@ def __next__(self):

def __iter__(self):
"""Need to be an iterator"""
self.logger.log(0, "__iter__")
return self

def __getattr__(self, attr):
"""Pass any attribute requests to the actual file handle"""
self.logger.debug("attr = '{}'".format(attr))
return getattr(self._fds[-1], attr)

@property
def depth(self):
"""The depth of the current line stack"""
return self._depth

@property
def path(self):
if len(self._paths) > 0:
return self._paths[-1]
"""The path of the current working file"""
if len(self.paths) > 0:
return self.paths[-1]
else:
return None

@property
def filename(self):
if len(self._files) > 0:
return self._files[-1]
else:
return None
def paths(self):
"""The paths of all the opened files."""
return self._paths

@property
def lineno(self):
"""THe line number in the current file."""
if len(self._linenos) > 0:
return self._linenos[-1]
else:
Expand All @@ -146,26 +187,21 @@ def total_lines(self):
return self._total_lines

@property
def depth(self):
return self._depth
def visited(self):
"""The set of files used"""
return self._visited

def push(self, n=1):
"""
push the n last lines back onto the device (virtually)
"""
"""Push the n last lines back onto the device (virtually)"""
if self._depth + n > len(self._deque):
raise RuntimeError("Exceeded the currently available history")
self._depth += n

def stack(self):
"""Provide the traceback of the included files"""
result = []
for i in range(len(self._paths) - 1, 0, -1):
path = self._paths[i]
filename = self._files[i]
filepath = os.path.join(path, filename)
lineno = self._linenos[i]
result.append("{}:{}".format(filepath, lineno))
for path, lineno in zip(reversed(self.paths), reversed(self._linenos)):
result.append(f"{path}:{lineno}")
return result

def _close(self, fd, *args, **kwargs):
Expand All @@ -183,36 +219,41 @@ def _close(self, fd, *args, **kwargs):
def _next(self):
"""Helper routine to get the next line, handling EOF and errors"""
try:
# Get the next line from the current file
line = self._fds[-1].__next__()
except StopIteration:
# Hit EOF, so go back to previous file, if any
fd = self._fds.pop()
self._close(fd)
self._files.pop()
self._paths.pop()
self.paths.pop()
n = self._linenos.pop()
self.logger.info(" read {} lines from fd".format(n))
if len(self._fds) <= 0:
# At the end of the first file, so all done
raise StopIteration()
# and get the next line from the previous file
return self._next()
except: # noqa: E722
raise

self._linenos[-1] += 1
self._total_lines += 1

return line

def _open(self, filename):
def _open(self, path):
"""Open 'filename' using gzip or bzip if needed
Args:
filename ('str'): the name, including any path, of the file
Parameters
----------
path : pathlib.Path
The path to the file
"""
if not isinstance(filename, str):
raise RuntimeError("Filename must be a string to open")
name, ext = os.path.splitext(filename.strip().lower())
if not isinstance(path, Path):
raise RuntimeError("path must be a pathlib.Path")
ext = path.suffix
if ext == ".bz2":
fd = bz2.open(filename, self.mode + "t")
fd = bz2.open(path, self.mode + "t")
elif ext == ".gz":
fd = gzip.open(filename, self.mode + "t")
fd = gzip.open(path, self.mode + "t")
else:
fd = open(filename, self.mode)
fd = open(path, self.mode)
return fd
2 changes: 1 addition & 1 deletion seamm_util/units.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@

# Unit handling!
ureg = pint.UnitRegistry(auto_reduce_dimensions=True)
ureg.default_format = "~P"
ureg.formatter.default_format = "~P"
pint.set_application_registry(ureg)
Q_ = ureg.Quantity
units_class = ureg("1 km").__class__
Expand Down
4 changes: 4 additions & 0 deletions tests/data/file_uri.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
file_uri line 1
file_uri line 2
include data:file1.txt
file_uri line 3
Binary file added tests/data/file_uri.txt.bz2
Binary file not shown.
Binary file added tests/data/file_uri.txt.gz
Binary file not shown.
Loading

0 comments on commit 7c29f30

Please sign in to comment.