Commit cbbddf07 by Owo Sugiana

Jalur web bisa multi modul, tidak hanya jsonrpc

1 parent 60ed2adc
0.2 2020-06-30
--------------
- Jalur web juga bisa multi modul
- Tambah linkaja sebagai modul web
0.1.8 2020-06-24
----------------
- Konfigurasi section web ditambah threads (integer)
......
###
# logging configuration
# https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/logging.html
###
[loggers]
keys = root, iso8583_web
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = INFO
handlers = console
[logger_iso8583_web]
level = DEBUG
handlers =
qualname = iso8583_web
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)s %(message)s
# Aktifkan web server jika ingin inquiry dll melalui web client.
[web]
port = 7001
threads = 12
[web_host_linkaja]
ip = 127.0.0.1
module = iso8583_web.scripts.views.linkaja
host = bjb
[module_iso8583_web.scripts.views.linkaja]
route_path = /linkaja
db_url = postgresql://user:pass@localhost/agratek
[host_bjb]
ip = 127.0.0.1
port = 10003
listen = false
streamer = bjb_with_suffix
###
# logging configuration
# https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/logging.html
###
[loggers]
keys = root, iso8583_web
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = INFO
handlers = console
[logger_iso8583_web]
level = DEBUG
handlers =
qualname = iso8583_web
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)s %(message)s
# Aktifkan web server agar inquiry dll bisa dilakukan melalui web client.
[web]
port = 7000
threads = 12
[web_host_teller]
ip = 127.0.0.1
module = iso8583_web.scripts.views.jsonrpc
host = pemda
[module_iso8583_web.scripts.views.jsonrpc]
route_path = /rpc
[host_pemda]
ip = 127.0.0.1
port = 10002
listen = true
streamer = bjb_with_suffix
......@@ -30,12 +30,23 @@ formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)s %(message)s
# Aktifkan JsonRPC server jika ingin simulasi sebagai bank dimana inquiry dkk
# bisa dilakukan melalui JsonRPC client.
# Aktifkan web server jika ingin simulasi sebagai bank dimana inquiry dkk
# bisa dilakukan melalui web client.
# [web]
# port = 7000
# threads = 12
#[web_host_linkaja]
#ip = 127.0.0.1
#module = iso8583_web.scripts.views.linkaja
#host = pemda
#[module_iso8583_web.scripts.views.linkaja]
#route_path = /linkaja
#db_url = postgresql://sugiana:a@localhost/agratek
[host_bjb]
ip = 127.0.0.1
port = 10002
......
from opensipkd.tcp.connection import join_ip_port
try:
from configparser import (
ConfigParser,
NoOptionError,
)
except ImportError:
from ConfigParser import (
ConfigParser,
NoOptionError,
)
from configparser import (
ConfigParser,
NoOptionError,
)
listen_ports = []
ip_conf = {}
name_conf = {}
allowed_ips = []
web = {}
listen_ports = list()
ip_conf = dict()
name_conf = dict()
allowed_ips = list()
web = dict()
web_ip_conf = dict()
def get_conf(ip, port):
......@@ -46,11 +42,13 @@ def get_int(conf, section, option, default):
MSG_DUPLICATE = 'IP {ip} port {port} ganda. Perbaiki konfigurasi.'
def validate_ip_port(ip_port):
if ip_port in ip_conf:
cfg = ip_conf[ip_port]
msg = MSG_DUPLICATE.format(ip=cfg['ip'], port=cfg['port'])
raise Exception(msg)
def validate_ip_port(cfg):
ip_port = join_ip_port(cfg['ip'], cfg['port'])
if ip_port not in ip_conf:
return ip_port
msg = MSG_DUPLICATE.format(ip=cfg['ip'], port=cfg['port'])
raise Exception(msg)
def get_module_object(name):
......@@ -70,66 +68,71 @@ def get_streamer_class(name):
def append_others(cfg, conf, section):
# Konfigurasi tambahan terkait host
for key, val in conf.items(section):
if key in cfg:
continue
cfg[key] = val
# Konfigurasi tambahan terkait modul
#section = 'module_' + cfg['module']
#if not conf.has_section(section):
# return
#for key, val in conf.items(section):
# cfg[key] = val
def load_module(cfg, conf, section, default):
cfg['module'] = get_str(conf, section, 'module', default)
cfg['module_obj'] = get_module_object(cfg['module'])
module_section = 'module_' + cfg['module']
if conf.has_section(module_section):
append_others(cfg, conf, module_section)
try:
f_init = getattr(cfg['module_obj'], 'init')
f_init(cfg)
except AttributeError:
pass
def read_web_conf(conf, section):
if section == 'web':
web['port'] = conf.getint(section, 'port')
web['threads'] = conf.getint(section, 'threads')
return
if section.find('web_host_') != 0:
return
cfg = dict()
cfg['name'] = section.split('_')[-1]
cfg['ip'] = conf.get(section, 'ip')
cfg['host'] = conf.get(section, 'host')
load_module(cfg, conf, section, 'iso8583_web.scripts.views.jsonrpc')
web_ip_conf[cfg['ip']] = dict(cfg)
def read_host_conf(conf, section):
if section.find('host_') != 0:
return
if not get_boolean(conf, section, 'active', True):
return
cfg = dict()
cfg['ip'] = conf.get(section, 'ip')
cfg['port'] = conf.getint(section, 'port')
ip_port = validate_ip_port(cfg)
cfg['name'] = section.split('_')[-1]
cfg['streamer'] = get_str(conf, section, 'streamer', 'none')
cfg['streamer_cls'] = get_streamer_class(cfg['streamer'])
cfg['ip'] = conf.get(section, 'ip')
cfg['port'] = conf.getint(section, 'port')
cfg['listen'] = get_boolean(conf, section, 'listen', True)
cfg['echo'] = get_boolean(conf, section, 'echo', not cfg['listen'])
cfg['timeout'] = get_int(conf, section, 'timeout', 60)
append_others(cfg, conf, section)
load_module(cfg, conf, section, 'opensipkd.iso8583.network')
if cfg['listen']:
if cfg['port'] not in listen_ports:
listen_ports.append(cfg['port'])
if cfg['ip'] not in allowed_ips:
allowed_ips.append(cfg['ip'])
ip_conf[ip_port] = name_conf[cfg['name']] = dict(cfg)
def read_conf(conf_file):
conf = ConfigParser()
conf['DEFAULT'] = dict(threads=12)
conf.read(conf_file)
for section in conf.sections():
if section == 'web':
web['port'] = conf.getint(section, 'port')
web['threads'] = conf.getint(section, 'threads')
continue
if section.find('host_') < 0:
continue
try:
active = conf.getboolean(section, 'active')
except NoOptionError:
active = True
if not active:
continue
ip = conf.get(section, 'ip')
port = conf.getint(section, 'port')
ip_port = join_ip_port(ip, port)
validate_ip_port(ip_port)
cfg = dict()
cfg['ip'] = ip
cfg['port'] = port
cfg['name'] = name = section.split('_')[1]
cfg['streamer'] = get_str(conf, section, 'streamer', 'none')
cfg['streamer_cls'] = get_streamer_class(cfg['streamer'])
cfg['module'] = get_str(conf, section, 'module', 'opensipkd.iso8583.network')
cfg['module_obj'] = get_module_object(cfg['module'])
cfg['ip'] = conf.get(section, 'ip')
cfg['port'] = conf.getint(section, 'port')
cfg['listen'] = get_boolean(conf, section, 'listen', True)
cfg['echo'] = get_boolean(conf, section, 'echo', not cfg['listen'])
cfg['timeout'] = get_int(conf, section, 'timeout', 60)
module_section = 'module_' + cfg['module']
if conf.has_section(module_section):
for key in conf.options(module_section):
cfg[key] = conf.get(module_section, key)
try:
f_init = getattr(cfg['module_obj'], 'init')
f_init(cfg)
except AttributeError:
pass
append_others(cfg, conf, section)
if cfg['listen']:
if port not in listen_ports:
listen_ports.append(port)
if ip not in allowed_ips:
allowed_ips.append(ip)
ip_conf[ip_port] = name_conf[name] = dict(cfg)
read_web_conf(conf, section)
read_host_conf(conf, section)
class CSVRenderer(object):
def __init__(self, info):
pass
def __call__(self, value, system):
''' value bertipe LinkAjaDataResponse '''
request = system.get('request')
if request is not None:
response = request.response
ct = response.content_type
if ct == response.default_content_type:
response.content_type = 'text/csv'
return str(value)
from opensipkd.tcp.connection import ConnectionManager as BaseConnectionManager
class ConnectionManager(BaseConnectionManager):
def close_old_connection(self, old_conn):
old_conn.log_close('new connection found')
BaseConnectionManager.close_old_connection(self, old_conn)
conn_mgr = ConnectionManager()
import os
import sys
import logging
import signal
from time import (
sleep,
time,
)
from threading import Thread
from configparser import ConfigParser
from waitress.server import create_server
from pyramid.config import Configurator
from pyramid.paster import setup_logging
from pyramid_rpc.jsonrpc import jsonrpc_method
from opensipkd.tcp.connection import (
ConnectionManager as BaseConnectionManager,
join_ip_port,
)
from opensipkd.string import (
exception_message,
dict_to_str,
)
from opensipkd.tcp.connection import join_ip_port
from opensipkd.string import exception_message
from opensipkd.tcp.server import (
Server as BaseServer,
RequestHandler as BaseRequestHandler,
)
from opensipkd.tcp.client import Client as BaseClient
from opensipkd.jsonrpc.exc import (
JsonRpcInvalidParams,
JsonRpcBankNotFound,
JsonRpcBillerNetwork,
)
from ..read_conf import (
read_conf,
ip_conf,
web as web_conf,
web_ip_conf,
listen_ports,
allowed_ips,
get_conf,
name_conf,
)
def get_log():
return logs[0]
def log_info(s):
log = get_log()
log.info(s)
def log_error(s):
log = get_log()
log.error(s)
def log_debug(s):
log = get_log()
log.debug(s)
def iso_to_dict(iso):
data = iso.get_values()
return dict_to_str(data)
from .tools import iso_to_dict
from .connection import conn_mgr
from .views import (
web_request,
web_process,
web_response,
append_web_process,
append_web_response,
)
from .logger import (
get_log,
set_log,
log_info,
log_error,
log_debug,
log_web_info,
log_web_debug,
)
def to_str(s):
......@@ -352,166 +333,9 @@ class Parser(Log):
self.running = False
######################
# Connection Manager #
######################
class ConnectionManager(BaseConnectionManager):
def close_old_connection(self, old_conn):
old_conn.log_close('new connection found')
BaseConnectionManager.close_old_connection(self, old_conn)
conn_mgr = ConnectionManager()
#######
# Web #
#######
def log_web_msg(s):
return 'Web server {}'.format(s)
def log_web_info(s):
msg = log_web_msg(s)
log = get_log()
log.info(msg)
def log_web_error(s):
msg = log_web_msg(s)
log = get_log()
log.error(msg)
def log_web_debug(s):
msg = log_web_msg(s)
log = get_log()
log.debug(msg)
def conn_by_name(name):
conf = name_conf[name]
found_conn = None
for ip_port, conn in conn_mgr:
ip, port = ip_port.split(':')
if conf['ip'] != ip:
continue
port = int(port)
if conf['port'] != port:
continue
found_conn = conn
if not found_conn:
raise JsonRpcBankNotFound()
if not found_conn.running:
raise JsonRpcBankNotFound(message='Disconnected')
return found_conn
# Daftar job dari web request, berisi iso request
# key: ip:port, value: list of iso
web_request = {}
# Daftar job yang sedang diproses, yaitu menunggu iso response
# key: ip:port, value: list of stan (bit 11)
web_process = {}
# Daftar job yang sudah selesai, berisi iso response
# key: ip:port, value: dict of (key: stan, value: iso)
web_response = {}
def append_web_process(ip_port, iso):
stan = iso.get_stan()
if ip_port in web_process:
web_process[ip_port].append(stan)
else:
web_process[ip_port] = [stan]
def append_web_response(ip_port, stan, iso):
if ip_port in web_response:
web_response[ip_port][stan] = iso
else:
web_response[ip_port] = {stan: iso}
def web_job(conn, iso):
ip_port = join_ip_port(conn.conf['ip'], conn.conf['port'])
if ip_port in web_request:
web_request[ip_port].append(iso)
else:
web_request[ip_port] = [iso]
stan = iso.get_stan()
awal = time()
while True:
sleep(0.001)
if time() - awal > 5:
raise JsonRpcBillerNetwork(message='Timeout')
if ip_port not in web_response:
continue
result = web_response[ip_port]
if stan in result:
iso = result[stan]
del result[stan]
data = iso_to_dict(iso)
return dict(code=0, message='OK', data=data)
def validate_rpc(p):
if 'host' not in p:
raise JsonRpcInvalidParams()
return conn_by_name(p['host'])
def log_web_receive(request, method, p, flow='Receive'):
msg = '{} {} {} {}'.format(request.client_addr, flow, method, p)
log_web_info(msg)
def log_web_send(request, method, p):
log_web_receive(request, method, p, 'Send')
@jsonrpc_method(endpoint='rpc')
def echo(request, p):
log_web_receive(request, 'echo', p)
conn = validate_rpc(p)
iso = conn.job.echo_request()
r = web_job(conn, iso)
log_web_send(request, 'echo', r)
return r
@jsonrpc_method(endpoint='rpc')
def inquiry(request, p):
log_web_receive(request, 'inquiry', p)
conn = validate_rpc(p)
iso = conn.job.inquiry(p)
r = web_job(conn, iso)
log_web_send(request, 'inquiry', r)
return r
@jsonrpc_method(endpoint='rpc')
def payment(request, p):
log_web_receive(request, 'payment', p)
conn = validate_rpc(p)
iso = conn.job.payment(p)
r = web_job(conn, iso)
log_web_send(request, 'payment', r)
return r
@jsonrpc_method(endpoint='rpc')
def reversal(request, p):
log_web_receive(request, 'reversal', p)
conn = validate_rpc(p)
iso = conn.job.reversal(p)
r = web_job(conn, iso)
log_web_send(request, 'reversal', r)
return r
web_server = {}
......@@ -523,8 +347,9 @@ def start_web_server():
host = '0.0.0.0'
with Configurator() as config:
config.include('pyramid_tm')
config.include('pyramid_rpc.jsonrpc')
config.add_jsonrpc_endpoint('rpc', '/rpc')
for ip, cfg in web_ip_conf.items():
cfg['module_obj'].pyramid_init(config)
config.scan(cfg['module'])
config.scan(__name__)
app = config.make_wsgi_app()
web_server['listener'] = server = create_server(
......@@ -643,15 +468,6 @@ def check_parser():
i -= 1
def set_log(conf_file):
conf = ConfigParser()
conf.read(conf_file)
log = logging.getLogger(__file__)
log.setLevel(conf.get('logger_root', 'level'))
logs.append(log)
logs = []
running = []
......@@ -661,7 +477,7 @@ def main(argv=sys.argv):
config_uri = argv[1]
setup_logging(config_uri)
read_conf(config_uri)
set_log(config_uri)
set_log(config_uri, __file__)
running.append(True)
start_web_server()
start_servers()
......
import sys
from configparser import ConfigParser
from sqlalchemy import create_engine
from .views.linkaja.models import Base
def main(argv=sys.argv):
conf_file = argv[1]
conf = ConfigParser()
conf.read(conf_file)
cf = conf['module_iso8583_web.scripts.views.linkaja']
engine = create_engine(cf['db_url'])
engine.echot = True
Base.metadata.create_all(engine)
import logging
from configparser import ConfigParser
logs = []
def set_log(conf_file, name=None):
if not name:
name = __file__
conf = ConfigParser()
conf.read(conf_file)
log = logging.getLogger(name)
log.setLevel(conf.get('logger_root', 'level'))
logs.append(log)
def get_log():
return logs[0]
def log_info(s):
log = get_log()
log.info(s)
def log_error(s):
log = get_log()
log.error(s)
def log_debug(s):
log = get_log()
log.debug(s)
def log_web_msg(s):
return 'Web server {}'.format(s)
def log_web_info(s):
msg = log_web_msg(s)
log = get_log()
log.info(msg)
def log_web_error(s):
msg = log_web_msg(s)
log = get_log()
log.error(msg)
def log_web_debug(s):
msg = log_web_msg(s)
log = get_log()
log.debug(msg)
from opensipkd.string import dict_to_str
def iso_to_dict(iso):
data = iso.get_values()
return dict_to_str(data)
from time import (
time,
sleep,
)
from opensipkd.tcp.connection import join_ip_port
from iso8583_web.read_conf import (
name_conf,
web_ip_conf,
)
from iso8583_web.scripts.logger import (
log_web_info,
log_web_error,
)
from ..connection import conn_mgr
from ..logger import log_web_info
# Daftar job dari web request, berisi iso request
# key: ip:port, value: list of iso
web_request = {}
# Daftar job yang sedang diproses, yaitu menunggu iso response
# key: ip:port, value: list of stan (bit 11)
web_process = {}
# Daftar job yang sudah selesai, berisi iso response
# key: ip:port, value: dict of (key: stan, value: iso)
web_response = {}
def append_web_process(ip_port, iso):
stan = iso.get_stan()
if ip_port in web_process:
web_process[ip_port].append(stan)
else:
web_process[ip_port] = [stan]
def append_web_response(ip_port, stan, iso):
if ip_port in web_response:
web_response[ip_port][stan] = iso
else:
web_response[ip_port] = {stan: iso}
class WebJob(object):
def __init__(self, conn, iso):
self.conn = conn
self.iso = iso
def get_response(self):
ip_port = join_ip_port(self.conn.conf['ip'], self.conn.conf['port'])
if ip_port in web_request:
web_request[ip_port].append(self.iso)
else:
web_request[ip_port] = [self.iso]
stan = self.iso.get_stan()
awal = time()
while True:
sleep(0.001)
if time() - awal > 5:
raise self.timeout_error()
if ip_port not in web_response:
continue
result = web_response[ip_port]
if stan in result:
iso = result[stan]
del result[stan]
return iso
def timeout_error(self):
return Exception('Timeout')
class View(object):
def __init__(self, request):
self.request = request
def log_prefix(self):
web_conf = self.get_web_conf()
if web_conf:
name = web_conf['name']
else:
name = 'unknown'
return '{} {} {}'.format(
self.request.client_addr, name, id(self.request))
def log_receive(self, msg, error=False):
msg = '{} {} {}'.format(self.log_prefix(), 'Receive', msg)
if error:
log_web_error(msg)
else:
log_web_info(msg)
def log_send(self, msg, error=False):
msg = '{} {} {}'.format(self.log_prefix(), 'Send', msg)
if error:
log_web_error(msg)
else:
log_web_info(msg)
# Get ISO8583 connection
def get_connection(self):
web_conf = self.get_web_conf()
if not web_conf:
raise self.not_found_error(self.request.client_addr)
name = web_conf['host']
conf = name_conf[name]
found_conn = None
for ip_port, conn in conn_mgr:
ip, port = ip_port.split(':')
if conf['ip'] != ip:
continue
port = int(port)
if conf['port'] != port:
continue
found_conn = conn
if not found_conn:
raise Exception(self.not_found_error(name))
if not found_conn.running:
raise Exception(self.not_running_error(name))
return found_conn
def get_web_conf(self):
return web_ip_conf.get(self.request.client_addr)
def get_iso_conf(self):
web_conf = self.get_web_conf()
name = web_conf['host']
return name_conf[name]
def not_found_error(self, hostname):
msg = 'Host {} tidak ditemukan di konfigurasi'.format(hostname)
return Exception(msg)
def not_running_error(self, hostname):
msg = 'Host {} belum terhubung'.format(hostname)
return Exception(msg)
def get_web_job_cls(self):
return WebJob
def web_job(self, conn, iso):
job = WebJob(conn, iso)
return job.get_response()
from pyramid_rpc.jsonrpc import jsonrpc_method
from opensipkd.jsonrpc.exc import (
JsonRpcInvalidParams,
JsonRpcBankNotFound,
JsonRpcBillerNetwork,
)
from ..tools import iso_to_dict
from ..logger import log_web_info
from . import (
WebJob as BaseWebJob,
View as BaseView,
)
conf = dict()
class Webjob(BaseWebJob):
def timeout_error(self): # override
raise JsonRpcBillerNetwork(message='Timeout')
class View(BaseView):
def get_web_job_cls(self): # Override
return WebJob
def get_response(self, rpc_method, p=dict(), iso_method=None):
self.log_receive(rpc_method, p)
conn = self.get_connection()
iso_func = getattr(conn.job, iso_method or rpc_method)
iso_req = p and iso_func(p) or iso_func()
iso_resp = self.web_job(conn, iso_req)
data = iso_to_dict(iso_resp)
r = dict(code=0, message='OK', data=data)
self.log_send(r)
return r
def not_found_error(self, hostname): # Override
msg = 'Host {} tidak ditemukan di konfigurasi'.format(hostname)
return JsonRpcBankNotFound(message=msg)
def not_running_error(self, hostname): # Override
msg = 'Host {} belum terhubung'.format(hostname)
return JsonRpcBankNotFound(message=msg)
def log_receive(self, method, p):
msg = '{} {}'.format(method, p)
BaseView.log_receive(self, msg)
def log_send(self, p):
BaseView.log_send(self, p)
@jsonrpc_method(endpoint='rpc')
def echo(self, p):
return self.get_response('echo', 'echo_request')
@jsonrpc_method(endpoint='rpc')
def inquiry(self, p):
return self.get_response('inquiry', p)
@jsonrpc_method(endpoint='rpc')
def payment(self, p):
return self.get_response('payment', p)
@jsonrpc_method(endpoint='rpc')
def reversal(self, p):
return self.get_response('reversal', p)
# Dipanggil read_conf.py
def init(cfg):
conf.update(cfg)
# Dipanggil forwarder.py
def pyramid_init(config):
config.include('pyramid_rpc.jsonrpc')
config.add_jsonrpc_endpoint('rpc', conf['route_path'])
import transaction
import venusian
from ISO8583.ISOErrors import BitNotSet
from sqlalchemy import create_engine
from sqlalchemy.orm import (
sessionmaker,
scoped_session,
)
from zope.sqlalchemy import register
from pyramid.view import (
view_config,
notfound_view_config,
)
from deform import (
Form,
Button,
ValidationFailure,
)
import colander
from opensipkd.string import (
FixLength,
FullDateTimeVar,
)
from opensipkd.iso8583.bjb.pbb.structure import INVOICE_PROFILE
from iso8583_web.scripts.tools import iso_to_dict
from .. import (
WebJob as BaseWebJob,
View as BaseView,
)
from .exceptions import (
InvoiceIdError,
NeedPostError,
InternalError,
TrxTypeError,
HostError,
AlreadyPaidError,
TimeoutError,
BaseError,
AmountError,
BillRefNotFound,
)
from .structure import (
DataRequest,
InquiryResponse,
PaymentResponse,
)
from .renderer import Renderer
from .models import (
Rpc,
Log,
)
ROUTE = 'rpc'
RENDERER = 'csv'
METHOD = {
'021': 'inquiry',
'022': 'payment',
'023': 'reversal'}
METHOD_CODE = dict()
for key, value in METHOD.items():
METHOD_CODE[value] = key
conf = dict()
def get_db_session():
return conf['db_session']
class Webjob(BaseWebJob):
def timeout_error(self): # override
return TimeoutError()
def form_validator(form, value):
if value['trx_type'] not in METHOD:
raise TrxTypeError()
def get_form():
schema = DataRequest(validator=form_validator)
return Form(schema)
def date_from_str(s):
t = FullDateTimeVar()
t.set_raw(s)
return t.get_value()
def get_method(data):
return METHOD[data['trx_type']]
def get_inquiry(data):
DBSession = get_db_session()
bill_ref = int(data['bill_ref'])
q = DBSession.query(Rpc).filter_by(id=bill_ref)
return q.first()
def get_payment(data):
DBSession = get_db_session()
bill_ref = int(data['bill_ref'])
q = DBSession.query(Rpc).filter_by(
inquiry_id=bill_ref, trx_type=data['trx_type'])
q = q.order_by(Rpc.id.desc())
return q.first()
def is_inquiry(data):
return data['trx_type'] == METHOD_CODE['inquiry']
def is_payment(data):
return data['trx_type'] == METHOD_CODE['payment']
def is_reversal(data):
return data['trx_type'] == METHOD_CODE['reversal']
def get_template_response(data):
if is_inquiry(data):
return InquiryResponse()
d = PaymentResponse()
d['Bill Ref'] = data['bill_ref']
return d
# view decorator
class csv_method(object):
def __init__(self, **kw):
self.kw = kw
def __call__(self, wrapped):
kw = self.kw.copy()
depth = kw.pop('_depth', 0)
def callback(context, name, ob):
config = context.config.with_package(info.module)
config.add_view(view=ob, renderer=RENDERER, **kw)
info = venusian.attach(
wrapped, callback, category='pyramid', depth=depth + 1)
if info.scope == 'class':
# ensure that attr is set if decorating a class method
kw.setdefault('attr', wrapped.__name__)
kw['_info'] = info.codeinfo # fbo action_method
print('DEBUG wrapped {}'.format(wrapped))
return wrapped
class View(BaseView):
def get_web_job_cls(self): # Override
return WebJob
def not_found_error(self, hostname): # Override
return HostError(hostname)
def not_running_error(self, hostname): # Override
msg = 'Host {} belum terhubung'.format(hostname)
return InternalError(msg, 'Sedang offline')
def create_iso_log(self, iso, rpc):
conf = self.get_iso_conf()
iso_log = Log(
mti=iso.getMTI(),
rpc_id=rpc.id,
ip=conf['ip'],
conf_name=conf['name'])
for bit in iso.get_bit_definition():
try:
value = iso.getBit(bit)
except BitNotSet:
continue
field = 'bit_{}'.format(str(bit).zfill(3))
setattr(iso_log, field, value)
try:
data = iso.getBit(62)
profile = FixLength(INVOICE_PROFILE)
profile.set_raw(data)
iso_log.bit_062_data = profile.to_dict()
except BitNotSet:
pass
return iso_log
def before_send_iso(self, data, inq, pay, iso_req):
DBSession = get_db_session()
web_conf = self.get_web_conf()
row = Rpc(
ip=self.request.client_addr,
conf_name=web_conf['name'],
merchant=data['merchant'],
terminal=data['terminal'],
trx_type=data['trx_type'],
msisdn=data['msisdn'],
acc_no=data['acc_no'],
stan=iso_req.get_stan())
row.trx_date = date_from_str(data['trx_date']),
if data.get('msg'):
row.msg = data['msg']
if data.get('amount'):
row.amount = int(data['amount'])
if not is_inquiry(data):
row.inquiry_id = inq and inq.id or pay.inquiry_id
row.ntb = data['trx_id']
with transaction.manager:
DBSession.add(row)
DBSession.flush()
DBSession.expunge_all() # Agar dapat row.id
iso_log = self.create_iso_log(iso_req, row)
DBSession.add(iso_log)
return row
def after_send_iso(self, data, inq, pay, row, iso_resp):
DBSession = get_db_session()
iso_log = self.create_iso_log(iso_resp, row)
iso_data = iso_to_dict(iso_resp)
web_data = get_template_response(data)
if iso_data[39] == '00':
web_data['Response Code'] = '00'
if is_inquiry(data):
web_data['Bill Ref'] = str(row.id)
if iso_data.get(62):
profile = FixLength(INVOICE_PROFILE)
profile.set_raw(iso_data[62])
iso_log.bit_062_data = profile.to_dict()
web_data['Biller Name'] = row.biller_name = \
profile['Nama'].strip()
web_data['Bill Amount'] = iso_data[4].lstrip('0')
if iso_data.get(47):
web_data['Transaction ID'] = row.ntp = iso_data[47] # NTP
err = None
elif iso_data[39] in ['33', '55']:
err = InvoiceIdError()
elif iso_data[39] == '54':
err = AlreadyPaidError()
elif iso_data[39] == '51':
err = AmountError()
else:
err = BaseError()
if err:
web_data.from_err(err)
row.resp_msg = web_data['Notification Message']
self.log_send(web_data.values)
row.resp_code = web_data['Response Code']
row.resp_msg = web_data['Notification Message']
with transaction.manager:
DBSession.add(row)
DBSession.add(iso_log)
return web_data
def get_response(self, data):
p = dict(invoice_id=data['acc_no'])
inq = pay = None
if not is_inquiry(data):
p['amount'] = data['amount']
p['ntb'] = data['trx_id']
if is_payment(data):
inq = get_inquiry(data)
if not inq:
raise BillRefNotFound()
else:
pay = get_payment(data)
if not pay:
raise BillRefNotFound()
p['stan'] = pay.stan
conn = self.get_connection()
method = get_method(data)
iso_func = getattr(conn.job, method)
iso_req = iso_func(p)
row = self.before_send_iso(data, inq, pay, iso_req)
iso_resp = self.web_job(conn, iso_req)
return self.after_send_iso(data, inq, pay, row, iso_resp)
@csv_method(route_name=ROUTE)
def view_trx(self):
if not self.request.POST:
self.log_receive('GET {}'.format(self.request.GET))
raise NeedPostError()
items = self.request.POST.items()
self.log_receive('POST {}'.format(dict(items)))
items = self.request.POST.items()
form = get_form()
try:
c = form.validate(items)
except ValidationFailure as e:
d = e.error.asdict()
for field, err in d.items():
msg = '{} {}'.format(field, err)
break
raise InternalError(msg)
data = dict(c)
try:
r = self.get_response(data)
self.log_send(r)
return r
except BaseError as e:
r = get_template_response(data)
r['Response Code'] = e.code
r['Notification Message'] = e.message
return r
@notfound_view_config()
def view_not_found(self):
msg = 'Path {} tidak ada'.format(self.request.path)
self.log_receive(msg, True)
return self.request.exception
# Dipanggil read_conf.py
def init(cfg):
conf.update(cfg)
# Dipanggil forwarder.py
def pyramid_init(config):
config.add_renderer(RENDERER, 'iso8583_web.scripts.views.linkaja.Renderer')
config.add_route(ROUTE, conf['route_path'])
pool_size = int(conf.get('db_pool_size', 50))
max_overflow = int(conf.get('db_max_overflow', 100))
engine = create_engine(
conf['db_url'], pool_size=pool_size, max_overflow=max_overflow)
session_factory = sessionmaker(bind=engine)
conf['db_session'] = scoped_session(session_factory)
register(conf['db_session'])
class BaseError(Exception):
code = '49'
message = 'Ada kesalahan yang belum dipahami'
class NeedPostError(BaseError):
code = '40'
message = 'HTTP Request harus POST'
class TrxTypeError(BaseError):
code = '41'
message = 'trx_type tidak dikenal'
class InternalError(BaseError):
def __init__(self, orig_msg, msg='Ada kesalahan internal'):
self.orig_msg = orig_msg
self.message = msg
class TimeoutError(BaseError):
code = '42'
message = 'Timeout'
class InvoiceIdError(BaseError):
code = '43'
message = 'acc_no tidak ditemukan'
class HostError(BaseError):
code = '44'
def __init__(self, hostname):
self.hostname = hostname
self.message = 'Host {} tidak terdaftar'.format(hostname)
class AlreadyPaidError(BaseError):
code = '45'
message = 'Memang sudah lunas'
class AmountError(BaseError):
code = '46'
message = 'Jumlah pembayaran tidak sesuai tagihan'
class BillRefNotFound(BaseError):
code = '47'
message = 'bill_ref tidak ditemukan'
from sqlalchemy import (
Column,
Integer,
Float,
DateTime,
String,
ForeignKey,
func,
JSON,
)
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Rpc(Base):
__tablename__ = 'linkaja_trx'
id = Column(Integer, primary_key=True) # Bill Ref
created = Column(
DateTime(timezone=True), nullable=False, server_default=func.now())
ip = Column(String(15), nullable=False)
conf_name = Column(String(16), nullable=False)
merchant = Column(String(100), nullable=False)
terminal = Column(String(100), nullable=False)
trx_type = Column(String(3), nullable=False)
msisdn = Column(String(20), nullable=False)
# Invoice ID = SPPT ID di PBB
acc_no = Column(String(64), nullable=False)
msg = Column(String(100))
trx_date = Column(DateTime(timezone=True), nullable=False)
# Dari inquiry response
amount = Column(Float)
# Dari inquiry response (Bill Ref),
# diisi saat payment request (bill_ref)
inquiry_id = Column(Integer, ForeignKey('linkaja_trx.id'))
# Dari payment request (trx_id)
ntb = Column(String(32))
# Penerjemahan bit 39
resp_code = Column(String(2))
# Penjelasan resp_code untuk pelanggan
resp_msg = Column(String(100))
# Penjelasan resp_code untuk audit sistem
resp_orig_msg = Column(String(100))
# Nama wajib pajak
biller_name = Column(String(100))
# Dari payment response (Transaction ID)
ntp = Column(String(32))
# Dari bit 11, dibutuhkan untuk reversal
stan = Column(String(6))
class Log(Base):
__tablename__ = 'log_iso'
id = Column(Integer, primary_key=True)
created = Column(
DateTime(timezone=True), nullable=False, server_default=func.now())
rpc_id = Column(Integer, ForeignKey(Rpc.id), nullable=False)
ip = Column(String(15))
conf_name = Column(String(16), nullable=False)
mti = Column(String(4), nullable=False)
bit_002 = Column(String(99))
bit_003 = Column(String(6))
bit_004 = Column(String(12))
bit_007 = Column(String(10))
bit_011 = Column(String(6))
bit_012 = Column(String(6))
bit_013 = Column(String(4))
bit_015 = Column(String(4))
bit_018 = Column(String(4))
bit_022 = Column(String(3))
bit_032 = Column(String(4))
bit_033 = Column(String(10))
bit_035 = Column(String(99))
bit_037 = Column(String(12))
bit_039 = Column(String(2))
bit_041 = Column(String(8))
bit_042 = Column(String(15))
bit_043 = Column(String(40))
bit_047 = Column(String(99))
bit_048 = Column(String(99))
bit_049 = Column(String(3))
bit_059 = Column(String(16))
bit_060 = Column(String(3))
bit_061 = Column(String(22))
bit_062 = Column(String(512))
bit_063 = Column(String(255))
bit_102 = Column(String(32))
bit_107 = Column(String(8))
bit_062_data = Column(JSON)
class Renderer(object):
def __init__(self, info):
pass
def __call__(self, value, system):
''' value bertipe DataResponse '''
request = system.get('request')
if request is not None:
response = request.response
ct = response.content_type
if ct == response.default_content_type:
response.content_type = 'text/csv'
return str(value)
import colander
from opensipkd.string.row import Row
from .exceptions import InternalError
INQUIRY_RESP_FIELDS = (
'Response Code',
'Biller Name',
'Bill Amount',
'Bill Ref', # STAN
'Notification Message')
PAYMENT_RESP_FIELDS = (
'Response Code',
'Transaction ID', # NTP
'Bill Ref', # STAN
'Notification Message')
class DataRequest(colander.Schema):
merchant = colander.SchemaNode(colander.String())
terminal = colander.SchemaNode(colander.String())
pwd = colander.SchemaNode(colander.String())
trx_type = colander.SchemaNode(colander.String())
msisdn = colander.SchemaNode(colander.String())
acc_no = colander.SchemaNode(colander.String())
msg = colander.SchemaNode(colander.String(), missing=colander.drop)
trx_date = colander.SchemaNode(colander.String())
amount = colander.SchemaNode(colander.String(), missing=colander.drop)
# Saat payment dan reversal, diperoleh dari inquiry response
bill_ref = colander.SchemaNode(colander.String(), missing=colander.drop)
# Saat payment dan reversal, dibuat saat payment request (NTB)
trx_id = colander.SchemaNode(colander.String(), missing=colander.drop)
class InquiryResponse(Row):
def __init__(self):
super().__init__(INQUIRY_RESP_FIELDS)
def __setitem__(self, name, value):
if value.find(':') > -1:
msg = 'Ada titik dua pada {} yaitu {}'.format(name, value)
raise InternalError(msg)
Row.__setitem__(self, name, value)
def __str__(self):
return ':'.join(list(self))
def from_err(self, err):
self.values['Response Code'] = err.code
self.values['Notification Message'] = err.message
def from_raw(self, raw):
t = raw.split(':')
i = -1
for fieldname in self.fieldnames:
i += 1
if not t[i:i+1]:
return
self.values[fieldname] = t[i]
class PaymentResponse(InquiryResponse):
def __init__(self):
Row.__init__(self, PAYMENT_RESP_FIELDS)
......@@ -19,11 +19,9 @@ json_responses = dict()
server_info = dict()
default_url = 'http://localhost:7000/rpc'
default_host = 'pemda'
default_count = 1
help_url = 'default ' + default_url
help_host = 'default ' + default_host
help_count = 'default {}'.format(default_count)
help_invoice_id = 'wajib saat --payment dan --reversal'
help_amount = 'wajib saat --payment dan --reversal'
......@@ -49,7 +47,6 @@ def log_info(s):
def get_option(argv):
parser = ArgumentParser()
parser.add_argument('--url', default=default_url, help=help_url)
parser.add_argument('--host', default=default_host, help=help_host)
parser.add_argument(
'--count', type=int, default=default_count, help=help_count)
parser.add_argument('--invoice-id', help=help_invoice_id)
......@@ -74,42 +71,70 @@ def send(p):
json_resp = resp.json()
log_info('Response: {}'.format(json_resp))
json_responses[key] = json_resp
except requests.exceptions.ConnectionError as e:
durations[key] = time() - start
log_info('Response: {}'.format(e))
json_responses[key] = dict(fatal=e)
finally:
end_threads.append(key)
def stan_from_result(result):
if result['code'] == 0:
return result['data']['11']
return '-'
def show_errors(errors):
if errors:
for err, count in errors.items():
log_info('{} {}'.format(err, count))
else:
log_info('Tidak ada yang gagal')
def show_durations():
key_fastest = None
key_slowest = None
total_duration = 0
messages = dict()
errors = dict()
for key in durations:
duration = durations[key]
msg = 'thread {} {} detik'.format(key, duration)
resp = json_responses[key]
if 'error' in resp:
break
result = resp['result']
if result['code'] == 0:
stan = result['data']['11']
if 'fatal' in resp:
errors['fatal'] = resp['fatal']
elif 'error' in resp:
result = resp['error']
msg = '{} {}'.format(msg, result['message'])
err = result['message']
if err in errors:
errors[err] += 1
else:
errors[err] = 1
else:
stan = '-'
msg = 'thread {} stan {} {} detik'.format(key, stan, duration)
log_info(msg)
messages[key] = msg
if key_fastest:
if duration < durations[key_fastest]:
result = resp['result']
stan = stan_from_result(result)
msg = '{} stan {}'.format(msg, stan)
messages[key] = msg
if key_fastest:
if duration < durations[key_fastest]:
key_fastest = key
else:
key_fastest = key
else:
key_fastest = key
if key_slowest:
if duration > durations[key_slowest]:
if key_slowest:
if duration > durations[key_slowest]:
key_slowest = key
else:
key_slowest = key
else:
key_slowest = key
total_duration += duration
log_info('Tercepat {}'.format(messages[key_fastest]))
log_info('Terlama {}'.format(messages[key_slowest]))
log_info('Rerata {} detik / request'.format(total_duration/len(durations)))
total_duration += duration
log_info(msg)
if key_fastest:
log_info('Tercepat {}'.format(messages[key_fastest]))
log_info('Terlama {}'.format(messages[key_slowest]))
log_info('Rerata {} detik / request'.format(total_duration/len(durations)))
show_errors(errors)
class App:
......@@ -149,11 +174,11 @@ class App:
error('--{} harus diisi'.format(name))
p[name] = value or default
p = dict(host=self.option.host, invoice_id=invoice_id)
p = dict(invoice_id=invoice_id)
if self.option.payment or self.option.reversal:
required('amount')
if self.option.method == 'payment':
required('ntb', datetime.now().strftime('%m%d%H%m%s'))
if self.option.payment:
required('ntb', datetime.now().strftime('%m%d%H%M%S'))
required('stan', datetime.now().strftime('%H%M%S'))
else:
required('ntb')
......@@ -186,12 +211,12 @@ class App:
def run_echo(self):
for thread_id in range(1, self.option.count+1):
p = dict(host=self.option.host, id=thread_id)
p = dict(id=thread_id)
data = dict(id=thread_id, method='echo', params=[p], jsonrpc='2.0')
self.create_thread(dict(data))
def run(self):
p = dict(host=self.option.host)
p = dict()
if self.option.invoice_id:
self.run_transaction()
else:
......
import sys
import os
import requests
from datetime import datetime
from time import (
sleep,
time,
)
from threading import Thread
from argparse import ArgumentParser
from .views.linkaja.structure import (
InquiryResponse,
PaymentResponse,
)
headers = {'content-type': 'application/x-www-form-urlencoded'}
threads = dict()
end_threads = list()
durations = dict()
csv_responses = dict()
server_info = dict()
default_url = 'http://localhost:7000/linkaja'
default_count = 1
default_merchant = 'ldmjakarta1'
default_terminal = 'Terminal Name'
default_pwd = 'ldmjkt1pass'
default_msisdn = '628111234567'
help_url = 'default ' + default_url
help_count = 'default {}'.format(default_count)
help_amount = 'wajib saat --payment dan --reversal'
help_bill_ref = 'wajib saat payment dan reversal, '\
'diperoleh dari inquiry response'
help_trx_id = 'Nomor Transaksi Bank, '\
'opsional saat --payment, wajib saat --reversal'
help_merchant = 'default {}'.format(default_merchant)
ERRORS = [
'Connection refused',
]
def error(s):
print('ERROR: {}'.format(s))
sys.exit()
def log_info(s):
t = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
t = t[:-3]
msg = '{} {}'.format(t, s)
print(msg)
def get_option(argv):
parser = ArgumentParser()
parser.add_argument('--url', default=default_url, help=help_url)
parser.add_argument(
'--count', type=int, default=default_count, help=help_count)
parser.add_argument('--invoice-id', required=True)
parser.add_argument('--payment', action='store_true')
parser.add_argument('--reversal', action='store_true')
parser.add_argument('--amount', type=int, help=help_amount)
parser.add_argument('--bill-ref', help=help_bill_ref)
parser.add_argument('--trx-id', help=help_trx_id)
parser.add_argument(
'--merchant', default=default_merchant, help=help_merchant)
parser.add_argument('--terminal', default=default_terminal)
parser.add_argument('--pwd', default=default_pwd)
parser.add_argument('--msisdn', default=default_msisdn)
return parser.parse_args(argv)
def send(thread_id, p):
url = server_info['url']
log_info('Request: {}'.format(p))
start = time()
try:
resp = requests.post(url, data=p, headers=headers, timeout=10)
durations[thread_id] = time() - start
data = p['trx_type'] == '021' and InquiryResponse() or \
PaymentResponse()
if resp.status_code == 200:
data.from_raw(resp.text)
log_info('Response {}: {} -> {}'.format(
resp.status_code, [resp.text], data.values))
csv_responses[thread_id] = resp
except requests.exceptions.ConnectionError as e:
durations[thread_id] = time() - start
log_info('Response: {}'.format(e))
csv_responses[thread_id] = dict(fatal=e)
finally:
end_threads.append(thread_id)
def show_errors(errors):
if errors:
for err, count in errors.items():
log_info('{} {}'.format(err, count))
else:
log_info('Tidak ada yang gagal')
def nice_error(s):
for msg in ERRORS:
if s.find(msg) > -1:
return msg
return s
def show_durations():
tid_fastest = tid_slowest = None
total_duration = 0
messages = dict()
errors = dict()
for tid in durations:
duration = durations[tid]
resp = csv_responses.get(tid)
if resp:
err = None
if 'fatal' in resp:
err = msg = nice_error(str(resp['fatal']))
elif resp.status_code == 200:
messages[tid] = msg = resp.text.strip()
if tid_fastest:
if duration < durations[tid_fastest]:
tid_fastest = tid
else:
tid_fastest = tid
if tid_slowest:
if duration > durations[tid_slowest]:
tid_slowest = tid
else:
tid_slowest = tid
total_duration += duration
else:
err = msg = resp.text.split('\n')[0].strip()
else:
err = msg = 'KOSONG'
if err:
if err in errors:
errors[err] += 1
else:
errors[err] = 1
log_info('thread {} {} detik {}'.format(tid, duration, msg))
if tid_fastest:
log_info('Tercepat {}'.format(messages[tid_fastest]))
log_info('Terlama {}'.format(messages[tid_slowest]))
log_info('Rerata {} detik / request'.format(total_duration/len(durations)))
show_errors(errors)
class App:
def __init__(self, argv):
self.option = get_option(argv)
server_info['url'] = self.option.url
def create_thread(self, thread_id, data):
thread = Thread(target=send, args=[thread_id, data])
# Exit the server thread when the main thread terminates
thread.daemon = True
threads[thread_id] = thread
thread.start()
def get_invoice_ids(self):
if not os.path.exists(self.option.invoice_id):
return [self.option.invoice_id]
r = []
with open(self.option.invoice_id) as f:
for line in f.readlines():
invoice_id = line.rstrip()
r += [invoice_id]
return r
def get_method(self):
if self.option.payment:
return '022'
if self.option.reversal:
return '023'
return '021'
def get_transaction(self, invoice_id):
def required(name, default=None):
value = getattr(self.option, name)
if not value and not default:
error('--{} harus diisi'.format(name.replace('_', '-')))
p[name] = value or default
p = dict(
merchant=self.option.merchant,
terminal=self.option.terminal,
pwd=self.option.pwd,
msisdn=self.option.msisdn,
acc_no=self.option.invoice_id,
trx_date=datetime.now().strftime('%Y%m%d%H%M%S'))
p['trx_type'] = self.get_method()
if self.option.payment or self.option.reversal:
required('amount')
required('bill_ref')
if self.option.payment:
required('trx_id', datetime.now().strftime('%m%d%H%M%S'))
else:
required('trx_id')
return p
def run_transaction(self):
thread_id = 0
for i in range(self.option.count):
for invoice_id in self.get_invoice_ids():
thread_id += 1
data = self.get_transaction(invoice_id)
self.create_thread(thread_id, data)
def run(self):
self.run_transaction()
while threads:
if not end_threads:
continue
i = end_threads[0]
if i in threads:
thread = threads[i]
thread.join()
del threads[i]
index = end_threads.index(i)
del end_threads[index]
show_durations()
def main(argv=sys.argv[1:]):
app = App(argv)
app.run()
......@@ -73,7 +73,9 @@ setup(
'console_scripts': [
'iso8583 = iso8583_web.scripts.forwarder:main',
'iso8583_web_client = iso8583_web.scripts.web_client:main',
'iso8583_web_client_linkaja = iso8583_web.scripts.web_client_linkaja:main',
'initialize_iso8583_web_db = iso8583_web.scripts.initialize_db:main',
'init_db_linkaja = iso8583_web.scripts.init_db_linkaja:main',
]
},
)
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!