Skip to content

Commit

Permalink
refactor: check pgcatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
Weijun-H committed Oct 16, 2024
1 parent 440ad81 commit 9980894
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 73 deletions.
24 changes: 0 additions & 24 deletions src/hooks/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,30 +116,6 @@ fn get_postgres_search_path() -> Vec<String> {
schema_vec
}

pub fn get_postgres_current_schema() -> String {
let active_schemas =
unsafe { PgList::<pg_sys::Oid>::from_pg(pg_sys::fetch_search_path(false)) };

let schema_oid = active_schemas.get_oid(0).unwrap();

let mut current_schema = String::new();
let tuple = unsafe {
pg_sys::SearchSysCache1(
pg_sys::SysCacheIdentifier::NAMESPACEOID as i32,
schema_oid.into_datum().unwrap(),
)
};

if !tuple.is_null() {
let pg_namespace = unsafe { pg_sys::GETSTRUCT(tuple) as pg_sys::Form_pg_namespace };
let name = pg_sys::name_data_to_str(unsafe { &(*pg_namespace).nspname });
current_schema = name.to_string();

unsafe { pg_sys::ReleaseSysCache(tuple) };
}
current_schema
}

pub fn is_duckdb_query(relations: &[PgRelation]) -> bool {
!relations.is_empty()
&& relations.iter().all(|pg_relation| {
Expand Down
4 changes: 3 additions & 1 deletion src/hooks/utility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ pub async fn process_utility_hook(
pstmt.utilityStmt as *mut pg_sys::ExplainStmt,
dest.as_ptr(),
)?,
pg_sys::NodeTag::T_ViewStmt => view_query(query_string)?,
pg_sys::NodeTag::T_ViewStmt => {
view_query(query_string, pstmt.utilityStmt as *mut pg_sys::ViewStmt)?
}
_ => bail!("unexpected statement type in utility hook"),
};

Expand Down
132 changes: 84 additions & 48 deletions src/hooks/utility/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,57 +15,93 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use anyhow::{bail, Result};
use sqlparser::{dialect::PostgreSqlDialect, parser::Parser};
use std::ops::ControlFlow;
use anyhow::Result;
use pg_sys::{get_relname_relid, RangeVarGetCreationNamespace};
use std::ffi::CStr;

use pgrx::*;
use sqlparser::ast::visit_relations;

use crate::duckdb::connection::{execute, view_exists};

use super::{get_postgres_current_schema, set_search_path_by_pg};

pub fn view_query(query_string: &core::ffi::CStr) -> Result<bool> {
// Use the current scheme if the schema is not provided in the query.
let current_schema = get_postgres_current_schema();
// Set DuckDB search path according search path in Postgres
set_search_path_by_pg()?;

let dialect = PostgreSqlDialect {};
let statements = Parser::parse_sql(&dialect, query_string.to_str()?)?;
// visit statements, capturing relations (table names)
let mut visited = vec![];

visit_relations(&statements, |relation| {
visited.push(relation.clone());
ControlFlow::<()>::Continue(())
});

for relation in visited.iter() {
let (schema_name, relation_name) = if relation.0.len() == 1 {
(current_schema.clone(), relation.0[0].to_string())
} else if relation.0.len() == 2 {
(relation.0[0].to_string(), relation.0[1].to_string())
} else if relation.0.len() == 3 {
// pg_analytics does not create view with database name now
error!(
"pg_analytics does not support creating view with database name: {}",
relation.0[0].to_string()
);
} else {
bail!("unexpected relation name: {:?}", relation.0);
};

// If the table does not exist in DuckDB, do not push down the query to DuckDB
if !view_exists(&relation_name, &schema_name)? {
fallback_warning!(format!(
"{schema_name}.{relation_name} does not exist in DuckDB"
));
return Ok(true);

use crate::{duckdb::connection::execute, hooks::query::is_duckdb_query};

use super::set_search_path_by_pg;

pub fn view_query(query_string: &core::ffi::CStr, stmt: *mut pg_sys::ViewStmt) -> Result<bool> {
if analyze_query(stmt)? {
// Push down the view creation query to DuckDB
set_search_path_by_pg()?;
execute(query_string.to_str()?, [])?;
}
Ok(true)
}

fn analyze_query(stmt: *mut pg_sys::ViewStmt) -> Result<bool> {
let query = unsafe { (*stmt).query as *mut pg_sys::SelectStmt };
let from_clause = unsafe { (*query).fromClause };

analyze_from_clause(from_clause)
}

fn analyze_from_clause(from_clause: *mut pg_sys::List) -> Result<bool> {
unsafe {
let elements = (*from_clause).elements;
for i in 0..(*from_clause).length {
let element = (*elements.offset(i as isize)).ptr_value as *mut pg_sys::Node;

match (*element).type_ {
pg_sys::NodeTag::T_RangeVar => {
return analyze_range_var(element as *mut pg_sys::RangeVar);
}
pg_sys::NodeTag::T_JoinExpr => {
return analyze_join_expr(element as *mut pg_sys::JoinExpr);
}
_ => continue,
}
}
}
Ok(false)
}

fn analyze_range_var(rv: *mut pg_sys::RangeVar) -> Result<bool> {
let pg_relation = unsafe {
let schema_id = RangeVarGetCreationNamespace(rv);
let relid = get_relname_relid((*rv).relname, schema_id);

pgrx::warning!(
"Relation table name: {:?}",
CStr::from_ptr((*rv).relname).to_str()
);

let relation = pg_sys::RelationIdGetRelation(relid);
PgRelation::from_pg_owned(relation)
};

Ok(is_duckdb_query(&[pg_relation]))
}

fn analyze_join_expr(join_expr: *mut pg_sys::JoinExpr) -> Result<bool> {
pgrx::warning!("Analyzing JoinExpr");

unsafe {
let ltree = (*join_expr).larg;

Check failure on line 85 in src/hooks/utility/view.rs

View workflow job for this annotation

GitHub Actions / Check Typo using codespell

larg ==> large
let rtree = (*join_expr).rarg;

Ok(analyze_tree(ltree)? && analyze_tree(rtree)?)
}
}

fn analyze_tree(mut tree: *mut pg_sys::Node) -> Result<bool> {
while !tree.is_null() {
unsafe {
match (*tree).type_ {
pg_sys::NodeTag::T_RangeVar => {
return analyze_range_var(tree as *mut pg_sys::RangeVar);
}
pg_sys::NodeTag::T_JoinExpr => {
tree = (*(tree as *mut pg_sys::JoinExpr)).larg;

Check failure on line 100 in src/hooks/utility/view.rs

View workflow job for this annotation

GitHub Actions / Check Typo using codespell

larg ==> large
}
_ => break,
}
}
}
// Push down the view creation query to DuckDB
execute(query_string.to_str()?, [])?;
Ok(true)
}

0 comments on commit 9980894

Please sign in to comment.