Commit 741afb2f by aa.gusti

api

1 parent c4c38dfc
......@@ -4,12 +4,15 @@ from opensipkd.tools import (
get_random_number, devel, get_random_string, get_settings)
from opensipkd.tools.api import *
from .. import log
from ..models import (DBSession, User, GroupPermission)
from ..models import (DBSession, User, GroupPermission, UserDeviceModel)
lima_menit = 300
def auth_from_rpc(request):
return auth_from(request)
def auth_from(request, field=None):
global lima_menit
env = request.environ
......@@ -28,7 +31,44 @@ def auth_from(request, field=None):
# bypass cek authentication for development
if http_userid == 'admin' and request.devel:
return user
time_stamp = validate_time(request)
if field:
header = json_rpc_header(http_userid, user.security_code, time_stamp)
else:
header = json_rpc_header(http_userid, user.api_key, time_stamp)
if header['signature'] != env['HTTP_SIGNATURE']:
raise JsonRpcInvalidLoginError
return user
def auth_from_token(request):
return auth_from(request, "security_code")
def renew_token(user_device):
user_device.token = get_random_string(32)
DBSession.add(user_device)
DBSession.flush()
return user_device
def get_user_device(request, user):
user_device = UserDeviceModel.query() \
.filter_by(user_id=user.id,
kode=request.headers.environ["HTTP_USER_AGENT"]).first()
if not user_device:
user_device = UserDeviceModel()
user_device.user_id = user.id
user_device.kode = request.headers.environ["HTTP_USER_AGENT"]
user_device.token = get_random_string(32)
DBSession.add(user_device)
DBSession.flush()
return user_device
def validate_time(request):
global lima_menit
env = request.environ
time_stamp = int(env['HTTP_KEY'])
now = get_seconds()
settings = get_settings()
......@@ -36,20 +76,37 @@ def auth_from(request, field=None):
lima_menit = int(settings["diff_server_time"])
if not request.devel and abs(now - time_stamp) > lima_menit:
log.info(f"req time {time_stamp} server time {now}")
raise JsonRpcInvalidTimeError
if field:
header = json_rpc_header(http_userid, user.security_code, time_stamp)
else:
header = json_rpc_header(http_userid, user.api_key, time_stamp)
if header['signature'] != env['HTTP_SIGNATURE']:
return time_stamp
def auth_device(request):
env = request.environ
log.info(env)
if not ('HTTP_USERID' in env and 'HTTP_SIGNATURE' in env and
'HTTP_KEY' in env):
raise JsonRpcInvalidLoginError
http_userid = env['HTTP_USERID']
q = DBSession.query(User).filter_by(user_name=http_userid)
user = q.first()
if not user or user.status == 0:
raise JsonRpcInvalidLoginError
if http_userid == 'admin' and request.devel:
return user
user_device = get_user_device(request, user)
time_stamp = validate_time(request)
header = json_rpc_header(http_userid, user_device.token, time_stamp)
if header['signature'] != env['HTTP_SIGNATURE']:
log.info(f"{http_userid}, {user_device.token}, {time_stamp}")
log.info(f"{header['signature']} != {env['HTTP_SIGNATURE']}")
raise JsonRpcInvalidLoginError
def auth_from_token(request):
return auth_from(request, "security_code")
return user
def get_jsonrpc(method, params):
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!