user.py 9.39 KB
import re
from email.utils import parseaddr
from pyramid.view import view_config
from pyramid.httpexceptions import HTTPFound
from pyramid.i18n import TranslationStringFactory
import colander
from deform import (
    Form,
    ValidationFailure,
    Button,
    )
from deform.widget import (
    SelectWidget,
    CheckboxChoiceWidget,
    PasswordWidget,
    HiddenWidget,
    )
from ..models import DBSession
from ..models.ziggurat import (
    User,
    Group,
    UserGroup,
    )
from ..tools.deform import to_dict
from ..tools.waktu import create_now
from .login import (
    regenerate_security_code,
    send_email_security_code,
    )


_ = TranslationStringFactory('user')


########                    
# List #
########    
@view_config(
    route_name='user', renderer='templates/user/list.pt',
    permission='user-edit')
def view_list(request):
    q = DBSession.query(User).order_by(User.email)
    return dict(title=_('Users'), users=q)
    

#######    
# Add #
#######
@colander.deferred
def deferred_status(node, kw):
    values = kw.get('status_list', [])
    return SelectWidget(values=values)


@colander.deferred
def deferred_group(node, kw):
    values = kw.get('group_list', [])
    return CheckboxChoiceWidget(values=values)


class Validator:
    def __init__(self, request):
        self.request = request
        if 'id' in self.request.matchdict:
            q = DBSession.query(User).filter_by(id=self.request.matchdict['id'])
            self.user = q.first()
        else:
            self.user = None


class EmailValidator(colander.Email, Validator):
    def __init__(self, request):
        colander.Email.__init__(self)
        Validator.__init__(self, request)

    def __call__(self, node, value):
        if self.match_object.match(value) is None:
            raise colander.Invalid(node, _('Invalid email format'))
        email = value.lower()
        if self.user and self.user.email == email:
            return
        q = DBSession.query(User).filter_by(email=email)
        found = q.first()
        if not found:
            return
        data = dict(email=email, uid=found.id)
        ts = _(
                'email-already-used',
                default='Email ${email} already used by user ID ${uid}',
                mapping=data)
        raise colander.Invalid(node, ts)


REGEX_ONLY_CONTAIN = re.compile('([a-z0-9-]*)')
REGEX_BEGIN_END_ALPHANUMERIC = re.compile('^[a-z0-9]+(?:[-][a-z0-9]+)*$')

class UsernameValidator(Validator):
    def __init__(self, request):
        Validator.__init__(self, request)

    def __call__(self, node, value):
        username = value.lower()
        if self.user and self.user.user_name == username:
            return
        match = REGEX_ONLY_CONTAIN.search(username)
        if not match or match.group(1) != username:
            ts = _(
                    'username-only-contain',
                    default='Only a-z, 0-9, and - characters are allowed')
            raise colander.Invalid(node, ts)
        match = REGEX_BEGIN_END_ALPHANUMERIC.search(username)
        if not match:
            ts = _(
                    'username-first-end-alphanumeric',
                    default='Only a-z or 0-9 at the start and end')
            raise colander.Invalid(node, ts)
        q = DBSession.query(User).filter_by(user_name=username)
        found = q.first()
        if not found:
            return
        data = dict(username=username, uid=found.id)
        ts = _(
                'username-already-used',
                default='Username ${username} already used by ID ${uid}',
                mapping=data)
        raise colander.Invalid(node, ts)


@colander.deferred
def deferred_email_validator(node, kw):
    return EmailValidator(kw['request'])


@colander.deferred
def deferred_username_validator(node, kw):
    return UsernameValidator(kw['request'])


class AddSchema(colander.Schema):
    email = colander.SchemaNode(
            colander.String(), title=_('Email'),
            validator=deferred_email_validator)
    user_name = colander.SchemaNode(colander.String(), title=_('Username'),
            validator=deferred_username_validator)
    groups = colander.SchemaNode(
            colander.Set(), widget=deferred_group, title=_('Group'))


class EditSchema(AddSchema):
    id = colander.SchemaNode(
            colander.String(), missing=colander.drop,
            widget=HiddenWidget(readonly=True))
    status = colander.SchemaNode(
            colander.String(), widget=deferred_status, title=_('Status'))
                    

class MyEditSchema(AddSchema):
    id = colander.SchemaNode(
            colander.String(), missing=colander.drop,
            widget=HiddenWidget(readonly=True))
 

def get_form(request, class_form):
    status_list = (
        (1, _('Active')),
        (0, _('Inactive')))
    group_list = []
    q = DBSession.query(Group).order_by(Group.group_name)
    for row in q:
        group = (str(row.id), _(row.group_name))
        group_list.append(group)
    schema = class_form()
    schema = schema.bind(
            status_list=status_list, group_list=group_list, request=request)
    schema.request = request
    btn_save = Button('save', _('Save'))
    btn_cancel = Button('cancel', _('Cancel'))
    return Form(schema, buttons=(btn_save, btn_cancel))


def insert(request, values):
    user = User()
    user.email = values['email'].lower()
    user.user_name = values['user_name'].lower()
    user.security_code_date = create_now()
    remain = regenerate_security_code(user)
    DBSession.add(user)
    DBSession.flush()
    for gid in values['groups']: 
        ug = UserGroup(user_id=user.id, group_id=gid)
        DBSession.add(ug)
    return user, remain


@view_config(
    route_name='user-add', renderer='templates/user/add.pt',
    permission='user-edit')
def view_add(request):
    form = get_form(request, AddSchema)
    resp = dict(title=_('Add user'))
    if not request.POST:
        resp['form'] = form.render()
        return resp
    if 'save' not in request.POST:
        return HTTPFound(location=request.route_url('user'))
    items = request.POST.items()
    try:
        c = form.validate(items)
    except ValidationFailure:
        resp['form'] = form.render()
        return resp
    user, remain = insert(request, dict(c.items())) 
    send_email_security_code(
        request, user, remain, 'Welcome new user', 'email-new-user',
        'email-new-user.tpl')
    data = dict(email=user.email)
    ts = _(
            'user-added',
            default='${email} has been added and the email has been sent.',
            mapping=data)
    request.session.flash(ts)
    return HTTPFound(location=request.route_url('user'))


########
# Edit #
########
def user_group_set(user):
    q = DBSession.query(UserGroup).filter_by(user_id=user.id)
    r = []
    for ug in q:
        r.append(str(ug.group_id))
    return set(r)


def update(request, values, user):
    fnames = ['email', 'user_name']
    user.email = values['email'].lower()
    user.user_name = values['user_name'].lower()
    if user.id != request.user.id:
        user.status = values['status']
    DBSession.add(user)
    existing = user_group_set(user)
    unused = existing - values['groups']
    if unused:
        q = DBSession.query(UserGroup).filter_by(user_id=user.id).filter(
                UserGroup.group_id.in_([2]))
        q.delete(synchronize_session=False)
    new = values['groups'] - existing
    for gid in new: 
        ug = UserGroup(user_id=user.id, group_id=gid)
        DBSession.add(ug)


@view_config(
    route_name='user-edit', renderer='templates/user/edit.pt',
    permission='user-edit')
def view_edit(request):
    q = DBSession.query(User).filter_by(id=request.matchdict['id'])
    user = q.first()
    if not user:
        return HTTPNotFound()
    if user.id == request.user.id:
        form = get_form(request, MyEditSchema)
    else:
        form = get_form(request, EditSchema)
    resp = dict(title=_('Edit user'))
    if not request.POST:
        d = user.to_dict()
        d['groups'] = user_group_set(user)
        resp['form'] = form.render(appstruct=d)
        return resp
    if 'save' not in request.POST:
        return HTTPFound(location=request.route_url('user'))
    items = request.POST.items()
    try:
        c = form.validate(items)
    except ValidationFailure:
        resp['form'] = form.render()
        return resp
    update(request, dict(c.items()), user)
    data = dict(username=user.user_name)
    ts = _('user-updated', default='${username} profile updated', mapping=data)
    request.session.flash(ts)
    return HTTPFound(location=request.route_url('user'))


##########
# Delete #
##########    
@view_config(
    route_name='user-delete', renderer='templates/user/delete.pt',
    permission='user-edit')
def view_delete(request):
    q = DBSession.query(User).filter_by(id=request.matchdict['id'])
    user = q.first()
    if not user:
        return HTTPNotFound()
    btn_delete = Button('delete', _('Delete'))
    btn_cancel = Button('cancel', _('Cancel'))
    buttons = (btn_delete, btn_cancel)
    form = Form(colander.Schema(), buttons=buttons)
    if not request.POST:
        return dict(title=_('Delete user'), user=user, form=form.render())
    if 'delete' in request.POST:
        data = dict(uid=user.id, email=user.email)
        ts = _(
                'user-deleted',
                default='User ${email} ID ${uid} has been deleted',
                mapping=data)
        q.delete()
        request.session.flash(ts)
    return HTTPFound(location=request.route_url('user'))