Commit 1f158947 by aagusti

api

1 parent 1c9f3ec4
......@@ -66,29 +66,30 @@ titles = {}
# http://stackoverflow.com/questions/9845669/pyramid-inverse-to-add-notfound-viewappend-slash-true
class RemoveSlashNotFoundViewFactory(object):
def __init__(self, notfound_view=None):
if notfound_view is None:
notfound_view = default_exceptionresponse_view
self.notfound_view = notfound_view
def __call__(self, context, request):
if not isinstance(context, Exception):
# backwards compat for an append_notslash_view registered via
# config.set_notfound_view instead of as a proper exception view
context = getattr(request, 'exception', None) or context
path_req = request.path
registry = request.registry
mapper = registry.queryUtility(IRoutesMapper)
if mapper is not None and path_req.endswith('/'):
noslash_path = path_req.rstrip('/')
for route in mapper.get_routes():
if route.match(noslash_path) is not None:
qs = request.query_string
if qs:
noslash_path += '?' + qs
return HTTPFound(location=noslash_path)
return self.notfound_view(context, request)
# class RemoveSlashNotFoundViewFactory(object):
# diganti menggunakan @view_config(context=HTTPNotFound, renderer='templates/404.pt') pada base.views
# def __init__(self, notfound_view=None):
# if notfound_view is None:
# notfound_view = default_exceptionresponse_view
# self.notfound_view = notfound_view
#
# def __call__(self, context, request):
# if not isinstance(context, Exception):
# # backwards compat for an append_notslash_view registered via
# # config.set_notfound_view instead of as a proper exception view
# context = getattr(request, 'exception', None) or context
# path_req = request.path
# registry = request.registry
# mapper = registry.queryUtility(IRoutesMapper)
# if mapper is not None and path_req.endswith('/'):
# noslash_path = path_req.rstrip('/')
# for route in mapper.get_routes():
# if route.match(noslash_path) is not None:
# qs = request.query_string
# if qs:
# noslash_path += '?' + qs
# return HTTPFound(location=noslash_path)
# return self.notfound_view(context, request)
# https://groups.google.com/forum/#!topic/pylons-discuss/QIj4G82j04c
......@@ -312,11 +313,11 @@ def json_rpc():
return json_r
class MyAuthenticationPolicy(AuthTktAuthenticationPolicy):
def authenticated_userid(self, request):
user = request.user
if user is not None:
return user.id
# class MyAuthenticationPolicy(AuthTktAuthenticationPolicy):
# def authenticated_userid(self, request):
# user = request.user
# if user is not None:
# return user.id
def get_host(request):
......@@ -371,35 +372,21 @@ def main(global_config, **settings):
config = Configurator(settings=settings,
root_factory='opensipkd.base.models.RootFactory',
session_factory=session_factory)
from .models import RootFactory
modules = get_modules(settings)
# print(modules)
from importlib import import_module
for module in modules:
# compatibility
if module == 'admin':
continue
module = module.replace('/', '.')
mfile = module
print(">>Load Module:", mfile)
m = import_module(mfile)
cfg = m.main(config, **settings)
if cfg:
config = cfg
# todo apakah config bisa dikirim ke module?
# contoh:
# config = m.config(config)
# dipindahkan ke config pyramid.include
# config.include('pyramid_beaker')
# config.include('pyramid_chameleon')
# authn_policy = AuthTktAuthenticationPolicy(
# 'sosecret', callback=group_finder, hashalg='sha512')
#
# authz_policy = ACLAuthorizationPolicy()
config.set_security_policy(MySecurityPolicy(settings["session.secret"]))
# config.set_authentication_policy(authn_policy)
# config.set_security_policy(authz_policy)
# config.set_authorization_policy(authz_policy)
config.add_request_method(get_user, 'user', reify=True)
config.add_request_method(get_title, 'title', reify=True)
config.add_request_method(get_company, 'company', reify=True)
......@@ -421,70 +408,19 @@ def main(global_config, **settings):
config.add_request_method(allow_register, 'allow_register', reify=True)
config.add_request_method(disable_responsive, 'disable_responsive', reify=True)
config.add_request_method(get_params, 'get_params', reify=True)
# config.add_notfound_view(RemoveSlashNotFoundViewFactory())
config.add_static_view('static', 'opensipkd.base:static', cache_max_age=3600)
config.add_static_view('deform_static', 'deform:static')
# config.add_view('.views.api.echoGateway')
# config.add_static_view('files', get_params('static_files'))
# Captcha
captcha_files = get_params('captcha_files', settings=settings,alternate="/tmp/captcha")
captcha_files = get_params('captcha_files', settings=settings, alternate="/tmp/captcha")
if not os.path.exists(captcha_files):
os.makedirs(captcha_files)
config.add_static_view('captcha', captcha_files)
# config.add_static_view('tts', path=get_params('tts_files'))
config.add_static_view('captcha', captcha_files)
config.add_renderer('csv', 'opensipkd.tools.CSVRenderer')
config.add_renderer('json', json_renderer())
# dipindahkan ke config pyramid.include
# config.include('pyramid_rpc.jsonrpc')
config.add_renderer('json_rpc', json_rpc())
# q = DBSession.query(Route)
# for route in q:
# if route.type == 0:
# config.add_route(route.kode, route.path)
# if route.nama:
# titles[route.kode] = route.nama
# elif route.type == 1:
# config.add_jsonrpc_endpoint(route.kode, route.path,
# default_renderer="json_rpc")
set_routes(config)
###########################################
# MAP
# todo apabila config bosa di get dari module maka baris ini bisa hilang
# Sudah solve menggunakan includeme
###########################################
# if 'opensipkd.map.base' in modules:
# import papyrus
# from papyrus.renderers import GeoJSON, XSD
#
# config.add_request_method(get_gmap_key, 'gmap_key', reify=True)
# config.add_request_method(get_bing_key, 'bing_key', reify=True)
# config.add_request_method(get_extent, 'extent', reify=True)
#
# config.include(papyrus.includeme)
# config.add_renderer('geojson', GeoJSON())
# config.add_renderer('xsd', XSD())
# config.add_static_view('static_map', 'opensipkd.map.base:static', cache_max_age=3600)
# if 'opensipkd.map.aset' in modules:
# config.add_static_view('static_map_aset', 'opensipkd.map.aset:static', cache_max_age=3600)
#
# # if 'opensipkd.map.pbb' in modules:
# # config.add_static_view('static_map_pbb', 'opensipkd.map.pbb:static', cache_max_age=3600)
#
# if 'opensipkd.pasar.web' in modules:
# config.add_static_view('static_pasar', 'opensipkd.pasar.web:static', cache_max_age=3600)
#
# if 'opensipkd.pbb.master' in modules:
# config.add_static_view('static_pbb', 'opensipkd.pbb.master:static', cache_max_age=3600)
# if 'opensipkd.pos.pbb' in modules:
# config.add_static_view('static_pospbb', 'opensipkd.pos.pbb:static', cache_max_age=3600)
config.registry['mailer'] = mailer_factory_from_settings(settings)
# config.include()
config.scan()
for m in modules:
config.scan(m)
......
"""penambahan user device expired
Revision ID: 8e7057155823
Revises: 86c1b4a1da16
Create Date: 2022-07-08 14:58:39.378811
"""
# revision identifiers, used by Alembic.
revision = '8e7057155823'
down_revision = '86c1b4a1da16'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
def upgrade():
context = op.get_context()
helpers = context.opts['helpers']
if not helpers.table_has_column('user_device', 'expired'):
op.add_column('user_device', sa.Column('expired', sa.DateTime(timezone=True)))
op.alter_column('user_device', sa.Column('expired', sa.DateTime(timezone=True)))
if not helpers.table_has_column('routes', 'create_uid'):
op.add_column('routes', sa.Column('create_uid', sa.Integer, default=0))
if not helpers.table_has_column('routes', 'update_uid'):
op.add_column('routes', sa.Column('update_uid', sa.Integer, default=0))
def downgrade():
pass
......@@ -38,8 +38,9 @@ class UserDeviceModel(Base, KodeModel):
kode = Column(String(256))
token = Column(String(256))
logged_in = Column(Integer)
las_login_date = Column(DateTime)
las_login_date = Column(DateTime(timezone=True))
expired = Column(DateTime(timezone=True))
user = relationship(User, backref=backref("devices"))
class ResCompany(Base, NamaModel):
__tablename__ = 'company'
......
......@@ -258,14 +258,16 @@ def alembic_run(ini_file, name=None):
if subprocess.call(command) != 0:
sys.exit()
def base_alembic_run(ini_file, name=None):
def alembic_run(ini_file, name='alembic_base'):
bin_path = os.path.split(sys.executable)[0]
alembic_bin = os.path.join(bin_path, 'alembic')
command = (alembic_bin, '-c', ini_file, '-n', 'alembic_base', 'upgrade', 'head')
command = (alembic_bin, '-c', ini_file, '-n', name, 'upgrade', 'head')
if subprocess.call(command) != 0:
sys.exit()
def base_alembic_run(ini_file):
alembic_run(ini_file)
def main(argv=sys.argv):
if len(argv) < 2:
......
import logging
from opensipkd.base.tools.api import rpc_auth
from opensipkd.tools.api import JsonRpcInvalidLoginError
from pyramid.renderers import render_to_response
from .models import (
User,
UserGroup,
......@@ -35,6 +40,7 @@ def get_user(request):
q = DBSession.query(User).filter_by(id=user_id)
return q.first()
# def get_user(request):
# user_id = request.unauthenticated_userid
# if user_id is not None:
......@@ -45,21 +51,18 @@ def get_user(request):
from pyramid.authentication import AuthTktCookieHelper
from pyramid.authorization import ACLHelper, Authenticated, Everyone
class MySecurityPolicy:
def __init__(self, secret):
self.helper = AuthTktCookieHelper(secret)
def identity(self, request):
# define our simple identity as None or a dict with userid and principals keys
identity = self.helper.identify(request)
if identity is None:
return None
userid = identity['userid'] # identical to the deprecated request.unauthenticated_userid
# verify the userid, just like we did before with groupfinder
userid = identity['userid']
principals = group_finder(userid, request)
# assuming the userid is valid, return a map with userid and principals
if principals is not None:
return {
'userid': userid,
......@@ -67,18 +70,14 @@ class MySecurityPolicy:
}
def authenticated_userid(self, request):
# defer to the identity logic to determine if the user id logged in
# and return None if they are not
identity = request.identity
if identity is not None:
return identity['userid']
def permits(self, request, context, permission):
# use the identity to build a list of principals, and pass them
# to the ACLHelper to determine allowed/denied
identity = request.identity
principals = set([Everyone])
if identity is not None:
principals.add(Authenticated)
principals.add(identity['userid'])
......@@ -90,4 +89,3 @@ class MySecurityPolicy:
def forget(self, request, **kw):
return self.helper.forget(request, **kw)
from .. import log
import logging
log=logging.getLogger(__name__)
log.warning("opensipkd.base.tools depreciated use opensipkd.tools")
from opensipkd.tools import *
import json
from datetime import timedelta, timezone, tzinfo
import requests
from opensipkd.tools import (
get_random_number, devel, get_random_string, get_settings)
get_random_number, devel, get_random_string, get_settings, DefaultTimeZone, get_params, get_timezone)
from opensipkd.tools.api import *
from .. import log
from ..models import (DBSession, User, GroupPermission, UserDeviceModel)
import logging
lima_menit = 300
log = logging.getLogger(__name__)
lima_menit = 300
#
def auth_from_rpc(request):
return auth_from(request)
def rpc_auth(request):
return auth_from(request)
def auth_from(request, field=None):
global lima_menit
......@@ -43,27 +50,44 @@ def auth_from(request, field=None):
return user
def auth_from_token(request):
return auth_from(request, "security_code")
def renew_token(user_device):
user_device.token = get_random_string(32)
DBSession.add(user_device)
DBSession.flush()
return user_device
def get_user_device(request, user):
# def auth_from_token(request):
# return auth_from(request, "security_code")
#
# def renew_token(user_device, logout=False):
# now = datetime.now(tz=get_timezone())
# tte = timedelta(minutes=10)
# if not user_device.expired or not user_device.token or \
# now - user_device.expired > tte:
# user_device.expired = now
# user_device.token = get_random_string(128)
# if logout:
# user_device.token=""
# DBSession.add(user_device)
# DBSession.flush()
# return user_device
# def token_auth(request, logout=False):
# if not request.environ["HTTP_TOKEN"]:
# raise JsonRpcInvalidLoginError
# user_device = UserDeviceModel.query() \
# .filter_by(kode=request.environ["HTTP_USER_AGENT"],
# token=request.environ["HTTP_TOKEN"]).first()
# if not user_device:
# raise JsonRpcInvalidLoginError
#
# return renew_token(user_device, logout=logout)
def get_user_device(request, user_id):
user_device = UserDeviceModel.query() \
.filter_by(user_id=user.id,
.filter_by(user_id=user_id,
kode=request.environ["HTTP_USER_AGENT"]).first()
if not user_device:
user_device = UserDeviceModel()
user_device.user_id = user.id
user_device.user_id = user_id
user_device.kode = request.environ["HTTP_USER_AGENT"]
user_device.token = get_random_string(32)
DBSession.add(user_device)
DBSession.flush()
# user_device = renew_token(user_device)
return user_device
......@@ -82,6 +106,7 @@ def validate_time(request):
return time_stamp
def auth_device(request):
env = request.environ
log.info(env)
......
......@@ -4,15 +4,20 @@ from datetime import timedelta
import colander
from deform import (
Form, ValidationFailure, widget, Button, )
from opensipkd.tools.api import JsonRpcInvalidLoginError
from pyramid.httpexceptions import (
HTTPFound, HTTPForbidden, HTTPNotFound, HTTPInternalServerError,
HTTPSeeOther)
from pyramid.i18n import TranslationStringFactory
from pyramid.interfaces import IRoutesMapper
from pyramid.renderers import render_to_response
from pyramid.response import Response
from pyramid.security import remember
from pyramid.view import view_config
from opensipkd.base import get_params
from opensipkd.base.tools.api import rpc_auth
from .base_views import BaseView
from ..models import (
DBSession, UserService, )
......@@ -74,18 +79,11 @@ class Home(BaseView):
@view_config(context=HTTPForbidden, renderer='templates/403.pt')
def http_forbidden(request):
# if request.authenticated_userid: # (request):
# request.session.flash('Hak Akses Terbatas', 'error')
# return HTTPFound(location=request.route_url('home'))
# return HTTPFound(location=request.route_url('login'))
if not request.is_authenticated:
next_url = request.route_url('login', _query={'next': request.url})
# next_url = f'{get_params("_host").strip()}/{next_url}'
# log.info(next_url)
return HTTPSeeOther(location=next_url)
request.response.status = 403
request.response.status = 403
return {"url": request.url}
......
......@@ -6,35 +6,7 @@ from ziggurat_foundations.models.services.user import UserService
from ..models import Partner, User
def get_profile_(user):
partner = Partner.query().filter_by(email=user.email).first()
if not partner:
raise JsonRpcInvalidDataError
result = dict(user_name=user.user_name,
nik=partner.kode,
email=partner.email,
mobile=partner.mobile,
nama=partner.nama, )
return dict(data=result)
@jsonrpc_method(method='get_profile', endpoint='rpc-user')
def get_profile(request, data):
"""
Digunakan untuk memperoleh profile user yang sedang login
parameter
@param request: Request
@param data: Dict(user_name=user_name/email, password=password)
@return:
"""
user = request.user
print("User", user)
data = type(data) == list and data[0] or data
user = User.get_by_identity(data["user_name"])
if not user or not UserService.check_password(user, data['password']):
raise JsonRpcInvalidLoginError
return get_profile_(user)
# services = {
......
......@@ -12,12 +12,13 @@ class NamaSchema(colander.Schema):
validator=colander.Length(max=32),
oid="kode",
title="Kode",
width="100pt")
width="100pt")
nama = colander.SchemaNode(
colander.String(),
validator=colander.Length(max=64),
oid="nama")
class PartnerSchema(NamaSchema):
alamat_1 = colander.SchemaNode(
colander.String(),
......
<html metal:use-macro="load: ./base3.1.pt">
<html metal:use-macro="load: ./base.pt">
<js metal:fill-slot="js_files">
<script src="${home}/static/v3/js/plugin/datatables/jquery.dataTables.min.js"></script>
<script src="${home}/static/v3/js/plugin/datatables/jquery.dataTables.min.js"></script>
<script src="${home}/static/v3/js/plugin/datatables/dataTables.colVis.min.js"></script>
<script src="${home}/static/v3/js/plugin/datatables/dataTables.tableTools.min.js"></script>
<script src="${home}/static/v3/js/plugin/datatables/dataTables.bootstrap.min.js"></script>
<script src="${home}/static/v3/js/plugin/datatable-responsive/datatables.responsive.min.js"></script>
</js>
</html>
</html>
\ No newline at end of file
......@@ -94,8 +94,8 @@ class Views(BaseView):
base_path=base_path)
filename = os.path.basename(filename)
resp = pdf_response(self.req, pdf, filename)
if resp.content_length<10:
resp.content_length=len(resp.body)
if resp.content_length < 10:
resp.content_length = len(resp.body)
return resp
return super(Views, self).view_act()
......@@ -238,21 +238,22 @@ class EmailValidator(colander.Email, Validator):
Validator.__init__(self, user)
def __call__(self, node, value):
def email_found():
data = dict(email=email, uid=found.id)
ts = _(
'email-already-used',
default='Email ${email} already used by user ID ${uid}',
mapping=data)
raise colander.Invalid(node, ts)
if self.match_object.match(value) is None:
raise colander.Invalid(node, _('Invalid email format'))
email = value.lower()
if self.user and self.user.email == email:
return
q = DBSession.query(User).filter_by(email=email)
found = q.first()
if not found:
return
data = dict(email=email, uid=found.id)
ts = _(
'email-already-used',
default='Email ${email} already used by user ID ${uid}',
mapping=data)
raise colander.Invalid(node, ts)
if found and (not self.user or self.user.email!=found.email):
email_found()
REGEX_ONLY_CONTAIN = re.compile('([A-Za-z0-9-]*)')
......
......@@ -29,13 +29,14 @@ from pyramid.httpexceptions import HTTPFound, HTTPNotFound
from pyramid.renderers import render_to_response
from pyramid.security import remember, forget
from pyramid.view import view_config
from ziggurat_foundations.models.services.external_identity import ExternalIdentityService
from ziggurat_foundations.models.services.external_identity import \
ExternalIdentityService
from ziggurat_foundations.models.services.user import UserService
from opensipkd.base import DBSession, get_params
from opensipkd.base.models import User, ExternalIdentity
from opensipkd.tools import create_now, set_user_log, get_settings
from opensipkd.base.views import _, one_hour, two_minutes
from opensipkd.base.views import _, one_hour, two_minutes, BaseView
from pyramid_mailer.message import Message
log = __import__("logging").getLogger(__name__)
......@@ -60,121 +61,120 @@ def get_login_headers(request, user):
return headers
@view_config(route_name='login', renderer='templates/login.pt')
def view_login(request):
request.session["login"]=True
next_url = request.params.get('next', request.referrer)
login_tpl = get_params('login_tpl', 'templates/login.pt')
if not next_url:
next_url = request.route_url('home') # get_params('_host')+
if request.authenticated_userid: # (request):
request.session.flash('Anda sudah login', 'error')
return HTTPFound(location=f"{request.route_url('home')}")
schema = Login(validator=login_validator)
form = Form(schema, buttons=('login',))
message = ""
if 'login' in request.POST:
identity = request.POST.get('username')
user = schema.user = User.get_by_identity(identity)
controls = request.POST.items()
try:
c = form.validate(controls)
except ValidationFailure as e:
msg = 'Login gagal'
set_user_log(msg, request, log, identity)
request.session.flash(msg, 'error')
return HTTPFound(location=request.route_url('login'))
values = dict(c)
# start cek external module
pckgs = get_params('external-uim')
if user:
external_user = DBSession.query(ExternalIdentity) \
.filter_by(local_user_id=user.id,
external_user_name=identity).first()
pckgs = external_user and pckgs or None
if pckgs:
# user_name = user and user.user_name or ""
m = import_module(pckgs)
class Views(BaseView):
@view_config(route_name='login', renderer='templates/login.pt')
def view_login(self):
request = self.req
request.session["login"] = True
next_url = request.params.get('next', request.referrer)
login_tpl = get_params('login_tpl', 'templates/login.pt')
if not next_url:
next_url = request.route_url('home')
if request.authenticated_userid: # (request):
request.session.flash('Anda sudah login', 'error')
return HTTPFound(location=f"{request.route_url('home')}")
schema = Login(validator=login_validator)
form = Form(schema, buttons=('login',))
message = ""
if 'login' in request.POST:
identity = request.POST.get('username')
user = schema.user = User.get_by_identity(identity)
controls = request.POST.items()
try:
user = m.login(identity, values['password'], user)
except Exception as e:
log.warn(str(e))
request.session.flash(str(e), "error")
return HTTPFound(location=request.route_url('login'))
else:
if not user or not UserService.check_password(user, values['password']):
msg = "Login Gagal"
c = form.validate(controls)
except ValidationFailure as e:
msg = 'Login gagal'
set_user_log(msg, request, log, identity)
request.session.flash(msg, "error")
next_url = f"{request.route_url('login')}?next={next_url}"
return HTTPFound(location=next_url)
return redirect_login(request, user)
request.session.flash(msg, 'error')
return HTTPFound(location=request.route_url('login'))
elif 'register' in request.POST:
register_form = get_params("register_form", 'register')
return HTTPFound(location=request.route_url(register_form))
values = dict(c)
# start cek external module
pckgs = get_params('external-uim')
if user:
external_user = DBSession.query(ExternalIdentity) \
.filter_by(local_user_id=user.id,
external_user_name=identity).first()
pckgs = external_user and pckgs or None
if pckgs:
# user_name = user and user.user_name or ""
m = import_module(pckgs)
try:
user = m.login(identity, values['password'], user)
except Exception as e:
log.warn(str(e))
request.session.flash(str(e), "error")
return HTTPFound(location=request.route_url('login'))
else:
if not user or not UserService.check_password(user, values[
'password']):
msg = "Login Gagal"
set_user_log(msg, request, log, identity)
request.session.flash(msg, "error")
next_url = f"{request.route_url('login')}?next={next_url}"
return HTTPFound(location=next_url)
elif 'login failed' in request.session:
r = dict(form=request.session['login failed'])
del request.session['login failed']
return r
return redirect_login(request, user)
elif "provider_name" in request.params and request.params["provider_name"]:
provider_name = request.params["provider_name"]
if provider_name == "google":
from .base_google import googlesignin
try:
id_info = googlesignin(request)
except Exception as e:
login = ""
request.session.flash(str(e), "error")
return render_to_response(login_tpl,
dict(form=form.render(),
message=message,
url=request.route_url('login'),
next_url=next_url,
login=login, ),
request=request)
request.session["id_info"] = id_info
else:
id_info = None
user = id_info and ExternalIdentityService. \
user_by_external_id_and_provider(id_info['sub'], id_info['iss'])
if id_info and not user:
request.session.flash('Silahkan Melakukan Registrasi')
elif 'register' in request.POST:
register_form = get_params("register_form", 'register')
return HTTPFound(location=request.route_url(register_form))
if user and user.status == 1:
return redirect_login(request, user)
else:
message = "User anda masih menunggu verifikasi atau lagi di blokir"
request.session.flash(message, "error")
# if "g_state" in request.cookies:
# requests.post("https://accounts.google.com/o/oauth2/revoke?token=" + ACCESS_TOKEN);
# headers = forget(request)
# request.session.delete()
# request.session["start"]="login"
login=""
return render_to_response(login_tpl,
dict(form=form.render(),
message=message,
url=request.route_url('login'),
next_url=next_url,
login=login, ),
request=request)
# return dict(
# )
elif 'login failed' in request.session:
r = dict(form=request.session['login failed'])
del request.session['login failed']
return r
elif "provider_name" in request.params and request.params[
"provider_name"]:
provider_name = request.params["provider_name"]
if provider_name == "google":
from .base_google import googlesignin
try:
id_info = googlesignin(request)
except Exception as e:
login = ""
request.session.flash(str(e), "error")
return render_to_response(login_tpl,
dict(form=form.render(),
message=message,
url=request.route_url(
'login'),
next_url=next_url,
login=login, ),
request=request)
request.session["id_info"] = id_info
else:
id_info = None
user = id_info and ExternalIdentityService. \
user_by_external_id_and_provider(id_info['sub'], id_info['iss'])
if id_info and not user:
request.session.flash('Silahkan Melakukan Registrasi')
register_form = get_params("register_form", 'register')
return HTTPFound(location=request.route_url(register_form))
if user and user.status == 1:
return redirect_login(request, user)
else:
message = "User anda masih menunggu verifikasi atau lagi di blokir"
request.session.flash(message, "error")
login = ""
return render_to_response(
renderer_name=login_tpl,
request=request,
value=dict(form=form.render(),
message=message,
url=request.route_url('login'),
next_url=next_url,
login=login, ),
)
def redirect_login(request, user):
......@@ -200,7 +200,12 @@ def view_logout(request):
request.session.delete()
if "g_state" in request.cookies:
del request.cookies["g_state"]
return HTTPFound(location=f"{request.route_url('home')}",
# if "g_state" in request.cookies:
# requests.post("https://accounts.google.com/o/oauth2/revoke?token=" + ACCESS_TOKEN);
# headers = forget(request)
# request.session.delete()
# request.session["start"]="login"
return HTTPFound(location=request.route_url('home'),
headers=headers)
return dict()
......@@ -210,14 +215,25 @@ class ChangePassword(colander.Schema):
colander.String(), widget=widget.PasswordWidget())
retype_password = colander.SchemaNode(
colander.String(), widget=widget.PasswordWidget())
password = colander.SchemaNode(colander.String(),
title=_("Old Password"))
def change_password_validator(form, value):
exc = colander.Invalid(form, '')
user = form.request.user
if not UserService.check_password(user, value["password"]):
exc["password"] = 'Login Failed'
raise exc
if value['new_password'] != value['retype_password']:
raise colander.Invalid(form, 'Retype mismatch.')
exc["new_password"] = 'Retype mismatch.'
exc["retype_password"] = 'Retype mismatch.'
raise exc
@view_config(route_name='change-password', renderer='templates/change-password.pt')
@view_config(route_name='change-password',
renderer='templates/change-password.pt')
def view_change_password(request):
if request.authenticated_userid:
request.session.flash('Anda sudah login', 'error')
......@@ -265,7 +281,8 @@ def generate_api_key():
@view_config(
route_name='recreate-api-key', renderer='templates/recreate-api-key.pt', permission='view')
route_name='recreate-api-key', renderer='templates/recreate-api-key.pt',
permission='view')
def view_recreate_api_key(request):
if not request.user.api_key:
return HTTPNotFound()
......@@ -374,7 +391,8 @@ def regenerate_security_code(user):
return one_hour
@view_config(route_name='reset-password', renderer='templates/reset-password.pt')
@view_config(route_name='reset-password',
renderer='templates/reset-password.pt')
def view_reset_password(request):
if request.authenticated_userid:
return HTTPFound(location=f"{request.route_url('home')}")
......@@ -394,7 +412,8 @@ def view_reset_password(request):
resp['form'] = form.render()
return resp
remain = regenerate_security_code(user)
set_user_log("Reset password to {}".format(user.email), request, log, user.user_name)
set_user_log("Reset password to {}".format(user.email), request, log,
user.user_name)
send_email_security_code(
request, user, remain, 'Reset password', 'reset-password-body',
'reset-password-body.tpl')
......@@ -404,6 +423,7 @@ def view_reset_password(request):
@view_config(
route_name='reset-password-sent', renderer='templates/reset-password-sent.pt')
route_name='reset-password-sent',
renderer='templates/reset-password-sent.pt')
def view_reset_password_sent(request):
return dict(title=_('Reset password'))
import re
from datetime import datetime
from email.utils import parseaddr
import transaction
##################
# RPC USER
##################
import colander
from deform import Form, ValidationFailure
from opensipkd.base.tools.api import (
get_user_device)
from opensipkd.base.views.partner_base import NamaSchema
from opensipkd.jsonrpc_auth import JsonRpcInvalidLogin
from opensipkd.tools import create_now
from opensipkd.tools.api import (
JsonRpcInvalidLoginError, JsonRpcInvalidDataError,
JsonRpcUserNotFoundError)
from pyramid.i18n import TranslationStringFactory
from pyramid.security import remember, forget
from pyramid_rpc.jsonrpc import jsonrpc_method
from opensipkd.tools.api import (
JsonRpcInvalidLoginError, JsonRpcInvalidNikError, JsonRpcInvalidMobileError,
JsonRpcInvalidEmailError, JsonRpcUserFoundError, JsonRpcEmailFoundError,
JsonRpcNikFoundError, JsonRpcRegisterFailError, JsonRpcInvalidDataError,
JsonRpcUserNotFoundError, JsonRpcProfileFailError, JsonRpcMobileFoundError)
from .base_google import googlesignin
from .user import add_member_count
from .user import save_user
from .user import EmailValidator as EmailValidatorBase
from .user_group import save as save_groups
from .. import get_params, log
from .user_login import (ChangePassword, change_password_validator,
regenerate_security_code, send_email_security_code)
from ..models import DBSession, UserService, Departemen
from ..models import (User, Partner, Group, UserGroup, PartnerDepartemen)
from opensipkd.tools import create_now, get_settings
from opensipkd.tools.api import custom_error
from opensipkd.base.tools.api import (
auth_from_rpc, check_token_rpc, update_token)
#todo save_partner
# from ..views.partner import save as save_partner
def insert_user(request, values):
q = DBSession.query(Group).filter(Group.group_name.ilike('web service'))
ws_group = q.first()
user = User()
user.email = values['email'].lower()
user.user_name = values['user_name'].lower()
user.password = values['password']
user.status = 1
DBSession.add(user)
DBSession.flush()
ug = UserGroup(user_id=user.id, group_id=ws_group.id)
DBSession.add(ug)
add_member_count(ws_group.id)
return user
_ = TranslationStringFactory('user')
def get_user(data):
user = 'user_name' in data and User.get_by_identity(data['user_name']) or None
user = 'user_name' in data and User.get_by_identity(
data['user_name']) or None
if not user:
user = 'email' in data and User.get_by_identity(data['email']) or None
if not user:
log.info("Get User Rpc Not Found")
raise JsonRpcUserNotFoundError
return user
def get_user_token(row):
user = 'token' in row and User.get_by_token(row['token']).first()
return user
# def validasi_nik(nik):
# return partner.query().filter_by(kode=nik).first()
def validasi_email(value):
name, email = parseaddr(value)
if not email or email.find('@') < 0:
return
return True
def validasi_user(data, row=None):
user = User.get_by_identity(data['user_name'])
if (not row and user) or (row and user and row.id != user.id):
log.info("Validasi User Found")
raise JsonRpcUserFoundError
user = User.get_by_identity(data['email'])
if (not row and user) or (row and user and row.id != user.id):
log.info("Validasi Email Found")
raise JsonRpcEmailFoundError
def validasi_partner(data, row=None):
# Cek di partner
partner = Partner.query_email(data['email']).first()
if (not row and partner) or (row and partner and row.id != partner.id):
log.info("Validasi Email Found")
raise JsonRpcEmailFoundError
partner = Partner.query_kode(data['nik']).first()
if (not row and partner) or (row and partner and row.id != partner.id):
log.info("Validasi NIK Found")
raise JsonRpcNikFoundError
partner = Partner.query().filter_by(mobile=data['mobile']).first()
if (not row and partner) or (row and partner and row.id != partner.id):
log.info("Validasi Mobile Found")
raise JsonRpcMobileFoundError
def validasi_data(dat):
nik = ""
if 'nik' in dat:
nik = re.sub('\D', '', dat['nik'])
if len(nik) != 16:
log.info("Validasi NIK Error")
raise JsonRpcInvalidNikError
mobile = re.sub('\D', '', dat['mobile'])
if len(mobile) < 9:
log.info("Validasi Mobile Error")
raise JsonRpcInvalidMobileError
email = dat['email']
if not validasi_email(email):
log.info("Validasi Email")
raise JsonRpcInvalidEmailError
dat['nik'] = nik and nik or mobile
dat['mobile'] = mobile
dat['email'] = email
return dat
def register_user_(data, user, groups=None):
is_list = isinstance(data, list)
if is_list:
data_list = data
else:
data_list = [data]
result = []
for data in data_list:
if not ('user_name' in data and 'password' in data and 'email' in data
and 'nama' in data and 'mobile' in data):
raise JsonRpcInvalidDataError
data = validasi_data(data)
validasi_user(data)
data['status'] = 1
row = save_user(data, user=user)
if not row:
raise JsonRpcRegisterFailError
# proses ke tabel partner
kode = 'nik' in data and data['nik'] or ""
if not kode:
kode = 'kode' in data and data['kode'] or ""
if not kode:
kode = 'email' in data and data['email'] or ""
data['kode'] = kode
data['user_id'] = row.id
data['is_customer'] = 1
user = row if not user else user
validasi_partner(data)
partner = save_partner(data, user)
if not partner:
transaction.abort()
raise JsonRpcRegisterFailError
##Untuk SIMKEL##
settings = get_settings()
default_group = get_params("default_group")
if default_group:
groups = settings['default_group'].split(',')
for group in groups:
group_data = Group.query_group_name(group).first()
if not group_data:
raise custom_error(-1, "Group Not Found.")
data['group_id'] = group_data.id
data['user_id'] = row.id
save_groups(data, None)
if not groups:
raise Exception("Groups Kosong")
ret_groups = []
if groups:
for group in groups.split(','):
group_data = Group.query_group_name(group).first()
if not group_data:
print(group)
raise Exception("Groups Data Kosong")
if group_data:
data['group_id'] = group_data.id
data['user_id'] = row.id
row = save_groups(data, None)
ret_groups.append(dict(group_name=group))
del data['group_id']
# del data['user_id']
data['groups']=ret_groups
result.append(data)
if not is_list:
result = result[0]
return result
# url /rpc/user , permission='web-service'
@jsonrpc_method(method='register', endpoint='rpc-user')
def register_user(request, data, groups=''):
# Digunakan untuk registrasi user via aplikasi lain
# parameter user_name, password, email, nama, mobile, nik
user = auth_from_rpc(request)
result = register_user_(data, user, groups)
return dict(message="Sukses Register User", data=result)
# 3 , permission='web-service'
def login_(request, data):
is_list = type(data) is list
data = is_list and data[0] or data
row = get_user(data)
if not row:
user = get_user(data)
if not user:
if 'external' in data:
from .base_google import googlesignin
row = googlesignin(request)
if not row:
user = googlesignin(request)
if not user:
result = dict(next="complete_user",
message="Silahkan Melakukan Registrasi")
raise JsonRpcInvalidLoginError(data=result)
raise JsonRpcInvalidLoginError
if not UserService.check_password(row, data['password']):
raise JsonRpcInvalidLoginError
row.last_login_date = create_now()
DBSession.add(row)
DBSession.flush()
partner = Partner.query().filter_by(user_id=row.id).first()
if not partner and row.id > 1:
raise JsonRpcInvalidDataError(message="Silahkan melengkapi Registrasi")
result = None
if row.id == 1:
is_pegawai = 1
else:
raise JsonRpcInvalidLoginError
else:
is_pegawai = partner and not partner.is_customer and not partner.is_vendor and 1 or 0
if not UserService.check_password(user, data['password']):
raise JsonRpcInvalidLoginError
##RETURNING GROUP##
user.last_login_date = create_now()
DBSession.add(user)
DBSession.flush()
headers = remember(request, user.id)
request.headers.update(headers)
response = request.response
response.headers.update(headers)
groups = DBSession.query(Group). \
join(UserGroup, UserGroup.group_id == Group.id). \
filter(UserGroup.user_id == row.id).all()
filter(UserGroup.user_id == user.id).all()
group_data = []
for group in groups:
group = group.to_dict()
group_data.append(dict(group_name=group['group_name']))
now = datetime.now().date()
partner_dep = Departemen.query() \
.join(PartnerDepartemen, Departemen.id == PartnerDepartemen.departemen_id) \
.join(Partner, Partner.id == PartnerDepartemen.partner_id) \
.filter(Partner.email == row.email,
PartnerDepartemen.mulai <= now,
PartnerDepartemen.selesai >= now).first()
if partner_dep:
departemen = dict(id=partner_dep.id,
kode=partner_dep.kode,
nama=partner_dep.nama)
else:
departemen = None
result = dict(user_name=row.user_name,
token=row.security_code,
nik=partner and partner.kode or '',
nama=partner and partner.nama or '',
is_pegawai=is_pegawai,
group=group_data,
departemens=departemen)
result = is_list and [result] or result
return dict(data=result)
now = datetime.now()
partner = Partner.query().filter_by(email=user.email).first()
departemen = None
if partner:
partner_dep = Departemen.query() \
.join(PartnerDepartemen,
Departemen.id == PartnerDepartemen.departemen_id) \
.join(Partner, Partner.id == PartnerDepartemen.partner_id) \
.filter(Partner.email == user.email,
PartnerDepartemen.mulai <= now,
PartnerDepartemen.selesai >= now).first()
if partner_dep:
departemen = dict(id=partner_dep.id,
kode=partner_dep.kode,
nama=partner_dep.nama)
return dict(id=user.id,
user_name=user.user_name,
nik=partner and partner.kode or '',
nama=partner and partner.nama or '',
group=group_data,
departemens=departemen)
@jsonrpc_method(method='login', endpoint='rpc-user')
def login(request, data):
# Digunakan untuk login dari aplikasi lain
# parameter user_name/email, user_password
ws_user = auth_from_rpc(request)
return login_(request, data)
# , permission='web-service'
def set_profile_(request, data):
"""
Digunakan untuk login pada aplikasi lain
:param request:
:param data:
{
user_name:
password:
device_id:
}
:return:{
"user_name": user_name,
"token": token,
"nik": nik,
"nama": nik,
"group": [group],
"departemens": departemen
}
"""
is_list = type(data) is list
data = is_list and data[0] or data
if not ('user_name' in data and 'password' in data and 'email' in data
and 'nama' in data and 'mobile' in data and 'nik' in data):
raise JsonRpcInvalidDataError
data = validasi_data(data)
if 'external' in data:
user = googlesignin(request)
if not user:
raise JsonRpcInvalidLoginError
resp = login_(request, data)
resp["token"] = get_user_device(request, resp["id"]).token
result = is_list and [resp] or resp
return result
@jsonrpc_method(method='logout', endpoint='rpc-user', permission="view")
def logout(request, data):
"""
Digunakan untuk login pada aplikasi lain
:param request:
:param data:
{
}
:return:{
}
"""
# token_auth(request, logout=True)
headers = forget(request)
request.session.delete()
request.response.headers.update(headers)
return dict(data="")
def get_profile_(user):
partner = Partner.query().filter_by(email=user.email).first()
if not partner:
return dict(user_name=user.user_name,
nik="",
email="",
mobile="",
nama="", )
return dict(user_name=user.user_name,
nik=partner.kode,
email=partner.email,
mobile=partner.mobile,
nama=partner.nama, )
@jsonrpc_method(method='get-profile', endpoint='rpc-user', permission="view")
@jsonrpc_method(method='get_profile', endpoint='rpc-user', permission="view")
def get_profile(request, data):
"""
Digunakan untuk memperoleh profile user yang sedang login
parameter
@param request: Request
@param data: Dict(password=password)
@return:
"""
user = request.user
is_list = type(data) == list
data = is_list and data[0] or data
print(data)
if not user or not UserService.check_password(user, data['password']):
raise JsonRpcInvalidLoginError
resp = get_profile_(user)
resp = is_list and [resp] or resp
return resp
class EmailValidator(EmailValidatorBase):
def __call__(self, node, value):
def email_found_partner():
data = dict(email=email, uid=found.id)
ts = _(
'email-already-used',
default='Email ${email} already used by partner ID ${uid}',
mapping=data)
raise colander.Invalid(node, ts)
super().__call__(node, value)
email = value.lower()
q = DBSession.query(Partner).filter_by(email=email)
found = q.first()
if found and (not self.user or self.user.email != found.email):
email_found_partner()
@colander.deferred
def email_validator(node, kw):
return EmailValidator(kw['user'])
class PartnerSchema(NamaSchema):
email = colander.SchemaNode(
colander.String(),
validator=email_validator
)
mobile = colander.SchemaNode(
colander.String()
)
def form_validator(form, values):
exc = colander.Invalid(form, "")
user = form.request.user
if user:
values["update_uid"] = user.id
values["updated"] = datetime.now()
else:
user = get_user(data)
if not UserService.check_password(user, data['password']):
raise JsonRpcInvalidLoginError
values["create_uid"] = user.id
values["created"] = datetime.now()
# if not user or UserService.check_password(user, data['password']):
# raise JsonRpcInvalidLoginError
values["is_customer"] = "is_customer" in values and values[
"is_customer"] or 1
values["is_vendor"] = "is_vendor" in values and values["is_vendor"] or 0
mobile = values["mobile"]
partner = Partner.query().filter_by(mobile=mobile).first()
if partner:
if not user or user and user.email != partner.email:
exc["mobile"] = "No Handphone sudah ada yang menggunakan"
raise exc
validasi_user(data, user)
partner = Partner.query().filter_by(user_id=user.id).first()
validasi_partner(data, partner)
if 'new_password' in data:
data['password'] = data['new_password']
def set_profile_(request, data):
schema = PartnerSchema(validator=form_validator)
schema = schema.bind(request=request, user=request.user)
schema.request = request
form = Form(schema)
data["kode"] = data["nik"]
controls = ((k, v) for k, v in data.items())
try:
controls = form.validate(controls)
except ValidationFailure as e:
print(e.error, type(e.error))
raise JsonRpcInvalidDataError(data=e.error.asdict())
values = dict(controls)
partner = Partner.query().filter_by(email=values["email"]).first()
if not partner:
partner = Partner()
partner.from_dict(values)
DBSession.add(partner)
DBSession.flush()
return values
row = save_user(data, user, user)
if not row:
raise JsonRpcProfileFailError
partner = save_partner(data, user, partner)
if not partner:
transaction.abort()
raise JsonRpcProfileFailError
@jsonrpc_method(method='set-profile', endpoint='rpc-user', permission="view")
@jsonrpc_method(method='set_profile', endpoint='rpc-user', permission="view")
def set_profile(request, data):
"""
Digunakan untuk menyimpan profile
:param request:
:param data:Dict(
nik="",
email="",
mobile="",
nama="",
)
:return:
"""
is_list = type(data) is list
data = is_list and data[0] or data
user = request.user
if not UserService.check_password(user, data['password']):
raise JsonRpcInvalidLoginError
result = data
result = is_list and [result] or result
return dict(message='Sukses Ubah', data=result)
old_email = user.email
values = set_profile_(request, data)
user.from_dict(values)
if old_email != data["email"]:
remain = regenerate_security_code(user)
send_email_security_code(
request, user, remain, 'Change email', 'change-email-body',
'change-email-body.tpl')
user.status = 0
headers = forget(request)
request.session.delete()
request.response.headers.update(headers)
return dict(message=f"Silahkan buka email {old_email}")
return dict(message="Sukses Ubah Profile")
@jsonrpc_method(method='set_profile', endpoint='rpc-user')
def set_profile(request, data):
# Digunakan untuk menyimpan profile kepada aplikasi lain
# parameter user_name / password
return set_profile_(request, data)
@jsonrpc_method(method='register', endpoint='rpc-user')
def register_user(request, data):
"""
Digunakan untuk registrasi user dan profile
:param request:
:param data:Dict(
user_name="",
nik="",
email="",
mobile="",
nama="",
)
:return:
"""
is_list = type(data) is list
data = is_list and data[0] or data
values = set_profile_(request, data)
user = User()
user.from_dict(values)
DBSession.add(user)
DBSession.flush()
groups = data["groups"]
for g in groups:
d = Group.query_group_name(g).first()
data['group_id'] = d.id
data['user_id'] = user.id
save_groups(data, None)
remain = regenerate_security_code(user)
send_email_security_code(
request, user, remain, 'Welcome new user', 'email-new-user',
'email-new-user.tpl')
ts = _(
'user-added',
default='${email} berhasil ditambahkan dan email untuk ubah ' \
'kata kunci sudah dikirim.',
mapping=data)
return dict(message=ts)
def get_password_(request, data):
......@@ -340,35 +330,59 @@ def get_password_(request, data):
if not user:
raise JsonRpcUserNotFoundError
from opensipkd.base.views.user_login import (
regenerate_security_code, send_email_security_code)
remain = regenerate_security_code(user)
send_email_security_code(
request, user, remain, 'Reset password', 'reset-password-body',
'reset-password-body.tpl')
return dict(data=dict(message='Email reset password sudah terkirim ke %s' % identity))
return dict(data=dict(
message='Email reset password sudah terkirim ke %s' % identity))
@jsonrpc_method(method='get-password', endpoint='rpc-user')
@jsonrpc_method(method='get_password', endpoint='rpc-user')
def get_password(request, data):
auth_from_rpc(request)
return get_password_(request, data)
def set_password_(token, data):
user = check_token_rpc(token)
if not UserService.check_password(user, data["password"]):
raise JsonRpcInvalidLoginError
def set_password_(user, data):
schema = ChangePassword(validator=change_password_validator)
form = Form(schema)
items = ((k, v) for k, v in data.items())
try:
c = form.validate(items)
except ValidationFailure as e:
raise JsonRpcInvalidLogin(data=e.error.asdict())
UserService.set_password(user, c['new_password'])
DBSession.add(user)
DBSession.flush()
if "new_password" in data and data["new_password"]:
UserService.set_password(User, data["new_password"])
result = dict(message="Sukses Ubah Password")
result.update(update_token(user))
return result
@jsonrpc_method(method='set_password', endpoint='rpc-user')
def set_password(request, token, data):
auth_from_rpc(request)
return set_password_(token, data)
@jsonrpc_method(method='set-password', endpoint='rpc-user', permission="view")
@jsonrpc_method(method='set_password', endpoint='rpc-user', permission="view")
def set_password(request, data):
"""
Digunakan untuk mengubah password
:param request:
:param data: {
"password": old_password,
"new_password": new_password,
"retype_password": conf_password,
}
:return:
success: {"result": {}}
error: {"error": {}}
"""
user = request.user
is_list = type(data) is list
data = is_list and data[0] or data
resp = set_password_(user, data)
headers = forget(request)
request.session.delete()
request.response.headers.update(headers)
return resp
import logging
import venusian
from pyramid.httpexceptions import HTTPFound
from pyramid.httpexceptions import HTTPForbidden
from pyramid.httpexceptions import HTTPNotFound
from pyramid.renderers import null_renderer
from pyramid.security import NO_PERMISSION_REQUIRED
from pyramid_rpc.jsonrpc import jsonrpc_method as json_rpc_base, MethodPredicate, BatchedRequestPredicate, \
EndpointPredicate, jsonrpc_renderer, DEFAULT_RENDERER, add_jsonrpc_endpoint, add_jsonrpc_method, JsonRpcError, \
exception_view, JsonRpcRequestInvalid, parse_request_GET, parse_request_POST
from opensipkd.base.tools.api import auth_from_rpc
from opensipkd.base.views.user_login import get_login_headers
from pyramid_rpc.jsonrpc import (JsonRpcError, JsonRpcMethodNotFound, JsonRpcParamsInvalid,
JsonRpcInternalError, make_error_response, MethodPredicate, BatchedRequestPredicate,
jsonrpc_renderer, add_jsonrpc_method,
DEFAULT_RENDERER,
batched_request_view, Endpoint, EndpointPredicate)
from pyramid_rpc.mapper import ViewMapperArgsInvalid, MapplyViewMapper
log = logging.getLogger(__name__)
def setup_request(endpoint, request):
""" Parse a JSON-RPC request body."""
if request.method == 'GET':
parse_request_GET(request)
elif request.method == 'POST':
parse_request_POST(request)
class JsonRpcRequestForbidden(JsonRpcError):
code = -32604
message = 'request forbidden'
class JsonRpcInvalidLogin(JsonRpcError):
code = -32605
message = "Invalid User/Password"
#
# class EndpointPredicate(BaseEndpointPredicate):
# def __call__(self, info, request):
# if self.val:
# # find the endpoint info
# key = info['route'].name
# endpoint = request.registry.jsonrpc_endpoints[key]
#
# # potentially setup either rpc v1 or v2 from the parsed body
# setup_request(endpoint, request)
#
# # update request with endpoint information
# request.rpc_endpoint = endpoint
#
# # Always return True so that even if it isn't a valid RPC it
# # will fall through to the notfound_view which will still
# # return a valid JSON-RPC response.
# return True
# def setup_request(endpoint, request):
# """ Parse a JSON-RPC request body."""
# if request.method == 'GET':
# parse_request_GET(request)
# elif request.method == 'POST':
# parse_request_POST(request)
# else:
# log.debug('unsupported request method "%s"', request.method)
# raise JsonRpcRequestInvalid
#
# if hasattr(request, 'batched_rpc_requests'):
# log.debug('handling batched rpc request')
# # the checks below will look at the subrequests
# return
#
# if request.rpc_version != '2.0':
# log.debug('id:%s invalid rpc version %s',
# request.rpc_id, request.rpc_version)
# raise JsonRpcRequestInvalid
#
# if request.rpc_method is None:
# log.debug('id:%s invalid rpc method', request.rpc_id)
# raise JsonRpcRequestInvalid
# env = request.environ
# if 'HTTP_TOKEN' in env:
# try:
# user_device = token_auth(request)
# user = user_device.user
# headers = remember(request, user.id)
# request.headers["Cookie"] = dict(headers)["Set-Cookie"]
# request.headers["token"]=user_device.token
# log.debug(request.headers["Cookie"])
# except JsonRpcInvalidLoginError as e:
# raise JsonRpcInvalidLogin
#
# elif ('HTTP_USERID' in env and 'HTTP_SIGNATURE' in env and
# 'HTTP_KEY' in env):
# try:
# user = rpc_auth(request)
# headers = remember(request, user.id)
# request.headers["Cookie"] = dict(headers)["Set-Cookie"]
# log.debug(request.headers["Cookie"])
# except JsonRpcInvalidLoginError as e:
# raise JsonRpcInvalidLogin
# log.debug('handling id:%s method:%s',
# request.rpc_id, request.rpc_method)
def exception_view(exc, request):
rpc_id = getattr(request, 'rpc_id', None)
if isinstance(exc, JsonRpcError):
fault = exc
log.debug('json-rpc error rpc_id:%s "%s"',
rpc_id, exc.message)
elif isinstance(exc, HTTPNotFound):
fault = JsonRpcMethodNotFound()
log.debug('json-rpc method not found rpc_id:%s "%s"',
rpc_id, request.rpc_method)
elif isinstance(exc, HTTPForbidden):
fault = JsonRpcRequestForbidden()
log.debug('json-rpc method forbidden rpc_id:%s "%s"',
rpc_id, request.rpc_method)
elif isinstance(exc, ViewMapperArgsInvalid):
fault = JsonRpcParamsInvalid()
log.debug('json-rpc invalid method params')
else:
log.debug('unsupported request method "%s"', request.method)
raise JsonRpcRequestInvalid
if hasattr(request, 'batched_rpc_requests'):
log.debug('handling batched rpc request')
# the checks below will look at the subrequests
return
if request.rpc_version != '2.0':
log.debug('id:%s invalid rpc version %s',
request.rpc_id, request.rpc_version)
raise JsonRpcRequestInvalid
if request.rpc_method is None:
log.debug('id:%s invalid rpc method', request.rpc_id)
raise JsonRpcRequestInvalid
log.debug('handling id:%s method:%s',
request.rpc_id, request.rpc_method)
class MethodPredicate(object):
def __init__(self, val, config):
self.method = val
def text(self):
return 'jsonrpc method = %s' % self.method
phash = text
def __call__(self, context, request):
user = auth_from_rpc(request)
headers = get_login_headers(request, user)
response = HTTPFound(location=request.route_url('home'), headers=headers)
# response = request.response
request.response.set_cookie('userid', value=str(user.id), max_age=31536000) # max_age = year
return getattr(request, 'rpc_method', None) == self.method
fault = JsonRpcInternalError()
log.exception('json-rpc exception rpc_id:%s "%s"', rpc_id, exc)
return make_error_response(request, fault, rpc_id)
def add_jsonrpc_endpoint(config, name, *args, **kw):
"""Add an endpoint for handling JSON-RPC.
``name``
The name of the endpoint.
``default_mapper``
A default view mapper that will be passed as the ``mapper``
argument to each of the endpoint's methods.
``default_renderer``
A default renderer that will be passed as the ``renderer``
argument to each of the endpoint's methods. This should be the
string name of the renderer, registered via
:meth:`pyramid.config.Configurator.add_renderer`.
A JSON-RPC method also accepts all of the arguments supplied to
:meth:`pyramid.config.Configurator.add_route`.
"""
default_mapper = kw.pop('default_mapper', MapplyViewMapper)
default_renderer = kw.pop('default_renderer', DEFAULT_RENDERER)
endpoint = Endpoint(
name,
default_mapper=default_mapper,
default_renderer=default_renderer,
)
config.registry.jsonrpc_endpoints[name] = endpoint
kw['jsonrpc_endpoint'] = True
config.add_route(name, *args, **kw)
kw = {}
kw['jsonrpc_batched'] = True
kw['renderer'] = null_renderer
config.add_view(batched_request_view, route_name=name,
permission=NO_PERMISSION_REQUIRED, **kw)
config.add_view(exception_view, route_name=name, context=Exception,
permission=NO_PERMISSION_REQUIRED)
def includeme(config):
""" Set up standard configurator registrations. Use via:
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!