Skip to content

Commit

Permalink
Merge pull request #72 from bmeares/dev
Browse files Browse the repository at this point in the history
🔖 v1.2.8 More verbose tracebacks, added `register()` for custom connectors, and more.
  • Loading branch information
bmeares authored Sep 8, 2022
2 parents 0035d2f + 1d03893 commit 4dbac54
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 40 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@

This is the current release cycle, so future features will be updated below.

### v1.2.8

- **Custom connectors may now have `register(pipe)` methods.**
Just like the module-level `register(pipe)` plugin function, custom connectors may also provide this function as a class member.
- **Print a traceback if `fetch(pipe)` breaks.**
A more verbose traceback is printed if a plugin breaks during the syncing process.
- **Cleaned up `sync pipes` output.**
This patch cleans up the syncing process's pretty output.
- **Respect `--nopretty` in `sync pipes`.**
This flag will only print JSON-encoded dictionaries for `sync pipes`. Tracebacks may still interfere without standard output, however.

### v1.2.5 – v1.2.7

- **`Venv` context managers do not deactivate previously activated venvs.**
Expand Down
11 changes: 11 additions & 0 deletions docs/mkdocs/news/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@

This is the current release cycle, so future features will be updated below.

### v1.2.8

- **Custom connectors may now have `register(pipe)` methods.**
Just like the module-level `register(pipe)` plugin function, custom connectors may also provide this function as a class member.
- **Print a traceback if `fetch(pipe)` breaks.**
A more verbose traceback is printed if a plugin breaks during the syncing process.
- **Cleaned up `sync pipes` output.**
This patch cleans up the syncing process's pretty output.
- **Respect `--nopretty` in `sync pipes`.**
This flag will only print JSON-encoded dictionaries for `sync pipes`. Tracebacks may still interfere without standard output, however.

### v1.2.5 – v1.2.7

- **`Venv` context managers do not deactivate previously activated venvs.**
Expand Down
2 changes: 1 addition & 1 deletion meerschaum/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def main(sysargs: list = None) -> None:
### Print success or failure message.
return_tuple = entry(sysargs)
rc = 0
if isinstance(return_tuple, tuple):
if isinstance(return_tuple, tuple) and '--nopretty' not in sysargs:
from meerschaum.utils.formatting import print_tuple
print_tuple(return_tuple, upper_padding=1)
rc = 0 if (return_tuple[0] is True) else 1
Expand Down
80 changes: 63 additions & 17 deletions meerschaum/actions/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def _pipes_lap(
min_seconds: int = 1,
mrsm_instance: Optional[str] = None,
timeout_seconds: Optional[int] = None,
nopretty: bool = False,
_progress: Optional['rich.progress.Progress'] = None,
**kw: Any
) -> Tuple[List[meerschaum.Pipe], List[meerschaum.Pipe]]:
Expand Down Expand Up @@ -128,7 +129,16 @@ def worker_fn():
return_tuple = sync_pipe(pipe)
results_dict[pipe] = return_tuple

print_tuple(return_tuple, _progress=_progress)
if not nopretty:
success, msg = return_tuple
msg = (
f"Finished syncing {pipe}:\n" if success
else f"Error while syncing {pipe}:\n"
) + msg + '\n'
print_tuple(
(success, msg),
_progress = _progress,
)
_checkpoint(_progress=_progress, _task=_task)
if _progress is not None:
nonlocal remaining_count
Expand Down Expand Up @@ -241,6 +251,7 @@ def _sync_pipes(
min_seconds: int = 1,
unblock: bool = False,
shell: bool = False,
nopretty: bool = False,
debug: bool = False,
**kw: Any
) -> SuccessTuple:
Expand All @@ -259,16 +270,20 @@ def _sync_pipes(
"""
from meerschaum.utils.debug import dprint
from meerschaum.utils.warnings import warn, info
from meerschaum.utils.formatting import UNICODE
from meerschaum.utils.formatting._shell import progress, live
from meerschaum.utils.formatting._shell import clear_screen, flush_with_newlines
from meerschaum.utils.misc import print_options
import contextlib
import time, sys
import time
import sys
import json
import asyncio
run = True
msg = ""
interrupt_warning_msg = "Syncing was interrupted due to a keyboard interrupt."
cooldown = 2 * (min_seconds + 1)
underline = '\u2015' if UNICODE else '-'
success = []
while run:
_progress = progress() if shell else None
Expand All @@ -283,6 +298,7 @@ def _sync_pipes(
_progress = _progress,
unblock = unblock,
debug = debug,
nopretty = nopretty,
**kw
)
except Exception as e:
Expand Down Expand Up @@ -310,28 +326,58 @@ def _sync_pipes(
lap_end = time.perf_counter()
print()

if success is not None and not loop and shell:

def get_options_to_print(
pipes_list: List['meerschaum.Pipe'],
include_msg: bool = True
) -> List[str]:
"""
Format the output strings.
"""
default_tuple = False, "No message returned."
options = []
for pipe in pipes_list:
result = results_dict.get(pipe, default_tuple)
if not isinstance(result, tuple):
result = default_tuple

option = (
str(pipe)
+ '\n'
+ ((underline * len(str(pipe))) if include_msg else '')
+ '\n'
+ (str(result[1]) if include_msg else '')
+ '\n\n'
) if not nopretty else (
json.dumps({
'pipe': pipe.meta,
'result': result,
})
)
options.append(option)

return options


if success is not None and not loop and shell and not nopretty:
clear_screen(debug=debug)
if fail is not None and len(fail) > 0:
print_options(
[str(p) + "\n"
+ (
results_dict[p][1] if isinstance(results_dict.get(p, None), tuple)
else "No message was returned."
) + "\n" for p in fail],
header = "Failed to sync pipes:"
)

if success is not None and len(success) > 0:
success_msg = "Successfully synced pipes:"
if unblock:
success_msg = "Successfully spawned threads for pipes:"
print_options([str(p) + "\n" for p in success], header=success_msg)
print_options(
get_options_to_print(success),
header = success_msg,
nopretty = nopretty,
)

if debug:
from meerschaum.utils.formatting import pprint
dprint("\n" + f"Return values for each pipe:")
pprint(results_dict)
if fail is not None and len(fail) > 0:
print_options(
get_options_to_print(fail),
header = 'Failed to sync pipes:',
nopretty = nopretty,
)

msg = (
f"It took {round(lap_end - lap_begin, 2)} seconds to sync " +
Expand Down
2 changes: 1 addition & 1 deletion meerschaum/config/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Specify the Meerschaum release version.
"""

__version__ = "1.2.7"
__version__ = "1.2.8"
11 changes: 7 additions & 4 deletions meerschaum/connectors/sql/_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ def sync_pipe(
from meerschaum.utils.debug import dprint
from meerschaum.utils.packages import import_pandas
from meerschaum.utils.misc import parse_df_datetimes
from meerschaum.utils.sql import get_update_queries
from meerschaum.utils.sql import get_update_queries, sql_item_name
from meerschaum import Pipe
import time
if df is None:
Expand Down Expand Up @@ -897,11 +897,14 @@ def sync_pipe(
if not success:
return success, stats['msg']
msg = (
f"It took {round(end-start, 2)} seconds to update {pipe} using method {stats['method']} "
+ f"and chunksize {stats['chunksize']}.\n"
+ f" Inserted {len(unseen_df)} rows, "
f"Inserted {len(unseen_df)}, "
+ f"updated {len(update_df) if update_df is not None else 0} rows."
)
if debug:
msg = msg[:-1] + (
f"\non table {sql_item_name(pipe.target, self.flavor)}\n"
+ f"in {round(end-start, 2)} seconds."
)
return success, msg


Expand Down
22 changes: 17 additions & 5 deletions meerschaum/core/Pipe/_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def register(
A `SuccessTuple` of success, message.
"""
from meerschaum.connectors import custom_types
from meerschaum.utils.formatting import get_console
import warnings
with warnings.catch_warnings():
warnings.simplefilter('ignore')
Expand All @@ -33,22 +35,32 @@ def register(
except Exception as e:
_conn = None

if _conn is not None and _conn.type == 'plugin' and _conn.register is not None:
params = self.connector.register(self)
if (
_conn is not None
and
(_conn.type == 'plugin' or _conn.type in custom_types)
and
getattr(_conn, 'register', None) is not None
):
try:
params = self.connector.register(self)
except Exception as e:
get_console().print_exception()
params = None
params = {} if params is None else params
if not isinstance(params, dict):
from meerschaum.utils.warnings import warn
warn(
f"Invalid parameters returned from `register()` in plugin {self.connector}:\n"
f"Invalid parameters returned from `register()` in connector {self.connector}:\n"
+ f"{params}"
)
else:
self.parameters = params

if not self.parameters:
cols = self.columns if self.columns else {'datetime': None, 'id': None}
self.parameters = {
'columns': self.columns,
'columns': cols,
}

return self.instance_connector.register_pipe(self, debug=debug)

19 changes: 12 additions & 7 deletions meerschaum/core/Pipe/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,14 @@ def sync(
from meerschaum.utils.warnings import warn, error
from meerschaum.connectors import custom_types
from meerschaum.plugins import Plugin
from meerschaum.utils.formatting import get_console
import datetime
import time
if (callback is not None or error_callback is not None) and blocking:
warn("Callback functions are only executed when blocking = False. Ignoring...")

_checkpoint(_total=2, **kw)

# if (
# not self.connector_keys.startswith('plugin:')
# and not self.get_columns('datetime', error=False)
# ):
# return False, f"Cannot sync {self} without a datetime column."

### NOTE: Setting begin to the sync time for Simple Sync.
### TODO: Add flag for specifying syncing method.
begin = _determine_begin(self, begin, debug=debug)
Expand Down Expand Up @@ -195,6 +190,7 @@ def _sync(
return return_tuple

except Exception as e:
get_console().print_exception()
msg = f"Failed to sync {p} with exception: '" + str(e) + "'"
if debug:
error(msg, silent=False)
Expand All @@ -219,9 +215,18 @@ def _sync(
if plugin is not None and deactivate_plugin_venv:
plugin.deactivate_venv(debug=debug)
except Exception as e:
get_console().print_exception(
suppress = [
'meerschaum/core/Pipe/_sync.py',
'meerschaum/core/Pipe/_fetch.py',
]
)
msg = f"Failed to fetch data from {p.connector}:\n {e}"
df = None

if df is None:
return False, f"No data was fetched for {p}."
return False, f"No data were fetched for {p}."

### TODO: Depreciate async?
if df is True:
return True, f"{p} is being synced in parallel."
Expand Down
16 changes: 11 additions & 5 deletions meerschaum/utils/formatting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def colored(text: str, *colors, as_rich_text: bool=False, **kw) -> Union[str, 'r

console = None
def get_console():
""" """
"""Get the rich console."""
global console
from meerschaum.utils.packages import import_rich, attempt_import
rich = import_rich()
Expand All @@ -188,6 +188,7 @@ def get_console():
console = None
return console


def print_tuple(
tup: tuple,
skip_common: bool = True,
Expand All @@ -197,14 +198,14 @@ def print_tuple(
_progress: Optional['rich.progress.Progress'] = None,
) -> None:
"""Print `meerschaum.utils.typing.SuccessTuple`."""
from meerschaum.config.static import _static_config
from meerschaum.config.static import STATIC_CONFIG
try:
status = 'success' if tup[0] else 'failure'
except TypeError:
status = 'failure'
tup = None, None

omit_messages = _static_config()['system']['success']['ignore']
omit_messages = STATIC_CONFIG['system']['success']['ignore']

do_print = True

Expand All @@ -221,9 +222,14 @@ def print_tuple(
status_config = get_config('formatting', status, patch=True)

msg = ' ' + status_config[CHARSET]['icon'] + ' ' + str(tup[1])
lines = msg.split('\n')
lines = [lines[0]] + [
((' ' + line if not line.startswith(' ') else line))
for line in lines[1:]
]
if ANSI:
msg = fill_ansi(highlight_pipes(msg), **status_config['ansi']['rich'])

lines[0] = fill_ansi(highlight_pipes(lines[0]), **status_config['ansi']['rich'])
msg = '\n'.join(lines)
msg = ('\n' * upper_padding) + msg + ('\n' * lower_padding)
print(msg)

Expand Down
1 change: 1 addition & 0 deletions meerschaum/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ def choices_docstring(action: str, globs : Optional[Dict[str, Any]] = None) -> s
options_str = options_str[:-2] + "]`"
return options_str


def print_options(
options: Optional[Dict[str, Any]] = None,
nopretty: bool = False,
Expand Down

0 comments on commit 4dbac54

Please sign in to comment.