security.py
2.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import logging
from pyramid.security import remember, forget
from .models.users import (User, UserGroup, DBSession, )
from pyramid.authentication import AuthTktCookieHelper
from pyramid.authorization import ACLHelper, Authenticated, Everyone
log = logging.getLogger(__name__)
def group_finder(user_id, request):
if user_id is not None:
q = DBSession.query(User).filter_by(id=user_id)
user = q.first()
else:
user = None
if not user or not user.status:
log.debug(f"user_id {user_id} not found or archived")
return []
r = []
q = DBSession.query(UserGroup).filter_by(user_id=user.id)
for ug in q:
acl_name = 'group:{gid}'.format(gid=ug.group_id)
r.append(acl_name)
return r
def get_user(request):
user_id = request.authenticated_userid
if user_id is not None:
q = DBSession.query(User).filter_by(id=user_id)
row = q.first()
return row
class MySecurityPolicy:
def __init__(self, secret):
self.helper = AuthTktCookieHelper(secret)
def identity(self, request):
identity = self.helper.identify(request)
if identity is None:
return None
userid = identity['userid']
principals = group_finder(userid, request)
if principals is not None:
return {
'userid': userid,
'principals': principals,
}
def authenticated_userid(self, request):
identity = request.identity
if identity is not None:
return identity['userid']
def permits(self, request, context, permission):
identity = request.identity
principals = set([Everyone])
if identity is not None:
principals.add(Authenticated)
principals.add(identity['userid'])
principals.update(identity['principals'])
return ACLHelper().permits(context, principals, permission)
def remember(self, request, userid, **kw):
return self.helper.remember(request, userid, **kw)
def forget(self, request, **kw):
return self.helper.forget(request, **kw)