Skip to content

Commit

Permalink
implement import-db command (#10040)
Browse files Browse the repository at this point in the history
* implement import-db command

* generate .dvc file

* update

* fix tests

* pin dbt-fal

* try to run on ci

* use os.chdir

* fix

* revert

* require dbt-core>=1.5

* add db config schema

* remove platformdirs constraints

* use logger.getChild

* improve error messaging

* rename cli flag --export-format to --output-format

* use feature flag for db config

* hide command from dvc commands list

* refactor dependency._get

* remove todo

* refactor dependency._get

* fix dbt installation check
  • Loading branch information
skshetry authored Nov 21, 2023
1 parent 2b3b15f commit fd7c891
Show file tree
Hide file tree
Showing 19 changed files with 831 additions and 29 deletions.
2 changes: 2 additions & 0 deletions dvc/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
get_url,
git_hook,
imp,
imp_db,
imp_url,
init,
install,
Expand Down Expand Up @@ -65,6 +66,7 @@
data_sync,
gc,
imp,
imp_db,
imp_url,
config,
checkout,
Expand Down
90 changes: 90 additions & 0 deletions dvc/commands/imp_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import argparse

from dvc.cli import completion
from dvc.cli.command import CmdBase
from dvc.cli.utils import append_doc_link
from dvc.log import logger

logger = logger.getChild(__name__)


class CmdImportDb(CmdBase):
def run(self):
if not (self.args.sql or self.args.model):
raise argparse.ArgumentTypeError("Either of --sql or --model is required.")

self.repo.imp_db(
url=self.args.url,
rev=self.args.rev,
project_dir=self.args.project_dir,
sql=self.args.sql,
model=self.args.model,
profile=self.args.profile,
target=self.args.target,
output_format=self.args.output_format,
out=self.args.out,
force=self.args.force,
)
return 0


def add_parser(subparsers, parent_parser):
IMPORT_HELP = (
"Download file or directory tracked by DVC or by Git "
"into the workspace, and track it."
)

import_parser = subparsers.add_parser(
"import-db",
parents=[parent_parser],
description=append_doc_link(IMPORT_HELP, "import"),
add_help=False,
)
import_parser.add_argument(
"--url", help="Location of DVC or Git repository to download from"
)
import_parser.add_argument(
"--rev",
nargs="?",
help="Git revision (e.g. SHA, branch, tag)",
metavar="<commit>",
)
import_parser.add_argument(
"--project-dir", nargs="?", help="Subdirectory to the dbt project location"
)

group = import_parser.add_mutually_exclusive_group()
group.add_argument(
"--sql",
help="SQL query",
)
group.add_argument(
"--model",
help="Model name to download",
)
import_parser.add_argument("--profile", help="Profile to use")
import_parser.add_argument("--target", help="Target to use")
import_parser.add_argument(
"--output-format",
default="csv",
const="csv",
nargs="?",
choices=["csv", "json"],
help="Export format",
)
import_parser.add_argument(
"-o",
"--out",
nargs="?",
help="Destination path to download files to",
metavar="<path>",
).complete = completion.FILE
import_parser.add_argument(
"-f",
"--force",
action="store_true",
default=False,
help="Override destination file or folder if exists.",
)

import_parser.set_defaults(func=CmdImportDb)
2 changes: 2 additions & 0 deletions dvc/config_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ def __call__(self, data):
"feature": FeatureSchema(
{
Optional("machine", default=False): Bool,
"db_profile": str,
"db_target": str,
},
),
"plots": {
Expand Down
23 changes: 17 additions & 6 deletions dvc/dependency/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dvc.output import ARTIFACT_SCHEMA, DIR_FILES_SCHEMA, Output

from .base import Dependency
from .db import DB_SCHEMA, PARAM_DB, DbDependency, DbtDependency
from .param import ParamsDependency
from .repo import RepoDependency

Expand All @@ -14,20 +15,28 @@
SCHEMA: Mapping[str, Any] = {
**ARTIFACT_SCHEMA,
**RepoDependency.REPO_SCHEMA,
**DB_SCHEMA,
Output.PARAM_FILES: [DIR_FILES_SCHEMA],
Output.PARAM_FS_CONFIG: dict,
}


def _get(stage, p, info, **kwargs):
if info and info.get(RepoDependency.PARAM_REPO):
repo = info.pop(RepoDependency.PARAM_REPO)
return RepoDependency(repo, stage, p, info)
d = info or {}
db = d.get(PARAM_DB, {})
params = d.pop(ParamsDependency.PARAM_PARAMS, None)
repo = d.pop(RepoDependency.PARAM_REPO, None)

if info and info.get(ParamsDependency.PARAM_PARAMS):
params = info.pop(ParamsDependency.PARAM_PARAMS)
if params:
return ParamsDependency(stage, p, params)
if DbDependency.PARAM_QUERY in db:
return DbDependency(stage, info)
if db:
return DbtDependency(repo, stage, info)

assert p
if repo:
return RepoDependency(repo, stage, p, info)
return Dependency(stage, p, info, **kwargs)


Expand All @@ -44,9 +53,11 @@ def loadd_from(stage, d_list):
return ret


def loads_from(stage, s_list, erepo=None, fs_config=None):
def loads_from(stage, s_list, erepo=None, fs_config=None, db=None):
assert isinstance(s_list, list)
info = {RepoDependency.PARAM_REPO: erepo} if erepo else {}
if db:
info.update({"db": db})
return [
_get(
stage,
Expand Down
Loading

0 comments on commit fd7c891

Please sign in to comment.