From 88787759bfbaea28a6428e84fcf32509878389b6 Mon Sep 17 00:00:00 2001 From: bmeares Date: Thu, 17 Nov 2022 22:27:21 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20v1.4.10=20Fixed=20edge=20case=20?= =?UTF-8?q?bugs=20in=20syncing,=20data=20type=20enforcement,=20and=20more.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 25 +++++++++++++++++ docs/mkdocs/news/changelog.md | 25 +++++++++++++++++ meerschaum/__init__.py | 1 + meerschaum/config/_version.py | 2 +- meerschaum/connectors/__init__.py | 9 ++++-- meerschaum/connectors/api/_login.py | 2 +- meerschaum/connectors/sql/_pipes.py | 43 +++++++++++++++++------------ meerschaum/core/Pipe/_register.py | 12 +++----- meerschaum/core/Pipe/_sync.py | 20 ++++++++++---- meerschaum/plugins/_Plugin.py | 18 +----------- meerschaum/utils/misc.py | 13 ++++++++- meerschaum/utils/venv/__init__.py | 5 +++- meerschaum/utils/warnings.py | 1 + 13 files changed, 120 insertions(+), 56 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4276c233..e44bb560 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,31 @@ This is the current release cycle, so stay tuned for future releases! +### v1.4.10 + +- **Fixed an issue with syncing background jobs.** + The `--name` flag of background jobs with colliding with the `name` keyword argument of `SQLConnector.to_sql()`. + +- **Fixed a datetime bounding issue when `datetime` index is omitted.** + If the minimum datetime value of the incoming dataframe cannot be determined, do not bound the `get_data()` request. + +- **Keep existing parameters when registering plugin pipes.** + When a pipe is registered with a plugin as its connector, the return value of the `register()` function will be patched with the existing in-memory parameters. + +- **Fixed a data type syncing issue.** + In cases where fetched data types do not match the data types in the pipe's table (e.g. automatic datetime columns), a bug has been patched to ensure the correct data types are enforced. + +- **Added `Venv` to the root namespace.** + Now you can access virtual environments directly from `mrsm`: + + ```python + import meerschaum as mrsm + + with mrsm.Venv('noaa'): + import pandas as pd + ``` + + ### v1.4.9 - **Fixed in-place syncs for aggregate queries.** diff --git a/docs/mkdocs/news/changelog.md b/docs/mkdocs/news/changelog.md index 4276c233..e44bb560 100644 --- a/docs/mkdocs/news/changelog.md +++ b/docs/mkdocs/news/changelog.md @@ -4,6 +4,31 @@ This is the current release cycle, so stay tuned for future releases! +### v1.4.10 + +- **Fixed an issue with syncing background jobs.** + The `--name` flag of background jobs with colliding with the `name` keyword argument of `SQLConnector.to_sql()`. + +- **Fixed a datetime bounding issue when `datetime` index is omitted.** + If the minimum datetime value of the incoming dataframe cannot be determined, do not bound the `get_data()` request. + +- **Keep existing parameters when registering plugin pipes.** + When a pipe is registered with a plugin as its connector, the return value of the `register()` function will be patched with the existing in-memory parameters. + +- **Fixed a data type syncing issue.** + In cases where fetched data types do not match the data types in the pipe's table (e.g. automatic datetime columns), a bug has been patched to ensure the correct data types are enforced. + +- **Added `Venv` to the root namespace.** + Now you can access virtual environments directly from `mrsm`: + + ```python + import meerschaum as mrsm + + with mrsm.Venv('noaa'): + import pandas as pd + ``` + + ### v1.4.9 - **Fixed in-place syncs for aggregate queries.** diff --git a/meerschaum/__init__.py b/meerschaum/__init__.py index 8d112f93..bfd6b15e 100644 --- a/meerschaum/__init__.py +++ b/meerschaum/__init__.py @@ -21,6 +21,7 @@ from meerschaum.core.Pipe import Pipe from meerschaum.plugins import Plugin from meerschaum.utils import get_pipes +from meerschaum.utils.venv import Venv from meerschaum._internal.docs import index as __doc__ from meerschaum.connectors import get_connector from meerschaum.config import __version__ diff --git a/meerschaum/config/_version.py b/meerschaum/config/_version.py index a52988ff..a165a19c 100644 --- a/meerschaum/config/_version.py +++ b/meerschaum/config/_version.py @@ -2,4 +2,4 @@ Specify the Meerschaum release version. """ -__version__ = "1.4.9" +__version__ = "1.4.10" diff --git a/meerschaum/connectors/__init__.py b/meerschaum/connectors/__init__.py index f91944a9..f70f178f 100644 --- a/meerschaum/connectors/__init__.py +++ b/meerschaum/connectors/__init__.py @@ -338,10 +338,13 @@ def get_connector_plugin( if not hasattr(connector, 'type'): return None from meerschaum import Plugin - return ( - Plugin(connector.__module__.replace('plugins.', '').split('.')[0]) + plugin_name = ( + connector.__module__.replace('plugins.', '').split('.')[0] if connector.type in custom_types else ( - Plugin(connector.label) if connector.type == 'plugin' + connector.label + if connector.type == 'plugin' else 'mrsm' ) ) + plugin = Plugin(plugin_name) + return plugin if plugin.is_installed() else None diff --git a/meerschaum/connectors/api/_login.py b/meerschaum/connectors/api/_login.py index 779a268b..d1419d1e 100644 --- a/meerschaum/connectors/api/_login.py +++ b/meerschaum/connectors/api/_login.py @@ -43,7 +43,7 @@ def login( else: msg = ( f"Failed to log into '{self}' as user '{login_data['username']}'.\n" + - f" Please verify login details for connector '{self}'." + f" Please verify login details for connector '{self}'." ) if warn: _warn(msg, stack=False) diff --git a/meerschaum/connectors/sql/_pipes.py b/meerschaum/connectors/sql/_pipes.py index a19ab792..c3bbc110 100644 --- a/meerschaum/connectors/sql/_pipes.py +++ b/meerschaum/connectors/sql/_pipes.py @@ -65,6 +65,7 @@ def register_pipe( return False, f"Failed to register {pipe}." return True, f"Successfully registered {pipe}." + def edit_pipe( self, pipe : meerschaum.Pipe = None, @@ -1046,6 +1047,7 @@ def sync_pipe( from meerschaum.utils.misc import generate_password from meerschaum import Pipe import time + import copy pd = import_pandas() if df is None: msg = f"DataFrame is None. Cannot sync {pipe}." @@ -1094,7 +1096,12 @@ def sync_pipe( unseen_df, update_df, delta_df = ( pipe.filter_existing( - df, chunksize=chunksize, begin=begin, end=end, debug=debug, **kw + df, + chunksize = chunksize, + begin = begin, + end = end, + debug = debug, + **kw ) if check_existing else (df, None, df) ) if debug: @@ -1106,14 +1113,14 @@ def sync_pipe( if update_df is not None and not update_df.empty: transact_id = generate_password(6) temp_target = '_' + transact_id + '_' + pipe.target - self.to_sql( - update_df, - name = temp_target, - if_exists = 'append', - chunksize = chunksize, - debug = debug, - **kw - ) + update_kw = copy.deepcopy(kw) + update_kw.update({ + 'name': temp_target, + 'if_exists': 'append', + 'chunksize': chunksize, + 'debug': debug, + }) + self.to_sql(update_df, **update_kw) temp_pipe = Pipe( pipe.connector_keys + '_', pipe.metric_key, pipe.location_key, instance = pipe.instance_keys, @@ -1148,15 +1155,15 @@ def sync_pipe( kw.pop('name') ### Insert new data into Pipe's table. - stats = self.to_sql( - unseen_df, - name = pipe.target, - if_exists = if_exists, - debug = debug, - as_dict = True, - chunksize = chunksize, - **kw - ) + unseen_kw = copy.deepcopy(kw) + unseen_kw.update({ + 'name': pipe.target, + 'if_exists': if_exists, + 'debug': debug, + 'as_dict': True, + 'chunksize': chunksize, + }) + stats = self.to_sql(unseen_df, **unseen_kw) if is_new: if not self.create_indices(pipe, debug=debug): if debug: diff --git a/meerschaum/core/Pipe/_register.py b/meerschaum/core/Pipe/_register.py index 6e396015..f3437a17 100644 --- a/meerschaum/core/Pipe/_register.py +++ b/meerschaum/core/Pipe/_register.py @@ -25,10 +25,10 @@ def register( A `SuccessTuple` of success, message. """ - from meerschaum.connectors import custom_types from meerschaum.utils.formatting import get_console from meerschaum.utils.venv import Venv - from meerschaum.connectors import get_connector_plugin + from meerschaum.connectors import get_connector_plugin, custom_types + from meerschaum.config._patch import apply_patch_to_config import warnings with warnings.catch_warnings(): @@ -45,12 +45,8 @@ def register( and getattr(_conn, 'register', None) is not None ): - if _conn.type == 'plugin': - venv = _conn._plugin - elif _conn.__module__.startswith('plugins'): - venv = _conn.__module__[len('plugins:'):].split('.')[0] try: - with Venv(venv, debug=debug): + with Venv(get_connector_plugin(_conn), debug=debug): params = self.connector.register(self) except Exception as e: get_console().print_exception() @@ -63,7 +59,7 @@ def register( + f"{params}" ) else: - self.parameters = params + self.parameters = apply_patch_to_config(params, self.parameters) if not self.parameters: cols = self.columns if self.columns else {'datetime': None, 'id': None} diff --git a/meerschaum/core/Pipe/_sync.py b/meerschaum/core/Pipe/_sync.py index c833421d..272842dd 100644 --- a/meerschaum/core/Pipe/_sync.py +++ b/meerschaum/core/Pipe/_sync.py @@ -437,12 +437,13 @@ def filter_existing( df = self.enforce_dtypes(df, debug=debug) ### begin is the oldest data in the new dataframe try: - min_dt = pd.to_datetime(df[self.get_columns('datetime')].min(skipna=True)).to_pydatetime() + min_dt = pd.to_datetime( + df[self.columns['datetime']].min(skipna=True) + ).to_pydatetime() except Exception as e: min_dt = None if not isinstance(min_dt, datetime.datetime) or str(min_dt) == 'NaT': - ### min_dt might be None, a user-supplied value, or the sync time. - min_dt = begin + min_dt = None begin = ( round_time( min_dt, @@ -452,9 +453,11 @@ def filter_existing( ### end is the newest data in the new dataframe try: - max_dt = pd.to_datetime(df[self.get_columns('datetime')].max(skipna=True)).to_pydatetime() + max_dt = pd.to_datetime( + df[self.columns['datetime']].max(skipna=True) + ).to_pydatetime() except Exception as e: - max_dt = end + max_dt = None if not isinstance(max_dt, datetime.datetime) or str(max_dt) == 'NaT': max_dt = None @@ -507,7 +510,12 @@ def filter_existing( ### Detect changes between the old target and new source dataframes. from meerschaum.utils.misc import filter_unseen_df, add_missing_cols_to_df delta_df = add_missing_cols_to_df( - filter_unseen_df(backtrack_df, df, dtypes=self.dtypes, debug=debug), + filter_unseen_df( + backtrack_df, + df, + dtypes = self.dtypes, + debug = debug + ), on_cols_dtypes, ) joined_df = pd.merge( diff --git a/meerschaum/plugins/_Plugin.py b/meerschaum/plugins/_Plugin.py index bb9df35e..27a40368 100644 --- a/meerschaum/plugins/_Plugin.py +++ b/meerschaum/plugins/_Plugin.py @@ -153,31 +153,15 @@ def requirements_file_path(self) -> Union[pathlib.Path, None]: return path - def is_installed(self, try_import: bool = True) -> bool: + def is_installed(self, **kw) -> bool: """ Check whether a plugin is correctly installed. - **NOTE:** This plugin will import the plugin's module. - Set `try_import` to `False` to avoid importing. - - Parameters - ---------- - try_import: bool, default True - If `True`, attempt importing the plugin's module. Returns ------- A `bool` indicating whether a plugin exists and is successfully imported. """ - # if not self.__file__: - # return False return self.__file__ is not None - # try: - # _installed = ( - # self.__dict__.get('_module', None) is not None and self.__file__ is not None - # ) if try_import else (self.__file__ is not None) - # except ModuleNotFoundError as e: - # _installed = False - # return _installed def make_tar(self, debug: bool = False) -> pathlib.Path: diff --git a/meerschaum/utils/misc.py b/meerschaum/utils/misc.py index 3d79a1fa..18d46d27 100644 --- a/meerschaum/utils/misc.py +++ b/meerschaum/utils/misc.py @@ -652,6 +652,7 @@ def round_time( return dt + datetime.timedelta(0, rounding - seconds, - dt.microsecond) + def parse_df_datetimes( df: 'pd.DataFrame', debug: bool = False @@ -961,11 +962,21 @@ def filter_unseen_df( ### assume the old_df knows what it's doing, even if it's technically wrong. if dtypes is None: dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} + dtypes = { col: ( str(typ) if str(typ) != 'int64' else 'Int64' - ) for col, typ in new_df.dtypes.items() + ) for col, typ in dtypes.items() + if col in new_df_dtypes and col in old_df_dtypes } + for col, typ in new_df_dtypes.items(): + if col not in dtypes: + dtypes[col] = typ + + for col, typ in {k: v for k, v in dtypes.items()}.items(): + if new_df_dtypes.get(col, None) != old_df_dtypes.get(col, None): + ### Fallback to object if the types don't match. + dtypes[col] = 'object' cast_cols = True try: diff --git a/meerschaum/utils/venv/__init__.py b/meerschaum/utils/venv/__init__.py index 45f087f0..11737310 100644 --- a/meerschaum/utils/venv/__init__.py +++ b/meerschaum/utils/venv/__init__.py @@ -309,7 +309,10 @@ def get_python_version(python_path: pathlib.Path) -> Union[str, None]: if filename == python_versioned_name: real_path = pathlib.Path(os.path.realpath(python_path)) if not real_path.exists(): - python_path.unlink() + try: + python_path.unlink() + except Exception as e: + pass init_venv(venv, verify=False, force=True, debug=debug) if not python_path.exists(): raise FileNotFoundError(f"Unable to verify Python symlink:\n{python_path}") diff --git a/meerschaum/utils/warnings.py b/meerschaum/utils/warnings.py index f1e42618..c4f341e9 100644 --- a/meerschaum/utils/warnings.py +++ b/meerschaum/utils/warnings.py @@ -93,6 +93,7 @@ def _no_stack_sw(message, category, filename, lineno, file=None, line=None): if not stack: warnings.showwarning = _old_sw + def exception_with_traceback( message: str, exception_class = Exception,