From 3dd7eaad20c8708ecd2e24846be278da2ac7abdd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Saugat=20Pachhai=20=28=E0=A4=B8=E0=A5=8C=E0=A4=97=E0=A4=BE?= =?UTF-8?q?=E0=A4=A4=29?= Date: Thu, 19 Oct 2023 17:41:22 +0545 Subject: [PATCH] implement import-db command --- dvc/cli/parser.py | 2 + dvc/commands/imp_db.py | 87 ++++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 4 +- 3 files changed, 92 insertions(+), 1 deletion(-) create mode 100644 dvc/commands/imp_db.py diff --git a/dvc/cli/parser.py b/dvc/cli/parser.py index a71b9fb9467..b4abc707778 100644 --- a/dvc/cli/parser.py +++ b/dvc/cli/parser.py @@ -27,6 +27,7 @@ get_url, git_hook, imp, + imp_db, imp_url, init, install, @@ -65,6 +66,7 @@ data_sync, gc, imp, + imp_db, imp_url, config, checkout, diff --git a/dvc/commands/imp_db.py b/dvc/commands/imp_db.py new file mode 100644 index 00000000000..b061d9dfcf2 --- /dev/null +++ b/dvc/commands/imp_db.py @@ -0,0 +1,87 @@ +import argparse +import logging + +from dvc.cli import completion +from dvc.cli.command import CmdBase +from dvc.cli.utils import append_doc_link +from dvc.ui import ui + +logger = logging.getLogger(__name__) + + +class CmdImportDb(CmdBase): + def run(self): + from fal.dbt import FalDbt + from funcy import print_durations + + from dvc.repo.open_repo import _cached_clone + + clone = _cached_clone(self.args.url, self.args.rev) + faldbt = FalDbt(profiles_dir="~/.dbt", project_dir=clone) + + if not self.args.sql: + name = self.args.to_materialize + out = self.args.out or f"{name}.csv" + with print_durations(f"ref {name}"), ui.status(f"Downloading {name}"): + model = faldbt.ref(name) + else: + query = self.args.to_materialize + out = self.args.out or "result.csv" + with print_durations(f"execute_sql {query}"), ui.status( + "Executing sql query" + ): + model = faldbt.execute_sql(query) + + with print_durations(f"to_csv {out}"), ui.status(f"Saving to {out}"): + model.to_csv(out) + + ui.write(f"Saved file to {out}", styled=True) + + +def add_parser(subparsers, parent_parser): + IMPORT_HELP = ( + "Download file or directory tracked by DVC or by Git " + "into the workspace, and track it." + ) + + import_parser = subparsers.add_parser( + "import-db", + parents=[parent_parser], + description=append_doc_link(IMPORT_HELP, "import"), + help=IMPORT_HELP, + formatter_class=argparse.RawTextHelpFormatter, + ) + import_parser.add_argument( + "url", help="Location of DVC or Git repository to download from" + ) + import_parser.add_argument( + "to_materialize", help="Name of the dbt model or SQL query (if --sql)" + ) + import_parser.add_argument( + "--sql", + help="is a sql query", + action="store_true", + default=False, + ) + import_parser.add_argument( + "-o", + "--out", + nargs="?", + help="Destination path to download files to", + metavar="", + ).complete = completion.FILE + import_parser.add_argument( + "-f", + "--force", + action="store_true", + default=False, + help="Override destination file or folder if exists.", + ) + import_parser.add_argument( + "--rev", + nargs="?", + help="Git revision (e.g. SHA, branch, tag)", + metavar="", + ) + + import_parser.set_defaults(func=CmdImportDb) diff --git a/pyproject.toml b/pyproject.toml index 2915a034879..eef6ff46adb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,7 +71,8 @@ dependencies = [ [project.optional-dependencies] all = ["dvc[azure,gdrive,gs,hdfs,oss,s3,ssh,webdav,webhdfs]"] azure = ["dvc-azure>=2.23.0"] -dev = ["dvc[azure,gdrive,gs,hdfs,lint,oss,s3,ssh,tests,webdav,webhdfs]"] +db = ["dbt-core", "dbt-fal", "dbt-snowflake"] +dev = ["dvc[azure,gdrive,gs,hdfs,lint,oss,s3,ssh,tests,webdav,webhdfs,db]"] gdrive = ["dvc-gdrive==2.20"] gs = ["dvc-gs==2.22.1"] hdfs = ["dvc-hdfs==2.19"] @@ -214,6 +215,7 @@ ignore_missing_imports = true module = [ "celery.*", "configobj.*", + "fal.dbt.*", "dpath.*", "distro", "dvc_http",