Skip to content

Commit

Permalink
External MySQL abstraction
Browse files Browse the repository at this point in the history
Summary:
The rust/shed/sql crate provides a wrapper for C++ bindings of the Meta internal MySQL client. This client will not be coompatible with AWS (at least this compatibility will not be supported by MySQL team) and we won't be able to control network dependencies in it.

For the AWS Mononoke instance I'm creating a new abstraction that will be able to speak MySQL with any host and port. Since this new implementation will becloser to the OSS version, I am renaming the internal Meta MySQL client bining to `FBMySQL/FBConnection...`.

Reviewed By: mitrandir77

Differential Revision: D49962569

fbshipit-source-id: 096009b3f0427fe5c382f60341460152cbd4f012
  • Loading branch information
Clara Rull authored and facebook-github-bot committed Nov 3, 2023
1 parent 66c954e commit 49d206a
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 5 deletions.
1 change: 1 addition & 0 deletions shed/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ license = "MIT OR Apache-2.0"
[dependencies]
anyhow = "=1.0.72"
cloned = { version = "0.1.0", path = "../cloned" }
frunk = "0.4.2"
futures = { version = "0.3.28", features = ["async-await", "compat"] }
futures-util = "0.3.7"
futures_ext = { version = "0.1.0", path = "../futures_ext" }
Expand Down
13 changes: 11 additions & 2 deletions shed/sql/common/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ pub enum Connection {
/// Sqlite lets you use this crate with rusqlite connections such as in memory or on disk Sqlite
/// databases, both useful in case of testing or local sql db use cases.
Sqlite(sqlite::SqliteMultithreaded),
/// A variant used for the new Mysql client connection factory.
/// A variant used for Meta's internal Mysql client connection factory.
Mysql(mysql::Connection),
/// For use in external Mysql DBs
OssMysql(mysql::OssConnection),
}

impl From<sqlite::SqliteMultithreaded> for Connection {
Expand All @@ -108,11 +110,18 @@ impl From<mysql::Connection> for Connection {
}
}

impl From<mysql::OssConnection> for Connection {
fn from(conn: mysql::OssConnection) -> Self {
Connection::OssMysql(conn)
}
}

impl Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Connection::Sqlite(..) => write!(f, "Sqlite"),
Connection::Mysql(..) => write!(f, "Mysql client"),
Connection::Mysql(..) => write!(f, "Meta internal Mysql client"),
Connection::OssMysql(..) => write!(f, "AWS compatible Mysql client"),
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions shed/sql/common/mysql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub use facebook::MysqlError;
#[cfg(fbcode_build)]
pub use facebook::OptionalTryFromRowField;
#[cfg(fbcode_build)]
pub use facebook::OssConnection;
#[cfg(fbcode_build)]
pub use facebook::RowField;
#[cfg(fbcode_build)]
pub use facebook::Transaction;
Expand All @@ -43,6 +45,8 @@ pub use mysql_stub::MysqlError;
#[cfg(not(fbcode_build))]
pub use mysql_stub::OptionalTryFromRowField;
#[cfg(not(fbcode_build))]
pub use mysql_stub::OssConnection;
#[cfg(not(fbcode_build))]
pub use mysql_stub::RowField;
#[cfg(not(fbcode_build))]
pub use mysql_stub::Transaction;
Expand Down
49 changes: 49 additions & 0 deletions shed/sql/common/mysql/mysql_stub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;

use mysql_async::Conn as MysqlConnection;
use mysql_async::Pool;
use mysql_async::QueryResult;
use mysql_async::TextProtocol;
use thiserror::Error;

use crate::mysql::ConnectionStats;
use crate::mysql::IsolationLevel;
use crate::mysql::WriteResult;

Expand Down Expand Up @@ -47,6 +53,15 @@ impl From<ValueError> for MysqlError {
#[derive(Clone)]
pub struct Connection;

/// Connection object.
#[derive(Clone)]
pub struct OssConnection {
/// Connection pool
pub pool: Pool,
/// Stats struct for logging performance
pub stats: Arc<ConnectionStats>,
}

/// Transaction result object.
#[allow(dead_code)]
pub struct TransactionResult<T> {
Expand Down Expand Up @@ -108,6 +123,40 @@ impl Connection {
}
}

unsafe impl Send for OssConnection {}

impl OssConnection {
/// Checks out a connection from the pool while collecting stats
pub async fn get_conn_counted(
_pool: Pool,
_stats: &ConnectionStats,
) -> Result<MysqlConnection, mysql_async::Error> {
unimplemented!("This is a stub");
}

/// Performs a given query and returns the result as a vector of rows.
pub async fn read_query<'a>(
&self,
_conn: &'a mut MysqlConnection,
_query: &'a str,
) -> Result<QueryResult<'a, 'a, TextProtocol>, MysqlError> {
unimplemented!("This is a stub");
}

/// Performs a given query and returns the write result.
pub async fn write_query(&self, _query: String) -> Result<WriteResult, MysqlError> {
unimplemented!("This is a stub");
}

/// Begins trasaction and returns Transaction object.
pub async fn begin_transaction(
&self,
_tx_opts: mysql_async::TxOpts,
) -> Result<mysql_async::Transaction<'static>, MysqlError> {
unimplemented!("This is a stub");
}
}

/// Transaction object.
pub struct Transaction;

Expand Down
20 changes: 18 additions & 2 deletions shed/sql/common/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ pub enum Transaction {
/// When a Sqlite transaction is dropped a "rollback" is performed, so one should always make
/// sure to call "commit" if they want to persist the transation.
Sqlite(Option<SqliteConnectionGuard>),
/// A variant used for the new Mysql client connection.
/// A variant used for the internal Mysql client connection.
Mysql(Option<mysql::Transaction>),
/// A variant used for the external Mysql client connection.
OssMysql(Option<mysql_async::Transaction<'static>>),
}

impl Transaction {
Expand All @@ -83,6 +85,12 @@ impl Transaction {
let transaction = conn.begin_transaction().map_err(Error::from).await?;
Ok(Transaction::Mysql(Some(transaction)))
}
super::Connection::OssMysql(conn) => {
let transaction = conn
.begin_transaction(mysql_async::TxOpts::default())
.await?;
Ok(Transaction::OssMysql(Some(transaction)))
}
}
}

Expand All @@ -105,6 +113,10 @@ impl Transaction {
let tr = tr.take().expect("Called commit after drop");
Ok(tr.commit().await?)
}
Transaction::OssMysql(ref mut tr) => {
let tr = tr.take().expect("Called rollback after drop");
Ok(tr.rollback().await?)
}
}
}

Expand All @@ -117,6 +129,10 @@ impl Transaction {
let tr = tr.take().expect("Called rollback after drop");
Ok(tr.rollback().await?)
}
Transaction::OssMysql(ref mut tr) => {
let tr = tr.take().expect("Called rollback after drop");
Ok(tr.rollback().await?)
}
}
}
}
Expand All @@ -136,7 +152,7 @@ impl Drop for Transaction {
panic!("Rollback on drop of Sqlite connection has failed: {err:#?}");
}
}
Transaction::Mysql(_) => {}
Transaction::Mysql(_) | Transaction::OssMysql(_) => {}
}
}
}
89 changes: 88 additions & 1 deletion shed/sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ mod tests;

pub use anyhow;
pub use cloned;
pub use frunk::HList;
pub use futures;
pub use futures_ext;
pub use futures_util;
Expand All @@ -78,6 +79,7 @@ use rusqlite::types::ValueRef as SqliteValueRef;
use rusqlite::Result as SqliteResult;
pub use sql_common;
pub use sql_common::mysql;
pub use sql_common::mysql::OssConnection;
pub use sql_common::sqlite;
pub use sql_common::transaction::Transaction;
pub use sql_common::Connection;
Expand Down Expand Up @@ -307,6 +309,7 @@ macro_rules! _query_common {
// Some users of queries! have redefined Result
use std::result::Result;

use $crate::anyhow::anyhow;
use $crate::anyhow::Context;
use $crate::anyhow::Error;
use $crate::cloned::cloned;
Expand All @@ -320,10 +323,12 @@ macro_rules! _query_common {
use $crate::rusqlite::Result as SqliteResult;
use $crate::rusqlite::Row as SqliteRow;
use $crate::rusqlite::Statement as SqliteStatement;
use $crate::sql_common::mysql::OssConnection;
use $crate::sqlite::SqliteConnectionGuard;
use $crate::sqlite::SqliteMultithreaded;
use $crate::sqlite::SqliteQueryType;
use $crate::Connection;
use $crate::HList;
use $crate::Transaction;
use $crate::ValueWrapper;

Expand Down Expand Up @@ -354,14 +359,47 @@ macro_rules! _read_query_impl {
let query = mysql_query($( $pname, )* $( $lname, )*);
conn.read_query(query).map_err(Error::from).await
}
Connection::OssMysql(conn) => {
let query = mysql_query($( $pname, )* $( $lname, )*);

let mut con = OssConnection::get_conn_counted(conn.pool.clone(), &conn.stats).await?;
let mut res = conn.read_query(&mut con, &query).map_err(Error::from).await?;

let result = res
.map( |row| mysql_async_row_to_tuple(row))
.await?
.into_iter()
.collect::<Result<Vec<($( $rtype, )*)>, Error>>()?;


Ok(result)
}
}
}

fn mysql_async_row_to_tuple(row: $crate::mysql_async::Row) -> Result<($( $rtype, )*), Error> {
#[allow(clippy::eval_order_dependence)]
let mut idx = 0;
let res = (
$({
let res: $crate::mysql_async::Value = row.get(idx).ok_or($crate::anyhow::anyhow!("Failed to parse idx"))?;
idx += 1;
<$rtype as FromValue>::from_value_opt(res)
.unwrap_or_else(|err| {
panic!("Failed to parse `{}`: {}", stringify!($rtype), err)
})
},)*
);
// suppress unused_assignments warning
let _ = idx;
Ok(res)
}

async fn query_internal_with_transaction(
mut transaction: Transaction,
$( $pname: & $ptype, )*
$( $lname: & [ $ltype ], )*
) -> Result<(Transaction, Vec<($( $rtype, )*)>), Error> {
) -> Result<(Transaction, Vec<($( $rtype, )*)>), Error>{
match transaction {
Transaction::Sqlite(ref mut con) => {
let con = con
Expand All @@ -381,6 +419,20 @@ macro_rules! _read_query_impl {
let result = tr.read_query(query).map_err(Error::from).await?;
Ok((Transaction::Mysql(Some(tr)), result))
}
Transaction::OssMysql(ref mut transaction) => {
let query = mysql_query($( $pname, )* $( $lname, )*);

let mut tr = transaction.take().expect("should be Some before transaction ended");
let mut query_result = tr.query_iter(query).map_err(Error::from).await?;
let result = query_result
.map(
|row| mysql_async_row_to_tuple(row)
)
.await?
.into_iter()
.collect::<Result<Vec<($( $rtype, )*)>, Error>>()?;
Ok((Transaction::OssMysql(Some(tr)), result))
}
}
}

Expand Down Expand Up @@ -516,6 +568,11 @@ macro_rules! _write_query_impl {
let res = conn.write_query(query).map_err(Error::from).await?;
Ok(res.into())
}
Connection::OssMysql(conn)=> {
let query = mysql_query(values, $( $pname ),*);
let res = conn.write_query(query).map_err(Error::from).await?;
Ok(res.into())
},
}
}

Expand Down Expand Up @@ -548,6 +605,20 @@ macro_rules! _write_query_impl {
let result = tr.write_query(query).map_err(Error::from).await?;
Ok((Transaction::Mysql(Some(tr)), result.into()))
},
Transaction::OssMysql(ref mut transaction)=>{
let query = mysql_query(values, $( $pname ),*);
let mut tr = transaction.take().expect("should be Some before transaction ended");

let query_result = tr.query_iter(query).await?;

let last_insert_id = query_result.last_insert_id();
let rows_affected = query_result.affected_rows();

let result = WriteResult::new(last_insert_id, rows_affected);

Ok((Transaction::OssMysql(Some(tr)), result.into()))

},
}
}

Expand Down Expand Up @@ -692,6 +763,11 @@ macro_rules! _write_query_impl {
let res = conn.write_query(query).map_err(Error::from).await?;
Ok(res.into())
}
Connection::OssMysql(conn) => {
let query = mysql_query($( $pname, )* $( $lname, )*);
let res = conn.write_query(query).map_err(Error::from).await?;
Ok(res.into())
},
}
}

Expand Down Expand Up @@ -719,6 +795,17 @@ macro_rules! _write_query_impl {
let result = tr.write_query(query).map_err(Error::from).await?;
Ok((Transaction::Mysql(Some(tr)), result.into()))
},
Transaction::OssMysql(ref mut transaction) => {
let query = mysql_query($( $pname, )* $( $lname, )*);
let mut tr = transaction.take()
.expect("should be Some before transaction ended");
let query_result = tr.query_iter(query).await?;

let last_insert_id = query_result.last_insert_id();
let rows_affected = query_result.affected_rows();
let result = WriteResult::new(last_insert_id, rows_affected);
Ok((Transaction::OssMysql(Some(tr)), result))
}
}
}

Expand Down

0 comments on commit 49d206a

Please sign in to comment.