Commit 0459a542 by Owo Sugiana

Penggunaan ColanderAlchemy

1 parent 20f4869a
0.2 2022-04-09
--------------
- Form User menggunakan ColanderAlchemy
- Ada contoh field tanggal
- Ada contoh penggunaan SQLAlchemy Datatables
0.1.5 2020-04-01 0.1.5 2020-04-01
---------------- ----------------
- Mengganti ZopeTransactionExtension() dengan register(). - Mengganti ZopeTransactionExtension() dengan register().
......
"""Biodata user
Revision ID: 641fb58fdaaa
Revises: 074b33635316
Create Date: 2022-04-08 23:45:28.499799
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '641fb58fdaaa'
down_revision = '074b33635316'
branch_labels = None
depends_on = None
def upgrade():
op.add_column('users', sa.Column('nama_lengkap', sa.String(64)))
op.add_column('users', sa.Column('tempat_lahir', sa.String(64)))
op.add_column('users', sa.Column('tgl_lahir', sa.Date))
op.add_column('users', sa.Column('alamat', sa.String(128)))
op.add_column('users', sa.Column('rt', sa.Integer))
op.add_column('users', sa.Column('rw', sa.Integer))
op.add_column('users', sa.Column('kelurahan', sa.String(64)))
op.add_column('users', sa.Column('kecamatan', sa.String(64)))
op.add_column('users', sa.Column('kabupaten', sa.String(64)))
op.add_column('users', sa.Column('provinsi', sa.String(64)))
def downgrade():
pass
...@@ -18,6 +18,8 @@ sqlalchemy.url = postgresql://user:pass@localhost/dbname ...@@ -18,6 +18,8 @@ sqlalchemy.url = postgresql://user:pass@localhost/dbname
timezone = Asia/Jakarta timezone = Asia/Jakarta
localization = id_ID.UTF-8 localization = id_ID.UTF-8
auth_key = s0secret
mail.host = localhost mail.host = localhost
mail.port = 25 mail.port = 25
mail.username = user@example.com mail.username = user@example.com
......
...@@ -16,6 +16,8 @@ sqlalchemy.url = postgresql://user:pass@localhost/dbname ...@@ -16,6 +16,8 @@ sqlalchemy.url = postgresql://user:pass@localhost/dbname
timezone = Asia/Jakarta timezone = Asia/Jakarta
localization = id_ID.UTF-8 localization = id_ID.UTF-8
auth_key = s0secret
mail.host = localhost mail.host = localhost
mail.port = 25 mail.port = 25
mail.username = user@example.com mail.username = user@example.com
......
...@@ -32,7 +32,8 @@ requires = [ ...@@ -32,7 +32,8 @@ requires = [
'deform', 'deform',
'pyramid_beaker', 'pyramid_beaker',
'pyramid_mailer', 'pyramid_mailer',
] 'sqlalchemy-datatables',
'ColanderAlchemy']
tests_require = [ tests_require = [
'WebTest >= 1.3.1', # py3 compat 'WebTest >= 1.3.1', # py3 compat
......
...@@ -27,3 +27,8 @@ class CommonModel(object): ...@@ -27,3 +27,8 @@ class CommonModel(object):
if val is not None: if val is not None:
values[column.name] = val values[column.name] = val
return values return values
def from_dict(self, d):
for key in d:
val = d[key]
setattr(self, key, val)
...@@ -3,7 +3,13 @@ from pyramid.security import ( ...@@ -3,7 +3,13 @@ from pyramid.security import (
Authenticated, Authenticated,
ALL_PERMISSIONS, ALL_PERMISSIONS,
) )
from sqlalchemy import PrimaryKeyConstraint from sqlalchemy import (
PrimaryKeyConstraint,
Column,
String,
Date,
Integer,
)
import ziggurat_foundations.models import ziggurat_foundations.models
from ziggurat_foundations.models.base import BaseModel from ziggurat_foundations.models.base import BaseModel
from ziggurat_foundations.models.external_identity import ExternalIdentityMixin from ziggurat_foundations.models.external_identity import ExternalIdentityMixin
...@@ -76,9 +82,25 @@ class UserResourcePermission(UserResourcePermissionMixin, Base): ...@@ -76,9 +82,25 @@ class UserResourcePermission(UserResourcePermissionMixin, Base):
pass pass
rt_info = dict(colanderalchemy=dict(title='RT'))
rw_info = dict(colanderalchemy=dict(title='RW'))
kabupaten_info = dict(colanderalchemy=dict(title='Kota / Kabupaten'))
class User(UserMixin, Base, CommonModel): class User(UserMixin, Base, CommonModel):
# ... your own properties.... nama_lengkap = Column(String(64))
pass tempat_lahir = Column(String(64))
tgl_lahir = Column(
Date, info=dict(
colanderalchemy=dict(
title='Tanggal Lahir', description='dd-mm-yyyy')))
alamat = Column(String(128))
rt = Column(Integer, info=rt_info)
rw = Column(Integer, info=rw_info)
kelurahan = Column(String(64))
kecamatan = Column(String(64))
kabupaten = Column(String(64), info=kabupaten_info)
provinsi = Column(String(64))
class ExternalIdentity(ExternalIdentityMixin, Base): class ExternalIdentity(ExternalIdentityMixin, Base):
......
...@@ -8,6 +8,8 @@ reset-password ...@@ -8,6 +8,8 @@ reset-password
reset-password-sent reset-password-sent
login-by-code-failed login-by-code-failed
user user
user-grid,/user/grid
user-act,/user/{act}/act
user-add,/user/add user-add,/user/add
user-edit,/user/{id} user-edit,/user/{id}
user-delete,/user/{id}/delete user-delete,/user/{id}/delete
......
/*
* This combined file was created by the DataTables downloader builder:
* https://datatables.net/download
*
* To rebuild or modify this file with the latest versions of the included
* software please visit:
* https://datatables.net/download/#bs/dt-1.10.25
*
* Included libraries:
* DataTables 1.10.25
*/
table.dataTable{clear:both;margin-top:6px !important;margin-bottom:6px !important;max-width:none !important;border-collapse:separate !important}table.dataTable td,table.dataTable th{-webkit-box-sizing:content-box;box-sizing:content-box}table.dataTable td.dataTables_empty,table.dataTable th.dataTables_empty{text-align:center}table.dataTable.nowrap th,table.dataTable.nowrap td{white-space:nowrap}div.dataTables_wrapper div.dataTables_length label{font-weight:normal;text-align:left;white-space:nowrap}div.dataTables_wrapper div.dataTables_length select{width:75px;display:inline-block}div.dataTables_wrapper div.dataTables_filter{text-align:right}div.dataTables_wrapper div.dataTables_filter label{font-weight:normal;white-space:nowrap;text-align:left}div.dataTables_wrapper div.dataTables_filter input{margin-left:.5em;display:inline-block;width:auto}div.dataTables_wrapper div.dataTables_info{padding-top:8px;white-space:nowrap}div.dataTables_wrapper div.dataTables_paginate{margin:0;white-space:nowrap;text-align:right}div.dataTables_wrapper div.dataTables_paginate ul.pagination{margin:2px 0;white-space:nowrap}div.dataTables_wrapper div.dataTables_processing{position:absolute;top:50%;left:50%;width:200px;margin-left:-100px;margin-top:-26px;text-align:center;padding:1em 0}table.dataTable thead>tr>th.sorting_asc,table.dataTable thead>tr>th.sorting_desc,table.dataTable thead>tr>th.sorting,table.dataTable thead>tr>td.sorting_asc,table.dataTable thead>tr>td.sorting_desc,table.dataTable thead>tr>td.sorting{padding-right:30px}table.dataTable thead>tr>th:active,table.dataTable thead>tr>td:active{outline:none}table.dataTable thead .sorting,table.dataTable thead .sorting_asc,table.dataTable thead .sorting_desc,table.dataTable thead .sorting_asc_disabled,table.dataTable thead .sorting_desc_disabled{cursor:pointer;position:relative}table.dataTable thead .sorting:after,table.dataTable thead .sorting_asc:after,table.dataTable thead .sorting_desc:after,table.dataTable thead .sorting_asc_disabled:after,table.dataTable thead .sorting_desc_disabled:after{position:absolute;bottom:8px;right:8px;display:block;font-family:"Glyphicons Halflings";opacity:.5}table.dataTable thead .sorting:after{opacity:.2;content:""}table.dataTable thead .sorting_asc:after{opacity:.5;content:""}table.dataTable thead .sorting_desc:after{opacity:.5;content:""}table.dataTable thead .sorting_asc_disabled:after,table.dataTable thead .sorting_desc_disabled:after{color:#eee}div.dataTables_scrollHead table.dataTable{margin-bottom:0 !important}div.dataTables_scrollBody>table{border-top:none;margin-top:0 !important;margin-bottom:0 !important}div.dataTables_scrollBody>table>thead .sorting:after,div.dataTables_scrollBody>table>thead .sorting_asc:after,div.dataTables_scrollBody>table>thead .sorting_desc:after{display:none}div.dataTables_scrollBody>table>tbody>tr:first-child>th,div.dataTables_scrollBody>table>tbody>tr:first-child>td{border-top:none}div.dataTables_scrollFoot>.dataTables_scrollFootInner{box-sizing:content-box}div.dataTables_scrollFoot>.dataTables_scrollFootInner>table{margin-top:0 !important;border-top:none}@media screen and (max-width: 767px){div.dataTables_wrapper div.dataTables_length,div.dataTables_wrapper div.dataTables_filter,div.dataTables_wrapper div.dataTables_info,div.dataTables_wrapper div.dataTables_paginate{text-align:center}}table.dataTable.table-condensed>thead>tr>th{padding-right:20px}table.dataTable.table-condensed .sorting:after,table.dataTable.table-condensed .sorting_asc:after,table.dataTable.table-condensed .sorting_desc:after{top:6px;right:6px}table.table-bordered.dataTable{border-right-width:0}table.table-bordered.dataTable th,table.table-bordered.dataTable td{border-left-width:0}table.table-bordered.dataTable th:last-child,table.table-bordered.dataTable th:last-child,table.table-bordered.dataTable td:last-child,table.table-bordered.dataTable td:last-child{border-right-width:1px}table.table-bordered.dataTable tbody th,table.table-bordered.dataTable tbody td{border-bottom-width:0}div.dataTables_scrollHead table.table-bordered{border-bottom-width:0}div.table-responsive>div.dataTables_wrapper>div.row{margin:0}div.table-responsive>div.dataTables_wrapper>div.row>div[class^=col-]:first-child{padding-left:0}div.table-responsive>div.dataTables_wrapper>div.row>div[class^=col-]:last-child{padding-right:0}
...@@ -13,7 +13,6 @@ from pyramid.httpexceptions import ( ...@@ -13,7 +13,6 @@ from pyramid.httpexceptions import (
from pyramid.security import ( from pyramid.security import (
remember, remember,
forget, forget,
authenticated_userid,
) )
from pyramid.i18n import TranslationStringFactory from pyramid.i18n import TranslationStringFactory
import colander import colander
...@@ -90,7 +89,7 @@ def login_default_response(): ...@@ -90,7 +89,7 @@ def login_default_response():
@view_config(route_name='login', renderer='templates/login.pt') @view_config(route_name='login', renderer='templates/login.pt')
def view_login(request): def view_login(request):
if authenticated_userid(request): if request.authenticated_userid:
return HTTPFound(location=request.route_url('home')) return HTTPFound(location=request.route_url('home'))
if '_LOCALE_' in request.GET: if '_LOCALE_' in request.GET:
resp = Response() resp = Response()
...@@ -206,6 +205,8 @@ def security_code_age(user): ...@@ -206,6 +205,8 @@ def security_code_age(user):
def send_email_security_code( def send_email_security_code(
request, user, time_remain, subject, body_msg_id, body_default_file): request, user, time_remain, subject, body_msg_id, body_default_file):
settings = get_settings() settings = get_settings()
if not settings['mail.host']:
return
up = urlparse(request.url) up = urlparse(request.url)
url = '{}://{}/login?code={}'.format( url = '{}://{}/login?code={}'.format(
up.scheme, up.netloc, user.security_code) up.scheme, up.netloc, user.security_code)
...@@ -240,7 +241,7 @@ def regenerate_security_code(user): ...@@ -240,7 +241,7 @@ def regenerate_security_code(user):
@view_config( @view_config(
route_name='reset-password', renderer='templates/reset-password.pt') route_name='reset-password', renderer='templates/reset-password.pt')
def view_reset_password(request): def view_reset_password(request):
if authenticated_userid(request): if request.authenticated_userid:
return HTTPFound(location=request.route_url('home')) return HTTPFound(location=request.route_url('home'))
resp = dict(title=_('Reset password')) resp = dict(title=_('Reset password'))
schema = ResetPassword(validator=reset_password_validator) schema = ResetPassword(validator=reset_password_validator)
......
<div metal:use-macro="load: ../layout-menu.pt">
<div metal:fill-slot="head">
<link rel="stylesheet" type="text/css" href="/static/datatables.min.css"/>
</div>
<div metal:fill-slot="content" i18n:domain="user">
<h1 i18n:translate="">Users</h1>
<table id="table-grid" class="table table-striped table-hover">
<thead>
<tr>
<th i18n:translate="">ID</th>
<th i18n:translate="">Username</th>
<th i18n:translate="">Email</th>
<th i18n:translate="">Status</th>
<th i18n:translate="">Last login</th>
</tr>
</thead>
<tbody>
</tbody>
</table>
</div>
<div metal:fill-slot="content-script">
<script type="text/javascript" src="/static/datatables.min.js"></script>
<script type="text/javascript">
$(document).ready(function() {
$('#table-grid').DataTable({
ajax: '/user/grid/act',
processing: true,
serverSide: true,
});
});
</script>
</div>
</div>
import colander
def none_to_null(d):
r = dict()
for key in d:
val = d[key]
if val is None:
val = colander.null
r[key] = val
return r
def null_to_none(d):
r = dict()
for key in d:
val = d[key]
if val is colander.null:
val = None
r[key] = val
return r
import os import os
import re import re
from email.utils import parseaddr
from pkg_resources import resource_filename from pkg_resources import resource_filename
from sqlalchemy import ( from sqlalchemy import (
func, func,
or_, or_,
event,
) )
from pyramid.view import view_config from pyramid.view import view_config
from pyramid.httpexceptions import ( from pyramid.httpexceptions import (
...@@ -22,8 +22,10 @@ from deform import ( ...@@ -22,8 +22,10 @@ from deform import (
from deform.widget import ( from deform.widget import (
SelectWidget, SelectWidget,
CheckboxChoiceWidget, CheckboxChoiceWidget,
PasswordWidget, )
HiddenWidget, from colanderalchemy import (
setup_schema,
SQLAlchemySchemaNode,
) )
from ..models import DBSession from ..models import DBSession
from ..models.ziggurat import ( from ..models.ziggurat import (
...@@ -31,12 +33,22 @@ from ..models.ziggurat import ( ...@@ -31,12 +33,22 @@ from ..models.ziggurat import (
Group, Group,
UserGroup, UserGroup,
) )
from ..tools.deform import to_dict
from ..tools.waktu import create_now from ..tools.waktu import create_now
from .login import ( from .login import (
regenerate_security_code, regenerate_security_code,
send_email_security_code, send_email_security_code,
) )
from .tools import (
none_to_null,
null_to_none,
)
from .validators import (
email_validator,
username_validator,
)
event.listen(User, 'mapper_configured', setup_schema)
_ = TranslationStringFactory('user') _ = TranslationStringFactory('user')
...@@ -142,119 +154,44 @@ def view_list(request): ...@@ -142,119 +154,44 @@ def view_list(request):
# Add # # Add #
####### #######
@colander.deferred @colander.deferred
def deferred_status(node, kw): def widget_status(node, kw):
values = kw.get('status_list', []) values = kw.get('status_list', [])
return SelectWidget(values=values) return SelectWidget(values=values)
@colander.deferred @colander.deferred
def deferred_group(node, kw): def widget_group(node, kw):
values = kw.get('group_list', []) values = kw.get('group_list', [])
return CheckboxChoiceWidget(values=values) return CheckboxChoiceWidget(values=values)
class Validator: field_user_name = colander.SchemaNode(
def __init__(self, user): colander.String(), name='user_name', validator=username_validator)
self.user = user field_email = colander.SchemaNode(
colander.String(), name='email', validator=email_validator)
class EmailValidator(colander.Email, Validator):
def __init__(self, user):
colander.Email.__init__(self)
Validator.__init__(self, user)
def __call__(self, node, value):
if self.match_object.match(value) is None:
raise colander.Invalid(node, _('Invalid email format'))
email = value.lower()
if self.user and self.user.email == email:
return
q = DBSession.query(User).filter_by(email=email)
found = q.first()
if not found:
return
data = dict(email=email, uid=found.id)
ts = _(
'email-already-used',
default='Email ${email} already used by user ID ${uid}',
mapping=data)
raise colander.Invalid(node, ts)
REGEX_ONLY_CONTAIN = re.compile('([a-z0-9-]*)')
REGEX_BEGIN_END_ALPHANUMERIC = re.compile('^[a-z0-9]+(?:[-][a-z0-9]+)*$')
class UsernameValidator(Validator):
def __call__(self, node, value):
username = value.lower()
if self.user and self.user.user_name == username:
return
match = REGEX_ONLY_CONTAIN.search(username)
if not match or match.group(1) != username or username != value:
ts = _(
'username-only-contain',
default='Only a-z, 0-9, and - characters are allowed')
raise colander.Invalid(node, ts)
match = REGEX_BEGIN_END_ALPHANUMERIC.search(username)
if not match:
ts = _(
'username-first-end-alphanumeric',
default='Only a-z or 0-9 at the start and end')
raise colander.Invalid(node, ts)
q = DBSession.query(User).filter_by(user_name=username)
found = q.first()
if not found:
return
data = dict(username=username, uid=found.id)
ts = _(
'username-already-used',
default='Username ${username} already used by ID ${uid}',
mapping=data)
raise colander.Invalid(node, ts)
@colander.deferred
def deferred_email_validator(node, kw):
return EmailValidator(kw['user'])
@colander.deferred
def deferred_username_validator(node, kw):
return UsernameValidator(kw['user'])
class AddSchema(colander.Schema):
email = colander.SchemaNode(
colander.String(), title=_('Email'),
validator=deferred_email_validator)
user_name = colander.SchemaNode(
colander.String(), title=_('Username'),
validator=deferred_username_validator)
groups = colander.SchemaNode(
colander.Set(), widget=deferred_group, title=_('Group'))
class EditSchema(AddSchema): MY_FIELDS = [
id = colander.SchemaNode( field_user_name, field_email, 'tempat_lahir', 'tgl_lahir', 'alamat', 'rt',
colander.String(), missing=colander.drop, 'rw', 'kelurahan', 'kecamatan', 'kabupaten', 'provinsi']
widget=HiddenWidget(readonly=True)) my_edit_schema = SQLAlchemySchemaNode(User, includes=MY_FIELDS)
status = colander.SchemaNode(
colander.String(), widget=deferred_status, title=_('Status'))
# Custom field
field_groups = colander.SchemaNode(
colander.Set(), name='groups', widget=widget_group, title=_('Group'))
my_edit_schema.add(field_groups)
class MyEditSchema(AddSchema): # Tampilkan field status bila edit user lain
id = colander.SchemaNode( field_status = colander.SchemaNode(
colander.String(), missing=colander.drop, colander.Integer(), name='status', widget=widget_status)
widget=HiddenWidget(readonly=True)) FIELDS = MY_FIELDS + [field_status, field_groups]
edit_schema = SQLAlchemySchemaNode(User, includes=FIELDS)
def get_form(request, class_form, user=None): def get_form(request, schema, user=None):
status_list = ( status_list = (
(1, _('Active')), (1, _('Active')),
(0, _('Inactive'))) (0, _('Inactive')))
group_list = get_group_list() group_list = get_group_list()
schema = class_form()
schema = schema.bind( schema = schema.bind(
status_list=status_list, group_list=group_list, user=user) status_list=status_list, group_list=group_list, user=user)
btn_save = Button('save', _('Save')) btn_save = Button('save', _('Save'))
...@@ -276,11 +213,21 @@ def reduce_member_count(gid): ...@@ -276,11 +213,21 @@ def reduce_member_count(gid):
DBSession.add(group) DBSession.add(group)
def clean_values(values, fnames):
d = dict()
for key in fnames:
if not isinstance(key, str):
key = key.name
d[key] = values[key]
d['email'] = values['email'].lower()
d['user_name'] = values['user_name'].lower()
return d
def insert(request, values): def insert(request, values):
user = User() d = clean_values(values, MY_FIELDS)
user.email = values['email'].lower() d['security_code_date'] = create_now()
user.user_name = values['user_name'].lower() user = User(**d)
user.security_code_date = create_now()
remain = regenerate_security_code(user) remain = regenerate_security_code(user)
DBSession.add(user) DBSession.add(user)
DBSession.flush() DBSession.flush()
...@@ -295,7 +242,7 @@ def insert(request, values): ...@@ -295,7 +242,7 @@ def insert(request, values):
route_name='user-add', renderer='templates/user/add.pt', route_name='user-add', renderer='templates/user/add.pt',
permission='user-edit') permission='user-edit')
def view_add(request): def view_add(request):
form = get_form(request, AddSchema) form = get_form(request, my_edit_schema)
resp = dict(title=_('Add user')) resp = dict(title=_('Add user'))
if not request.POST: if not request.POST:
resp['form'] = form.render() resp['form'] = form.render()
...@@ -333,11 +280,21 @@ def user_group_set(user): ...@@ -333,11 +280,21 @@ def user_group_set(user):
def update(request, user, values): def update(request, user, values):
fnames = ['email', 'user_name'] if user.id == request.user.id:
user.email = values['email'].lower() fnames = MY_FIELDS
user.user_name = values['user_name'].lower() else:
fnames = FIELDS
d = dict()
for key in fnames:
if not isinstance(key, str):
key = key.name
d[key] = values[key]
d['email'] = values['email'].lower()
d['user_name'] = values['user_name'].lower()
if user.id != request.user.id: if user.id != request.user.id:
user.status = values['status'] d['status'] = values['status']
del d['groups']
user.from_dict(d)
DBSession.add(user) DBSession.add(user)
existing = user_group_set(user) existing = user_group_set(user)
unused = existing - values['groups'] unused = existing - values['groups']
...@@ -363,14 +320,15 @@ def view_edit(request): ...@@ -363,14 +320,15 @@ def view_edit(request):
if not user: if not user:
return HTTPNotFound() return HTTPNotFound()
if user.id == request.user.id: if user.id == request.user.id:
form = get_form(request, MyEditSchema, user) schema = my_edit_schema
else: else:
form = get_form(request, EditSchema, user) schema = edit_schema
form = get_form(request, schema, user)
resp = dict(title=_('Edit user')) resp = dict(title=_('Edit user'))
if not request.POST: if not request.POST:
d = user.to_dict() d = user.to_dict()
d['groups'] = user_group_set(user) d['groups'] = user_group_set(user)
resp['form'] = form.render(appstruct=d) resp['form'] = form.render(appstruct=none_to_null(d))
return resp return resp
if 'save' not in request.POST: if 'save' not in request.POST:
return HTTPFound(location=request.route_url('user')) return HTTPFound(location=request.route_url('user'))
...@@ -380,7 +338,7 @@ def view_edit(request): ...@@ -380,7 +338,7 @@ def view_edit(request):
except ValidationFailure: except ValidationFailure:
resp['form'] = form.render() resp['form'] = form.render()
return resp return resp
update(request, user, dict(c.items())) update(request, user, null_to_none(dict(c.items())))
data = dict(username=user.user_name) data = dict(username=user.user_name)
ts = _('user-updated', default='${username} profile updated', mapping=data) ts = _('user-updated', default='${username} profile updated', mapping=data)
request.session.flash(ts) request.session.flash(ts)
......
import logging
from sqlalchemy import (
func,
cast,
Text,
)
from pyramid.view import view_config
from pyramid.i18n import TranslationStringFactory
from datatables import (
ColumnDT,
DataTables,
)
from ..models import DBSession
from ..models.ziggurat import User
log = logging.getLogger(__name__)
_ = TranslationStringFactory('user')
columns = [
ColumnDT(User.id),
ColumnDT(User.user_name),
ColumnDT(User.email),
ColumnDT(User.status),
ColumnDT(cast(User.last_login_date, Text))]
@view_config(
route_name='user-grid', renderer='templates/user/grid.pt',
permission='user-edit')
def view_list(request):
return dict(title=_('Users'))
@view_config(route_name='user-act', renderer='json', permission='user-edit')
def view_act(request):
if request.matchdict['act'] == 'grid':
log.debug(f'{request.path}')
q = DBSession.query().select_from(User)
dt = DataTables(request.GET, q, columns)
return dt.output_result()
import re
import colander
from pyramid.i18n import TranslationStringFactory
from ..models import DBSession
from ..models.ziggurat import User
_ = TranslationStringFactory('user')
class Validator:
def __init__(self, user):
self.user = user
class EmailValidator(colander.Email, Validator):
def __init__(self, user):
colander.Email.__init__(self)
Validator.__init__(self, user)
def __call__(self, node, value):
if self.match_object.match(value) is None:
raise colander.Invalid(node, _('Invalid email format'))
email = value.lower()
if self.user and self.user.email == email:
return
q = DBSession.query(User).filter_by(email=email)
found = q.first()
if not found:
return
data = dict(email=email, uid=found.id)
ts = _(
'email-already-used',
default='Email ${email} already used by user ID ${uid}',
mapping=data)
raise colander.Invalid(node, ts)
REGEX_ONLY_CONTAIN = re.compile('([a-z0-9-]*)')
REGEX_BEGIN_END_ALPHANUMERIC = re.compile('^[a-z0-9]+(?:[-][a-z0-9]+)*$')
class UsernameValidator(Validator):
def __call__(self, node, value):
username = value.lower()
if self.user and self.user.user_name == username:
return
match = REGEX_ONLY_CONTAIN.search(username)
if not match or match.group(1) != username or username != value:
ts = _(
'username-only-contain',
default='Only a-z, 0-9, and - characters are allowed')
raise colander.Invalid(node, ts)
match = REGEX_BEGIN_END_ALPHANUMERIC.search(username)
if not match:
ts = _(
'username-first-end-alphanumeric',
default='Only a-z or 0-9 at the start and end')
raise colander.Invalid(node, ts)
q = DBSession.query(User).filter_by(user_name=username)
found = q.first()
if not found:
return
data = dict(username=username, uid=found.id)
ts = _(
'username-already-used',
default='Username ${username} already used by ID ${uid}',
mapping=data)
raise colander.Invalid(node, ts)
@colander.deferred
def email_validator(node, kw):
return EmailValidator(kw['user'])
@colander.deferred
def username_validator(node, kw):
return UsernameValidator(kw['user'])
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!