init 1.0

0 parents
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
.vs/
.vscode/
# C extensions
*.so
# Distribution / packaging
.Python
bak/
env/
env*/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
inventori/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# dotenv
.env
# bat file
*.bat
# virtualenv
.venv
venv/
ENV/
# Spyder project settings
.spyderproject
# Rope project settings
.ropeproject
live.ini
.idea/
.project/
.DS_Store
opensipkd/base/static/img/*
opensipkd/base/static/icon/*
!opensipkd/base/static/icon/default.png
!opensipkd/base/static/img/pyramid.png
!opensipkd/base/static/img/opensipkd.png
!opensipkd/base/static/img/opensipkd_bg.png
!opensipkd/base/static/img/ajax-loader.gif
!opensipkd/base/static/img/line.png
!opensipkd/base/static/img/bootstrap.png
!opensipkd/base/static/img/postgree.png
alembic.ini
alembic_local.ini
.vs*
activate
4.0.0 17-11-2024
Penambahan Field pada tabel routes
Tabel Routes berfungsi juga sebagai menu generator
4.0.0 09-07-2024
Perubahan Colander>2.0
Perubahan SQLAlchemy>2.0
beaker>=1.12.1
3.0.2 13-07-2023
Perubahan Colander>2.0
Perubahan SQLAlchemy>2.0
beaker>=1.12.1
Ini sepertinya gak jadi
3.0.1 25-04-2022
Penambahan Feature ```detable```
3.0.0 26-08-2021
1. Penghapusan folder src
2. Perubahan ke base ciamis
3. Penambahan api sicantik
2.1.5 06-03-2021
Minor change
1. Perbaikan datatable sorting dengan inherite data di common.py
2. Penghapusan dukungan ke web-r authentication
3. Penambahan module External module login
2.1.4 06-01-2021
1. Penambahan function request.api_has_permission
2.0.3 25-12-2020
1. Penambahan tinymce
2. Perubahan MaskMoney
3. Pengurangan library static
2.0.2 22-11-2020
1. Penambahan field app_id pada tabel routes
2. Perubahan struktur csv di scripts/data/routes.csv mejadi
kode,path,nama,app_id,status,type
3. Functionality penggunakan config pyramid.includes
pyramid.includes =
pyramid_tm
pyramid_beaker
pyramid_chameleon
pyramid_rpc.jsonrpc
bjb.samsat
pyramid_debugtoolbar
4. Penggunaan Parameter modules depreceated
Setiap modules harus terdapat function includeme
def includeme():
app_id = 1
set_routes(app_id
config.scan('.')
5. function main() pada modules.__init__ depreceated
6. config(ini) depreceated
modules = module1, module2, ...
7. Penambahan Logging terhadap database::
[handlers]
keys = console, tabel, dblog
[handler_dblog]
class = opensipkd.base.handlers.SQLAlchemyHandler
args = ()
level = INFO
[logger_root]
level = INFO
handlers = console, filelog, dblog
[logger_opensipkd]
level = INFO
handlers = dblog
qualname = opensipkd
[handler_filelog]
class = FileHandler
args = ('file_location','a')
level = INFO
formatter = generic
8. Penambahan script import_log:
Digunakan untuk import data dari file log ke database
Sebaiknya diimport pada saat akan berpindah logging
$import_log config log_file
2.0.1 16-08-2020
Penambahan Error Rpc
2.0.0 16-08-2020
Pembuatan Branch Ciamis
Perbaikan Step PIP install
Perubahan setup.py
Perubahan ini file nama aplikasi opensipkd.base menjadi opensipkd_base
Perubahan penamaan folder src/opensipkd/base menjadi opensipkd/base
Penambahan Template Horizontal
Old
${structure form}
Label
Input
New
${structure form}
Label Input
0.1.4 08-11-2019
----------------
Pembuatan Branch Bandung
Penambahan method pada models user
Penambahan custom render
0.1.3 08-11-2019
----------------
- ZopeTransactionExtension diganti dengan register().
- README yang lebih baik.
0.1.2 10-02-2019
----------------
- Penambahan parameter 'db_session=DBSession' pada append_csv berfungsi apabila
caller mempunyai Session database yang berbeda
- Penambahan modules tools sub api.py, db.py, pbb.py
0.1.1 17-12-2018
----------------
- Form edit user bisa ubah password karena user non-manusia (host to host)
tidak punya email.
- .gitignore ditambah .DS_Store.
0.1 12-12-2018
--------------
- get_login_headers() menyimpan user.id pada cookie ketimbang user.user_email.
Akhirnya ini berdampak pada group_finder().
- RootFactory yang lebih efisien yaitu cukup berisi daftar hak akses, tidak
perlu melibatkan request.
- groups.py kini menyertakan hak akses dengan memanfaatkan tabel
groups_permissions bawaan ziggurat. Sebagai pendukung dibuatlah tabel
permissions yang berisi daftar hak akses sebagaimana yang tertera pada
function argument permission di setiap view function.
- Hak akses yang lebih ringkas dimana user.py dan groups.py menggunakan
satu hak akses saja yang bernama user-edit yang bermakna bisa SELECT, INSERT,
UPDATE, dan DELETE tabel users dan tabel groups. Jika nanti ada kebutuhan
grup yang hanya bisa SELECT maka bisa dibuat hak akses bernama user-view.
- initializedb.py saat INSERT data kini menggunakan format csv saja
dengan menggunakan dua fungsi:
- restore_csv() yang akan INSERT data hanya jika tabelnya masih kosong. Ini
cocok untuk tabel users.
- append_csv() yang akan INSERT data jika key pada tabel tidak ditemukan.
- Jika field pada csv merupakan foreign key maka nama field ditulis dengan
susunan nama-field-tabel-csv/nama-tabel-foreign.nama-field-foreign, contoh:
pemda_id/va_pemda.kode.
- Tabel user_ws diganti dengan field users.api_key.
- Admin bisa membuatkan API Key saat membuat user baru atau saat sedang edit.
- User bisa membuat ulang API Key jika memang terisi.
- Ada prosedur lupa password.
0.0.1 21-10-2017
----------------
- Penambahan Field type pada route default 0
- alter table routes add type not null default 0
- Field ini digunakan untuk menentukan jenis-renderer
- 0 Route Standar
- 1 Route JSON_RPC
0.0 24-9-2017 aa.gusti
----------------------
- Penambahan tabel parameters
0.0
---
- Initial version
\ No newline at end of file
Basis Aplikasi Tangsel
========================
Ini adalah basis dari seluruh aplikasi.
Pemasangan
----------
Pasang paket Debian yang dibutuhkan::
$ sudo apt install libxml2-dev libxslt1-dev libtiff5-dev libjpeg8-dev zlib1g-dev libfreetype6-dev liblcms2-dev libwebp-dev
Buat Python Virtual Environment::
$ python3.12 -m venv ~/env
$ ~/env/bin/pip install --upgrade pip setuptools wheel
Unduh source-nya::
$ git clone https://git.opensipkd.com/aa.gusti/tangsel-base -b v5.0
Pasang::
$ ~/env/bin/pip install tangsel-base/
Buat databasenya. Lalu salin file konfigurasi::
$ cp development.ini.tpl live.ini
Sesuaikan database profile dan log file. Lalu buat tabelnya::
$ ~/env/bin/initialize_tangsel_db live.ini
Jalankan web server::
$ ~/env/bin/pserve live.ini
Di web browser buka `http://localhost:6543/login <http://localhost:6543>`_
Bila Digunakan di Paket Lain
----------------------------
Di paket lainnya pada file ``pyproject.toml`` section ``[project]`` baris ``dependencies`` tambahkan::
dependencies = [
...
'tangsel-base @ git+https://git.opensipkd.com/aa.gusti/tangsel-base.git#beta-4.2',
]
Publikasi
---------
Saat dipublikasikan maka pastikan otomatis hidup saat komputer aktif. Silakan lihat
`dokumentasi ini <https://wiki.opensipkd.com/doku.php?id=panduan:python:pyramid-sebagai-daemon>`_
untuk penjelasan lebih lanjut, termasuk konfigurasi Nginx.
###
# app configuration
# http://docs.pylonsproject.org/projects/pyramid/en/latest/narr/environment.html
###
[app:main]
;[app:tangsel_base]
use = egg:tangsel_base
reload_templates = true
pyramid.debug_all = true
pyramid.debug_authorization = false
pyramid.debug_notfound = true
pyramid.debug_routematch = true
pyramid.debug_templates = true
default_locale_name = id
sqlalchemy.url = postgresql://aagusti:a@localhost:5432/demo2
session.url = postgresql://aagusti:a@localhost:5432/demo2
pyramid.includes =
pyramid_tm
pyramid_beaker
pyramid_chameleon
pyramid_rpc.jsonrpc
pyramid_debugtoolbar
tangsel.pbb.models
tangsel.pbb.esppt
tangsel.pbb.monitoring
session.type = ext:database
session.secret = s0s3cr3ts
session.cookie_expires = true
session.key = WhatEver
session.timeout = 3000
session.lock_dir = %(here)s/tmp
timezone = Asia/Jakarta
;localization = id_ID.UTF-8
#localization = Indonesian_indonesia.1252
localization = English_Australia.1252
# Base Configuration
temp_files = C:\tmp
partner_doc = C:\\tmp\\docs\\partner\\
;home_tpl = tangsel.samsat.jabar.views:templates/login.pt
;login_tpl = tangsel.samsat.jabar.views:templates/login.pt
login_captcha = 0
# Registrasi User
;reg_form =
allow_register = 1
reg_form =
reg_id_card = 1
reg_captcha = 1
reg_verify = 1
# MAIL
mail.host = smtp.gmail.com
mail.port = 465
mail.ssl = True
mail.tls = True
mail.username = tangsel@gmail.com
mail.password = ajmyoksxeiprmtyc
mail.sender_name = "tangsel"
;PBB
pbb.url = oracle://PBB:A@10.8.50.62/simpbb
pbbm.url =
lib_dir = C:\Users\aagus\Project\pbb\instantclient_11_2
pbb_esppt_files = C:\tmp\pbb_esppt_files
pbb_esppt_mirror.url =
pbb_esppt_bsre.url = bapenda:!&hfZZR6g@https://esign-service.cirebonkab.go.id/api/sign/pdf@3175101408750004:A4gusti08**&%%
pbb_pemda = 32.79
sig_url =
map_center =
;static_files = %(here)s/../files
;company = Tangsel
;ibukota = Bekasi
;departement = IT
;address_1 = Jalan....
;address_2 = Bekasi ...
;
;center.phone = 021123456789
;center.mobile = 081311045668
;center.email = aa.gustiana@gmail.com
;center.email_password =
;center.smtp_server =
;
;#_host = http://localhost:5433/demo2
;
;unoconv_py = C:\Program Files\LibreOffice\program\python.exe
;unoconv_bin = C:\product\venv-lates\Scripts\unoconv
;
;modules =
menus = login:Login
/pbb/esppt:ESPPT
/pbbm:PBB-MONITORING
;app_name = GAJI ASN
;change_unit = False
;departemen_chg_id = 3
# digunakan jika akan menggunakan form registrasi sendiri
; PROXY
;trusted_proxy_headers = "forwarded x-forwarded-for x-forwarded-host x-forwarded-proto x-forwarded-port"
;url_prefix='/wsgi'
;
;[composite:main]
;use = egg:rutter#urlmap
;/ = tangsel_base
;/wsgi/ = tangsel_base
;[filter:proxy-prefix]
;use = egg:PasteDeploy#prefix
;prefix = /wsgi
;
;[pipeline:main]
;pipeline =
; proxy-prefix
; tangsel_base
[server:main]
use = egg:waitress#main
host = 0.0.0.0
port = 6543
# Begin logging configuration
[loggers]
keys = root, tangsel, sqlalchemy
[handlers]
keys = console, filelog
#, tabel
[formatters]
keys = generic
[logger_root]
level = WARN
handlers =
#, tabel
[logger_tangsel]
level = DEBUG
handlers =console, filelog
qualname = tangsel
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
# "level = INFO" logs SQL queries.
# "level = DEBUG" logs SQL queries and results.
# "level = WARN" logs neither. (Recommended for production systems.)
[handler_filelog]
class = FileHandler
; args = ('log_file','a')
args = ('/tmp/logs/tangsel.log','a')
level = DEBUG
formatter = generic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[handler_tabel]
class = tangsel.base.handlers.SQLAlchemyHandler
args = ()
level = WARN
formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)-5.5s [%(name)s][%(threadName)s] %(message)s
# End logging configuration
[alembic_ziggurat]
script_location = ziggurat_foundations:migrations
sqlalchemy.url = postgresql://aagusti:a@localhost:5432/demo2
[alembic_base]
script_location = tangsel.base.scripts:alembic
sqlalchemy.url = postgresql://aagusti:a@localhost:5432/demo2
[pytest]
filterwarnings =
error
ignore::UserWarning
ignore:function ham\(\) is deprecated:DeprecationWarning
\ No newline at end of file
[build-system]
requires = ['setuptools >= 64', 'wheel']
build-backend = 'setuptools.build_meta'
[tool.setuptools]
packages = ["tangsel", "tangsel.detable", "tangsel.base"]
[tool.mypy]
exclude = [
'bak',
]
show_error_codes = true
[project]
name = 'tangsel_base'
version = '0.1'
dependencies = [
'deform',
'colander',
'google',
'google-api-python-client',
'pyramid',
'pyramid_tm',
"pyramid_beaker",
"pyramid_mailer",
'pyramid_chameleon',
'pyramid_rpc',
'pytz',
'psycopg2-binary',
'requests',
'sqlalchemy',
'sqlalchemy-datatables',
'transaction',
'waitress',
'wheezy.captcha',
'ziggurat-foundations',
'zope.sqlalchemy',
]
requires-python = '>= 3.8'
authors = [
{name='Agus Gustiana', email='aa.gustiana@gmail.com'},
{email="tangsel@gmail.com" },]
maintainers = [
{ name = "Ari", email = "ariagungprasetiyo@gmail.com" },
{ name = "Owo Sugiana", email = "sugiana@gmail.com" }]
description = 'Framwork Aplikasi tangsel'
readme = 'README.rst'
# license = {text = 'Apache Software License'}
keywords = ["tangsel"]
classifiers = [
'Development Status :: 5 - Beta',
'Programming Language :: Python :: 3',
'Operating System :: OS Independent',
'Framework :: Pylons',
'Topic :: Internet :: WWW/HTTP',
'Topic :: Internet :: WWW/HTTP :: WSGI :: Application',
]
[project.optional-dependencies]
dev = [
'pyramid_debugtoolbar',
'pytest', 'webtest']
[project.entry-points."paste.app_factory"]
main = 'tangsel.base:main'
[project.scripts]
osipkd-db-init = 'tangsel.base.scripts.initializedb:main'
import locale
import logging
import os
import importlib
import csv
import re
import datetime
import decimal
from pyramid.renderers import JSON
from pyramid_beaker import session_factory_from_settings
from pyramid.config import Configurator
from pyramid.events import NewRequest, BeforeRender, subscriber
from pyramid_mailer import mailer_factory_from_settings
from opensipkd.tools import get_settings, DefaultTimeZone, dmy, dmyhms
from .security import MySecurityPolicy, get_user
from sqlalchemy import engine_from_config
from .models.base import DBSession
from .models.handlers import LogDBSession
from .models.meta import Base
from .models.users import init_model
from pkg_resources import resource_filename
# from deform import ZPTRendererFactory, Form
# from deform.widget import default_resource_registry
import deform
_logging = logging.getLogger(__name__)
# version 2.0.0 digunakan untuk change default template
deform_templates = resource_filename('deform', 'templates')
path = os.path.dirname(__file__)
path = os.path.join(path, 'widgets', 'templates')
search_path = (path, deform_templates) # ,
renderer = deform.ZPTRendererFactory(search_path)
deform.Form.set_zpt_renderer(search_path)
main_title = 'openSIPKD'
titles = {}
static_route = []
titles = {}
def get_params(params, alternate=None, settings=None):
"""
Digunakan untuk mengambil nilai dari konfigurasi sesuai params yang disebut
:param params: variable
:param alternate: default apabila tidak ditemukan data/params
:param settings: default settings
:return: value
contoh penggunaan:
get_params('devel', False)
"""
result = None
if not settings:
settings = get_settings()
if settings:
result = settings.get(params)
_logging.debug(
f"get_params: {params}, Alternate: {alternate} Settings: {not settings == None} Result: {result}")
return result and result.strip() or alternate
def add_cors_headers_response_callback(event):
def cors_headers(request, response):
origin = request.headers.get("Origin", None)
allowed_origin = get_params("allowed_origin", None)
if allowed_origin:
if origin not in allowed_origin.split('\n'):
origin = "null"
headers = {
'Access-Control-Allow-Methods': 'POST,GET,DELETE,PUT,OPTIONS',
'Access-Control-Allow-Headers': 'Origin, Content-Type, Accept, Authorization',
'Access-Control-Max-Age': '1728000',
}
# _logging.info(f"{origin} {request.is_xhr}")
# response.headers.update(
# {'Access-Control-Allow-Credential': 'true',
# 'Access-Control-Allow-Origin': "*"}
# )
if origin:
headers['Access-Control-Allow-Origin'] = origin
else:
headers['Access-Control-Allow-Origin'] = "*"
if 'Access-Control-Allow-Credentials' not in headers:
headers['Access-Control-Allow-Credentials'] = 'true'
_logging.debug(f"Headers: {headers}")
response.headers.update(headers)
event.request.add_response_callback(cors_headers)
def get_app_name(request):
return get_params('app_name', 'openSIPKD Application')
def get_menus(request):
"""
digunakan untuk mengambil daftar menu untuk setiap modul yg sudah diregistrasikan di config key "menus".
tadinaya akan dibuat didalam method get_modules,
dikarenakan method get_modules lebih cenderung digunakan untuk meng-load module yang diregistrasikan di config key "modules",
maka akan lebih aman dipisahkan antara pembacaan module dan menu.
tetapi jika key "menus" kosong, maka secara default mengambil dari daftar modul.
result sama dengan result module
result = {
url: title
}
"""
# settings = get_settings()
# menus = 'menus' in settings and settings['menus'] and settings['menus'].split(',') or []
menus = get_params('menus')
menus = menus and menus.split('\n') or []
# if not menus:
# return get_modules(get_settings())
result = {}
for menu in menus:
if menu.find(',') > -1:
key, val = menu.strip().split(',')
key = key.strip().strip('/')
val = re.sub('[//-]', ' ', val)
elif menu.find(':') > -1:
key, val = menu.strip().split(':')
key = key.strip().strip('/')
val = re.sub('[//-]', ' ', val)
else:
key = menu.strip()
val = key.replace('/', '-')
if key and val:
result[key] = val.upper()
return result
def get_home(request):
return request.route_url('base-home')[:-1]
def get_host(request):
host = get_params('_host', "")
return host and host or get_home(request)
def get_title(request):
route_name = request.matched_route.name
return titles[route_name]
def get_company(request):
return get_params('company', 'openSIPKD').upper()
def format_datetime(v):
if v.time() != datetime.time(0, 0):
return dmyhms(v)
else:
return dmy(v)
def json_renderer():
json_r = JSON()
json_r.add_adapter(datetime.datetime, lambda v,
request: format_datetime(v))
json_r.add_adapter(datetime.date, lambda v, request: dmy(v))
json_r.add_adapter(decimal.Decimal, lambda v, request: str(v))
return json_r
def json_rpc():
json_r = JSON()
json_r.add_adapter(datetime.datetime, lambda v, request: v.isoformat())
json_r.add_adapter(datetime.date, lambda v, request: v.isoformat())
json_r.add_adapter(decimal.Decimal, lambda v, request: str(v))
return json_r
def allow_register(request):
return BASE_CLASS.allow_register
def google_signin_client_ids(request):
ids = get_params('google-signin-client-id', '')
if ids:
return ids.split('\n')
else:
return []
def google_signin_client_id(request):
ids = google_signin_client_ids(request)
if ids:
return ids[0].strip()
return ''
def get_config(settings):
session_factory = session_factory_from_settings(settings)
config = Configurator(settings=settings,
root_factory='opensipkd.base.models.users.RootFactory',
session_factory=session_factory)
config.set_default_csrf_options(require_csrf=False)
config.set_security_policy(MySecurityPolicy(settings["session.secret"]))
config.add_subscriber(add_cors_headers_response_callback, NewRequest)
config.add_request_method(get_app_name, 'app_name', reify=True)
config.add_request_method(get_menus, 'menus', reify=True)
config.add_request_method(get_host, 'home', reify=True)
config.add_request_method(get_title, 'title', reify=True)
config.add_request_method(get_company, 'company', reify=True)
config.add_request_method(get_user, 'user', reify=True)
config.add_request_method(google_signin_client_id,
'google_signin_client_id', reify=True)
config.add_request_method(google_signin_client_ids,
'google_signin_client_ids', reify=True)
config.add_request_method(allow_register, 'allow_register', reify=True)
config.add_static_view('static', 'opensipkd.base:static',
cache_max_age=3600)
config.add_static_view('deform_static', 'deform:static',
cache_max_age=3600)
config.add_renderer('csv', 'opensipkd.tools.CSVRenderer')
config.add_renderer('json', json_renderer())
config.add_renderer('json_rpc', json_rpc())
config.registry['mailer'] = mailer_factory_from_settings(settings)
return config
# config.add_request_method(get_host, '_host', reify=True)
# config.add_request_method(get_departement, 'departement', reify=True)
# config.add_request_method(get_ibukota, 'ibukota', reify=True)
# config.add_request_method(get_address, 'address', reify=True)
# config.add_request_method(get_address2, 'address2', reify=True)
# config.add_request_method(get_modules, 'modules', reify=True)
# config.add_request_method(has_modules_, 'has_modules', reify=True)
# config.add_request_method(thousand, 'thousand', reify=True)
# config.add_request_method(is_devel, 'devel', reify=True)
# config.add_request_method(disable_responsive, 'disable_responsive',
# reify=True)
# config.add_request_method(_get_ini, 'get_ini', reify=True)
# config.add_request_method(get_params, 'get_params', reify=True)
# config.add_request_method(get_csrf_token, 'get_csrf_token', reify=True)
# # Penambahan Module Auto Generate Menu
# # config.add_request_method(get_module_menus, 'get_module_menus', reify=True)
# # config.add_request_method(get_module_submenus, 'get_module_submenus', reify=True)
# # config.add_translation_dirs('opensipkd.base:locale/')
# partner_files = get_params("partner_files", settings=settings,
# alternate="/tmp/partner")
# captcha_files = get_params('captcha_files', settings=settings,
# alternate="/tmp/captcha")
# if os.sep != '/':
# captcha_files = captcha_files.replace('/', '\\')
# partner_files = partner_files.replace('/', '\\')
# _logging.info(f"Captcha Files: {captcha_files}")
# _logging.info(f"Partner Files: {partner_files}")
# if not os.path.exists(captcha_files):
# os.makedirs(captcha_files)
# if not os.path.exists(partner_files):
# os.makedirs(partner_files)
# config.add_static_view(partner_idcard_url,
# get_id_card_folder("/", settings=settings),
# cache_max_age=3600)
# config.add_static_view('captcha', captcha_files)
# config.add_static_view('partner/files', partner_files)
def init_db(settings):
engine = engine_from_config(
settings, 'sqlalchemy.', client_encoding='utf8',
max_identifier_length=30) # , convert_unicode=True
DBSession.configure(bind=engine)
LogDBSession.configure(bind=engine)
Base.metadata.bind = engine
init_model()
def main(global_config, **settings):
""" This function returns a Pyramid WSGI application.
"""
if not settings.get('localization', ''):
settings['localization'] = 'id_ID.UTF-8'
locale.setlocale(locale.LC_ALL, settings['localization'])
if 'timezone' not in settings:
settings['timezone'] = DefaultTimeZone
init_db(settings=settings)
config = get_config(settings=settings)
BASE_CLASS.route_from_csv(config)
BASE_CLASS.route_from_list(config)
BASE_CLASS.static_view(config, settings=settings)
config.scan()
return config.make_wsgi_app()
def _add_route(config, route):
if int(route.get("typ", 0)) == 0:
config.add_route(route.get("kode"), route.get("path"))
elif int(route.get("typ")) == 1:
config.add_jsonrpc_endpoint(route.get("kode"), route.get("path"),
default_renderer="json_rpc")
if route.get("nama"):
titles[route.get("kode")] = route.get("nama")
def _add_view_config(config, paket, route):
_add_route(config, route)
if not route.get("func_name"):
func_name = "".join(route.get("kode").split('-')[-1:])
route["func_name"] = "_".join(["view", func_name])
file_name = f"{paket}.{route.get('file_name')}"
# _logging.debug(f"File Name: {file_name}")
attr = f"{route.get('func_name')}"
try:
_views = importlib.import_module(file_name)
class_name = route.get("class_name", None)
if not class_name:
class_name = "Views"
views = getattr(_views, class_name)
template = route.get("template", "form.pt")
if not template:
if route.get("func_name") == "view_list":
template = "list.pt"
elif route.get("func_name") == "view_act":
template = "json"
else:
template = "form.pt"
if template != "json":
template = "views/templates/" + template
params = dict(attr=f"{attr}",
route_name=route.get("kode"),
renderer=template)
if route.get("permission"):
params["permission"] = route.get("permission")
if route.get("crsf"):
params["require_csrf"] = True
config.add_view(views, **params)
except Exception as e:
_logging.error("Add View Config :{code} Kode {error}"\
.format(code=route["kode"], error=str(e)))
_logging.debug(f"Route: {route.get('kode')} {route.get('path')}")
class BaseApp():
def __init__(self):
self.menus = []
self.partner_doc = ""
self.temp_files = ""
self.allow_register = 0
self.reg_form = ""
self.reg_id_card = 0
self.reg_captcha = 0
self.captcha_files = ""
self.login_captcha = 0
self.base_dir = os.path.split(__file__)[0]
def get_route_file(self, filename="routes.csv"):
fullpath = os.path.join(self.base_dir, 'scripts', 'data', filename)
return open(fullpath)
def static_view(self, config, settings=None):
self.partner_doc = get_params(
"partner_doc", '/tmp/docs/partner', settings=settings)+os.sep
if not os.path.exists(self.partner_doc):
os.makedirs(self.partner_doc)
self.temp_files = get_params(
"temp_files", '/tmp', settings=settings)
if not os.path.exists(self.temp_files):
os.makedirs(self.temp_files)
# Registrasi
self.allow_register = get_params("allow_register", 0, settings=settings)
self.reg_form = get_params("reg_form", 'base-register')
self.reg_id_card = get_params("reg_id_card", 0, settings=settings)
self.reg_captcha = get_params("reg_captcha", 0, settings=settings)
self.login_tpl = get_params("login_tpl", "", settings=settings)
self.login_captcha = int(get_params("login_captcha", 0, settings=settings))
self.captcha_files = os.path.join(self.temp_files, "captcha")+os.sep
if not os.path.exists(self.captcha_files):
os.makedirs(self.captcha_files)
config.add_static_view(
'docs/partner', self.partner_doc, cache_max_age=0)
config.add_static_view('captcha', self.captcha_files, cache_max_age=0)
def add_menu(self, config, route_menus, parent=None, paket="opensipkd.base.views"):
route_names = []
for route in route_menus:
# if not int(route.get("status", 0)):
# continue
route["route_name"] = [route["kode"]]
route["permission"] = route.get("permission", "")
route["icon"] = route.get("icon", None)
route_typ = route.get("typ", 0)
if route_typ == "" or route_typ == None:
route_typ = 0
else:
route_typ = int(route_typ)
route["typ"] = route_typ
is_menu = route.get("is_menu", 0)
route["is_menu"] = is_menu and int(is_menu) or 0
url_path = route.get("path", None)
if not url_path:
path_split = route.get("kode").split("-")
path_last = path_split[len(path_split) - 1]
if path_last in ["edit", "view", "delete"]:
path = "/".join(path_split[:-1])
path += "/{id}/"+path_last
elif path_last in ["act"]:
path = "/".join(path_split[:-1])
path += "/{act}/"+path_last
else:
path = "/".join(path_split)
url_path = '/'+path
route["path"] = url_path
children = route.get("children", [])
route["children"] = []
if route.get("file_name"):
_add_view_config(config, paket, route)
elif route["path"] != "#":
_add_route(config, route)
if route.get("is_menu", None):
if not parent:
self.menus.append(route)
else:
parent["children"].append(route)
if children:
route["route_name"].extend(
self.add_menu(config, children, route, paket)
)
route_names.append(route["kode"])
return route_names
def route_children(self, parent, row):
for p in parent:
parent_id = row.get("parent_id") or row.get(
"parent_id/routes.kode")
if p["kode"] == parent_id:
p["children"].append(row)
else:
if p["children"]:
self.route_children(p["children"], row)
def route_from_csv(self, config, paket="opensipkd.base.views", filename="routes.csv"):
with self.get_route_file(filename) as f:
rows = csv.DictReader(f)
new_routes = []
for row in rows:
status = row.get("status", 0)
if not row["kode"] or not int(status):
continue
status = int(status)
row["children"] = []
parent_id = row.get("parent_id") or row.get("parent_id/routes.kode")
if parent_id:
self.route_children(new_routes, row)
else:
new_routes.append(row)
self.add_menu(config, new_routes, None, paket)
def route_from_list(self, config, routs=[], paket="opensipkd.base.views"):
new_routes = []
for route in routs:
d = {"kode": route[0],
"path": route[1],
"nama": route[2],
"typ": len(route) > 4 and route[4] or 0
}
new_routes.append(d)
self.add_menu(config, new_routes, paket)
def get_menus(self):
_logging.debug(f"Menus: {self.menus}")
return self.menus
BASE_CLASS = BaseApp()
# https://groups.google.com/forum/#!topic/pylons-discuss/QIj4G82j04c
def has_permission_(request, perm_names, context=None):
if not perm_names:
return False
if isinstance(perm_names, str):
perm_names = [perm_names]
for perm_name in perm_names:
if request.has_permission(perm_name, context):
return True
@subscriber(BeforeRender)
def add_global(event):
event['has_permission'] = has_permission_
event['get_base_menus'] = BASE_CLASS.get_menus
# event['has_modules'] = has_modules_
# event['urlencode'] = urlencode
# event['quote_plus'] = quote_plus
# event['quote'] = quote
# event['money'] = money
# event['should_int'] = should_int
# event['thousand'] = thousand
# event['as_timezone'] = as_timezone
# event['split'] = split
# event['allow_register'] = allow_register
# event['change_unit'] = change_unit
event['get_params'] = get_params_
# event['get_urls'] = get_urls
# event['get_csrf_token'] = get_csrf_token
# event['get_base_menus'] = BASE_CLASS.get_menus
# # event['get_params'] = get_params
# # event['get_module_menus'] = get_module_menus
# # event['get_module_submenus'] = get_module_submenus
def get_params_(params, alternate=None, settings=None):
return get_params(params, alternate, settings)
# def get_urls(url):
# home = get_params('_host', "")
# if home:
# urls = url.split(":")
# homes = home.split(":")
# if len(urls) > 1:
# if urls[0] != homes[0]:
# return ":".join([homes[0], ":".join(urls[1:])])
# else:
# return home + url
# return url
from .meta import *
from .base import *
from .users import *
# from .common import *
# from .wilayah import *
# from .partner import *
# from .targets import *
# from .user_area import *
import logging
from datetime import datetime
import ziggurat_foundations.models
from opensipkd.tools import as_timezone
from sqlalchemy import Column, String, SmallInteger, Integer, DateTime, func, Numeric
from sqlalchemy import inspect as sa_inspect
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import (scoped_session, sessionmaker, Session)
from zope.sqlalchemy import register
_logging = logging.getLogger(__name__)
class MySession(Session):
def execute(self, clause, params=None, mapper=None, **kw):
# Your magic with clause here
# print("Session:", clause, params, mapper, kw)
return Session.execute(self, clause, params) # , mapper
session_factory = sessionmaker(class_=MySession)
DBSession = scoped_session(session_factory)
register(DBSession)
ziggurat_foundations.models.DBSession = DBSession
TABLE_ARGS = dict(extend_existing=True, schema="public")
def flush(row, db_session=DBSession):
db_session.add(row)
db_session.flush()
class CommonModel(object):
def to_dict_hybrid(self):
values = {}
for item in sa_inspect(self.__class__).all_orm_descriptors:
if hybrid_property == type(item):
value = getattr(self, item.__name__)
print(item.__name__, value)
if value:
values[item.__name__] = value
return values
def to_dict(self, null=False, date_format="%d-%m-%Y"): # Elixir like
values = {}
for column in self.__table__.columns:
value = getattr(self, column.name)
if value:
if type(column.type) is DateTime and date_format:
if value:
values[column.name] = value.strftime(date_format)
else:
values[column.name] = value
else:
if Integer in type(column.type).__mro__ or Numeric in type(column.type).__mro__:
if value == 0:
values[column.name] = 0
return values
def to_dict_without_none(self):
values = {}
for column in self.__table__.columns:
value = getattr(self, column.name)
if value is not None:
values[column.name] = value
return values
def from_dict(self, values, date_format="%d-%m-%Y"):
for column in self.__table__.columns:
if column.name in values:
_logging.debug(f"{column.name}: {column.type}: {values[column.name]}")
if type(column.type) is DateTime and date_format:
if values[column.name] and type(values[column.name]) is String:
setattr(self, column.name,
datetime.strptime(values[column.name], date_format))
continue
setattr(self, column.name, values[column.name])
def as_timezone(self, fieldname):
date_ = getattr(self, fieldname)
return date_ and as_timezone(date_) or None
class DefaultModel(CommonModel):
id = Column(Integer, primary_key=True)
@classmethod
def save(cls, values, row=None, **kwargs):
if not row:
row = cls()
row.from_dict(values)
return row
@classmethod
def count(cls, db_session=DBSession):
return db_session.query(func.count('id')).scalar()
@classmethod
def query(cls, db_session=DBSession, filters=None):
query = db_session.query(cls)
if filters:
filter_expressions = []
for d in filters:
field = getattr(cls, d[0])
operator = d[1]
value = d[2]
filter_expressions.append(field.op(operator)(value))
query = query.filter(
*[e for i, e in enumerate(filter_expressions) if e is not None])
return query
@classmethod
def query_from(cls, db_session=DBSession, columns=[], filters=None):
query = db_session.query().select_from(cls)
for c in columns:
query = query.add_columns(c)
return query
@classmethod
def query_id(cls, row_id, db_session=DBSession):
return cls.query(db_session).filter_by(id=row_id)
@classmethod
def delete(cls, row_id, db_session=DBSession):
cls.query_id(row_id, db_session).delete()
@classmethod
def flush(cls, row, db_session=DBSession):
db_session.add(row)
db_session.flush()
class StandarModel(DefaultModel):
status = Column(SmallInteger, nullable=False, default=0)
created = Column(DateTime, nullable=True, default=datetime.utcnow)
updated = Column(DateTime, nullable=True)
create_uid = Column(Integer, nullable=True, default=1)
update_uid = Column(Integer, nullable=True)
# New Method
@classmethod
def query_status(cls, status=0, db_session=DBSession):
return cls.query(db_session).filter_by(status=status)
@classmethod
def disabled(cls):
return cls.query_status(status=0)
@classmethod
def active(cls):
return cls.query_status(status=1)
@classmethod
def draft(cls):
return cls.disabled()
@classmethod
def processed(cls):
return cls.query_status(status=1)
@classmethod
def canceled(cls):
return cls.query_status(status=9)
@classmethod
def get_active(cls):
return cls.query_status(status=1).all()
@classmethod
def get_disabled(cls):
return cls.query_status(status=0).all()
@classmethod
def get_archived(cls, db_session=DBSession):
return cls.query_status(status=0, db_session=db_session).all()
class KodeModel(StandarModel):
kode = Column(String(32), nullable=False)
@classmethod
def query_kode(cls, kode, db_session=DBSession):
return cls.query(db_session).filter_by(kode=kode)
@classmethod
def get_by_kode(cls, kode, db_session=DBSession):
return cls.query_kode(kode, db_session).first()
class UraianModel(StandarModel):
nama = Column(String(128))
@classmethod
def query_nama(cls, nama, db_session=DBSession):
return cls.query(db_session).filter_by(nama=nama)
@classmethod
def get_by_nama(cls, nama, db_session=DBSession):
return cls.query_nama(nama, db_session).first()
@classmethod
def get_list(cls):
return DBSession.query(cls.id, cls.nama).order_by(cls.nama).all()
class NamaModel(KodeModel):
nama = Column(String(128), nullable=False)
@classmethod
def query_nama(cls, nama, db_session=DBSession):
return cls.query(db_session).filter_by(nama=nama)
@classmethod
def get_by_nama(cls, nama, db_session=DBSession):
return cls.query_nama(nama, db_session).first()
@classmethod
def query_list(cls, db_session=DBSession):
return db_session.query(cls.id, cls.nama).order_by(cls.nama)
@classmethod
def get_list(cls):
return cls.query_list().all()
\ No newline at end of file
from sqlalchemy import (Column, Integer, String, DateTime, func, )
from sqlalchemy.orm import (scoped_session, sessionmaker, )
from .base import CommonModel
from .meta import Base
factory = sessionmaker(autoflush=True, autocommit=True)
LogDBSession = scoped_session(factory)
class Log(Base, CommonModel):
__tablename__ = 'logs'
id = Column(Integer, primary_key=True) # auto incrementing
line_id = Column(String(32), nullable=False, unique=True)
logger = Column(String) # the name of the logger. (e.g. myapp.views)
level = Column(String) # info, debug, or error?
trace = Column(String) # the full traceback printout
msg = Column(String, nullable=False)
created_at = Column(
DateTime(timezone=True),
nullable=False,
server_default=func.now()) # the current timestamp
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.schema import MetaData
# Recommended naming convention used by Alembic, as various different database
# providers will autogenerate vastly different names making migrations more
# difficult. See: http://alembic.zzzcomputing.com/en/latest/naming.html
NAMING_CONVENTION = {
"ix": 'ix_%(column_0_label)s',
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s"
}
metadata = MetaData(naming_convention=NAMING_CONVENTION)
Base = declarative_base(metadata=metadata)
from sqlalchemy import (
Column,
Integer,
String,
SmallInteger,
DateTime, ForeignKey
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.orm import backref
from .base import NamaModel, StandarModel
from .meta import (Base)
from .wilayah import ResProvinsi, ResDesa, ResKecamatan, ResDati2
class PartnerModel(NamaModel):
idcard = Column(String(256))
status = Column(Integer, default=1)
alamat_1 = Column(String(128))
alamat_2 = Column(String(128))
email = Column(String(40))
phone = Column(String(16))
fax = Column(String(16))
mobile = Column(String(16))
website = Column(String(64))
# pic = Column(String(16))
# pic_mobile = Column(String(16))
# pic_email = Column(String(16))
# pic_jabatan = Column(String(16))
@classmethod
def query_email(cls, email):
return cls.query().filter_by(email=email)
@classmethod
def query_mobile(cls, mobile):
return cls.query().filter_by(mobile=mobile)
class Partner(Base, PartnerModel):
__tablename__ = 'partner'
kelurahan = Column(String(128))
kecamatan = Column(String(128))
kota = Column(String(128))
provinsi = Column(String(128))
is_vendor = Column(SmallInteger, nullable=False, )
is_customer = Column(SmallInteger, nullable=False, )
# bank = Column(String(16))
# bank_accnt = Column(String(16))
# user_id = Column(Integer, ForeignKey(User.id), nullable=True) # referensi ke login
# departemen_id = Column(Integer, ForeignKey(Departemen.id)) # referensi ke default skpd
# users = relationship("User", backref=backref('partner'))
# departemen = relationship('Departemen', backref=backref('partner'))
rt = Column(String(3))
rw = Column(String(3))
tempat_lahir = Column(String(128))
tgl_lahir = Column(DateTime(timezone=False))
jenis_kelamin = Column(String(1))
gol_darah = Column(String(2))
agama = Column(String(32))
perkawinan = Column(String(2))
pekerjaan = Column(String(32))
kewarganegaraan = Column(String(10))
provinsi_id = Column(Integer, ForeignKey(ResProvinsi.id))
dati2_id = Column(Integer, ForeignKey(ResDati2.id))
kecamatan_id = Column(Integer, ForeignKey(ResKecamatan.id))
desa_id = Column(Integer, ForeignKey(ResDesa.id))
company_id = Column(Integer)
nip = Column(String(32))
res_provinsi = relationship(
"ResProvinsi", backref=backref('partner'))
res_dati2 = relationship(
"ResDati2", backref=backref('partner'))
res_kecamatan = relationship(
"ResKecamatan", backref=backref('partner'))
res_desa = relationship(
"ResDesa", backref=backref('partner'))
partner_files: Mapped["PartnerFiles"] = relationship(back_populates="partner")
# npwp = Column(String(16))
# npwpd = Column(String(16))
#
# @classmethod
# def query_user_id(cls, user_id):
# return cls.query().filter_by(user_id=user_id)
#
# @classmethod
# def query_user(cls, user):
# return cls.query_user_id(user.id)
@classmethod
def query_identity(cls, ident):
row = cls.query().filter_by(kode=ident).first()
if not row:
row = cls.query().filter_by(email=ident).first()
if not row:
row = cls.query().filter_by(mobile=ident).first()
return row
@classmethod
def query_register(cls):
columns= [cls.kode, cls.nama, cls.mobile, cls.email, cls.status]
return cls.query_from(columns=columns)
class PartnerFiles(Base, StandarModel):
__tablename__ = 'partner_files'
partner_id: Mapped[int] = mapped_column(ForeignKey(Partner.id))
file_name: Mapped[str] = mapped_column(String(256))
description: Mapped[str] = mapped_column(String(256), nullable=True)
partner: Mapped["Partner"] = relationship(back_populates="partner_files")
# class PartnerUserModel(Base, DefaultModel):
# __tablename__ = 'partner_user'
# partner_id = Column(Integer, ForeignKey(Partner.id))
# user_id = Column(Integer, ForeignKey(User.id))
\ No newline at end of file
from datetime import datetime
import pytz
import sqlalchemy as sa
from opensipkd.tools import as_timezone
from pyramid.authorization import (Allow, Authenticated, ALL_PERMISSIONS)
from sqlalchemy import (
Column, Integer, DateTime, String)
from sqlalchemy.orm import (relationship, backref)
from ziggurat_foundations import ziggurat_model_init
from ziggurat_foundations.models.base import BaseModel
from ziggurat_foundations.models.external_identity import ExternalIdentityMixin
from ziggurat_foundations.models.group import GroupMixin
from ziggurat_foundations.models.group_permission import GroupPermissionMixin
from ziggurat_foundations.models.group_resource_permission import \
GroupResourcePermissionMixin
from ziggurat_foundations.models.resource import ResourceMixin
from ziggurat_foundations.models.services.user import UserService
from ziggurat_foundations.models.user import UserMixin
from ziggurat_foundations.models.user_group import UserGroupMixin
from ziggurat_foundations.models.user_permission import UserPermissionMixin
from ziggurat_foundations.models.user_resource_permission import \
UserResourcePermissionMixin
from .base import CommonModel, DBSession, DefaultModel
from .meta import Base
from .base import TABLE_ARGS
class GroupPermission(GroupPermissionMixin, Base):
pass
class UserGroup(UserGroupMixin, Base, CommonModel):
@classmethod
def _get_by_user(cls, user):
return DBSession.query(cls).filter_by(user_id=user.id).all()
@classmethod
def query(cls):
return DBSession.query(cls)
@classmethod
def get_by_user(cls, user):
groups = []
for g in cls._get_by_user(user):
groups.append(g.group_id)
return groups
class GroupResourcePermission(GroupResourcePermissionMixin, Base):
__table_args__ = (
sa.PrimaryKeyConstraint(
"group_id",
"resource_id",
"perm_name",
name="pk_group_resources_permissions ",
),
{"mysql_engine": "InnoDB", "mysql_charset": "utf8"},
)
class Resource(ResourceMixin, Base):
pass
class UserPermission(UserPermissionMixin, Base):
pass
class UserResourcePermission(UserResourcePermissionMixin, Base):
pass
class User(UserMixin, BaseModel, DefaultModel, Base):
last_login_date = Column(DateTime(timezone=True), nullable=True)
registered_date = Column(DateTime(timezone=True),
nullable=False,
default=datetime.utcnow)
security_code_date = Column(DateTime(timezone=True),
default=datetime(2000, 1, 1,
tzinfo=pytz.timezone(
'Asia/Jakarta')),
server_default="2000-01-01 01:01+7",
)
api_key = Column(String(256))
partner_id = Column(Integer) # , ForeignKey(Partner.id))
company_id = Column(Integer) # , ForeignKey(Partner.id))
# partners = relationship(Partner, backref=backref('users'))
def _get_password(self):
return self._password
def _set_password(self, password):
self._password = UserService.set_password(self, password)
password = property(_get_password, _set_password)
def get_groups(self):
return UserGroup.get_by_user(self)
def last_login_date_tz(self):
return as_timezone(self.last_login_date)
def registered_date_tz(self):
return as_timezone(self.registered_date)
def nice_username(self):
return self.user_name or self.email
def kode(self):
pass
@classmethod
def get_by_email(cls, email):
return DBSession.query(cls).filter_by(email=email).first()
@classmethod
def get_by_name(cls, name):
return DBSession.query(cls).filter_by(user_name=name).first()
@classmethod
def get_by_identity(cls, identity):
if identity.find('@') > -1:
return cls.get_by_email(identity)
return cls.get_by_name(identity)
@classmethod
def get_by_token(cls, token):
return DBSession.query(cls).filter_by(security_code=token)
@classmethod
def query_register(cls):
return cls.query_from(columns=[cls.email, cls.user_name, cls.registered_date,
cls.last_login_date])
@classmethod
def query_list(cls):
return DBSession.query(cls.id, cls.user_name).order_by(cls.user_name)
@classmethod
def get_list(cls):
qry = cls.query_list()
return qry.all()
class ExternalIdentity(ExternalIdentityMixin, CommonModel, Base):
user = relationship(User, backref=backref("external"))
@classmethod
def query(cls):
return DBSession.query(cls)
@classmethod
def query_user(cls, user):
return cls.query().filter_by(local_user_id=user.id)
@classmethod
def external(cls, user):
return cls.query_user(user).count() > 0
class Permission(Base, CommonModel):
__tablename__ = 'permissions'
__table_args__ = (TABLE_ARGS)
id = Column(Integer, primary_key=True)
perm_name = Column(String(64), nullable=False, unique=True)
description = Column(String(64), nullable=False, unique=True)
class Group(GroupMixin, Base, DefaultModel):
member_count = Column(Integer, nullable=True, default=0)
@classmethod
def query_group_name(cls, group_name):
return DBSession.query(cls).filter_by(group_name=group_name)
# It is used when there is a web request.
class RootFactory:
def __init__(self, request):
gr = DBSession.query(Group).filter_by(group_name="Superuser").first()
gr_id = gr and gr.id or 1
self.__acl__ = [
(Allow, f'group:{gr_id}', ALL_PERMISSIONS),
(Allow, Authenticated, 'view')]
for gp in DBSession.query(GroupPermission):
acl_name = 'group:{}'.format(gp.group_id)
self.__acl__.append((Allow, acl_name, gp.perm_name))
def init_model():
ziggurat_model_init(User, Group, UserGroup, GroupPermission, UserPermission,
UserResourcePermission, GroupResourcePermission,
Resource,
ExternalIdentity, passwordmanager=None)
\ No newline at end of file
import logging
from pyramid.security import remember, forget
from .models.users import (User, UserGroup, DBSession, )
from pyramid.authentication import AuthTktCookieHelper
from pyramid.authorization import ACLHelper, Authenticated, Everyone
log = logging.getLogger(__name__)
def group_finder(user_id, request):
if user_id is not None:
q = DBSession.query(User).filter_by(id=user_id)
user = q.first()
else:
user = None
if not user or not user.status:
log.debug(f"user_id {user_id} not found or archived")
return []
r = []
q = DBSession.query(UserGroup).filter_by(user_id=user.id)
for ug in q:
acl_name = 'group:{gid}'.format(gid=ug.group_id)
r.append(acl_name)
return r
def get_user(request):
user_id = request.authenticated_userid
if user_id is not None:
q = DBSession.query(User).filter_by(id=user_id)
row = q.first()
return row
class MySecurityPolicy:
def __init__(self, secret):
self.helper = AuthTktCookieHelper(secret)
def identity(self, request):
identity = self.helper.identify(request)
if identity is None:
return None
userid = identity['userid']
principals = group_finder(userid, request)
if principals is not None:
return {
'userid': userid,
'principals': principals,
}
def authenticated_userid(self, request):
identity = request.identity
if identity is not None:
return identity['userid']
def permits(self, request, context, permission):
identity = request.identity
principals = set([Everyone])
if identity is not None:
principals.add(Authenticated)
principals.add(identity['userid'])
principals.update(identity['principals'])
return ACLHelper().permits(context, principals, permission)
def remember(self, request, userid, **kw):
return self.helper.remember(request, userid, **kw)
def forget(self, request, **kw):
return self.helper.forget(request, **kw)
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!