Implement Google OAuth2 user validation and upload logo functionality

1 parent fe13e5ef
......@@ -5,14 +5,18 @@ import importlib
import csv
import re
import datetime
import deform
import decimal
import xlsx_dict_reader
from openpyxl import load_workbook
from opensipkd.tools import get_settings, DefaultTimeZone, dmy, dmyhms, get_ext
from pkg_resources import resource_filename
from pyramid.renderers import JSON
from pyramid_beaker import session_factory_from_settings
from pyramid.config import Configurator
from pyramid.events import NewRequest, BeforeRender, subscriber
from pyramid_mailer import mailer_factory_from_settings
from opensipkd.tools import get_settings, DefaultTimeZone, dmy, dmyhms
from .security import MySecurityPolicy, get_user
from sqlalchemy import engine_from_config
from .models.base import DBSession
......@@ -20,11 +24,9 @@ from .models.handlers import LogDBSession
from .models.meta import Base
from .models.users import init_model
from pkg_resources import resource_filename
# from deform import ZPTRendererFactory, Form
# from deform.widget import default_resource_registry
import deform
_logging = logging.getLogger(__name__)
# version 2.0.0 digunakan untuk change default template
......@@ -320,7 +322,7 @@ def main(global_config, **settings):
init_db(settings=settings)
config = get_config(settings=settings)
BASE_CLASS.route_from_csv(config)
BASE_CLASS.route_from_csv(config, filename="routes.xlsx")
BASE_CLASS.route_from_list(config)
BASE_CLASS.static_view(config, settings=settings)
config.scan()
......@@ -497,24 +499,57 @@ class BaseApp():
if p["children"]:
self.route_children(p["children"], row)
def route_from_csv(self, config, paket="opensipkd.base.views", filename="routes.csv"):
with self.get_route_file(filename) as f:
rows = csv.DictReader(f)
new_routes = []
for row in rows:
status = row.get("status", 0)
if not row["kode"] or not int(status):
continue
status = int(status)
row["children"] = []
parent_id = row.get("parent_id") or row.get("parent_id/routes.kode")
if parent_id:
self.route_children(new_routes, row)
else:
new_routes.append(row)
self.add_menu(config, new_routes, None, paket)
def route_from_csv_(self, config, paket="tangsel.base.views", rows=[]):
new_routes = []
for row in rows:
status = row.get("status", 0)
if not row["kode"] or not int(status):
continue
status = int(status)
row["children"] = []
parent_id = row.get("parent_id") or row.get(
"parent_id/routes.kode")
if parent_id:
self.route_children(new_routes, row)
else:
new_routes.append(row)
self.add_menu(config, new_routes, None, paket)
def route_from_csv(self, config, paket="opensipkd.base.views", filename="routes.csv"):
fullpath = os.path.join(self.base_dir, 'scripts', 'data', filename)
if get_ext(filename) == ".csv":
with open(fullpath) as f:
rows = csv.DictReader(f, skipinitialspace=True)
self.route_from_csv_(config, paket, rows=rows)
else:
wb= load_workbook(fullpath, data_only=True)
ws = wb.active
rows = xlsx_dict_reader.DictReader(ws) #skip_blank=True
self.route_from_csv_(config, paket, rows=rows)
# with self.get_route_file(filename) as f:
# rows = csv.DictReader(f)
# new_routes = []
# for row in rows:
# status = row.get("status", 0)
# if not row["kode"] or not int(status):
# continue
# status = int(status)
# row["children"] = []
# parent_id = row.get("parent_id") or row.get("parent_id/routes.kode")
# if parent_id:
# self.route_children(new_routes, row)
# else:
# new_routes.append(row)
# self.add_menu(config, new_routes, None, paket)
def route_from_list(self, config, routs=[], paket="opensipkd.base.views"):
new_routes = []
......
import logging
from google.auth.transport import requests
from google.oauth2 import id_token
from opensipkd.base import get_params
from pyramid.view import (view_config, )
from opensipkd.models import User
from opensipkd.tools import get_settings
import json
_logging = logging.getLogger(__name__)
def validate_user(request, idinfo):
"""
Digunakan untuk memvalidasi token google dalam password
Langkah yang dilakukan query email dan simpan sebagai session
:param idinfo:
:return:
{
// These six fields are included in all Google ID Tokens.
"iss": "https://accounts.google.com",
"sub": "110169484474386276334",
"azp": "1008719970978-hb24n2dstb40o45d4feuo2ukqmcc6381.apps.googleusercontent.com",
"aud": "1008719970978-hb24n2dstb40o45d4feuo2ukqmcc6381.apps.googleusercontent.com",
"iat": "1433978353",
"exp": "1433981953",
// These seven fields are only included when the user has granted the "profile" and
// "email" OAuth scopes to the application.
"email": "testuser@gmail.com",
"email_verified": "true",
"name" : "Test User",
"picture": "https://lh4.googleusercontent.com/-kYgzyAWpZzJ/ABCDEFGHI/AAAJKLMNOP/tIXL9Ir44LE/s99-c/photo.jpg",
"given_name": "Test",
"family_name": "User",
"locale": "en"
}
"""
email = 'email' in idinfo and idinfo['email']
if not email:
raise ValueError('Mail tidak ditemukan')
user = User.get_by_identity(email)
return user
@view_config(route_name='googleOauth2', renderer='json')
def google_oauth2(request):
return dict()
@view_config(route_name='googlesignin', renderer='json')
def googlesignin(request, data=None):
# (Receive token by HTTPS POST)
# ...
CLIENT_IDS = request.google_signin_client_ids
# CLIENT_IDS = get_params('google-signin-client-id')
KEY = get_params('google-signin-client-secret')
# Specify the CLIENT_ID of the app that accesses the backend:
# idinfo = id_token.verify_oauth2_token(token, requests.Request(), CLIENT_ID)
# Or, if multiple clients access the backend server:
id_token = "id_token" in request.params and request.params[
'id_token'] or ""
gtoken = None
if id_token:
gtoken = json.loads(id_token)
else:
if data and "id_token" in data:
gtoken = data["id_token"]
_logging.debug(gtoken)
if not gtoken:
raise Exception("Gtoken not found")
# idinfo = id_token.verify_oauth2_token(gtoken, requests.Request())
# test
import jwt
idinfo = jwt.decode(gtoken["credential"], options={
"verify_signature": False}) # KEY, algorithms=["RS256"]) #
_logging.debug(CLIENT_IDS)
_logging.debug(idinfo)
if idinfo['aud'] not in CLIENT_IDS or idinfo['azp'] not in CLIENT_IDS:
raise ValueError('Could not verify audience.')
if idinfo['iss'] not in ['accounts.google.com',
'https://accounts.google.com']:
raise ValueError('Wrong issuer.')
return idinfo
import os
import colander
from deform import (Form, widget, FileData, )
from deform.interfaces import FileUploadTempStore
from pyramid.httpexceptions import HTTPFound
from pyramid.view import view_config
from opensipkd.tools import (get_ext, dict_to_str, )
from .base_views import CSRFSchema
# from .. import get_urls
def route_list(request, p={}):
q = dict_to_str(p)
return HTTPFound(location=request.route_url('base-upload-logo', _query=q))
##########
# Unggah #
##########
# class UploadLogo(SaveFile):
# def save(self, fs):
# input_file = fs.file
# ext = get_ext(fs.filename)
# fullpath = self.create_fullpath(ext)
# output_file = open(fullpath, 'wb')
# input_file.seek(0)
# while True:
# data = input_file.read(2 << 16)
# if not data:
# break
# output_file.write(data)
# output_file.close()
# return fullpath
tmpstore = FileUploadTempStore()
class AddSchema(CSRFSchema):
upload = colander.SchemaNode(
FileData(),
widget=widget.FileUploadWidget(tmpstore),
title='Unggah')
image_for = colander.SchemaNode(
colander.String(),
widget=widget.SelectWidget(values=(('oth', "Other"), ('logo', "Logo"),
('bg', "Background"))),
title='Peruntukan')
def get_form(request, schema_cls):
schema = schema_cls()
schema = schema.bind(request=request)
return Form(schema, buttons=('simpan', 'batal'))
@view_config(route_name='base-upload-logo',
renderer='templates/form8.pt',
permission='upload-logo', require_csrf=True)
def view_file(request):
form = get_form(request, AddSchema)
if request.POST:
if 'simpan' in request.POST:
input_file = request.POST['upload'].file
filename = request.POST['upload'].filename.lower()
ext = get_ext(filename).lower()
if ext.lower() not in ['.png', '.ico']:
request.session.flash("File harus format 'png' atau 'ico'", 'error')
return dict(form=form.render())
_here = os.path.dirname(__file__)
static_path = os.path.join(os.path.dirname(_here), 'static')
fname = filename
if request.POST["image_for"] == "logo":
fname = f"logo{ext}"
elif request.POST["image_for"] == "bg":
fname = f"background{ext}"
typ = ext == '.png' and "img" or 'icon'
folder = os.path.join(static_path, typ)
if not os.path.exists(folder):
os.makedirs(folder)
fullpath = os.path.join(folder, fname)
output_file = open(fullpath, 'wb')
input_file.seek(0)
while True:
data = input_file.read(2 << 16)
if not data:
break
output_file.write(data)
request.session.flash(f"Sukses upload {fname}")
return route_list(request)
return dict(form=form.render(), scripts="")
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!