user.py 10.4 KB
import os
import re
from pkg_resources import resource_filename
from sqlalchemy import (
    func,
    or_,
    event,
    )
from pyramid.view import view_config
from pyramid.httpexceptions import (
    HTTPFound,
    HTTPNotFound,
    )
from pyramid.i18n import TranslationStringFactory
import colander
from deform import (
    Form,
    ValidationFailure,
    Button,
    ZPTRendererFactory,
    )
from deform.widget import (
    SelectWidget,
    CheckboxChoiceWidget,
    )
from colanderalchemy import (
    setup_schema,
    SQLAlchemySchemaNode,
    )
from ..models import DBSession
from ..models.ziggurat import (
    User,
    Group,
    UserGroup,
    )
from ..tools.waktu import create_now
from .login import (
    regenerate_security_code,
    send_email_security_code,
    )
from .tools import (
    none_to_null,
    null_to_none,
    )
from .validators import (
    email_validator,
    username_validator,
    )


event.listen(User, 'mapper_configured', setup_schema)


_ = TranslationStringFactory('user')


########
# List #
########
@colander.deferred
def group_widget(node, kw):
    return SelectWidget(values=kw['group_list'])


class FilterSchema(colander.Schema):
    group = colander.SchemaNode(
            colander.String(), missing=colander.drop, widget=group_widget)
    name = colander.SchemaNode(colander.String(), missing=colander.drop)


def get_group_list():
    r = []
    q = DBSession.query(Group).order_by(Group.group_name)
    for row in q:
        group = (str(row.id), _(row.group_name))
        r.append(group)
    return r


deform_templates = resource_filename('deform', 'templates')
here = os.path.abspath(os.path.dirname(__file__))
my_templates = os.path.join(here, 'templates', 'user')
search_path = [my_templates, deform_templates]
my_renderer = ZPTRendererFactory(search_path)


def get_filter_form():
    group_list = get_group_list()
    group_list = [('', '')] + group_list
    schema = FilterSchema()
    schema = schema.bind(group_list=group_list)
    btn_show = Button('show', _('Show'))
    buttons = (btn_show,)
    return Form(schema, buttons=buttons, renderer=my_renderer)


def query_filter(q, p):
    if 'group' in p:
        q = q.filter(
            User.id == UserGroup.user_id,
            UserGroup.group_id == p['group'])
    if 'name' in p:
        pattern = '%{}%'.format(p['name'])
        q = q.filter(
                or_(
                    User.email.ilike(pattern),
                    User.user_name.ilike(pattern)))
    return q


def get_filter(request):
    p = dict()
    if request.GET.get('group'):
        try:
            p['group'] = int(request.GET.get('group'))
        except ValueError:
            pass
    if request.GET.get('name'):
        p['name'] = request.GET.get('name')
    return p


@view_config(
    route_name='user', renderer='templates/user/list.pt',
    permission='user-edit')
def view_list(request):
    form = get_filter_form()
    resp = dict(title=_('Users'))
    if request.POST:
        items = request.POST.items()
        try:
            c = form.validate(items)
        except ValidationFailure as e:
            resp['form'] = e.render()
            return resp
        p = dict(c.items())
        p['show'] = 1
        return HTTPFound(location=request.route_url('user', _query=p))
    p = get_filter(request)
    resp['form'] = form.render(appstruct=p)
    if 'show' not in request.GET:
        return resp
    q_count = DBSession.query(func.count())
    q_count = query_filter(q_count, p)
    resp['count'] = count = q_count.scalar()
    if count:
        q_user = DBSession.query(User)
        q_user = query_filter(q_user, p)
        resp['users'] = q_user.order_by(User.email)
    return resp


#######
# Add #
#######
@colander.deferred
def widget_status(node, kw):
    values = kw.get('status_list', [])
    return SelectWidget(values=values)


@colander.deferred
def widget_group(node, kw):
    values = kw.get('group_list', [])
    return CheckboxChoiceWidget(values=values)


field_user_name = colander.SchemaNode(
        colander.String(), name='user_name', validator=username_validator)
field_email = colander.SchemaNode(
        colander.String(), name='email', validator=email_validator)

MY_FIELDS = [
    field_user_name, field_email, 'tempat_lahir', 'tgl_lahir', 'alamat', 'rt',
    'rw', 'kelurahan', 'kecamatan', 'kabupaten', 'provinsi']
my_edit_schema = SQLAlchemySchemaNode(User, includes=MY_FIELDS)

# Custom field
field_groups = colander.SchemaNode(
        colander.Set(), name='groups', widget=widget_group, title=_('Group'))
my_edit_schema.add(field_groups)

# Tampilkan field status bila edit user lain
field_status = colander.SchemaNode(
        colander.Integer(), name='status', widget=widget_status)
FIELDS = MY_FIELDS + [field_status, field_groups]
edit_schema = SQLAlchemySchemaNode(User, includes=FIELDS)


def get_form(request, schema, user=None):
    status_list = (
        (1, _('Active')),
        (0, _('Inactive')))
    group_list = get_group_list()
    schema = schema.bind(
            status_list=status_list, group_list=group_list, user=user)
    btn_save = Button('save', _('Save'))
    btn_cancel = Button('cancel', _('Cancel'))
    return Form(schema, buttons=(btn_save, btn_cancel))


def add_member_count(gid):
    q = DBSession.query(Group).filter_by(id=gid)
    group = q.first()
    group.member_count += 1
    DBSession.add(group)


def reduce_member_count(gid):
    q = DBSession.query(Group).filter_by(id=gid)
    group = q.first()
    group.member_count -= 1
    DBSession.add(group)


def clean_values(values, fnames):
    d = dict()
    for key in fnames:
        if not isinstance(key, str):
            key = key.name
        d[key] = values[key]
    d['email'] = values['email'].lower()
    d['user_name'] = values['user_name'].lower()
    return d


def insert(request, values):
    d = clean_values(values, MY_FIELDS)
    d['security_code_date'] = create_now()
    user = User(**d)
    remain = regenerate_security_code(user)
    DBSession.add(user)
    DBSession.flush()
    for gid in values['groups']:
        ug = UserGroup(user_id=user.id, group_id=gid)
        DBSession.add(ug)
        add_member_count(gid)
    return user, remain


@view_config(
    route_name='user-add', renderer='templates/user/add.pt',
    permission='user-edit')
def view_add(request):
    form = get_form(request, my_edit_schema)
    resp = dict(title=_('Add user'))
    if not request.POST:
        resp['form'] = form.render()
        return resp
    if 'save' not in request.POST:
        return HTTPFound(location=request.route_url('user'))
    items = request.POST.items()
    try:
        c = form.validate(items)
    except ValidationFailure:
        resp['form'] = form.render()
        return resp
    user, remain = insert(request, dict(c.items()))
    send_email_security_code(
        request, user, remain, 'Welcome new user', 'email-new-user',
        'email-new-user.tpl')
    data = dict(email=user.email)
    ts = _(
            'user-added',
            default='${email} has been added and the email has been sent.',
            mapping=data)
    request.session.flash(ts)
    return HTTPFound(location=request.route_url('user'))


########
# Edit #
########
def user_group_set(user):
    q = DBSession.query(UserGroup).filter_by(user_id=user.id)
    r = []
    for ug in q:
        r.append(str(ug.group_id))
    return set(r)


def update(request, user, values):
    if user.id == request.user.id:
        fnames = MY_FIELDS
    else:
        fnames = FIELDS
    d = dict()
    for key in fnames:
        if not isinstance(key, str):
            key = key.name
        d[key] = values[key]
    d['email'] = values['email'].lower()
    d['user_name'] = values['user_name'].lower()
    if user.id != request.user.id:
        d['status'] = values['status']
    del d['groups']
    user.from_dict(d)
    DBSession.add(user)
    existing = user_group_set(user)
    unused = existing - values['groups']
    if unused:
        q = DBSession.query(UserGroup).filter_by(user_id=user.id).filter(
                UserGroup.group_id.in_(unused))
        q.delete(synchronize_session=False)
        for gid in unused:
            reduce_member_count(gid)
    new = values['groups'] - existing
    for gid in new:
        ug = UserGroup(user_id=user.id, group_id=gid)
        DBSession.add(ug)
        add_member_count(gid)


@view_config(
    route_name='user-edit', renderer='templates/user/edit.pt',
    permission='user-edit')
def view_edit(request):
    q = DBSession.query(User).filter_by(id=request.matchdict['id'])
    user = q.first()
    if not user:
        return HTTPNotFound()
    if user.id == request.user.id:
        schema = my_edit_schema
    else:
        schema = edit_schema
    form = get_form(request, schema, user)
    resp = dict(title=_('Edit user'))
    if not request.POST:
        d = user.to_dict()
        d['groups'] = user_group_set(user)
        resp['form'] = form.render(appstruct=none_to_null(d))
        return resp
    if 'save' not in request.POST:
        return HTTPFound(location=request.route_url('user'))
    items = request.POST.items()
    try:
        c = form.validate(items)
    except ValidationFailure:
        resp['form'] = form.render()
        return resp
    update(request, user, null_to_none(dict(c.items())))
    data = dict(username=user.user_name)
    ts = _('user-updated', default='${username} profile updated', mapping=data)
    request.session.flash(ts)
    return HTTPFound(location=request.route_url('user'))


##########
# Delete #
##########
@view_config(
    route_name='user-delete', renderer='templates/user/delete.pt',
    permission='user-edit')
def view_delete(request):
    q = DBSession.query(User).filter_by(id=request.matchdict['id'])
    user = q.first()
    if not user:
        return HTTPNotFound()
    if not request.POST:
        btn_delete = Button('delete', _('Delete'))
        btn_cancel = Button('cancel', _('Cancel'))
        buttons = (btn_delete, btn_cancel)
        form = Form(colander.Schema(), buttons=buttons)
        return dict(title=_('Delete user'), user=user, form=form.render())
    if 'delete' not in request.POST:
        return HTTPFound(location=request.route_url('user'))
    gid_list = user_group_set(user)
    for gid in gid_list:
        reduce_member_count(gid)
    data = dict(uid=user.id, email=user.email)
    ts = _(
            'user-deleted',
            default='User ${email} ID ${uid} has been deleted',
            mapping=data)
    q.delete()
    request.session.flash(ts)
    return HTTPFound(location=request.route_url('user'))