Skip to content

ddc/ddcDatabases

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

36 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Databases Session Connections and ORM Queries

Donate License PyPi PyPI Downloads Code style: black Build Status Python

Install only sqlite

pip install ddcDatabases

Install All databases

pip install ddcDatabases[all]

Install MSSQL

pip install ddcDatabases[mssql]

Install MYSQL

pip install ddcDatabases[mysql]

Install PostgreSQL

pip install ddcDatabases[pgsql]

Install Oracle

pip install ddcDatabases[oracle]

Install MONGODB

pip install ddcDatabases[mongodb]

Databases

  • Parameters for all classes are declared as OPTIONAL falling back to .env file variables
  • All examples are using db_utils.py
  • By default, the MSSQL class will open a session to the database, but the engine can be available at session.bind
  • SYNC sessions defaults:
    • autoflush is True
    • expire_on_commit is True
    • echo is False
  • ASYNC sessions defaults:
    • autoflush is True
    • expire_on_commit is False
    • echo is False

SQLITE

class Sqlite(
    filepath: Optional[str] = None,
    echo: Optional[bool] = None,
    autoflush: Optional[bool] = None,
    expire_on_commit: Optional[bool] = None,
    extra_engine_args: Optional[dict] = None,
)

Session

import sqlalchemy as sa
from ddcDatabases import DBUtils, Sqlite
with Sqlite() as session:
    utils = DBUtils(session)
    stmt = sa.select(TableModel).where(TableModel.id == 1)
    results = utils.fetchall(stmt)
    for row in results:
        print(row)

MSSQL

class MSSQL(        
    host: Optional[str] = None,
    port: Optional[int] = None,
    user: Optional[str] = None,
    password: Optional[str] = None,
    database: Optional[str] = None,
    schema: Optional[str] = None,
    echo: Optional[bool] = None,
    pool_size: Optional[int] = None,
    max_overflow: Optional[int] = None,
    autoflush: Optional[bool] = None,
    expire_on_commit: Optional[bool] = None,
    extra_engine_args: Optional[dict] = None,
)

Sync Example

import sqlalchemy as sa
from ddcDatabases import DBUtils, MSSQL
with MSSQL() as session:
    stmt = sa.select(TableModel).where(TableModel.id == 1)
    db_utils = DBUtils(session)
    results = db_utils.fetchall(stmt)
    for row in results:
        print(row)

Async Example

import sqlalchemy as sa
from ddcDatabases import DBUtilsAsync, MSSQL
async with MSSQL() as session:
    stmt = sa.select(TableModel).where(TableModel.id == 1)
    db_utils = DBUtilsAsync(session)
    results = await db_utils.fetchall(stmt)
    for row in results:
        print(row)

PostgreSQL or MySQL

class PostgreSQL(
    host: Optional[str] = None,
    port: Optional[int] = None,
    user: Optional[str] = None,
    password: Optional[str] = None,
    database: Optional[str] = None,
    echo: Optional[bool] = None,
    autoflush: Optional[bool] = None,
    expire_on_commit: Optional[bool] = None,
    engine_args: Optional[dict] = None,
)

Sync Examples

import sqlalchemy as sa
from ddcDatabases import DBUtils, PostgreSQL
with PostgreSQL() as session:
    stmt = sa.select(TableModel).where(TableModel.id == 1)
    db_utils = DBUtils(session)
    results = db_utils.fetchall(stmt)
    for row in results:
        print(row)
import sqlalchemy as sa
from ddcDatabases import DBUtils, MySQL
with MySQL() as session:
    stmt = sa.text("SELECT * FROM users")
    db_utils = DBUtils(session)
    results = db_utils.fetchall(stmt)
    for row in results:
        print(row)

Async Example

import sqlalchemy as sa
from ddcDatabases import DBUtilsAsync, PostgreSQL
async with PostgreSQL() as session:
    stmt = sa.select(TableModel).where(TableModel.id == 1)
    db_utils = DBUtilsAsync(session)
    results = await db_utils.fetchall(stmt)
    for row in results:
        print(row)

Oracle

class Oracle(
    host: Optional[str] = None,
    port: Optional[int] = None,
    user: Optional[str] = None,
    password: Optional[str] = None,
    servicename: Optional[str] = None,
    echo: Optional[bool] = None,
    autoflush: Optional[bool] = None,
    expire_on_commit: Optional[bool] = None,
    extra_engine_args: Optional[dict] = None,

Sync Example using arguments instead of .env file

credentials = {
    "host": "127.0.0.1",
    "user": "system",
    "password": "oracle",
    "servicename": "xe",
    "echo": False,
}

import sqlalchemy as sa
from ddcDatabases import DBUtils, Oracle
with Oracle(**credentials) as session:
    stmt = sa.text("SELECT * FROM system.help")
    db_utils = DBUtils(session)
    results = db_utils.fetchall(stmt)
    for row in results:
        print(row)

MongoDB

class PostgreSQL(
    host: Optional[str] = None,
    port: Optional[int] = None,
    user: Optional[str] = None,
    password: Optional[str] = None,
    database: Optional[str] = None,
    batch_size: Optional[int] = None,
    limit: Optional[int] = None,
)

Sync Example using arguments instead of .env file

credentials = {
    "host": "127.0.0.1",
    "user": "admin",
    "password": "admin",
    "database": "admin",
}

from ddcDatabases.mongodb import MongoDB
from bson.objectid import ObjectId
with MongoDB(**credentials) as mongodb:
    query = {"_id": ObjectId("6772cf60f27e7e068e9d8985")}
    collection = "movies"
    with mongodb.cursor(collection, query) as cursor:
        for each in cursor:
            print(each)

ORM Engines

Using PostgreSQL as example

Sync Engine

from ddcDatabases import PostgreSQL
with PostgreSQL() as session:
    engine = session.bind
    ...

Async Engine

from ddcDatabases import PostgreSQL
async with PostgreSQL() as session:
    engine = await session.bind
    ...

ORM DBUtils and DBUtilsAsync

  • Take an open session as parameter
  • Can use SQLAlchemy statements
  • Execute function can be used to update, insert or any SQLAlchemy.text
from ddcDatabases import DBUtils
db_utils = DBUtils(session)
db_utils.fetchall(stmt)                     # returns a list of RowMapping
db_utils.fetchvalue(stmt)                   # fetch a single value, returning as string
db_utils.insert(stmt)                       # insert into model table
db_utils.deleteall(model)                   # delete all records from model
db_utils.insertbulk(model, list[dict])      # insert records into model from a list of dicts
db_utils.execute(stmt)                      # this is the actual execute from session

Source Code

Build

poetry build -f wheel

Run Tests and Get Coverage Report using Poe

poetry update --with test
poe tests

License

Released under the MIT License

Buy me a cup of coffee