From 78d6a7bcfe02bb7f88af3d5426dbdb3e48e5fe7d Mon Sep 17 00:00:00 2001 From: bmeares Date: Sun, 4 Dec 2022 23:05:31 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=92=EF=B8=8F=20Security=20patch=20and?= =?UTF-8?q?=20add=20`temporary`=20to=20avoid=20automatic=20table=20creatio?= =?UTF-8?q?n.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 53 +++++++++++++++++++++++ docs/mkdocs/news/changelog.md | 53 +++++++++++++++++++++++ meerschaum/_internal/arguments/_parser.py | 10 ++++- meerschaum/actions/copy.py | 4 +- meerschaum/api/__init__.py | 2 +- meerschaum/api/routes/_pipes.py | 14 +++++- meerschaum/config/_version.py | 2 +- meerschaum/connectors/api/_delete.py | 27 +----------- meerschaum/connectors/api/_get.py | 23 +++++----- meerschaum/connectors/api/_patch.py | 35 +++------------ meerschaum/connectors/api/_post.py | 5 +-- meerschaum/connectors/sql/_pipes.py | 41 +++++++++++------- meerschaum/core/Pipe/__init__.py | 12 +++-- meerschaum/core/Pipe/_attributes.py | 4 +- meerschaum/core/Pipe/_bootstrap.py | 8 ---- meerschaum/core/Pipe/_delete.py | 24 ++++++---- meerschaum/core/Pipe/_edit.py | 6 +++ meerschaum/core/Pipe/_register.py | 3 ++ meerschaum/core/Pipe/_sync.py | 2 +- meerschaum/utils/get_pipes.py | 2 - meerschaum/utils/sql.py | 2 +- tests/test_pipes.py | 34 +++++++++++++++ 22 files changed, 251 insertions(+), 115 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e21d4a6..6bb913a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,59 @@ This is the current release cycle, so stay tuned for future releases! +### v1.4.14 + +- **Added flag `temporary` to `Pipe` (and `--temporary`).** + Pipes built with `temporary=True`, will not create instance tables (`pipes`, `users`, and `plugins`) or be able to modify registration. This is particularly useful when creating pipes from existing tables when automatic registration is not desired. + + ```python + import meerschaum as mrsm + import pandas as pd + conn = mrsm.get_connector('sql:temp', uri='postgresql://user:pass@localhost:5432/db') + + ### Simulating an existing table. + table_name = 'my_table' + conn.to_sql( + pd.DataFrame([{'id_column': 1, 'value': 1.0}]), + name = table_name, + ) + + ### Create a temporary pipe with the existing table as its target. + pipe = mrsm.Pipe( + 'foo', 'bar', + target = table_name, + temporary = True, + instance = conn, + columns = { + 'id': 'id_column', + }, + ) + + docs = [ + { + "id_column": 1, + "value": 123.456, + "new_column": "hello, world!", + }, + ] + + ### Existing table `my_table` is synced without creating other tables + ### or affecting pipes' registration. + pipe.sync(docs) + ``` + +- **Fixed potential security of public instance tables.** + The API now refuses to sync or serve data if the target is a protected instance table (`pipes`, `users`, or `plugins`). + +- **Added not-null check to `pipe.get_sync_time().`** + The `datetime` column should never contain null values, but just in case, `pipe.get_sync_time()` now passes a not-null check to `params` for the datetime column. + +- **Removed prompt for `value` from `pipe.bootstrap()`.** + The prompt for an optional `value` column has been removed from the bootstrapping wizard because `pipe.columns` is now largely used as a collection of indices rather than the original purpose of meta-columns. + +- **Pass `--debug` and other flags in `copy pipes`.** + Command line flags are now passed to the new pipe when copying an existing pipe. + ### v1.4.12 – v1.4.13 - **Fixed an issue when syncing empty DataFrames [(#95)](https://github.com/bmeares/Meerschaum/issues/95).** diff --git a/docs/mkdocs/news/changelog.md b/docs/mkdocs/news/changelog.md index 7e21d4a6..6bb913a9 100644 --- a/docs/mkdocs/news/changelog.md +++ b/docs/mkdocs/news/changelog.md @@ -4,6 +4,59 @@ This is the current release cycle, so stay tuned for future releases! +### v1.4.14 + +- **Added flag `temporary` to `Pipe` (and `--temporary`).** + Pipes built with `temporary=True`, will not create instance tables (`pipes`, `users`, and `plugins`) or be able to modify registration. This is particularly useful when creating pipes from existing tables when automatic registration is not desired. + + ```python + import meerschaum as mrsm + import pandas as pd + conn = mrsm.get_connector('sql:temp', uri='postgresql://user:pass@localhost:5432/db') + + ### Simulating an existing table. + table_name = 'my_table' + conn.to_sql( + pd.DataFrame([{'id_column': 1, 'value': 1.0}]), + name = table_name, + ) + + ### Create a temporary pipe with the existing table as its target. + pipe = mrsm.Pipe( + 'foo', 'bar', + target = table_name, + temporary = True, + instance = conn, + columns = { + 'id': 'id_column', + }, + ) + + docs = [ + { + "id_column": 1, + "value": 123.456, + "new_column": "hello, world!", + }, + ] + + ### Existing table `my_table` is synced without creating other tables + ### or affecting pipes' registration. + pipe.sync(docs) + ``` + +- **Fixed potential security of public instance tables.** + The API now refuses to sync or serve data if the target is a protected instance table (`pipes`, `users`, or `plugins`). + +- **Added not-null check to `pipe.get_sync_time().`** + The `datetime` column should never contain null values, but just in case, `pipe.get_sync_time()` now passes a not-null check to `params` for the datetime column. + +- **Removed prompt for `value` from `pipe.bootstrap()`.** + The prompt for an optional `value` column has been removed from the bootstrapping wizard because `pipe.columns` is now largely used as a collection of indices rather than the original purpose of meta-columns. + +- **Pass `--debug` and other flags in `copy pipes`.** + Command line flags are now passed to the new pipe when copying an existing pipe. + ### v1.4.12 – v1.4.13 - **Fixed an issue when syncing empty DataFrames [(#95)](https://github.com/bmeares/Meerschaum/issues/95).** diff --git a/meerschaum/_internal/arguments/_parser.py b/meerschaum/_internal/arguments/_parser.py index 1a03f536..75ecf708 100644 --- a/meerschaum/_internal/arguments/_parser.py +++ b/meerschaum/_internal/arguments/_parser.py @@ -196,6 +196,7 @@ def get_arguments_triggers() -> Dict[str, Tuple[str]]: '-t', '--tags', nargs='+', help="Only include pipes with these tags.", ) + ### Sync options groups['sync'].add_argument( '--min-seconds', '--cooldown', type=float, help=( @@ -315,7 +316,14 @@ def get_arguments_triggers() -> Dict[str, Tuple[str]]: "--params key1:value1,key2:value2" ) ) - +groups['misc'].add_argument( + '--temporary', '--temp', + action = 'store_true', + help = ( + "Skip creating or modifying instance tables when working with pipes " + + "(plugins and users still trigger table creation)." + ), +) groups['misc'].add_argument( '--gui', action='store_true', help="Open a DataFrame in an interactive pandasgui or matplotlib window." diff --git a/meerschaum/actions/copy.py b/meerschaum/actions/copy.py index 4ae1b2e0..0c8952cc 100644 --- a/meerschaum/actions/copy.py +++ b/meerschaum/actions/copy.py @@ -32,6 +32,7 @@ def copy( } return choose_subaction(action, options, **kw) + def _complete_copy( action : Optional[List[str]] = None, **kw : Any @@ -60,6 +61,7 @@ def _complete_copy( from meerschaum._internal.shell import default_action_completer return default_action_completer(action=(['copy'] + action), **kw) + def _copy_pipes( yes: bool = False, noask: bool = False, @@ -115,7 +117,7 @@ def _copy_pipes( noask=noask, yes=yes ) ): - _new_pipe.sync(p.get_data(debug=debug, **kw)) + _new_pipe.sync(p.get_data(debug=debug, **kw), debug=debug, **kw) msg = ( "No pipes were copied." if successes == 0 diff --git a/meerschaum/api/__init__.py b/meerschaum/api/__init__.py index 617760b0..232054e8 100644 --- a/meerschaum/api/__init__.py +++ b/meerschaum/api/__init__.py @@ -85,7 +85,7 @@ def get_uvicorn_config() -> Dict[str, Any]: _include_dash = (not no_dash) connector = None -def get_api_connector(instance_keys : Optional[str] = None): +def get_api_connector(instance_keys: Optional[str] = None): """Create the instance connector.""" from meerschaum.utils.debug import dprint global connector diff --git a/meerschaum/api/routes/_pipes.py b/meerschaum/api/routes/_pipes.py index 8226bb77..037762c9 100644 --- a/meerschaum/api/routes/_pipes.py +++ b/meerschaum/api/routes/_pipes.py @@ -308,6 +308,12 @@ def sync_pipe( if data is None: data = {} p = get_pipe(connector_keys, metric_key, location_key) + if p.target in ('users', 'plugins', 'pipes'): + raise fastapi.HTTPException( + status_code = 409, + detail = f"Cannot sync data to protected table '{p.target}'.", + ) + if not p.columns and columns is not None: p.columns = json.loads(columns) if not p.columns and not is_pipe_registered(p, pipes(refresh=True)): @@ -363,7 +369,13 @@ def get_pipe_data( if not is_pipe_registered(p, pipes(refresh=True)): raise fastapi.HTTPException( status_code = 409, - detail = "Pipe must be registered with the datetime column specified" + detail = "Pipe must be registered with the datetime column specified." + ) + + if p.target in ('users', 'plugins', 'pipes'): + raise fastapi.HTTPException( + status_code = 409, + detail = f"Cannot retrieve data from protected table '{p.target}'.", ) # chunks = p.get_data( diff --git a/meerschaum/config/_version.py b/meerschaum/config/_version.py index ab90cc5c..d68794ad 100644 --- a/meerschaum/config/_version.py +++ b/meerschaum/config/_version.py @@ -2,4 +2,4 @@ Specify the Meerschaum release version. """ -__version__ = "1.4.13" +__version__ = "1.4.14" diff --git a/meerschaum/connectors/api/_delete.py b/meerschaum/connectors/api/_delete.py index 031a2b20..37a42763 100644 --- a/meerschaum/connectors/api/_delete.py +++ b/meerschaum/connectors/api/_delete.py @@ -17,27 +17,7 @@ def delete( debug : bool = False, **kw : Ahy, ) -> requests.Response: - """Wrapper for requests.delete - - Parameters - ---------- - r_url : str : - - headers : Optional[Dict[str : - - Any]] : - (Default value = None) - use_token : bool : - (Default value = True) - debug : bool : - (Default value = False) - **kw : Ahy : - - - Returns - ------- - - """ + """Wrapper for `requests.delete`.""" if debug: from meerschaum.utils.debug import dprint @@ -51,10 +31,7 @@ def delete( if debug: from meerschaum.utils.formatting import pprint - dprint(f"Sending DELETE request to {self.url + r_url}") - if headers: - pprint(headers) - pprint(kw) + dprint(f"Sending DELETE request to {self.url + r_url}.") return self.session.delete( self.url + r_url, diff --git a/meerschaum/connectors/api/_get.py b/meerschaum/connectors/api/_get.py index 0380fcad..d8ff78cc 100644 --- a/meerschaum/connectors/api/_get.py +++ b/meerschaum/connectors/api/_get.py @@ -11,11 +11,11 @@ def get( self, - r_url : str, - headers : Optional[Dict[str, str]] = None, - use_token : bool = True, - debug : bool = False, - **kw : Any + r_url: str, + headers: Optional[Dict[str, str]] = None, + use_token: bool = True, + debug: bool = False, + **kw: Any ) -> requests.Reponse: """Wrapper for `requests.get`.""" if debug: @@ -27,14 +27,11 @@ def get( if use_token: if debug: dprint(f"Checking login token.") - headers.update({ 'Authorization': f'Bearer {self.token}' }) + headers.update({'Authorization': f'Bearer {self.token}'}) if debug: from meerschaum.utils.formatting import pprint dprint(f"Sending GET request to {self.url + r_url}.") - if headers: - pprint(headers) - pprint(kw) return self.session.get( self.url + r_url, @@ -44,12 +41,12 @@ def get( def wget( self, - r_url : str, - dest : Optional[Union[str, pathlib.Path]] = None, + r_url: str, + dest: Optional[Union[str, pathlib.Path]] = None, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, - **kw : Any + **kw: Any ) -> pathlib.Path: """Mimic wget with requests. """ @@ -60,5 +57,5 @@ def wget( if use_token: if debug: dprint(f"Checking login token.") - headers.update({ 'Authorization': f'Bearer {self.token}' }) + headers.update({'Authorization': f'Bearer {self.token}'}) return wget(self.url + r_url, dest=dest, headers=headers, **kw) diff --git a/meerschaum/connectors/api/_patch.py b/meerschaum/connectors/api/_patch.py index 79e2532c..643fd98f 100644 --- a/meerschaum/connectors/api/_patch.py +++ b/meerschaum/connectors/api/_patch.py @@ -11,33 +11,13 @@ def patch( self, - r_url : str, - headers : Optional[Dict[str, Any]] = None, - use_token : bool = True, - debug : bool = False, - **kw : Any + r_url: str, + headers: Optional[Dict[str, Any]] = None, + use_token: bool = True, + debug: bool = False, + **kw: Any ) -> requests.Response: - """Wrapper for requests.patch - - Parameters - ---------- - r_url : str : - - headers : Optional[Dict[str : - - Any]] : - (Default value = None) - use_token : bool : - (Default value = True) - debug : bool : - (Default value = False) - **kw : Any : - - - Returns - ------- - - """ + """Wrapper for `requests.patch`.""" if debug: from meerschaum.utils.debug import dprint @@ -52,9 +32,6 @@ def patch( if debug: from meerschaum.utils.formatting import pprint dprint(f"Sending PATCH request to {self.url + r_url}") - if headers: - pprint(headers) - pprint(kw) return self.session.patch( self.url + r_url, diff --git a/meerschaum/connectors/api/_post.py b/meerschaum/connectors/api/_post.py index 2cb1ae45..46829f1c 100644 --- a/meerschaum/connectors/api/_post.py +++ b/meerschaum/connectors/api/_post.py @@ -27,14 +27,11 @@ def post( if use_token: if debug: dprint(f"Checking token...") - headers.update({ 'Authorization': f'Bearer {self.token}' }) + headers.update({'Authorization': f'Bearer {self.token}'}) if debug: from meerschaum.utils.formatting import pprint dprint(f"Sending POST request to {self.url + r_url}") - if headers: - pprint(headers) - pprint(kw) return self.session.post( self.url + r_url, diff --git a/meerschaum/connectors/sql/_pipes.py b/meerschaum/connectors/sql/_pipes.py index d2706c27..a69576f6 100644 --- a/meerschaum/connectors/sql/_pipes.py +++ b/meerschaum/connectors/sql/_pipes.py @@ -25,7 +25,7 @@ def register_pipe( ### ensure pipes table exists from meerschaum.connectors.sql.tables import get_tables - pipes = get_tables(mrsm_instance=self, debug=debug)['pipes'] + pipes = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] if pipe.get_id(debug=debug) is not None: return False, f"{pipe} is already registered." @@ -109,7 +109,7 @@ def edit_pipe( ### ensure pipes table exists from meerschaum.connectors.sql.tables import get_tables - pipes = get_tables(mrsm_instance=self, debug=debug)['pipes'] + pipes = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] import json sqlalchemy = attempt_import('sqlalchemy') @@ -166,8 +166,8 @@ def fetch_pipes_keys( from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import from meerschaum.utils.misc import separate_negation_values - from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS - from meerschaum.config.static import _static_config + from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists + from meerschaum.config.static import STATIC_CONFIG sqlalchemy = attempt_import('sqlalchemy') import json from copy import deepcopy @@ -207,15 +207,18 @@ def fetch_pipes_keys( parameters[col] = vals cols = {k: v for k, v in cols.items() if v != [None]} + if not table_exists('pipes', self, debug=debug): + return [] + from meerschaum.connectors.sql.tables import get_tables - pipes = get_tables(mrsm_instance=self, debug=debug)['pipes'] + pipes = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes'] _params = {} for k, v in parameters.items(): _v = json.dumps(v) if isinstance(v, dict) else v _params[k] = _v - negation_prefix = _static_config()['system']['fetch_pipes_keys']['negation_prefix'] + negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] ### Parse regular params. ### If a param begins with '_', negate it instead. _where = [ @@ -548,11 +551,11 @@ def delete_pipe( ### ensure pipes table exists from meerschaum.connectors.sql.tables import get_tables - pipes = get_tables(mrsm_instance=self, debug=debug)['pipes'] + pipes = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] q = sqlalchemy.delete(pipes).where(pipes.c.pipe_id == pipe.id) if not self.exec(q, debug=debug): - return False, f"Failed to delete registration for '{pipe}'." + return False, f"Failed to delete registration for {pipe}." return True, "Success" @@ -913,15 +916,17 @@ def get_pipe_id( self, pipe: meerschaum.Pipe, debug: bool = False, - ) -> int: + ) -> Any: """ Get a Pipe's ID from the pipes table. """ + if pipe.temporary: + return None from meerschaum.utils.packages import attempt_import import json sqlalchemy = attempt_import('sqlalchemy') from meerschaum.connectors.sql.tables import get_tables - pipes = get_tables(mrsm_instance=self, debug=debug)['pipes'] + pipes = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] query = sqlalchemy.select([pipes.c.pipe_id]).where( pipes.c.connector_keys == pipe.connector_keys @@ -931,7 +936,7 @@ def get_pipe_id( (pipes.c.location_key == pipe.location_key) if pipe.location_key is not None else pipes.c.location_key.is_(None) ) - _id = self.value(query, debug=debug) + _id = self.value(query, debug=debug, silent=pipe.temporary) if _id is not None: _id = int(_id) return _id @@ -950,11 +955,12 @@ def get_pipe_attributes( from meerschaum.connectors.sql.tables import get_tables from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') - pipes = get_tables(mrsm_instance=self, debug=debug)['pipes'] if pipe.get_id(debug=debug) is None: return {} + pipes = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] + try: q = sqlalchemy.select([pipes]).where(pipes.c.pipe_id == pipe.id) if debug: @@ -1056,8 +1062,7 @@ def sync_pipe( start = time.perf_counter() - ### if Pipe is not registered - if not pipe.get_id(debug=debug): + if not pipe.temporary and not pipe.get_id(debug=debug): register_tuple = pipe.register(debug=debug) if not register_tuple[0]: return register_tuple @@ -1126,6 +1131,7 @@ def sync_pipe( instance = pipe.instance_keys, columns = pipe.columns, target = temp_target, + temporary = True, ) existing_cols = pipe.get_columns_types(debug=debug) @@ -1842,6 +1848,11 @@ def get_sync_time( valid_params = {} if params is not None: valid_params = {k: v for k, v in params.items() if k in existing_cols} + + ### If no bounds are provided for the datetime column, + ### add IS NOT NULL to the WHERE clause. + if _dt not in valid_params: + valid_params[_dt] = '_None' where = "" if not valid_params else build_where(valid_params, self) q = f"SELECT {dt}\nFROM {table}{where}\nORDER BY {dt} {ASC_or_DESC}\nLIMIT 1" if self.flavor == 'mssql': @@ -2050,7 +2061,7 @@ def get_pipe_rowcount( ) ) - result = self.value(query, debug=debug) + result = self.value(query, debug=debug, silent=True) try: return int(result) except Exception as e: diff --git a/meerschaum/core/Pipe/__init__.py b/meerschaum/core/Pipe/__init__.py index 350efc07..c759abf1 100644 --- a/meerschaum/core/Pipe/__init__.py +++ b/meerschaum/core/Pipe/__init__.py @@ -122,6 +122,7 @@ def __init__( target: Optional[str] = None, dtypes: Optional[Dict[str, str]] = None, instance: Optional[Union[str, InstanceConnector]] = None, + temporary: bool = False, mrsm_instance: Optional[Union[str, InstanceConnector]] = None, cache: bool = False, debug: bool = False, @@ -165,6 +166,9 @@ def __init__( instance: Optional[Union[str, InstanceConnector]], default None Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored. + temporary: bool, default False + If `True`, prevent instance tables (pipes, users, plugins) from being created. + cache: bool, default False If `True`, cache fetched data into a local database file. Defaults to `False`. @@ -199,6 +203,7 @@ def __init__( self.connector_key = self.connector_keys ### Alias self.metric_key = metric self.location_key = location + self.temporary = temporary self._attributes = { 'connector_keys': self.connector_keys, @@ -353,9 +358,10 @@ def cache_pipe(self) -> Union['meerschaum.Pipe', None]: self.instance_keys, (self.connector_keys + '_' + self.metric_key + '_cache'), self.location_key, - mrsm_instance=self.cache_connector, - parameters=_parameters, - cache=False, + mrsm_instance = self.cache_connector, + parameters = _parameters, + cache = False, + temporary = True, ) return self._cache_pipe diff --git a/meerschaum/core/Pipe/_attributes.py b/meerschaum/core/Pipe/_attributes.py index 3f631ed4..7dcf85dd 100644 --- a/meerschaum/core/Pipe/_attributes.py +++ b/meerschaum/core/Pipe/_attributes.py @@ -34,7 +34,7 @@ def attributes(self) -> Dict[str, Any]: or (timeout_seconds is not None and (now - last_refresh) >= timeout_seconds) ) - if timed_out: + if not self.temporary and timed_out: self._attributes_sync_time = now local_attributes = self.__dict__.get('_attributes', {}) with Venv(get_connector_plugin(self.instance_connector)): @@ -215,6 +215,8 @@ def get_id(self, **kw: Any) -> Union[int, None]: Fetch a pipe's ID from its instance connector. If the pipe does not exist, return `None`. """ + if self.temporary: + return None from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin diff --git a/meerschaum/core/Pipe/_bootstrap.py b/meerschaum/core/Pipe/_bootstrap.py index 53c3f50a..bf9b8aa7 100644 --- a/meerschaum/core/Pipe/_bootstrap.py +++ b/meerschaum/core/Pipe/_bootstrap.py @@ -233,17 +233,9 @@ def _ask_for_columns(pipe, debug: bool=False) -> Dict[str, str]: if id_name == '': id_name = None - try: - value_name = prompt(f"Value column (empty to omit):", icon=False) - except KeyboardInterrupt: - return False, f"Cancelled bootstrapping {pipe}." - if value_name == '': - value_name = None - break return { 'datetime': datetime_name, 'id': id_name, - 'value': value_name, } diff --git a/meerschaum/core/Pipe/_delete.py b/meerschaum/core/Pipe/_delete.py index 5b9904de..dcf2726f 100644 --- a/meerschaum/core/Pipe/_delete.py +++ b/meerschaum/core/Pipe/_delete.py @@ -31,15 +31,23 @@ def delete( from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin + if self.temporary: + return ( + False, + "Cannot delete pipes created with `temporary=True` (read-only). " + + "You may want to call `pipe.drop()` instead." + ) + if self.cache_pipe is not None: - _delete_cache_tuple = self.cache_pipe.delete(debug=debug, **kw) - if not _delete_cache_tuple[0]: - warn(_delete_cache_tuple[1]) - _cache_db_path = pathlib.Path(self.cache_connector.database) - try: - os.remove(_cache_db_path) - except Exception as e: - warn(f"Could not delete cache file '{_cache_db_path}' for {self}:\n{e}") + _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw) + if not _drop_cache_tuple[0]: + warn(_drop_cache_tuple[1]) + if getattr(self.cache_connector, 'flavor', None) == 'sqlite': + _cache_db_path = pathlib.Path(self.cache_connector.database) + try: + os.remove(_cache_db_path) + except Exception as e: + warn(f"Could not delete cache file '{_cache_db_path}' for {self}:\n{e}") with Venv(get_connector_plugin(self.instance_connector)): result = self.instance_connector.delete_pipe(self, debug=debug, **kw) diff --git a/meerschaum/core/Pipe/_edit.py b/meerschaum/core/Pipe/_edit.py index ea006cb4..f1f0580b 100644 --- a/meerschaum/core/Pipe/_edit.py +++ b/meerschaum/core/Pipe/_edit.py @@ -44,6 +44,9 @@ def edit( from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin + if self.temporary: + return False, "Cannot edit pipes created with `temporary=True` (read-only)." + if not interactive: with Venv(get_connector_plugin(self.instance_connector)): return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw) @@ -109,6 +112,9 @@ def edit_definition( A `SuccessTuple` of success, message. """ + if self.temporary: + return False, "Cannot edit pipes created with `temporary=True` (read-only)." + from meerschaum.connectors import instance_types if (self.connector is None) or self.connector.type not in instance_types: return self.edit(interactive=True, debug=debug, **kw) diff --git a/meerschaum/core/Pipe/_register.py b/meerschaum/core/Pipe/_register.py index f3437a17..324f8500 100644 --- a/meerschaum/core/Pipe/_register.py +++ b/meerschaum/core/Pipe/_register.py @@ -25,6 +25,9 @@ def register( A `SuccessTuple` of success, message. """ + if self.temporary: + return False, "Cannot register pipes created with `temporary=True` (read-only)." + from meerschaum.utils.formatting import get_console from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin, custom_types diff --git a/meerschaum/core/Pipe/_sync.py b/meerschaum/core/Pipe/_sync.py index 2ad47cd0..29196e70 100644 --- a/meerschaum/core/Pipe/_sync.py +++ b/meerschaum/core/Pipe/_sync.py @@ -151,7 +151,7 @@ def _sync( + "Omit the DataFrame to infer fetching.", ) ### Ensure that Pipe is registered. - if p.get_id(debug=debug) is None: + if not p.temporary and p.get_id(debug=debug) is None: ### NOTE: This may trigger an interactive session for plugins! register_tuple = p.register(debug=debug) if not register_tuple[0]: diff --git a/meerschaum/utils/get_pipes.py b/meerschaum/utils/get_pipes.py index fabd0d17..3dc0b196 100644 --- a/meerschaum/utils/get_pipes.py +++ b/meerschaum/utils/get_pipes.py @@ -237,8 +237,6 @@ def fetch_pipes_keys( [('sql:main', 'weather', None)] """ from meerschaum.utils.warnings import error - from meerschaum.connectors.sql.tables import get_tables - tables = get_tables(connector) def _registered( connector_keys: Optional[List[str]] = None, diff --git a/meerschaum/utils/sql.py b/meerschaum/utils/sql.py index aa2ca237..e59acf5c 100644 --- a/meerschaum/utils/sql.py +++ b/meerschaum/utils/sql.py @@ -786,7 +786,7 @@ def get_sqlalchemy_table( from meerschaum.utils.packages import attempt_import if refresh: connector.metadata.clear() - tables = get_tables(mrsm_instance=connector, debug=debug) + tables = get_tables(mrsm_instance=connector, debug=debug, create=False) sqlalchemy = attempt_import('sqlalchemy') truncated_table_name = truncate_item_name(str(table), connector.flavor) if refresh or truncated_table_name not in tables: diff --git a/tests/test_pipes.py b/tests/test_pipes.py index 1fd2f9dd..dc745a30 100644 --- a/tests/test_pipes.py +++ b/tests/test_pipes.py @@ -8,6 +8,7 @@ from tests.pipes import all_pipes, stress_pipes, remote_pipes from tests.connectors import conns from tests.test_users import test_register_user +import meerschaum as mrsm from meerschaum import Pipe from meerschaum.actions import actions @@ -269,3 +270,36 @@ def test_dtype_enforcement(flavor: str): for col, typ in df.dtypes.items(): assert str(typ) == pipe.dtypes[col] return pipe + + +@pytest.mark.parametrize("flavor", list(all_pipes.keys())) +def test_temporary_pipes(flavor: str): + """ + Verify that `temporary=True` will not create instance tables. + """ + from meerschaum.utils.misc import generate_password + from meerschaum.utils.sql import table_exists + session_id = generate_password(6) + db_path = '/tmp/' + session_id + '.db' + conn = mrsm.get_connector('sql', session_id, flavor='sqlite', database=db_path) + pipe = Pipe('foo', 'bar', instance=conn, temporary=True, columns={'id': 'id'}) + _ = pipe.parameters + _ = pipe.id + _ = pipe.get_rowcount(debug=debug) + success, msg = pipe.sync([{'id': 1, 'a': 2}], debug=debug) + assert success, msg + success, msg = pipe.sync([{'id': 2, 'b': 3}], debug=debug) + assert success, msg + success, msg = pipe.sync( + [ + {'id': 1, 'b': 4}, + {'id': 2, 'a': 5}, + ], + debug = debug, + ) + success, msg = pipe.delete(debug=debug) + assert (not success), msg + assert pipe.get_rowcount(debug=debug) == 2 + assert not table_exists('pipes', conn, debug=debug) + assert not table_exists('users', conn, debug=debug) + assert not table_exists('plugins', conn, debug=debug)