Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep up with production doltr #1

Open
wants to merge 47 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
48ccd34
Fix dolt_push set_upstream (#36)
n8layman Dec 22, 2021
112acb5
Remove use of R 4.1+ lambda syntax
Mar 16, 2022
65035e4
Ignore .sqlhistory
May 13, 2022
77ed2f6
Ignore .sqlhistory
May 13, 2022
d3b6791
Kill server not responding to queries as invalid
May 13, 2022
55532cf
Add vignette
May 19, 2022
8d1b14e
Fix dolt_diffs()
May 19, 2022
bb068af
Add parquet export
May 19, 2022
5bcf02d
Add return value to query_as_of in read-table.R
n8layman Jun 9, 2022
cf1856f
Merge pull request #45 from ecohealthalliance/hotfix-1
n8layman Jun 9, 2022
c81c617
Fis server timeout
Jun 20, 2022
0c10c6c
Add delay between server process creation and reading
n8layman Jul 29, 2022
69e9d2f
Add diagnostic step to print process id after first read from proc
n8layman Jul 29, 2022
1256fbd
Add looped tryCatch. It will keep waiting until process id is availab…
n8layman Jul 29, 2022
dc246de
Update server timeout default to match main
n8layman Aug 1, 2022
be8905b
Fix net_write_timeout arg
n8layman Aug 1, 2022
88ef664
Change dolt_server_find to look for both 'running' and 'sleeping' pro…
n8layman Aug 1, 2022
cf6db57
Allow dolt_server_find() to find sleeping processes.
n8layman Aug 1, 2022
dd2c7f3
Remove sleep loop waiting for proc to spawn
n8layman Aug 1, 2022
ac0051a
Merge branch 'main' into hotfix-issue_48
n8layman Aug 1, 2022
69c7db8
Remove default log used in testing
n8layman Aug 1, 2022
e33cb68
Merge pull request #49 from ecohealthalliance/hotfix-issue_48
n8layman Aug 1, 2022
db6cd24
Deleted line apturing proc$as_ps_handle() getting ready for PR
n8layman Aug 1, 2022
a53fabd
Merge branch 'hotfix-issue_48' of https://github.com/ecohealthallianc…
n8layman Aug 1, 2022
10bf075
Merge pull request #51 from ecohealthalliance/hotfix-issue_48
n8layman Aug 1, 2022
e749e51
Add lockfile cleanup to dbDisconnect
n8layman Aug 2, 2022
6454505
Move unlink inside try
n8layman Aug 2, 2022
9286004
Try to kill process in a more elegant way that cleans up sql-lock.fil…
n8layman Aug 2, 2022
f7a9ad7
Switch to using ps::cwd() to identify p_handle directory rather that …
n8layman Aug 2, 2022
086f3ab
Fix missing paranthetical and change ps::cwd to ps::ps_cwd
n8layman Aug 2, 2022
b94afdd
Merge pull request #52 from ecohealthalliance/hotfix-issue_48
n8layman Aug 2, 2022
a07571e
Upate dbReadTable to stop adding space between as of hash and surroun…
n8layman Aug 4, 2022
036577c
Add hash_qualified implementation of 'as_of' argument to dbReadTable …
n8layman Aug 5, 2022
dca0945
Update dbListTable to be as_if friendly and add in dbGetTableType
n8layman Aug 5, 2022
02709ae
Change SELECT syntax on dolt procedures to new CALL format
n8layman Jan 9, 2023
e89589e
dolt_query uses dplyr's tbl which cannot handle dolt's new CALL synta…
n8layman Jan 9, 2023
6cccad8
Updated other instances of CALL to use new dolt_query execute = T syntax
n8layman Jan 9, 2023
3a85762
Collect doesn't apply to dbExecute results. Removed that option when …
n8layman Jan 9, 2023
03836df
Split out dolt_call
n8layman Jan 9, 2023
36ca730
Merge pull request #57 from ecohealthalliance/hotfix-issue_48
n8layman Jan 9, 2023
917432f
Merge pull request #63 from ecohealthalliance/hotfix/issue_41
n8layman Jan 23, 2023
11eb989
Fix as_of for views
n8layman Mar 1, 2023
8de34a5
Set the behavior of `as_of` in dbReadTable() to generate the view usi…
n8layman Mar 2, 2023
5291768
Remove diagnostic print statements
n8layman Mar 2, 2023
8103e0b
Previous method didn't actually work on testing and failed silently. …
n8layman Mar 2, 2023
ffb5bc6
Uncomment dbGetTableType
n8layman Jun 12, 2023
0ea27d4
Merge pull request #66 from ecohealthalliance/hotfix/as_of_bug
n8layman Jun 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ docs
tests/testthat/doltdb
inst/doc
dolt
.sqlhistory
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ VignetteBuilder:
Encoding: UTF-8
Language: en-US
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.1.2
RoxygenNote: 7.2.0
SystemRequirements: dolt
Collate:
'cli.R'
Expand Down
5 changes: 3 additions & 2 deletions R/cli.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ dolt_init <- function(dir = Sys.getenv("DOLT_DIR", "doltdb")) {

#' Export data from a dolt database
#' @param dir path to dolt database on-disk
#' @param format the export data format. One of `"sql"`, `"csv"`, or `"json"`
#' @param format the export data format. One of `"sql"`, `"csv"`, `"json"`, or
#' `"parquet"`
#' @param out the location on-disk for export. In the case of `"sql"`, format,
#' a single file path (default `doltdump.sql`), otherwise a directory for all
#' tables to be dumped as separate files (default "doltdump")
Expand All @@ -49,7 +50,7 @@ dolt_init <- function(dir = Sys.getenv("DOLT_DIR", "doltdb")) {
#' @importFrom R.utils getAbsolutePath
#' @return the path(s) of exported files
#' @export
dolt_dump <- function(format = c("sql", "csv", "json"),
dolt_dump <- function(format = c("sql", "csv", "json", "parquet"),
out = NULL,
overwrite = FALSE,
dir = Sys.getenv("DOLT_DIR", "doltdb")) {
Expand Down
6 changes: 4 additions & 2 deletions R/dolt-diffs.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
#'
#' @param table [character] the name of a table in the database
#' @param to commit to compare to
#' @param from commit to compare from
#' @inheritParams dolt_branches
#' @export
#' @rdname dolt-diffs
dolt_diffs <- function(table, to, conn = dolt(), collect = NULL, show_sql = NULL) {
dolt_diffs <- function(table, to, from, conn = dolt(), collect = NULL, show_sql = NULL) {
collect <- .collect(collect); show_sql <- .show_sql(show_sql)
query <- paste0("select * from dolt_commit_diff_", table, " WHERE to_commit = ", to)
query <- paste0("select * from dolt_commit_diff_", table,
" where to_commit='", to, "' and from_commit='", from, "'")
dolt_query(query, conn, collect, show_sql)
}

Expand Down
13 changes: 7 additions & 6 deletions R/dolt-local-connection.R
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ setMethod("dbDisconnect", "DoltLocalConnection", function(conn, ...) {

if (dbIsValid(conn) && ps_is_running(conn@server)) {
# On disconnection, kill the server only if it was started by doltr and no
# no other processes connect to it
# no other processes connect to it.

is_doltr_server <- isTRUE(ps_environ(conn@server)["R_DOLT"] == "1")
procs <- ps()
procs <- procs[procs$status == "running" & procs$pid != ps_pid(ps_handle()),]
procs <- procs[(procs$status == "running" | procs$status == "sleeping") & procs$pid != ps_pid(ps_handle()),]
procs <- procs[vapply(procs$ps_handle, function(x) {
conns <- try(ps_connections(x), silent = TRUE)
out <- !inherits(conns, "try-error") && nrow(conns) && conn@port %in% conns$rport
out <- !inherits(conns, "try-error") && nrow(conns) && conn@port %in% conns$lport
out
}, logical(1)),]
other_sessions <- as.logical(nrow(procs))
Expand All @@ -149,16 +149,17 @@ setMethod("dbDisconnect", "DoltLocalConnection", function(conn, ...) {
}
getMethod(dbDisconnect, "DoltConnection")(conn)

if (kill_server)
try(dkill(conn@server), silent = TRUE)
invisible(TRUE)
if (kill_server) {
try(dkill(conn@server), silent = T)
}
})

#' @export
#' @importFrom ps ps_is_running
#' @rdname dolt_local
setMethod("dbIsValid", "DoltLocalConnection", function(dbObj, ...) {
valid <- getMethod(dbIsValid, "MariaDBConnection")(dbObj) &&
class(try(dbGetQuery(dbObj, "SELECT 1"), silent = TRUE)) != "try-error" &&
ps_is_running(dbObj@server)
if (!valid && inherits(dbObj@server, "ps_handle"))
try(dkill(dbObj@server), silent = TRUE)
Expand Down
6 changes: 3 additions & 3 deletions R/dolt-nav.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ dolt_checkout <- function(branch, b = FALSE, start_point = NULL,
branch = sql_quote(branch, "'")
if (b) branch <- paste0(sql_quote("-b", "'"), ", ", branch)
if (!is.null(start_point)) branch <- paste0(branch, ", ", start_point)
query <- paste0("select dolt_checkout(", branch, ")")
dolt_query(query, conn, collect, show_sql)
invisible(dolt_state())
query <- paste0("CALL dolt_checkout(", branch, ")")
dolt_call(query, conn, show_sql)
dolt_state() # I don't think this needs to be invisible.
}

#' @export
Expand Down
24 changes: 14 additions & 10 deletions R/dolt-remote.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#' Work with dolt repository remotes
#'
#' @param remote the name of the remote. "origin" is used by default
#' @param remote_branch the name of the remote branch to use with set_upstream. Current local branch is used by default
#' @param ref the branch reference
#' @param set_upstream whether to set the remote branch reference to track
#' @param force whether to overwrite any conflicting history
Expand All @@ -10,17 +11,20 @@
#' @rdname dolt-remote
#' @family dolt-sql-commands
#' @importFrom dbplyr sql_quote
dolt_push <- function(remote = NULL, ref = NULL, set_upstream = FALSE,
force = FALSE, conn = dolt(), collect = NULL,
show_sql = NULL) {
dolt_push <- function(remote = NULL, remote_branch = NULL, ref = NULL,
set_upstream = FALSE, force = FALSE, conn = dolt(),
collect = NULL, show_sql = NULL) {
collect <- .collect(collect); show_sql <- .show_sql(show_sql)
args <- character(0)
if (set_upstream & is.null (remote)) remote = "origin"
if (set_upstream & is.null (remote_branch)) remote_branch = sub(".*/", "", dolt_state()$head_ref)
if (!is.null (remote)) args <- c(args, sql_quote(remote, "'"))
if (!is.null (remote_branch)) args <- c(args, sql_quote(remote_branch, "'"))
if (!is.null (ref)) args <- c(args, sql_quote(ref, "'"))
if (set_upstream) args <- c("'--set-upstream'", args)
if (set_upstream) args <- c("'--set-upstream' ", args)
if (force) args <- c(args, "'--force'")
query <- paste0("select dolt_push(", paste0(args, collapse = ", "), ")")
dolt_query(query, conn, collect, show_sql)
query <- paste0("call dolt_push(", paste0(args, collapse = ", "), ")")
dolt_call(query, conn, show_sql)
invisible(dolt_state())
}

Expand All @@ -35,8 +39,8 @@ dolt_pull <- function(remote = NULL, squash = FALSE, conn = dolt(),
args <- ""
if (!is.null(remote)) args <- c(args, sql_quote(remote, "'"))
if (squash) args <- c(args, "'--squash'")
query <- paste0("select dolt_pull(", paste0(args, collapse = ", "), ")")
dolt_query(query, conn, collect, show_sql)
query <- paste0("call dolt_pull(", paste0(args, collapse = ", "), ")")
dolt_call(query, conn, show_sql)
invisible(dolt_state())
}

Expand All @@ -50,8 +54,8 @@ dolt_fetch <- function(remote = NULL, ref = FALSE, force = FALSE,
if (!is.null(remote)) args <- c(args, sql_quote(remote, "'"))
if (!is.null(ref)) args <- c(args, sql_quote(remote, "'"))
if (force) args <- c(args, "'--force'")
query <- paste0("select dolt_fetch(", paste0(args, collapse = ", "), ")")
dolt_query(query, conn, collect, show_sql)
query <- paste0("call dolt_fetch(", paste0(args, collapse = ", "), ")")
dolt_call(query, conn, show_sql)
invisible(dolt_state())
}

Expand Down
16 changes: 9 additions & 7 deletions R/dolt-stage-commit.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ dolt_add <- function(tables = NULL, conn = dolt(), collect = NULL, show_sql = NU
tables <- "'--all'"
else
tables <- paste0("'", tables, "'", collapse = ", ")
query <- paste0("select dolt_add(", tables, ")");
dolt_query(query, conn, collect, show_sql)
query <- paste0("call dolt_add(", tables, ")");
dolt_call(query, conn, show_sql)
invisible(dolt_status())
}

Expand Down Expand Up @@ -44,9 +44,11 @@ dolt_commit <- function(all = TRUE, message = NULL, author = NULL, date = NULL,
if (!is.null(author)) args <- c(args, "'--author'", paste0("'", author, "'"))
if (!is.null(date)) args <- c(args, "'--date'", paste0("'", date, "'"))
if (allow_empty) args <- c(args, "'--allow-empty'")
query <- paste0("select dolt_commit(", paste0(args, collapse = ", "), ")");
dolt_query(query, conn, collect, show_sql)
invisible(dolt_state())
query <- paste0("call dolt_commit(", paste0(args, collapse = ", "), ")");
dolt_call(query, conn, show_sql)
state <- dolt_state()
message(paste0("head commit: ", state$head))
invisible(state)
}

#' @param hard Reset working and staged tables? If FALSE (default), a "soft"
Expand All @@ -62,7 +64,7 @@ dolt_reset <- function(hard = FALSE, tables = NULL, conn = dolt(),
args <- c()
if (!is.null(tables)) args <- paste0(sql_quote(tables, "'"), collapse = ", ")
if (hard) args <- c(sql_quote("--hard", "'"), args)
query <- paste0("select dolt_reset(", paste(args, collapse = ", ", ")"))
dolt_query(query, conn, collect, show_sql)
query <- paste0("call dolt_reset(", paste(args, collapse = ", ", ")"))
dolt_call(query, conn, show_sql)
invisible(dolt_state())
}
2 changes: 1 addition & 1 deletion R/dolt-types.R
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ dolt_text_type <- function(obj, min_varchar, max_varchar) {
#' @importFrom blob as_blob
dolt_blob_type <- function(obj) {
if (!all(vapply(obj, is.raw, logical(1)))) "Stop only lists of raw vectors (blobs) allowed"
nb <- max(vapply(obj, \(x) length(x), 1), 1, na.rm = TRUE)
nb <- max(vapply(obj, length, 1), 1, na.rm = TRUE)
if (nb <= 65535) {
return(structure("BLOB", max_size = 65535))
} else if (nb > 65535L && nb <= 16777215) {
Expand Down
11 changes: 10 additions & 1 deletion R/query.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ dolt_query <- function(query, conn = dolt(),
result
}


.collect <- function(collect) {
if (is.null(collect))
return(Sys.getboolenv("DOLT_COLLECT", TRUE))
Expand All @@ -23,3 +22,13 @@ dolt_query <- function(query, conn = dolt(),
else
return(show_sql)
}

dolt_call <- function(query, conn = dolt(),
show_sql = Sys.getboolenv("DOLT_VERBOSE", FALSE)) {

query <- sql(query)
if (show_sql) message(query)
result <- RMariaDB::dbExecute(conn, query)
result
}

56 changes: 41 additions & 15 deletions R/read-table.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ NULL
#' @export
#' @rdname dolt-read
setMethod("dbReadTable", c("DoltConnection", "character"),
function(conn, name, as_of = NULL, ..., row.names = FALSE, check.names = TRUE) {
function(conn, name,
as_of = NULL,
...,
row.names = FALSE,
check.names = TRUE,
show_sql = F) {

row.names <- compatRowNames(row.names)

if ((!is.logical(row.names) && !is.character(row.names)) || length(row.names) != 1L) {
Expand All @@ -39,11 +45,21 @@ setMethod("dbReadTable", c("DoltConnection", "character"),
stopc("`check.names` must be a logical scalar")
}

name <- dbQuoteIdentifier(conn, name)
query <- paste("SELECT * FROM ", name)
if (!is.null(as_of)) query <- query_as_of(query, as_of)
if (!is.null(as_of)) {
table_type <- dbGetTableType(conn, name, as_of)
if(!length(table_type)) warning("table does not exist at as_of commit")
name <- query_hash_qualified(conn, name, as_of)
} else {
name <- dbQuoteIdentifier(conn, name)
}

query <- paste("SELECT * FROM", name)

out <- dbGetQuery(conn, query,
if(!is.null(as_of)) query <- paste(query, "AS OF", DBI::dbQuoteString(dolt(), as_of))
if(show_sql) print(query)

out <- DBI::dbGetQuery(conn,
query,
row.names = row.names)

if (check.names) {
Expand All @@ -54,26 +70,36 @@ setMethod("dbReadTable", c("DoltConnection", "character"),
}
)

query_as_of <- function(query, as_of) {
query_as_of <- function(name, as_of) {
as_of <- tryCatch(
paste0("TIMESTAMP('", as.character(as.POSIXct(as_of)), "')"),
error = function(e) paste("'", as_of, "'")
error = function(e) paste0("'", as_of, "'")
)
query <- paste0(query, " AS OF ", as_of)
name <- paste0(name, " AS OF ", as_of)
name
}

query_hash_qualified <- function(conn, name, as_of) {
dbname <- dbGetQuery(conn, "select DATABASE()")[[1]]
name <- paste0("`", dbname, "/", as_of, "`.", name)
name
}

#' @export
#' @rdname dolt-read
setMethod("dbListTables", "DoltConnection", function(conn, as_of = NULL, ...) {
# DATABASE(): https://stackoverflow.com/a/8096574/946850
query <- paste0("SELECT table_name FROM INFORMATION_SCHEMA.tables\n",
"WHERE table_schema = DATABASE()")
if (!is.null(as_of)) query <- query_as_of(query, as_of)

dbGetQuery(conn, query)[[1]]

query <- 'show full tables'
if(!is.null(as_of)) query <- paste0(query, " as of '", as_of, "'")
out <- RMariaDB::dbGetQuery(conn, query)
out[[1]]
})

dbGetTableType <- function(conn, name, as_of = NULL) {
query <- 'show full tables'
if(!is.null(as_of)) query <- paste0(query, " as of '", as_of, "'")
out <- RMariaDB::dbGetQuery(conn, query)
out[out[,1] == name, 2]
}

#' @export
#' @inheritParams DBI::dbListObjects
Expand Down
30 changes: 24 additions & 6 deletions R/server.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#' to `std_out()`, if `NULL` (default), it is suppressed. Can also take
#' a filename. See [processx::run()].
#' @param timeout Defines the timeout, in seconds, used for connections
#' A value of `0` represents an infinite timeout (default `28800000`)
#' (default `28800000`)
#' @param query_parallelism Set the number of go routines spawned to handle each
#' query (default `2`)
#' @param max_connections Set the number of connections handled by the server
Expand All @@ -44,7 +44,7 @@ dolt_server <- function(dir = Sys.getenv("DOLT_DIR", "doltdb"),
read_only = FALSE,
log_level = "info",
log_out = NULL,
timeout = 0,
timeout = 28800000,
query_parallelism = 2,
max_connections = 100,
config_file = Sys.getenv("DOLT_CONFIG_FILE", "")) {
Expand Down Expand Up @@ -90,6 +90,7 @@ dolt_server <- function(dir = Sys.getenv("DOLT_DIR", "doltdb"),
stdout = log_out, stderr = "2>&1",
env = c("current", R_DOLT=1),
supervise = FALSE, cleanup = FALSE, cleanup_tree = FALSE)

p <- proc$as_ps_handle()
rm(proc)

Expand Down Expand Up @@ -131,7 +132,7 @@ setOldClass("dolt_server")
#' @importFrom ps ps ps_connections ps_cwd ps_environ ps_cmdline
dolt_server_find <- function(dir = NULL, port = NULL, doltr_only = FALSE) {
dp <- ps()
dp <- dp[dp$name == "dolt" & dp$status == "running",]
dp <- dp[dp$name == "dolt" & (dp$status == "running" | dp$status == "sleeping"),]
if (nrow(dp))
dp <- dp[vapply(dp$ps_handle, function(x) {
isTRUE(try(ps_cmdline(x)[2], silent = TRUE) == "sql-server")
Expand Down Expand Up @@ -163,18 +164,35 @@ dolt_server_kill <- function(dir = NULL, port = NULL, doltr_only = FALSE, verbos
dp <- dolt_server_find(dir, port, doltr_only)
lapply(dp$ps_handle, dkill)
if (verbose) message(nrow(dp), " processes killed")

# dolt now uses a lock file which needs to be cleaned up after process killed
# This might be a little complicated with multi_db = T
invisible(dp)
}

#' @importFrom ps signals ps_terminate ps_kill
dkill <- function(p = ps_handle) {
# We should prefer SIGTERM over SIGKILL when possible
# is.null(ps::signals()$SIGTERM)) asks if SIGTERM
# is NOT available.
if (is.null(ps::signals()$SIGTERM)) {
ps_terminate(p)
} else {
# If SIGTERM signal is NOT available kill the
# process and clean up the lock file manually.
ps_kill(p)
unlink(paste0(ps::ps_cwd(p), "/.dolt/sql-server.lock"))
} else {
ps_terminate(p) # sql-server.lock should be cleaned up automatically. Just in case though...
unlink(paste0(ps::ps_cwd(p), "/.dolt/sql-server.lock"))
}

invisible(NULL)
}


#
# is_dolt_server_valid <- function(srv) {
# dbConnect(dolt_remote(), dbname = basename(ps_cwd(srv)), username = username,
# password = password, host = host, port = ps_connections(srv)$lport,
# autocommit = autocommit, ...)
# }
#

5 changes: 5 additions & 0 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,8 @@ as_table <- function(schema, table) {
args <- args[!is.na(args) & args != ""]
do.call(Id, as.list(args))
}

compact <- function(x) {
is_empty <- vapply(x, function(x) length(x) == 0, logical(1))
x[!is_empty]
}
Loading