Enhance CSRF protection: add CSRF headers to responses, refactor CSRF handling i…

…n forms, and improve captcha integration in widgets
1 parent f695f603
......@@ -14,7 +14,8 @@ from pkg_resources import resource_filename
from pyramid.renderers import JSON
from pyramid_beaker import session_factory_from_settings
from pyramid.config import Configurator
from pyramid.events import NewRequest, BeforeRender, subscriber
from pyramid.events import NewRequest, BeforeRender, subscriber, NewResponse
from pyramid.csrf import new_csrf_token, get_csrf_token
from pyramid_mailer import mailer_factory_from_settings
from sqlalchemy import engine_from_config, or_
......@@ -452,7 +453,7 @@ def _add_view_config(config, paket, route, template_path="views/templates/"):
if route.get("permission"):
params["permission"] = route.get("permission")
if route.get("crsf"):
if route.get("csrf"):
params["require_csrf"] = True
if route.get("request_method"):
params["request_method"] = route.get("request_method")
......@@ -470,6 +471,14 @@ def _add_view_config(config, paket, route, template_path="views/templates/"):
# _logging.debug(f"Route: {route.get('kode')} {route.get('path')}")
@subscriber(NewResponse)
def add_csrf_headers(event):
request = event.request
csrf = get_csrf_token(request)
if not csrf:
csrf = new_csrf_token(request)
event.response.headers['X-CSRF-Token'] = csrf
class BaseApp():
def __init__(self):
self.menus = []
......@@ -675,6 +684,7 @@ BASE_CLASS = BaseApp()
def has_permission_(request, perm_names, context=None):
_logging.debug(f"Has Permission: {perm_names} Context: {context}")
if not perm_names:
return True
if isinstance(perm_names, str):
......
......@@ -3,17 +3,17 @@ import logging
from decimal import Decimal
from deform import Form, ValidationFailure
from pyramid.response import Response
from pyramid.exceptions import HTTPNotFound, HTTPBadRequest
from opensipkd.base.models import DBSession
from opensipkd.tools.buttons import btn_save, btn_cancel
from opensipkd.tools import get_settings
from opensipkd.base.views.common import DataTables, ColumnDT
from deform.widget import SelectWidget
from pyramid.response import Response
from pyramid.httpexceptions import *
import colander
from . import api_messages
from ..tools import obj2json
from pyramid.response import Response
from pyramid_restful.views import APIView
import json
......@@ -40,6 +40,11 @@ class ApiViews(APIView):
self.psize = 25
self.settings = get_settings()
self.page = 1
self.http_bad_request = HTTPBadRequest
self.http_not_found = HTTPNotFound
self.http_forbidden = HTTPForbidden
self.http_not_acceptable = HTTPNotAcceptable
self.response = Response
def get_params(self, key, default=None):
return self.settings.get(key, default)
......
from email.utils import parseaddr
import logging
import os
from datetime import datetime
from cgi import FieldStorage
from cgi import FieldStorage
from webob.multidict import MultiDict
from opensipkd.tools.captcha import img_captcha
......@@ -23,6 +24,7 @@ from opensipkd.tools.report import csv_response, file_response
from pyramid.request import Response
from .common import DataTables
from ..models import DBSession, Partner, Base
from ..widgets import widget_os
# , get_params, get_urls
from ..scripts.initializedb import append_csv
from ..tools import obj2json
......@@ -41,16 +43,10 @@ class UploadSchema(colander.Schema):
class CSRFSchema(colander.Schema):
def after_bind(self, schema, kwargs):
request = kwargs["request"]
csrf_token = get_csrf_token(request)
if not csrf_token:
csrf_token = new_csrf_token(request)
self["csrf_token"] = colander.SchemaNode(
colander.String(), widget=widget.HiddenWidget(),
default=csrf_token
)
csrf_token = colander.SchemaNode(
colander.String(),
widget=widget_os.CSRFWidget(),
)
class BaseView(object):
......@@ -89,7 +85,7 @@ class BaseView(object):
self.allow_delete = True
self.allow_post = False
self.allow_unpost = False
self.allow_check = False
self.allow_check = False
self.check_field = ""
self.state_save = False
self.server_side = True
......@@ -221,6 +217,7 @@ class BaseView(object):
'jenis'] or self.jenis
self.ses['jenis'] = self.jenis
"""
def form2dict(self, field):
children = []
for c in field.children:
......@@ -246,6 +243,7 @@ class BaseView(object):
def query_register(self, **kwargs):
pass
"""
def get_routes(self):
"""
Digunakan untuk mendapatkan default url apabila list_url tidak ada
......@@ -299,6 +297,7 @@ class BaseView(object):
def get_params(self, params, default=None):
return get_params(params, default)
"""
def get_form(self, class_form, row=None, buttons=(btn_save, btn_cancel),
**kwargs):
buttons = self.buttons and self.buttons or buttons
......@@ -308,7 +307,7 @@ class BaseView(object):
bindings = self.bindings
else:
bindings = self.get_bindings(row)
form_params = {}
form_params = {"request": self.req}
# form_params["after_bind"] = after_bind
if "validator" in kwargs and kwargs["validator"]:
form_params["validator"] = kwargs["validator"]
......@@ -337,6 +336,7 @@ class BaseView(object):
del self.req.session[session_name]
return r
"""
def view_list(self, **kwargs):
"""
custom:
......@@ -409,7 +409,7 @@ class BaseView(object):
bindings = self.bindings
else:
bindings = self.get_bindings()
schema = schema.bind(request=self.req, **bindings)
if not new_buttons:
......@@ -601,8 +601,8 @@ class BaseView(object):
def returned_form(self, form, **kwargs):
table = kwargs.get("table", None)
if self.req.is_xhr and self.req.params.get("html","false")=="false":
data = form.cstruct
if self.req.is_xhr and self.req.params.get("html", "false") == "false":
data = form.cstruct
if "captcha" in form:
kode_captcha, file_name = img_captcha(self.req)
self.req.session["captcha"] = kode_captcha
......@@ -855,7 +855,7 @@ class BaseView(object):
# e.cstruct[f.name] = self.get_captcha_url()
value = self.update_value(value, e.cstruct)
form.set_appstruct(value)
kwargs["table"]=table
kwargs["table"] = table
return self.returned_form(form, **kwargs)
values = dict(c)
......@@ -966,20 +966,19 @@ class BaseView(object):
def edit_restrict(self, row):
return False
def resp_xhr(self, values):
if values.get("data"):
data = []
if values and type(values["data"]) is not list:
values["data"] = [values["data"]]
for val in values["data"]:
for val in values["data"]:
data.append(obj2json(val))
values["data"] = data
else:
values["error"] = obj2json(values.get("error", {}))
return Response(json=values)
def view_edit(self, **kwargs):
request = self.req
self.ses["readonly"] = False
......@@ -1004,10 +1003,11 @@ class BaseView(object):
controls = request.POST.items()
if self.req.is_xhr:
cloned = request.POST.items()
controls=[]
controls = []
for ctrl in cloned:
if isinstance(ctrl[1], FieldStorage):
controls.append(("__start__", f"{ctrl[0]}:mapping"))
controls.append(
("__start__", f"{ctrl[0]}:mapping"))
controls.append(("upload", ctrl[1]))
controls.append(("uid", ""))
controls.append(("__end__", f"{ctrl[0]}:mapping"))
......@@ -1018,7 +1018,7 @@ class BaseView(object):
controls = items.items()
log.debug(controls)
try:
controls = form.validate(controls)
controls = form.validate(controls)
except ValidationFailure as e:
log.error(f"Edit Error: {str(e.error)}")
if self.req.is_xhr:
......@@ -1039,7 +1039,6 @@ class BaseView(object):
if self.req.is_xhr:
return self.resp_xhr({"data": [c]})
return self.after_edit(row=row, **kwargs)
return self.next_edit(form, row=row)
......@@ -1048,7 +1047,6 @@ class BaseView(object):
# return self.resp_xhr({"data": [form.cstruct]})
form.set_appstruct(values)
form = self.before_edit(form)
return self.returned_form(form, **kwargs)
......@@ -1102,7 +1100,7 @@ class BaseView(object):
def query_id(self, id_=None):
id_ = id_ or self.req.matchdict['id']
return self.table.query_id(id_)
# if self.req.user:
# if hasattr(self.table, 'company_id') and self.req.user.company_id:
# q = q.filter_by(company_id=self.req.user.company_id)
......@@ -1201,18 +1199,19 @@ class BaseView(object):
def get_partner(self):
return Partner.query_email(self.req.user.email).first()
@colander.deferred
def deferred_status(node, kw):
values = kw.get('daftar_status', [])
return widget.SelectWidget(values=values)
from email.utils import parseaddr
def email_validator(node, value):
name, email = parseaddr(value)
if not email or email.find('@') < 0:
raise colander.Invalid(node, 'Invalid email format')
"""
......
......@@ -65,14 +65,8 @@ class Login(CSRFSchema):
colander.String(), widget=widget.PasswordWidget())
def after_bind(self, schema, kwargs):
super().after_bind(schema, kwargs)
# super().after_bind(schema, kwargs)
request = kwargs["request"]
# csrf_token = new_csrf_token(request)
# log.debug(csrf_token)
# self["csrf_token"] = colander.SchemaNode(
# colander.String(), widget=widget.HiddenWidget(),
# default=csrf_token
# )
if BASE_CLASS.login_captcha:
self["captcha"] = colander.SchemaNode(
colander.String(),
......@@ -210,7 +204,7 @@ class ViewAuth(BaseView):
request.session.flash('Anda sudah login', 'error')
return HTTPFound(location=f"{request.home}")
schema = Login()
schema = Login(request=request)
schema = schema.bind(request=self.req)
buttons = (Button('login', _('Login')),)
if BASE_CLASS.allow_register:
......
import os
from pyramid.csrf import new_csrf_token, get_csrf_token
from iso8601.iso8601 import ISO8601_REGEX
from deform.widget import string_types
import json
......@@ -14,7 +14,6 @@ from deform.widget import (
Widget, _StrippedString, Select2Widget, _normalize_choices, OptGroup,
DateInputWidget as WidgetDateInputWidget, AutocompleteInputWidget)
from opensipkd.tools.captcha import img_captcha
from opensipkd.tools import get_settings
_logging = logging.getLogger(__name__)
......@@ -379,10 +378,11 @@ class CaptchaWidget(Widget):
def serialize(self, field, cstruct, **kw):
file_name = ""
# if not cstruct:
kode_captcha, file_name = img_captcha(self.request)
self.request.session["captcha_code"] = kode_captcha
_logging.error(f"Generated captcha code: {kode_captcha}")
_logging.error(self.request.session.items())
request = field.parent.schema.request
kode_captcha, file_name = img_captcha(request)
request.session["captcha_code"] = kode_captcha
_logging.debug("Generated captcha code: %s", kode_captcha)
_logging.debug(self.request.session.items())
# cstruct = cstruct or self.url+file_name
cstruct = self.url+file_name
......@@ -985,3 +985,25 @@ class FilterWidget(Widget):
# template = readonly and self.readonly_template or self.template
# return field.renderer(template, **tmpl_values)
#
class CSRFWidget(widget.HiddenWidget):
def serialize(self, field, cstruct, **kw):
request = field.parent.schema.request
cstruct = get_csrf_token(request)
if not cstruct:
cstruct = new_csrf_token(request)
values = self.get_template_values(field, cstruct, kw)
return field.renderer(self.template, **values)
def deserialize(self, field, pstruct):
if pstruct is null:
return null
elif not isinstance(pstruct, string_types):
raise Invalid(field.schema, "Pstruct is not a string")
if not pstruct:
return null
return pstruct
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!