security.py
3.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# import inspect
import logging
# from opensipkd.tools import get_params
from .models.users import (User, UserGroup, DBSession, )
log = logging.getLogger(__name__)
def group_finder(user_id, request):
if user_id != 'None':
q = DBSession.query(User).filter_by(id=user_id)
user = q.first()
else:
user = None
if not user or not user.status:
log.debug(f"user_id {user_id} not found or archived")
return []
r = []
q = DBSession.query(UserGroup).filter_by(user_id=user.id)
for ug in q:
acl_name = 'group:{gid}'.format(gid=ug.group_id)
r.append(acl_name)
return r
def get_user(request):
user_id = request.authenticated_userid
if user_id:
q = DBSession.query(User).filter_by(id=user_id)
row = q.first()
#todo restrict multi browser
# if row and "g_state" not in request.cookies and \
# ("token" not in request.session or
# not request.session["token"] or
# row.security_code != request.session["token"]):
# request.session.flash("Silahkan login ulang")
# headers = forget(request)
# request.session.delete()
# request.response.headers.update(headers)
# if "g_state" in request.cookies:
# request.response.delete_cookie("g_state", '/')
# return
return row
# def get_user(request):
# user_id = request.unauthenticated_userid
# if user_id is not None:
# user = DBSession.query(User).get(user_id)
# return user
from pyramid.authentication import AuthTktCookieHelper
from pyramid.authorization import ACLHelper, Authenticated, Everyone
from .tools.api import auth_from_rpc
class MySecurityPolicy:
def __init__(self, secret):
self.helper = AuthTktCookieHelper(secret)
def identity(self, request):
# log.debug("MySecurityPolicy.identity")
# log.debug(inspect.stack()[1])
identity = self.helper.identify(request)
if identity is None and request.matched_route!='login':
env = request.environ
if 'HTTP_USERID' in env and 'HTTP_SIGNATURE' in env and 'HTTP_KEY' in env:
try:
user = auth_from_rpc(request)
identity = {'userid': user.id}
except Exception as e:
log.warning("Failed to authenticate from RPC: %s", e)
return
else:
return
userid = identity['userid']
principals = group_finder(userid, request)
if principals is not None:
return {
'userid': userid,
'principals': principals,
}
def authenticated_userid(self, request):
identity = request.identity
if identity is not None:
return identity['userid']
def permits(self, request, context, permission):
# log.debug(f"MySecurityPolicy.permits: permission={permission}")
identity = request.identity
principals = set([Everyone])
if identity is not None:
principals.add(Authenticated)
principals.add(identity['userid'])
principals.update(identity['principals'])
return ACLHelper().permits(context, principals, permission)
def remember(self, request, userid, **kw):
return self.helper.remember(request, userid, **kw)
def forget(self, request, **kw):
return self.helper.forget(request, **kw)