pip install ddcDatabases
pip install ddcDatabases[all]
pip install ddcDatabases[mssql]
pip install ddcDatabases[mysql]
pip install ddcDatabases[pgsql]
pip install ddcDatabases[oracle]
pip install ddcDatabases[mongodb]
- Parameters for all classes are declared as OPTIONAL falling back to .env file variables
- All examples are using db_utils.py
- By default, the MSSQL class will open a session to the database, but the engine can be available at
session.bind
- SYNC sessions defaults:
autoflush is True
expire_on_commit is True
echo is False
- ASYNC sessions defaults:
autoflush is True
expire_on_commit is False
echo is False
class Sqlite(
filepath: Optional[str] = None,
echo: Optional[bool] = None,
autoflush: Optional[bool] = None,
expire_on_commit: Optional[bool] = None,
extra_engine_args: Optional[dict] = None,
)
import sqlalchemy as sa
from ddcDatabases import DBUtils, Sqlite
with Sqlite() as session:
utils = DBUtils(session)
stmt = sa.select(TableModel).where(TableModel.id == 1)
results = utils.fetchall(stmt)
for row in results:
print(row)
class MSSQL(
host: Optional[str] = None,
port: Optional[int] = None,
user: Optional[str] = None,
password: Optional[str] = None,
database: Optional[str] = None,
schema: Optional[str] = None,
echo: Optional[bool] = None,
pool_size: Optional[int] = None,
max_overflow: Optional[int] = None,
autoflush: Optional[bool] = None,
expire_on_commit: Optional[bool] = None,
extra_engine_args: Optional[dict] = None,
)
import sqlalchemy as sa
from ddcDatabases import DBUtils, MSSQL
with MSSQL() as session:
stmt = sa.select(TableModel).where(TableModel.id == 1)
db_utils = DBUtils(session)
results = db_utils.fetchall(stmt)
for row in results:
print(row)
import sqlalchemy as sa
from ddcDatabases import DBUtilsAsync, MSSQL
async with MSSQL() as session:
stmt = sa.select(TableModel).where(TableModel.id == 1)
db_utils = DBUtilsAsync(session)
results = await db_utils.fetchall(stmt)
for row in results:
print(row)
class PostgreSQL(
host: Optional[str] = None,
port: Optional[int] = None,
user: Optional[str] = None,
password: Optional[str] = None,
database: Optional[str] = None,
echo: Optional[bool] = None,
autoflush: Optional[bool] = None,
expire_on_commit: Optional[bool] = None,
engine_args: Optional[dict] = None,
)
import sqlalchemy as sa
from ddcDatabases import DBUtils, PostgreSQL
with PostgreSQL() as session:
stmt = sa.select(TableModel).where(TableModel.id == 1)
db_utils = DBUtils(session)
results = db_utils.fetchall(stmt)
for row in results:
print(row)
import sqlalchemy as sa
from ddcDatabases import DBUtils, MySQL
with MySQL() as session:
stmt = sa.text("SELECT * FROM users")
db_utils = DBUtils(session)
results = db_utils.fetchall(stmt)
for row in results:
print(row)
import sqlalchemy as sa
from ddcDatabases import DBUtilsAsync, PostgreSQL
async with PostgreSQL() as session:
stmt = sa.select(TableModel).where(TableModel.id == 1)
db_utils = DBUtilsAsync(session)
results = await db_utils.fetchall(stmt)
for row in results:
print(row)
class Oracle(
host: Optional[str] = None,
port: Optional[int] = None,
user: Optional[str] = None,
password: Optional[str] = None,
servicename: Optional[str] = None,
echo: Optional[bool] = None,
autoflush: Optional[bool] = None,
expire_on_commit: Optional[bool] = None,
extra_engine_args: Optional[dict] = None,
credentials = {
"host": "127.0.0.1",
"user": "system",
"password": "oracle",
"servicename": "xe",
"echo": False,
}
import sqlalchemy as sa
from ddcDatabases import DBUtils, Oracle
with Oracle(**credentials) as session:
stmt = sa.text("SELECT * FROM system.help")
db_utils = DBUtils(session)
results = db_utils.fetchall(stmt)
for row in results:
print(row)
class PostgreSQL(
host: Optional[str] = None,
port: Optional[int] = None,
user: Optional[str] = None,
password: Optional[str] = None,
database: Optional[str] = None,
batch_size: Optional[int] = None,
limit: Optional[int] = None,
)
credentials = {
"host": "127.0.0.1",
"user": "admin",
"password": "admin",
"database": "admin",
}
from ddcDatabases.mongodb import MongoDB
from bson.objectid import ObjectId
with MongoDB(**credentials) as mongodb:
query = {"_id": ObjectId("6772cf60f27e7e068e9d8985")}
collection = "movies"
with mongodb.cursor(collection, query) as cursor:
for each in cursor:
print(each)
Using PostgreSQL as example
from ddcDatabases import PostgreSQL
with PostgreSQL() as session:
engine = session.bind
...
from ddcDatabases import PostgreSQL
async with PostgreSQL() as session:
engine = await session.bind
...
- Take an open session as parameter
- Can use SQLAlchemy statements
- Execute function can be used to update, insert or any SQLAlchemy.text
from ddcDatabases import DBUtils
db_utils = DBUtils(session)
db_utils.fetchall(stmt) # returns a list of RowMapping
db_utils.fetchvalue(stmt) # fetch a single value, returning as string
db_utils.insert(stmt) # insert into model table
db_utils.deleteall(model) # delete all records from model
db_utils.insertbulk(model, list[dict]) # insert records into model from a list of dicts
db_utils.execute(stmt) # this is the actual execute from session
poetry build -f wheel
poetry update --with test
poe tests
Released under the MIT License