user.py 11.1 KB
import re
from email.utils import parseaddr
from sqlalchemy import func
from pyramid.view import view_config
from pyramid.httpexceptions import (
    HTTPFound,
    HTTPNotFound,
    )
from pyramid.i18n import TranslationStringFactory
import colander
from deform import (
    Form,
    ValidationFailure,
    Button,
    )
from deform.widget import (
    SelectWidget,
    CheckboxChoiceWidget,
    PasswordWidget,
    HiddenWidget,
    )
from ..models.ziggurat import (
    User,
    Group,
    UserGroup,
    )
from ..tools.deform import to_dict
from ..tools.waktu import create_now
from .login import (
    regenerate_security_code,
    send_email_security_code,
    )


_ = TranslationStringFactory('user')


########
# List #
########

def query_filter(request, q):
    return q.filter(
            User.id == UserGroup.user_id,
            UserGroup.group_id == request.GET['gid'])


@view_config(
    route_name='user', renderer='templates/user/list.pt',
    permission='user-edit')
def view_list(request):
    if request.POST:
        p = dict(gid=request.POST['gid'])
        return HTTPFound(location=request.route_url('user', _query=p))
    if 'gid' in request.GET and request.GET['gid']:
        try:
            int(request.GET['gid'])
        except ValueError:
            return HTTPNotFound()
        q_count = request.dbsession.query(func.count())
        q_count = query_filter(request, q_count)
        count = q_count.scalar()
        if count:
            q_user = request.dbsession.query(User)
            q_user = query_filter(request, q_user)
    else:
        q_count = request.dbsession.query(func.count(User.id))
        count = q_count.scalar()
        if count:
            q_user = request.dbsession.query(User)
    q_group = request.dbsession.query(Group).order_by(Group.group_name)
    resp = dict(title=_('Users'), count=count, groups=q_group)
    if count:
        resp['users'] = q_user.order_by(User.email)
    return resp


#######
# Add #
#######

@colander.deferred
def deferred_status(node, kw):
    values = kw.get('status_list', [])
    return SelectWidget(values=values)


@colander.deferred
def deferred_group(node, kw):
    values = kw.get('group_list', [])
    return CheckboxChoiceWidget(values=values)


class Validator:
    def __init__(self, user):
        self.user = user


class EmailValidator(colander.Email, Validator):
    def __init__(self, kw):
        user = kw['user']
        self.db_session = kw['request'].dbsession
        colander.Email.__init__(self)
        Validator.__init__(self, user)

    def __call__(self, node, value):
        if self.match_object.match(value) is None:
            raise colander.Invalid(node, _('Invalid email format'))
        email = value.lower()
        if self.user and self.user.email == email:
            return
        q = self.db_session.query(User).filter_by(email=email)
        found = q.first()
        if not found:
            return
        data = dict(email=email, uid=found.id)
        ts = _(
                'email-already-used',
                default='Email ${email} already used by user ID ${uid}',
                mapping=data)
        raise colander.Invalid(node, ts)


REGEX_ONLY_CONTAIN = re.compile('([a-z0-9-]*)')
REGEX_BEGIN_END_ALPHANUMERIC = re.compile('^[a-z0-9]+(?:[-][a-z0-9]+)*$')


class UsernameValidator(Validator):
    def __init__(self, kw):
        self.db_session = kw['request'].dbsession
        self.user = kw['user']

    def __call__(self, node, value):
        username = value.lower()
        if self.user and self.user.user_name == username:
            return
        match = REGEX_ONLY_CONTAIN.search(username)
        if not match or match.group(1) != username or username != value:
            ts = _(
                    'username-only-contain',
                    default='Only a-z, 0-9, and - characters are allowed')
            raise colander.Invalid(node, ts)
        match = REGEX_BEGIN_END_ALPHANUMERIC.search(username)
        if not match:
            ts = _(
                    'username-first-end-alphanumeric',
                    default='Only a-z or 0-9 at the start and end')
            raise colander.Invalid(node, ts)
        q = self.db_session.query(User).filter_by(user_name=username)
        found = q.first()
        if not found:
            return
        data = dict(username=username, uid=found.id)
        ts = _(
                'username-already-used',
                default='Username ${username} already used by ID ${uid}',
                mapping=data)
        raise colander.Invalid(node, ts)


@colander.deferred
def deferred_email_validator(node, kw):
    return EmailValidator(kw)


@colander.deferred
def deferred_username_validator(node, kw):
    return UsernameValidator(kw)


class AddSchema(colander.Schema):
    email = colander.SchemaNode(
            colander.String(), title=_('Email'),
            validator=deferred_email_validator)
    user_name = colander.SchemaNode(
            colander.String(), title=_('Username'),
            validator=deferred_username_validator)
    groups = colander.SchemaNode(
            colander.Set(), widget=deferred_group, title=_('Group'))


class EditSchema(AddSchema):
    id = colander.SchemaNode(
            colander.String(), missing=colander.drop,
            widget=HiddenWidget(readonly=True))
    status = colander.SchemaNode(
            colander.String(), widget=deferred_status, title=_('Status'))


class MyEditSchema(AddSchema):
    id = colander.SchemaNode(
            colander.String(), missing=colander.drop,
            widget=HiddenWidget(readonly=True))


def get_form(request, class_form, user=None):
    status_list = (
        (1, _('Active')),
        (0, _('Inactive')))
    group_list = []
    q = request.dbsession.query(Group).order_by(Group.group_name)
    for row in q:
        group = (str(row.id), _(row.group_name))
        group_list.append(group)
    schema = class_form()
    schema = schema.bind(
            request=request, status_list=status_list, group_list=group_list,
            user=user)
    btn_save = Button('save', _('Save'))
    btn_cancel = Button('cancel', _('Cancel'))
    return Form(schema, buttons=(btn_save, btn_cancel))


def add_member_count(db_session, gid):
    q = db_session.query(Group).filter_by(id=gid)
    group = q.first()
    group.member_count += 1
    db_session.add(group)


def reduce_member_count(db_session, gid):
    q = db_session.query(Group).filter_by(id=gid)
    group = q.first()
    group.member_count -= 1
    db_session.add(group)


def insert(request, values):
    user = User()
    user.email = values['email'].lower()
    user.user_name = values['user_name'].lower()
    user.security_code_date = create_now()
    remain = regenerate_security_code(request.dbsession, user)
    request.dbsession.add(user)
    request.dbsession.flush()
    for gid in values['groups']:
        ug = UserGroup(user_id=user.id, group_id=gid)
        request.dbsession.add(ug)
        add_member_count(request.dbsession, gid)
    return user, remain


@view_config(
    route_name='user-add', renderer='templates/user/add.pt',
    permission='user-edit')
def view_add(request):
    form = get_form(request, AddSchema)
    resp = dict(title=_('Add user'))
    if not request.POST:
        resp['form'] = form.render()
        return resp
    if 'save' not in request.POST:
        return HTTPFound(location=request.route_url('user'))
    items = request.POST.items()
    try:
        c = form.validate(items)
    except ValidationFailure:
        resp['form'] = form.render()
        return resp
    user, remain = insert(request, dict(c.items()))
    request.log(f'tambah user {user.user_name}')
    send_email_security_code(
        request, user, remain, 'Welcome new user', 'email-new-user',
        'email-new-user.tpl')
    data = dict(email=user.email)
    ts = _(
            'user-added',
            default='${email} has been added and the email has been sent.',
            mapping=data)
    request.session.flash(ts)
    return HTTPFound(location=request.route_url('user'))


########
# Edit #
########
def user_group_set(db_session, user):
    q = db_session.query(UserGroup).filter_by(user_id=user.id)
    r = []
    for ug in q:
        r.append(str(ug.group_id))
    return set(r)


def update(request, user, values):
    fnames = ['email', 'user_name']
    user.email = values['email'].lower()
    user.user_name = values['user_name'].lower()
    if user.id != request.user.id:
        user.status = values['status']
    request.dbsession.add(user)
    existing = user_group_set(request.dbsession, user)
    unused = existing - values['groups']
    if unused:
        q = request.dbsession.query(UserGroup).\
                filter_by(user_id=user.id).\
                filter(UserGroup.group_id.in_(unused))
        q.delete(synchronize_session=False)
        for gid in unused:
            reduce_member_count(request.dbsession, gid)
    new = values['groups'] - existing
    for gid in new:
        ug = UserGroup(user_id=user.id, group_id=gid)
        request.dbsession.add(ug)
        add_member_count(request.dbsession, gid)


@view_config(
    route_name='user-edit', renderer='templates/user/edit.pt',
    permission='user-edit')
def view_edit(request):
    q = request.dbsession.query(User).filter_by(id=request.matchdict['id'])
    user = q.first()
    if not user:
        return HTTPNotFound()
    if user.id == request.user.id:
        form = get_form(request, MyEditSchema, user)
    else:
        form = get_form(request, EditSchema, user)
    resp = dict(title=_('Edit user'))
    if not request.POST:
        d = user.to_dict()
        d['groups'] = user_group_set(request.dbsession, user)
        resp['form'] = form.render(appstruct=d)
        return resp
    if 'save' not in request.POST:
        return HTTPFound(location=request.route_url('user'))
    items = request.POST.items()
    try:
        c = form.validate(items)
    except ValidationFailure:
        resp['form'] = form.render()
        return resp
    update(request, user, dict(c.items()))
    data = dict(username=user.user_name)
    ts = _('user-updated', default='${username} profile updated', mapping=data)
    request.session.flash(ts)
    return HTTPFound(location=request.route_url('user'))


##########
# Delete #
##########

@view_config(
    route_name='user-delete', renderer='templates/user/delete.pt',
    permission='user-edit')
def view_delete(request):
    q = request.dbsession.query(User).filter_by(id=request.matchdict['id'])
    user = q.first()
    if not user:
        return HTTPNotFound()
    if not request.POST:
        btn_delete = Button('delete', _('Delete'))
        btn_cancel = Button('cancel', _('Cancel'))
        buttons = (btn_delete, btn_cancel)
        form = Form(colander.Schema(), buttons=buttons)
        return dict(title=_('Delete user'), user=user, form=form.render())
    if 'delete' not in request.POST:
        return HTTPFound(location=request.route_url('user'))
    gid_list = user_group_set(request.dbsession, user)
    for gid in gid_list:
        reduce_member_count(gid)
    data = dict(uid=user.id, email=user.email)
    ts = _(
            'user-deleted',
            default='User ${email} ID ${uid} has been deleted',
            mapping=data)
    q.delete()
    request.session.flash(ts)
    return HTTPFound(location=request.route_url('user'))