Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
**Breaking Changes** - **Removed redundant `Pipe.sync_time` property.** Use `pipe.get_sync_time()` instead. - **Removed `SQLConnector.get_pipe_backtrack_minutes()`.** Use `pipe.get_backtrack_interval()` instead. - **Replaced `pipe.parameters['chunk_time_interval']` with `pipe.parameters['verify']['chunk_minutes']`** For better security and cohesiveness, the TimescaleDB `chunk_time_interval` value is now derived from the standard `chunk_minutes` value. This also means pipes with integer date axes will be created with a new default chunk interval of 1440 (was previously 100,000). - **Moved `choose_subaction()` into `meerschaum.actions`.** This function is for internal use and as such should not affect any users. **Features** - **Added `verify pipes` and `--verify`.** The command `mrsm verify pipes` or `mrsm sync pipes --verify` will resync pipes' chunks with different rowcounts to catch any backfilled data. ```python import meerschaum as mrsm foo = mrsm.Pipe( 'foo', 'src', target = 'foo', columns = {'datetime': 'dt'}, instance = 'sql:local' ) docs = [ {'dt': '2023-01-01'}, {'dt': '2023-01-02'}, ] foo.sync(docs) pipe = mrsm.Pipe( 'sql:local', 'verify', columns = {'datetime': 'dt'}, parameters = { 'query': f'SELECT * FROM "{foo.target}"' }, instance = 'sql:local', ) pipe.sync(docs) backfilled_docs = [ {'dt': '2022-12-30'}, {'dt': '2022-12-31'}, ] foo.sync(backfilled_docs) mrsm.pprint(pipe.verify()) assert foo.get_rowcount() == pipe.get_rowcount() ``` - **Added `deduplicate pipes` and `--deduplicate`.** Running `mrsm deduplicates pipes` or `mrsm sync pipes --deduplicate` will iterate over pipes' entire intervals, chunking at the configured chunk interval (see `pipe.get_chunk_interval()` below) and clearing + resyncing chunks with duplicate rows. If your instance connector implements `deduplicate_pipe()` (e.g. `SQLConnector`), then this method will override the default `pipe.deduplicate()`. ```python pipe = mrsm.Pipe( 'demo', 'deduplicate', columns = {'datetime': 'dt'}, instance = 'sql:local', ) docs = [ {'dt': '2023-01-01'}, {'dt': '2023-01-01'}, ] pipe.sync(docs) print(pipe.get_rowcount()) # 2 pipe.deduplicate() print(pipe.get_rowcount()) # 1 ``` - **Added `pyarrow` support.** The dtypes enforcement system was overhauled to add support for `pyarrow` data types. ```python import meerschaum as mrsm import pandas as pd df = pd.DataFrame( [{'a': 1, 'b': 2.3}] ).convert_dtypes(dtype_backend='pyarrow') pipe = mrsm.Pipe( 'demo', 'pyarrow', instance = 'sql:local', columns = {'a': 'a'}, ) pipe.sync(df) ``` - **Added `bool` support.** Pipes may now sync DataFrames with booleans (even on Oracle and MySQL): ```python import meerschaum as mrsm pipe = mrsm.Pipe( 'demo', 'bools', instance = 'sql:local', columns = {'id': 'id'}, ) pipe.sync([{'id': 1, 'is_blue': True}]) assert 'bool' in pipe.dtypes['is_blue'] pipe.sync([{'id': 1, 'is_blue': False}]) assert pipe.get_data()['is_blue'][0] == False ``` - **Added preliminary `dask` support.** For example, you may now return Dask DataFrames in your plugins, pass into `pipe.sync()`, and `pipe.get_data()` now has the flag `as_dask`. ```python import meerschaum as mrsm pipe = mrsm.Pipe( 'dask', 'demo', columns = {'datetime': 'dt'}, instance = 'sql:local', ) pipe.sync([ {'dt': '2023-01-01', 'val': 1}, {'dt': '2023-01-02', 'val': 2}, {'dt': '2023-01-03', 'val': 3}, ]) ddf = pipe.get_data(as_dask=True) print(ddf) # Dask DataFrame Structure: # dt val # npartitions=4 # datetime64[ns] int64[pyarrow] # ... ... # ... ... # ... ... # ... ... # Dask Name: from-delayed, 5 graph layers print(ddf.compute()) # dt val # 0 2023-01-01 1 # 0 2023-01-02 2 # 0 2023-01-03 3 pipe2 = mrsm.Pipe( 'dask', 'insert', columns = pipe.columns, instance = 'sql:local', ) pipe2.sync(ddf) assert pipe.get_data().to_dict() == pipe2.get_data().to_dict() pipe.sync([{'dt': '2023-01-01', 'val': 10}]) pipe2.sync(pipe.get_data(as_dask=True)) assert pipe.get_data().to_dict() == pipe2.get_data().to_dict() ``` - **Added `chunk_minutes` to `pipe.parameters['verify']`.** Like `pipe.parameters['fetch']['backtrack_minutes']`, you may now specify the default chunk interval to use for verification syncs and iterating over the datetime axis. ```python import meerschaum as mrsm pipe = mrsm.Pipe( 'a', 'b', instance = 'sql:local', columns = {'datetime': 'dt'}, parameters = { 'verify': { 'chunk_minutes': 2880, } }, ) pipe.sync([ {'dt': '2023-01-01'}, {'dt': '2023-01-02'}, {'dt': '2023-01-03'}, {'dt': '2023-01-04'}, {'dt': '2023-01-05'}, ]) chunk_bounds = pipe.get_chunk_bounds(bounded=True) for chunk_begin, chunk_end in chunk_bounds: print(chunk_begin, '-', chunk_end) # 2023-01-01 00:00:00 - 2023-01-03 00:00:00 # 2023-01-03 00:00:00 - 2023-01-05 00:00:00 ``` - **Added `--chunk-minutes`, `--chunk-hours`, and `--chunk-days`.** You may override a pipe's chunk interval during a verification sync with `--chunk-minutes` (or `--chunk-hours` or `--chunk-days`). ```python mrsm verify pipes --chunk-days 3 ``` - **Added `pipe.get_chunk_interval()` and `pipe.get_backtrack_interval()`.** Return the `timedelta` (or `int` for integer datetimes) from `verify:chunk_minutes` and `fetch:backtrack_minutes`, respectively. ```python import meerschaum as mrsm dt_pipe = mrsm.Pipe( 'demo', 'intervals', 'datetime', instance = 'sql:local', columns = {'datetime': 'dt'}, ) print(dt_pipe.get_chunk_interval()) # 1 day, 0:00:00 int_pipe = mrsm.Pipe( 'demo', 'intervals', 'int', instance = 'sql:local', columns = {'datetime': 'dt'}, dtypes = {'dt': 'int'}, ) print(int_pipe.get_chunk_interval()) # 1440 ``` - **Added `pipe.get_chunk_bounds()`.** Return a list of `begin` and `end` values to use when iterating over a pipe's datetime axis. ```python from datetime import datetime import meerschaum as mrsm pipe = mrsm.Pipe( 'demo', 'chunk_bounds', instance = 'sql:local', columns = {'datetime': 'dt'}, parameters = { 'verify': { 'chunk_minutes': 1440, } }, ) pipe.sync([ {'dt': '2023-01-01'}, {'dt': '2023-01-02'}, {'dt': '2023-01-03'}, {'dt': '2023-01-04'}, ]) open_bounds = pipe.get_chunk_bounds() for i, (begin, end) in enumerate(open_bounds): print(f"Chunk {i}: ({begin}, {end})") # Chunk 0: (None, 2023-01-01 00:00:00) # Chunk 1: (2023-01-01 00:00:00, 2023-01-02 00:00:00) # Chunk 2: (2023-01-02 00:00:00, 2023-01-03 00:00:00) # Chunk 3: (2023-01-03 00:00:00, 2023-01-04 00:00:00) # Chunk 4: (2023-01-04 00:00:00, None) closed_bounds = pipe.get_chunk_bounds(bounded=True) for i, (begin, end) in enumerate(closed_bounds): print(f"Chunk {i}: ({begin}, {end})") # Chunk 0: (2023-01-01 00:00:00, 2023-01-02 00:00:00) # Chunk 1: (2023-01-02 00:00:00, 2023-01-03 00:00:00) # Chunk 2: (2023-01-03 00:00:00, 2023-01-04 00:00:00) sub_bounds = pipe.get_chunk_bounds( begin = datetime(2023, 1, 1), end = datetime(2023, 1, 3), ) for i, (begin, end) in enumerate(sub_bounds): print(f"Chunk {i}: ({begin}, {end})") # Chunk 0: (2023-01-01 00:00:00, 2023-01-02 00:00:00) # Chunk 1: (2023-01-02 00:00:00, 2023-01-03 00:00:00) ``` - **Added `--bounded` to verification syncs.** By default, `verify pipes` is unbounded, meaning it will sync values beyond the existing minimum and maximum datetime values. Running a verification sync with `--bounded` will bound the search to the existing datetime axis. ```bash mrsm sync pipes --verify --bounded ``` - **Added `pipe.get_num_workers()`.** Return the number of concurrent threads to be used with this pipe (with respect to its instance connector's thread safety). - **Added `select_columns` and `omit_columns` to `pipe.get_data()`.** In situations where not all columns are required, you can now either specify which columns you want to include (`select_columns`) and which columns to filter out (`omit_columns`). You may pass a list of columns or a single column, and the value `'*'` for `select_columns` will be treated as `None` (i.e. `SELECT *`). ```python pipe = mrsm.Pipe('a', 'b', 'c', instance='sql:local') pipe.sync([{'a': 1, 'b': 2, 'c': 3}]) pipe.get_data(['a', 'b']) # a b # 0 1 2 pipe.get_data('*', 'b') # a c # 0 1 3 pipe.get_data(None, ['a', 'c']) # b # 0 2 pipe.get_data(omit_columns=['b', 'c']) # a # 0 1 pipe.get_data(select_columns=['c', 'a']) # c a # 0 3 1 ``` - **Replace `daemoniker` with `python-daemon`.** `python-daemon` is a well-maintained and well-behaved daemon process library. However, this migration removes Windows support for background jobs (which was never really fully supported already, so no harm there). - **Added `pause jobs`.** In addition to `start jobs` and `stop jobs`, the command `pause jobs` will suspend a job's daemon. Jobs may be resumed with `start jobs` (i.e. `Daemon.resume()`). - **Added job management to the UI.** Now that jobs and logs are much more robust, more job management features have been added to the web UI. Jobs may be started, stopped, paused, and resumed from the web console, and their logs are now available for download. - **Logs now roll over and are preserved on job restarts.** Spin up long-running job with peace of mind now that logs are automatically rolled over, keeping five 500 KB files on disk at any moment (you can tweak these values with `mrsm edit config jobs`). To facilitate this, `meershaum.utils.daemon.RotatingFile` was added to provide a generic file-like object, complete with its own file descriptor. - **Starting existing jobs with `-d` will not throw an exception if the arguments match.** Similarly, running without any arguments other than `--name` will run the existing job. This matches the behavior of `start jobs`. - **Allow for colon-separated paths in `MRSM_PLUGINS_DIR`.** Just like `PATH` in `bash`, you may now specify your plugins' paths in a single variable, separated by colons. Unlike `bash`, however, a blank path will not interpreted as the current directory. ```bash export MRSM_PLUGINS_DIR='./plugins:/app/plugins' ``` - **Add `pipe.keys()`** `pipe.keys()` returns the connector, metric, and location keys (i.e. `pipe.meta` without the `instance`). ```python pipe = mrsm.Pipe('foo', 'bar') print(pipe.keys()) # {'connector': 'foo', 'metric': 'bar', 'location': None} ``` - **Pipes are now indexable.** Indexing a pipe directly is the same as accessing `pipe.attributes`: ```python pipe = mrsm.Pipe('a', 'b', columns={'foo': 'bar'}) print(pipe['connector']) # 'a' print(pipe['connector_keys']) # 'a' print(pipe['columns']) # {'foo': 'bar'} print(pipe['does_not_exist']) # None ``` **Other changes** - **Fixed backtracking being incorrectly applied to `--begin`.** Application of the backtracking interval has been consolidated into `pipe.fetch()`. - **Improved data type enforcement for SQL pipes.** A pipe's data types are now passed to `SQLConnector.read()` when fetching its data. - **Added `meerschaum.utils.sql.get_db_version()` and `SQLConnector.db_version`.** - **Moved `print_options()` from `meerschaum.utils.misc` into `meerschaum.utils.formatting`.** This places `print_options()` next to `print_tuple` and `pprint`. A placeholder function is still present in `meerschaum.utils.misc` to preserve existing behavior. - **`mrsm.pprint()` will now pretty-print `SuccessTuples`.** - **Added `calm` to `print_tuple()`.** Printing a `SuccessTuple` with `calm=True` will use a more muted color scheme and emoji. - **Removed `round_down` from `get_sync_time()` for instance connectors.** To avoid confusion, sync times are no longer truncated by default. `round_down` is still an optional keyword argument on `pipe.get_sync_time()`. - **Created `meerschaum.utils.dtypes`.** - **Added `are_dtypes_equal()` to `meerschaum.utils.dtypes`.** - **Added `get_db_type_from_pd_type()` to `meerschaum.utils.dtypes.sql`.** - **Added `get_pb_type_from_db_type()` to `meerschaum.utils.dtypes.sql`.** - **Moved `to_pandas_dtype()` from `meerschaum.utils.misc` into `meerschaum.utils.dtypes`.** - **Created `meerschaum.utils.dataframe`.** - **Added `chunksize_to_npartitions()` to `meerschaum.utils.dataframe`.** - **Added `get_first_valid_dask_partition()` to `meerschaum.utils.dataframe`.** - **Moved `filter_unseen_df()` from `meerschaum.utils.misc` into `meerschaum.utils.dataframe`.** - **Moved `add_missing_cols_to_df()` from `meerschaum.utils.misc` into `meerschaum.utils.dataframe`.** - **Moved `parse_df_datetimes()` from `meerschaum.utils.misc` into `meerschaum.utils.dataframe`.** - **Moved `df_from_literal()` from `meerschaum.utils.misc` into `meerschaum.utils.dataframe`.** - **Moved `get_json_cols()` from `meerschaum.utils.misc` into `meerschaum.utils.dataframe`.** - **Moved `get_unhashable_cols()` from `meerschaum.utils.misc` into `meerschaum.utils.dataframe`.** - **Moved `enforce_dtypes()` from `meerschaum.utils.misc` into `meerschaum.utils.dataframe`.** - **Moved `get_datetime_bound_from_df()` from `meerschaum.utils.misc` into `meerschaum.utils.dataframe`.** - **Moved `df_is_chunk_generator()` from `meerschaum.utils.misc` into `meerschaum.utils.dataframe`.** - **Refactored SQL utilities.** - **Added `format_cte_subquery()` to `meerschaum.utils.sql`.** - **Added `get_create_table_query()` to `meerschaum.utils.sql`.** - **Added `get_db_version()` to `meerschaum.utils.sql`.** - **Added `get_rename_table_queries()` to `meerschaum.utils.sql`.** - **Moved `choices_docstring()` from `meerschaum.utils.misc` into `meerschaum.actions`.** - **Fixed handling backslashes for `stack` on Windows.**
- Loading branch information