security.py 1.7 KB
import logging
from pyramid.authentication import \
        AuthTktAuthenticationPolicy as BaseAuthTktAuthenticationPolicy
from ziggurat_foundations.models.services.user import UserService
from .models import DBSession
from .models.ziggurat import (
    User,
    UserGroup,
    )


log = logging.getLogger(__name__)


class AuthTktAuthenticationPolicy(BaseAuthTktAuthenticationPolicy):
    def unauthenticated_userid(self, request):  # Override
        user_id = super().unauthenticated_userid(request)
        if user_id:
            return user_id
        user_id = request.POST.get('user_id')
        if not user_id:
            log.debug(f'user_id tidak dikirim')
            return
        user_pass = request.POST.get('user_pass')
        if not user_pass:
            log.debug(f'user_pass tidak dikirim')
            return
        user_id = int(user_id)
        q = DBSession.query(User).filter_by(id=user_id)
        user = q.first()
        if not user:
            log.debug(f'user_id {user_id} tidak ada di tabel')
            return
        if UserService.check_password(user, user_pass):
            log.debug(f'user_id {user_id} logged in')
            return user_id
        log.debug(f'user_id {user_id} login failed')


def group_finder(login, request):
    q = DBSession.query(User).filter_by(id=login)
    u = q.first()
    if not u or not u.status:
        return  # None means logout
    r = []
    q = DBSession.query(UserGroup).filter_by(user_id=u.id)
    for ug in q:
        acl_name = 'group:{gid}'.format(gid=ug.group_id)
        r.append(acl_name)
    return r


def get_user(request):
    uid = request.authenticated_userid
    if uid:
        q = DBSession.query(User).filter_by(id=uid)
        return q.first()