Make sure that you have PostgreSQL and Redis servers up and running. If you want the full source code in advance or for comparison, check out the demo source.
Launch these sql scripts to init database and fill it with sample data:
psql template1 < demo/sql/init_db.sql
and
psql template1 < demo/sql/sample_data.sql
Now you have two tables:
- for storing users
users |
---|
id |
login |
passwd |
is_superuser |
disabled |
- for storing their permissions
permissions |
---|
id |
user_id |
permission_name |
You need to implement two entities: IdentityPolicy and AuthorizationPolicy. First one should have these methods: identify, remember and forget. For second one: authorized_userid and permits. We will use built-in SessionIdentityPolicy and write our own database-based authorization policy.
In our example we will lookup database by user login and if presents then return this identity:
async def authorized_userid(self, identity): async with self.dbengine as conn: where = sa.and_(db.users.c.login == identity, sa.not_(db.users.c.disabled)) query = db.users.count().where(where) ret = await conn.scalar(query) if ret: return identity else: return None
For permission checking we will fetch the user first, check if he is superuser (all permissions are allowed), otherwise check if permission is explicitly set for that user:
async def permits(self, identity, permission, context=None): if identity is None: return False async with self.dbengine as conn: where = sa.and_(db.users.c.login == identity, sa.not_(db.users.c.disabled)) query = db.users.select().where(where) ret = await conn.execute(query) user = await ret.fetchone() if user is not None: user_id = user[0] is_superuser = user[3] if is_superuser: return True where = db.permissions.c.user_id == user_id query = db.permissions.select().where(where) ret = await conn.execute(query) result = await ret.fetchall() if ret is not None: for record in result: if record.perm_name == permission: return True return False
Once we have all the code in place we can install it for our application:
from aiohttp_session.redis_storage import RedisStorage from aiohttp_security import setup as setup_security from aiohttp_security import SessionIdentityPolicy from aiopg.sa import create_engine from aioredis import create_pool from .db_auth import DBAuthorizationPolicy async def init(loop): redis_pool = await create_pool(('localhost', 6379)) dbengine = await create_engine(user='aiohttp_security', password='aiohttp_security', database='aiohttp_security', host='127.0.0.1') app = web.Application() setup_session(app, RedisStorage(redis_pool)) setup_security(app, SessionIdentityPolicy(), DBAuthorizationPolicy(dbengine)) return app
Now we have authorization and can decorate every other view with access rights based on permissions. There are already implemented two helpers:
from aiohttp_security import check_authorized, check_permission
For each view you need to protect - just apply the decorator on it:
class Web: async def protected_page(self, request): await check_permission(request, 'protected') response = web.Response(body=b'You are on protected page') return response
or:
class Web: async def logout(self, request): await check_authorized(request) response = web.Response(body=b'You have been logged out') await forget(request, response) return response
If someone try to access that protected page he will see:
403: Forbidden
The best part of it - you can implement any logic you want until it follows the API conventions.
For working with passwords there is a good library passlib. Once you've created some users you want to check their credentials on login. Similar function may do what you are trying to accomplish:
from passlib.hash import sha256_crypt async def check_credentials(db_engine, username, password): async with db_engine as conn: where = sa.and_(db.users.c.login == username, sa.not_(db.users.c.disabled)) query = db.users.select().where(where) ret = await conn.execute(query) user = await ret.fetchone() if user is not None: hash = user[2] return sha256_crypt.verify(password, hash) return False
Final step is to launch your application:
python demo/database_auth/main.py
Try to login with admin/moderator/user accounts (with password password) and access /public or /protected endpoints.