user.py 10.5 KB
import re
from email.utils import parseaddr
from sqlalchemy import func
from pyramid.view import view_config
from pyramid.httpexceptions import (
    HTTPFound,
    HTTPNotFound,
    )
from pyramid.i18n import TranslationStringFactory
import colander
from deform import (
    Form,
    ValidationFailure,
    Button,
    )
from deform.widget import (
    SelectWidget,
    CheckboxChoiceWidget,
    PasswordWidget,
    HiddenWidget,
    )
from ..models import DBSession
from ..models.ziggurat import (
    User,
    Group,
    UserGroup,
    )
from ..tools.waktu import create_now
from .login import (
    regenerate_security_code,
    send_email_security_code,
    )


_ = TranslationStringFactory('user')


########                    
# List #
########    

def query_filter(request, q):
    return q.filter(
            User.id==UserGroup.user_id,
            UserGroup.group_id==request.GET['gid'])


@view_config(
    route_name='user', renderer='templates/user/list.pt',
    permission='user-edit')
def view_list(request):
    if request.POST:
        p = dict(gid=request.POST['gid'])
        return HTTPFound(location=request.route_url('user', _query=p))
    if 'gid' in request.GET and request.GET['gid']:
        try:
            int(request.GET['gid'])
        except ValueError:
            return HTTPNotFound()
        q_count = DBSession.query(func.count())
        q_count = query_filter(request, q_count)
        count = q_count.scalar()
        if count:
            q_user = DBSession.query(User)
            q_user = query_filter(request, q_user)
    else:
        q_count = DBSession.query(func.count(User.id))
        count = q_count.scalar()
        if count:
            q_user = DBSession.query(User)
    q_group = DBSession.query(Group).order_by(Group.group_name)
    resp = dict(title=_('Users'), count=count, groups=q_group)
    if count:
        resp['users'] = q_user.order_by(User.email)
    return resp
 

#######    
# Add #
#######
@colander.deferred
def status_widget(node, kw):
    values = kw.get('status_list', [])
    return SelectWidget(values=values)


@colander.deferred
def group_widget(node, kw):
    values = kw.get('group_list', [])
    return CheckboxChoiceWidget(values=values)


class Validator:
    def __init__(self, user):
        self.user = user


class EmailValidator(colander.Email, Validator):
    def __init__(self, user):
        colander.Email.__init__(self)
        Validator.__init__(self, user)

    def __call__(self, node, value):
        if self.match_object.match(value) is None:
            raise colander.Invalid(node, _('Invalid email format'))
        email = value.lower()
        if self.user and self.user.email == email:
            return
        q = DBSession.query(User).filter_by(email=email)
        found = q.first()
        if not found:
            return
        data = dict(email=email, uid=found.id)
        ts = _(
                'email-already-used',
                default='Email ${email} already used by user ID ${uid}',
                mapping=data)
        raise colander.Invalid(node, ts)


REGEX_ONLY_CONTAIN = re.compile('([a-z0-9-]*)')
REGEX_BEGIN_END_ALPHANUMERIC = re.compile('^[a-z0-9]+(?:[-][a-z0-9]+)*$')

class UsernameValidator(Validator):
    def __call__(self, node, value):
        username = value.lower()
        if self.user and self.user.user_name == username:
            return
        match = REGEX_ONLY_CONTAIN.search(username)
        if not match or match.group(1) != username or username != value:
            ts = _(
                    'username-only-contain',
                    default='Only a-z, 0-9, and - characters are allowed')
            raise colander.Invalid(node, ts)
        match = REGEX_BEGIN_END_ALPHANUMERIC.search(username)
        if not match:
            ts = _(
                    'username-first-end-alphanumeric',
                    default='Only a-z or 0-9 at the start and end')
            raise colander.Invalid(node, ts)
        q = DBSession.query(User).filter_by(user_name=username)
        found = q.first()
        if not found:
            return
        data = dict(username=username, uid=found.id)
        ts = _(
                'username-already-used',
                default='Username ${username} already used by ID ${uid}',
                mapping=data)
        raise colander.Invalid(node, ts)


@colander.deferred
def email_validator(node, kw):
    return EmailValidator(kw['user'])


@colander.deferred
def username_validator(node, kw):
    return UsernameValidator(kw['user'])


class AddSchema(colander.Schema):
    email = colander.SchemaNode(
            colander.String(), title=_('Email'), validator=email_validator)
    user_name = colander.SchemaNode(colander.String(), title=_('Username'),
            validator=username_validator)
    groups = colander.SchemaNode(
            colander.Set(), widget=group_widget, title=_('Group'))


class EditSchema(AddSchema):
    id = colander.SchemaNode(
            colander.String(), missing=colander.drop,
            widget=HiddenWidget(readonly=True))
    status = colander.SchemaNode(
            colander.String(), widget=status_widget, title=_('Status'))
                    

class MyEditSchema(AddSchema):
    id = colander.SchemaNode(
            colander.String(), missing=colander.drop,
            widget=HiddenWidget(readonly=True))
 

def get_form(request, class_form, user=None):
    status_list = (
        (1, _('Active')),
        (0, _('Inactive')))
    group_list = []
    q = DBSession.query(Group).order_by(Group.group_name)
    for row in q:
        group = (str(row.id), _(row.group_name))
        group_list.append(group)
    schema = class_form()
    schema = schema.bind(
            status_list=status_list, group_list=group_list, user=user)
    btn_save = Button('save', _('Save'))
    btn_cancel = Button('cancel', _('Cancel'))
    return Form(schema, buttons=(btn_save, btn_cancel))


def add_member_count(gid):
    q = DBSession.query(Group).filter_by(id=gid)
    group = q.first()
    group.member_count += 1
    DBSession.add(group)


def reduce_member_count(gid):
    q = DBSession.query(Group).filter_by(id=gid)
    group = q.first()
    group.member_count -= 1
    DBSession.add(group)


def insert(request, values):
    user = User()
    user.email = values['email'].lower()
    user.user_name = values['user_name'].lower()
    user.security_code_date = create_now()
    remain = regenerate_security_code(user)
    DBSession.add(user)
    DBSession.flush()
    for gid in values['groups']: 
        ug = UserGroup(user_id=user.id, group_id=gid)
        DBSession.add(ug)
        add_member_count(gid)
    return user, remain


@view_config(
    route_name='user-add', renderer='templates/user/add.pt',
    permission='user-edit')
def view_add(request):
    form = get_form(request, AddSchema)
    resp = dict(title=_('Add user'))
    if not request.POST:
        resp['form'] = form.render()
        return resp
    if 'save' not in request.POST:
        return HTTPFound(location=request.route_url('user'))
    items = request.POST.items()
    try:
        c = form.validate(items)
    except ValidationFailure:
        resp['form'] = form.render()
        return resp
    user, remain = insert(request, dict(c.items())) 
    send_email_security_code(
        request, user, remain, 'Welcome new user', 'email-new-user',
        'email-new-user.tpl')
    data = dict(email=user.email)
    ts = _(
            'user-added',
            default='${email} has been added and the email has been sent.',
            mapping=data)
    request.session.flash(ts)
    return HTTPFound(location=request.route_url('user'))


########
# Edit #
########
def user_group_list(user):
    q = DBSession.query(UserGroup).filter_by(user_id=user.id)
    r = []
    for ug in q:
        r.append(str(ug.group_id))
    return r


def update(request, user, values):
    fnames = ['email', 'user_name']
    user.email = values['email'].lower()
    user.user_name = values['user_name'].lower()
    if user.id != request.user.id:
        user.status = values['status']
    DBSession.add(user)
    existing = set(user_group_list(user))
    unused = existing - values['groups']
    if unused:
        q = DBSession.query(UserGroup).filter_by(user_id=user.id).filter(
                UserGroup.group_id.in_(unused))
        q.delete(synchronize_session=False)
        for gid in unused:
            reduce_member_count(gid)
    new = values['groups'] - existing
    for gid in new: 
        ug = UserGroup(user_id=user.id, group_id=gid)
        DBSession.add(ug)
        add_member_count(gid)


@view_config(
    route_name='user-edit', renderer='templates/user/edit.pt',
    permission='user-edit')
def view_edit(request):
    q = DBSession.query(User).filter_by(id=request.matchdict['id'])
    user = q.first()
    if not user:
        return HTTPNotFound()
    cls = user.id == request.user.id and MyEditSchema or EditSchema
    form = get_form(request, cls, user)
    resp = dict(title=_('Edit user'))
    if not request.POST:
        d = user.to_dict()
        d['groups'] = user_group_list(user)
        resp['form'] = form.render(appstruct=d)
        return resp
    if 'save' not in request.POST:
        return HTTPFound(location=request.route_url('user'))
    items = request.POST.items()
    try:
        c = form.validate(items)
    except ValidationFailure:
        resp['form'] = form.render()
        return resp
    update(request, user, dict(c.items()))
    data = dict(username=user.user_name)
    ts = _('user-updated', default='${username} profile updated', mapping=data)
    request.session.flash(ts)
    return HTTPFound(location=request.route_url('user'))


##########
# Delete #
##########    
@view_config(
    route_name='user-delete', renderer='templates/user/delete.pt',
    permission='user-edit')
def view_delete(request):
    q = DBSession.query(User).filter_by(id=request.matchdict['id'])
    user = q.first()
    if not user:
        return HTTPNotFound()
    if not request.POST:
        btn_delete = Button('delete', _('Delete'))
        btn_cancel = Button('cancel', _('Cancel'))
        buttons = (btn_delete, btn_cancel)
        form = Form(colander.Schema(), buttons=buttons)
        return dict(title=_('Delete user'), user=user, form=form.render())
    if 'delete' not in request.POST:
        return HTTPFound(location=request.route_url('user'))
    gid_list = user_group_list(user)
    for gid, x in gid_list:
        reduce_member_count(gid)
    data = dict(uid=user.id, email=user.email)
    ts = _(
            'user-deleted',
            default='User ${email} ID ${uid} has been deleted',
            mapping=data)
    q.delete()
    request.session.flash(ts)
    return HTTPFound(location=request.route_url('user'))