perbaikan validasi login untuk mencegah percobaan login berlebihan

1 parent febbaac4
......@@ -19,10 +19,13 @@ Perubahan Mendasar dari fungsi login adalah:
result object dari fungsi tersebut harus berupa class User()
"""
from operator import ge
from google.oauth2 import id_token
from google.auth.transport import requests
import os
from random import Random
import re
from datetime import timedelta, datetime
from datetime import date, timedelta, datetime
from importlib import import_module
from pyramid.request import Response
......@@ -42,7 +45,7 @@ from . import one_hour, two_minutes
from ..models.users import User, ExternalIdentity
from ..models import Partner
# , Partner
from opensipkd.tools import create_now, set_user_log, get_settings
from opensipkd.tools import create_now, set_user_log, get_settings, dmyhms
from opensipkd.tools.buttons import btn_cancel
# from .. import get_urls
from .base_views import CSRFSchema, BaseView
......@@ -77,7 +80,18 @@ class Login(CSRFSchema):
# http://deformdemo.repoze.org/interfield/
def login_validator(form, value):
pass
exc = colander.Invalid(form, 'Terlalu banyak percobaan')
request = form.request
if request.session.get("login_failed", 0) > 3:
# message = "Login Gagal, terlalu banyak percobaan"
login_blocked = request.session.ses["login_blocked"]
if login_blocked and login_blocked > datetime.now():
exc = colander.Invalid(
form,
'Login Gagal, terlalu banyak percobaan, silahkan coba lagi setelah {}'
.format(dmyhms(login_blocked))
)
raise exc
def get_login_headers(request, user):
......@@ -98,14 +112,25 @@ class LoginUser(object):
# self.identity=identity
self.message = "Sukses Login"
self.user = None
self.ses = request.session
self.ses["login_failed"] = self.ses.get("login_failed", 0)
self.ses["login_blocked"] = self.ses.get("login_blocked")
def login(self, values, user=None):
settings = get_settings()
self.user = user and user or User.get_by_identity(values["username"])
if not self.user or not UserService.check_password(
self.user, values["password"]):
self.message = "Login Gagal"
set_user_log(self.message, self.request, log, values["username"])
self.ses["login_failed"] += self.ses.get("login_failed", 0) + 1
if self.ses["login_failed"] > 3:
self.ses["login_blocked"] = datetime.now() + \
timedelta(minutes=settings.get("login_blocked_minutes", 1))
return
self.ses["login_failed"] = 0
self.ses["login_blocked"] = None
for g in self.user.groups:
log.debug(f"Group: {g.id} as {g.group_name}")
......@@ -123,13 +148,11 @@ class Oauth2ParseExc(Exception):
class Oauth2UserExc(Exception):
"""Error User Found"""
from google.oauth2 import id_token
from google.auth.transport import requests
def verify_android_token(token, web_client_id):
"""
Verifies a Google ID token from an Android client on the backend.
Args:
token (str): The ID token string received from the Android app.
web_client_id (str): The Client ID for your *web application* in
......@@ -144,11 +167,11 @@ def verify_android_token(token, web_client_id):
request = requests.Request()
# Verify the token against Google's public keys
# The function automatically checks the token's signature, expiry,
# The function automatically checks the token's signature, expiry,
# and if it was issued by accounts.google.com.
id_info = id_token.verify_oauth2_token(
token,
request,
token,
request,
web_client_id
)
......@@ -159,7 +182,7 @@ def verify_android_token(token, web_client_id):
# Extract user information
user_id = id_info['sub']
email = id_info.get('email')
print(f"Token verified for User ID: {user_id}, Email: {email}")
return id_info
......@@ -167,12 +190,13 @@ def verify_android_token(token, web_client_id):
# Invalid token (e.g., signature mismatch, expired, wrong audience)
print(f"Invalid token: {e}")
raise
def oauth2_login(request, params=None):
provider_name = params and params["provider_name"] or request.params["provider_name"]
if provider_name == "google":
client_platform = params and params.get("platform") or request.params.get("platform") or "web"
client_platform = params and params.get(
"platform") or request.params.get("platform") or "web"
web_client_id = request.google_signin_client_id
if client_platform == "android":
id_info = verify_android_token(
......@@ -188,7 +212,7 @@ def oauth2_login(request, params=None):
raise Oauth2ParseExc(str(e))
request.session["id_info"] = id_info
else:
id_info = None
......@@ -252,7 +276,7 @@ class ViewAuth(BaseView):
user = request.user
headers = get_login_headers(request, user)
return xhr_response(user, headers)
# return Response(json={"error":
# return Response(json={"error":
# {"code": "0000",
# "msg": message},
# "data":[]})
......@@ -267,7 +291,7 @@ class ViewAuth(BaseView):
buttons += (Button('register', _('Register')),)
buttons += (Button('reset', _('Reset')), btn_cancel,)
form = Form(schema, buttons=buttons)
form = Form(schema, buttons=buttons, validator=login_validator)
message = ""
if 'cancel' in request.POST:
return HTTPFound(location=request.home)
......@@ -315,7 +339,7 @@ class ViewAuth(BaseView):
if self.req.is_xhr:
return Response(json={"error": {"code": -1,
"msg": login.message},
"data":[]})
"data": []})
next_url = f"{request.route_url('base-login')}?next={next_url}"
return HTTPFound(location=next_url)
......@@ -361,7 +385,7 @@ class ViewAuth(BaseView):
# url=get_urls(request.route_url('login')),
# next_url=next_url,
# login=login, )
if self.req.is_xhr:
form.set_appstruct({})
struct = form.cstruct
......@@ -370,7 +394,7 @@ class ViewAuth(BaseView):
if not csrf_token:
csrf_token = new_csrf_token(request)
struct["csrf_token"] = csrf_token
struct["csrf_token"] = csrf_token
log.debug("CSRF Token: %s", csrf_token)
log.info("Form Struct: %s", struct)
return self.resp_xhr({"data": struct})
......@@ -420,12 +444,13 @@ class ViewAuth(BaseView):
return dict(form=form.render())
def xhr_response(user, headers):
partner = Partner.query_email(user.email).first()
mobile = partner and partner.mobile or ""
nama = partner and partner.nama or ""
data = {
"data":
"data":
[{
"user_id": user.user_name,
"permission": user.get_permissions(),
......@@ -434,11 +459,12 @@ def xhr_response(user, headers):
"email": user.email,
"nama": nama,
}]
}
return Response(json=data, headerlist=headers)
def redirect_login(request, user):
set_user_log("Login Sukses", request, log, user.user_name)
for g in user.groups:
......@@ -618,7 +644,7 @@ class ViewPassword(BaseView):
def get_passcode(self):
if not self.req.authenticated_userid:
return HTTPNotFound("Anda harus login dahulu")
if "mail.sender_name" not in self.settings or 'mail.username' not in self.settings:
return HTTPNotAcceptable("Anda harus login dahulu")
......@@ -638,6 +664,7 @@ class ViewPassword(BaseView):
sending_mail(self.req, user, subject, body)
return dict(data={"message": "Passcode sudah dikirim ke email Anda"})
class ChangePasswordRequest(colander.Schema):
new_password = colander.SchemaNode(
colander.String(), widget=widget.CheckedPasswordWidget())
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!