__init__.py
8.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
from datetime import datetime
from sqlalchemy import (
Column,
Integer,
SmallInteger,
Text,
DateTime,
String,
ForeignKey,
UniqueConstraint
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm import (
scoped_session,
sessionmaker,
relationship,
backref
)
from zope.sqlalchemy import ZopeTransactionExtension
import transaction
import ziggurat_foundations.models
from ziggurat_foundations.models import BaseModel, UserMixin, GroupMixin
from ziggurat_foundations.models import GroupPermissionMixin, UserGroupMixin
from ziggurat_foundations.models import GroupResourcePermissionMixin, ResourceMixin
from ziggurat_foundations.models import UserPermissionMixin, UserResourcePermissionMixin
from ziggurat_foundations.models import ExternalIdentityMixin
from ziggurat_foundations import ziggurat_model_init
from pyramid.security import (
Allow,
Authenticated,
Everyone,
ALL_PERMISSIONS
)
from pyramid.httpexceptions import (
HTTPFound, HTTPNotFound
)
from ..tools import as_timezone
DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
Base = declarative_base()
##############
# Base model #
##############
class CommonModel(object):
def to_dict(self): # Elixir like
values = {}
for column in self.__table__.columns:
values[column.name] = getattr(self, column.name)
return values
def from_dict(self, values):
for column in self.__table__.columns:
if column.name in values:
val = values[column.name] or None
setattr(self, column.name, val)
def as_timezone(self, fieldname):
date_ = getattr(self, fieldname)
return date_ and as_timezone(date_) or None
class DefaultModel(CommonModel):
id = Column(Integer, primary_key=True)
def save(self):
if self.id:
#Who knows another user edited, so use merge ()
DBSession.merge(self)
else:
DBSession.add(self)
@classmethod
def query(cls):
return DBSession.query(cls)
@classmethod
def query_id(cls, id):
return cls.query().filter_by(id=id)
@classmethod
def get_by_id(cls, id):
return cls.query_id(id).first()
@classmethod
def delete(cls, id):
cls.query_id(id).delete()
class KodeModel(DefaultModel):
kode = Column(String(64))
@classmethod
def query_kode(cls, kode):
return cls.query().filter_by(kode=kode)
@classmethod
def get_by_kode(cls, kode):
return cls.query_kode(kode).first()
class NamaModel(KodeModel):
nama = Column(String(128))
@classmethod
def query_nama(cls, nama):
return cls.query().filter_by(nama=nama)
@classmethod
def get_by_nama(cls, nama):
return cls.query_nama(nama).first()
class Group(GroupMixin, Base, CommonModel):
pass
class GroupPermission(GroupPermissionMixin, Base):
pass
class UserGroup(UserGroupMixin, Base):
@classmethod
def get_by_email(cls, email):
user = User.get_by_email(email)
return cls.get_by_user(user)
@classmethod
def _get_by_user(cls, user):
return DBSession.query(cls).filter_by(user_id=user.id).all()
@classmethod
def get_by_user(cls, user):
groups = []
for g in cls._get_by_user(user):
groups.append(g.group_id)
return groups
@classmethod
def set_one(cls, session, user, group):
member = DBSession.query(cls).filter_by(user_id=user.id, group_id=group.id)
try:
member = member.one()
except NoResultFound:
member = cls(user_id=user.id, group_id=group.id)
DBSession.add(member)
transaction.commit()
@classmethod
def set_all(cls, user, group_ids=[]):
if type(user) in [StringType, UnicodeType]:
user = User.get_by_email(user)
olds = cls._get_by_user(user)
news = []
for group_id in group_ids:
group = DBSession.query(Group).get(group_id)
member = cls.set_one(user, group)
news.append(group)
for old in olds:
if old not in news:
old.delete()
DBSession.commit()
@classmethod
def get_by_group(cls, group):
users = []
for g in DBSession.query(cls).filter_by(group=group):
users.append(g.user)
return users
class GroupResourcePermission(GroupResourcePermissionMixin, Base):
pass
class Resource(ResourceMixin, Base):
pass
class UserPermission(UserPermissionMixin, Base):
pass
class UserResourcePermission(UserResourcePermissionMixin, Base):
pass
class User(UserMixin, BaseModel, CommonModel, Base):
last_login_date = Column(DateTime(timezone=True), nullable=True)
registered_date = Column(DateTime(timezone=True),
nullable=False,
default=datetime.utcnow)
def _get_password(self):
return self._password
def _set_password(self, password):
self._password = self.set_password(password)
password = property(_get_password, _set_password)
def get_groups(self):
return UserGroup.get_by_user(self)
def last_login_date_tz(self):
return as_timezone(self.last_login_date)
def registered_date_tz(self):
return as_timezone(self.registered_date)
def nice_username(self):
return self.user_name or self.email
@classmethod
def get_by_email(cls, email):
return DBSession.query(cls).filter_by(email=email).first()
@classmethod
def get_by_name(cls, name):
return DBSession.query(cls).filter_by(user_name=name).first()
@classmethod
def get_by_identity(cls, identity):
if identity.find('@') > -1:
return cls.get_by_email(identity)
return cls.get_by_name(identity)
class ExternalIdentity(ExternalIdentityMixin, Base):
pass
class GroupRoutePermission(Base, CommonModel):
__tablename__ = 'groups_routes_permissions'
__table_args__ = {'extend_existing':True,}
route_id = Column(Integer, ForeignKey("routes.id"),nullable=False, primary_key=True)
group_id = Column(Integer, ForeignKey("groups.id"),nullable=False, primary_key=True)
routes = relationship("Route", backref=backref('routepermission'))
groups = relationship("Group",backref= backref('grouppermission'))
class Route(Base, DefaultModel):
__tablename__ = 'routes'
__table_args__ = {'extend_existing':True}
kode = Column(String(256))
nama = Column(String(256))
path = Column(String(256), nullable=False)
factory = Column(String(256))
perm_name = Column(String(16))
disabled = Column(SmallInteger, nullable=False, default=0,
server_default='0')
created = Column(DateTime, nullable=False, default=datetime.now,
server_default='now()')
updated = Column(DateTime)
create_uid = Column(Integer, nullable=False, default=1,
server_default='1')
update_uid = Column(Integer),
UniqueConstraint('kode', 'route_kode_uq')
class RootFactory(object):
def __init__(self, request):
self.__name__ = None
self.request = request
self.__acl__ = [(Allow, 'Admin', ALL_PERMISSIONS),
(Allow, Authenticated, 'view'),]
if self.request.user and self.request.matched_route:
rows = DBSession.query(Group.group_name, Route.perm_name).\
join(UserGroup).join(GroupRoutePermission).join(Route).\
filter(UserGroup.user_id==self.request.user.id,
Route.kode==self.request.matched_route.name).all()
if rows:
for r in rows:
self.__acl__.append((Allow, ''.join(['g:',r.group_name]), r.perm_name))
def init_model():
ziggurat_model_init(User, Group, UserGroup, GroupPermission, UserPermission,
UserResourcePermission, GroupResourcePermission, Resource,
ExternalIdentity, passwordmanager=None)