Commit b67f0bc0 by Owo Sugiana

Parser.run() bisa forward dengan sumber dan target ISO8583

1 parent d99457b1
0.3 2020-08-09
--------------
- Di forwarder.py Parser.run() usai job.process() bisa mengirim dokumen ISO8583
ke host lain. Ini berguna saat sebagai bank yang mendapat request dari
aggregator berupa dokumen ISO8583 juga.
- Bug fixed saat konfigurasi ada nama section memuat karakter underscore lebih
dari 1.
- linkaja mengenal makna bit 39 bernilai 91 yaitu koneksi terputus.
0.2.1 2020-07-19
----------------
- Konfigurasi web yang lebih ringkas
......
......@@ -4,48 +4,72 @@
###
[loggers]
keys = root, iso8583_web
keys = root, iso8583_web, jsonrpc
[handlers]
keys = console
keys = console, file
[formatters]
keys = generic
[logger_root]
level = INFO
handlers = console
level = DEBUG
handlers = console, file
[logger_iso8583_web]
level = DEBUG
handlers =
qualname = iso8583_web
[logger_jsonrpc]
level = DEBUG
handlers = console, file
qualname = pyramid_rpc.jsonrpc
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
level = DEBUG
formatter = generic
[handler_file]
class = FileHandler
args = ('/home/sugiana/log/agratek.log', 'a')
level = DEBUG
formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)s %(message)s
# Aktifkan web server jika ingin inquiry dll melalui web client.
[web]
port = 7001
port = 7000
threads = 12
[web_host_linkaja]
ip = 127.0.0.1
[web_linkaja]
route_path = /linkaja
module = iso8583_web.scripts.views.linkaja
host = bjb
[module_iso8583_web.scripts.views.linkaja]
route_path = /linkaja
db_url = postgresql://user:pass@localhost/agratek
timeout = 30
db_url = postgresql://user:pass@localhost/db
[host_bjb]
ip = 127.0.0.1
port = 10003
listen = false
streamer = bjb_with_suffix
port = 10001
listen = false
timeout = 15
streamer = bjb
module = opensipkd.iso8583.bjb.pbb.agratek
request_bits =
2:622011888888888888
18:6025
22:010
32:015
35:622011888888888888=9912?
37:000000000000
41:AGRATEK
42:LINKAJA
43:AGRATEK-LINKAJA
49:360
59:PAY
60:123
63:214
......@@ -4,7 +4,7 @@
###
[loggers]
keys = root, iso8583_web
keys = root, iso8583_web, jsonrpc
[handlers]
keys = console, file
......@@ -13,7 +13,7 @@ keys = console, file
keys = generic
[logger_root]
level = INFO
level = DEBUG
handlers = console, file
[logger_iso8583_web]
......@@ -21,10 +21,15 @@ level = DEBUG
handlers =
qualname = iso8583_web
[logger_jsonrpc]
level = DEBUG
handlers = console, file
qualname = pyramid_rpc.jsonrpc
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
level = DEBUG
formatter = generic
[handler_file]
......@@ -36,20 +41,33 @@ formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)s %(message)s
[host_pemda]
[host_agratek]
ip = 127.0.0.1
port = 10001
listen = true
streamer = bjb
module = opensipkd.iso8583.bjb.pbb.test_aggregator
db_url = postgresql://user:pass@localhost/db
# Routing dari 4 digit awal InvoiceID
host =
3271:kota_bogor
3275:kota_bekasi
[host_kota_bogor]
ip = 127.0.0.1
port = 10002
listen = true
streamer = bjb_with_suffix
timeout = 60
module = opensipkd.iso8583.bjb.pbb.test
# Aktifkan web server dimana inquiry dkk bisa dilakukan melalui web client.
[web]
port = 7000
threads = 12
[host_kota_bekasi]
ip = 10.8.20.134
port = 10002
listen = true
streamer = bjb_with_suffix
module = opensipkd.iso8583.bjb.pbb.test
[web_rpc]
route_path = /rpc
host = pemda
module = iso8583_web.scripts.views.jsonrpc
[module_opensipkd.iso8583.bjb.pbb.test]
request_bits =
18:6025
33:00110
......@@ -13,7 +13,7 @@ keys = console, file
keys = generic
[logger_root]
level = INFO
level = DEBUG
handlers = console, file
[logger_iso8583_web]
......@@ -30,7 +30,7 @@ formatter = generic
[handler_file]
class = FileHandler
args = ('/home/sugiana/log/pemda.log', 'a')
level = DEBUG
level = NOTSET
formatter = generic
[formatter_generic]
......@@ -42,7 +42,10 @@ db_pool_size = 50
db_max_overflow = 100
persen_denda = 2
nip_pencatat = 999999999
# Tempat pembayaran
#####################
# Tempat pembayaran #
#####################
kd_kanwil = 01
kd_kantor = 01
# Penerjemahan nilai bit NN menjadi kd_tp
......@@ -67,13 +70,14 @@ ip = 127.0.0.1
port = 10002
listen = false
streamer = bjb_with_suffix
timeout = 60
timeout = 60
module = opensipkd.iso8583.bjb.pbb.bogor_kota
[host_mitracomm]
active = false
ip = 127.0.0.1
port = 8583
listen = true
streamer = mitracomm
timeout = 60
listen = true
timeout = 60
module = opensipkd.iso8583.bjb.pbb.bogor_kota
###
# logging configuration
# https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/logging.html
###
[loggers]
keys = root, iso8583_web
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = INFO
handlers = console
[logger_iso8583_web]
level = DEBUG
handlers =
qualname = iso8583_web
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(asctime)s %(levelname)s %(message)s
# Aktifkan web server agar inquiry dll bisa dilakukan melalui web client.
[web]
port = 7000
threads = 12
[web_host_teller]
ip = 127.0.0.1
module = iso8583_web.scripts.views.jsonrpc
host = pemda
[module_iso8583_web.scripts.views.jsonrpc]
route_path = /rpc
[host_pemda]
ip = 127.0.0.1
port = 10002
listen = true
streamer = bjb_with_suffix
......@@ -102,7 +102,8 @@ def read_web_conf(conf, section):
if section.find('web_') != 0:
return
cfg = dict()
cfg['name'] = section.split('web_')[-1]
cfg['name'] = section[4:]
cfg['timeout'] = get_int(conf, section, 'timeout', 30)
cfg['allowed_ip'] = get_list(conf, section, 'allowed_ip', [])
append_others(cfg, conf, section)
cfg['module_obj'] = get_module_object(cfg['module'])
......@@ -123,7 +124,7 @@ def read_host_conf(conf, section):
cfg['ip'] = conf.get(section, 'ip')
cfg['port'] = conf.getint(section, 'port')
ip_port = validate_ip_port(cfg)
cfg['name'] = section.split('_')[-1]
cfg['name'] = section[5:]
cfg['streamer'] = get_str(conf, section, 'streamer', 'none')
cfg['streamer_cls'] = get_streamer_class(cfg['streamer'])
cfg['ip'] = conf.get(section, 'ip')
......
......@@ -31,7 +31,6 @@ from .connection import conn_mgr
from .views import (
web_request,
web_process,
web_response,
append_web_process,
append_web_response,
)
......@@ -46,6 +45,41 @@ from .logger import (
)
# Jika kosong berarti perintah untuk mengakhiri aplikasi ini
running = []
# Daftar server ISO8583, key: port
servers = {}
# Daftar client ISO8583, key: ip:port
clients = {}
# Daftar penerjemah dokumen ISO8583
parser_threads = []
# Web server profile
web_server = {}
# Daftar antrian saat sebagai ISO8583 forwarder
# key: ip:port
# value: dict of (
# key: STAN forwarder
# value: list of [
# ip_port aggregator,
# iso response for aggregator])
iso_process = {}
def append_iso_process(
ip_port_target, iso_response, iso_for_host, ip_port_source):
stan = iso_for_host.get_stan()
data = [ip_port_source, iso_response]
if ip_port_target in iso_process:
iso_process[ip_port_target][stan] = data
else:
iso_process[ip_port_target] = {stan: data}
def to_str(s):
if sys.version_info.major > 2:
return s
......@@ -88,9 +122,6 @@ class Log:
self.log_info('Encode MTI {} Data {}'.format(iso.getMTI(), data))
parser_threads = []
class CommonConnection(Log):
def log_receive_raw(self, raw):
raw = to_str(raw)
......@@ -194,9 +225,6 @@ class RequestHandler(BaseRequestHandler, CommonConnection):
self.log_unknown()
servers = {}
def start_servers():
for listen_port in listen_ports:
listen_address = ('0.0.0.0', listen_port)
......@@ -272,9 +300,6 @@ def start_client(conf):
conn_mgr.add(client)
clients = {}
def stop_connections(reason):
for ip_port, connection in conn_mgr:
connection.log_close(reason)
......@@ -317,28 +342,95 @@ class Parser(Log):
self.running = False
return
if iso:
self.log_ack(iso)
self.log_encode(iso)
raw = iso.getRawIso()
self.connection.send(raw)
else: # dapat response
ip_port = join_ip_port(self.conf['ip'], self.conf['port'])
if ip_port in web_process:
stan_list = web_process[ip_port]
stan = from_iso.get_stan()
if stan in stan_list:
i = stan_list.index(stan)
del stan_list[i]
append_web_response(ip_port, stan, from_iso)
try:
for_host = getattr(iso, 'for_host')
except AttributeError:
for_host = None
if for_host:
try:
self.send_iso_for_host(iso)
except:
self.log_unknown()
else:
self.send(iso)
else:
self.on_response(from_iso)
self.running = False
def on_response(self, from_iso):
ip_port = join_ip_port(self.conf['ip'], self.conf['port'])
if ip_port in web_process:
stan_list = web_process[ip_port]
stan = from_iso.get_stan()
if stan in stan_list:
i = stan_list.index(stan)
del stan_list[i]
append_web_response(ip_port, stan, from_iso)
# as forwarder
if ip_port not in iso_process:
return
stan = from_iso.get_stan()
if stan not in iso_process[ip_port]:
return
ip_port_source, iso = iso_process[ip_port][stan]
try:
iso.continue_response(from_iso)
ok = True
except:
self.log_unknown()
ok = False
if ok:
conn = get_connection_from_ip_port(ip_port_source)
self.log_ack(iso)
self.send_by_conn(iso, conn)
del iso_process[ip_port][stan]
def send_by_conn(self, iso, conn):
conn.log_encode(iso)
raw = iso.getRawIso()
conn.send(raw)
def send(self, iso):
self.log_ack(iso)
self.send_by_conn(iso, self.connection)
def send_iso_for_host(self, iso):
conn = get_connection_from_name(iso.for_host)
if not conn:
iso.ack_link_conf()
self.send(iso)
return
if not conn.running:
iso.ack_link_down()
self.send(iso)
return
iso_for_host = iso.create_iso_for_host(conn.job.conf)
ip_port_source = join_ip_port(self.conf['ip'], self.conf['port'])
ip_port_target = join_ip_port(conn.conf['ip'], conn.conf['port'])
append_iso_process(ip_port_target, iso, iso_for_host, ip_port_source)
self.send_by_conn(iso_for_host, conn)
def get_connection_from_name(name):
conf = name_conf[name]
for ip_port, conn in conn_mgr:
ip, port = ip_port.split(':')
if conf['ip'] != ip:
continue
port = int(port)
if conf['port'] == port:
return conn
def get_connection_from_ip_port(ip_port):
for this_ip_port, conn in conn_mgr:
if ip_port == this_ip_port:
return conn
#######
# Web #
#######
web_server = {}
def start_web_server():
port = web_conf.get('port')
if not port:
......@@ -468,9 +560,6 @@ def check_parser():
i -= 1
running = []
def main(argv=sys.argv):
if len(argv) != 2:
usage(argv)
......
......@@ -43,10 +43,11 @@ def append_web_response(ip_port, stan, iso):
web_response[ip_port] = {stan: iso}
class WebJob(object):
def __init__(self, conn, iso):
class WebJob:
def __init__(self, conn, iso, conf=dict()):
self.conn = conn
self.iso = iso
self.conf = conf
def get_response(self):
ip_port = join_ip_port(self.conn.conf['ip'], self.conn.conf['port'])
......@@ -58,7 +59,7 @@ class WebJob(object):
awal = time()
while True:
sleep(0.001)
if time() - awal > 5:
if time() - awal > self.conf.get('timeout', 30):
raise self.timeout_error()
if ip_port not in web_response:
continue
......@@ -72,7 +73,7 @@ class WebJob(object):
return Exception('Timeout')
class View(object):
class View:
def __init__(self, request):
self.request = request
......@@ -148,5 +149,6 @@ class View(object):
return WebJob
def web_job(self, conn, iso):
job = WebJob(conn, iso)
conf = self.get_web_conf()
job = WebJob(conn, iso, conf)
return job.get_response()
......@@ -45,6 +45,7 @@ from .exceptions import (
AmountError,
BillRefNotFound,
PaymentNotFound,
LinkError,
)
from .structure import (
DataRequest,
......@@ -249,6 +250,8 @@ class View(BaseView):
err = AlreadyPaidError()
elif iso_data[39] == '51':
err = AmountError()
elif iso_data[39] == '91':
err = LinkError()
else:
err = BaseError()
if err:
......
......@@ -55,3 +55,8 @@ class BillRefNotFound(BaseError):
class PaymentNotFound(BaseError):
code = '48'
message = 'Belum ada pembayaran'
class LinkError(BaseError):
code = '49'
message = 'Koneksi terputus'
......@@ -92,6 +92,10 @@ def send(thread_id, p):
durations[thread_id] = time() - start
log_info('Response: {}'.format(e))
csv_responses[thread_id] = dict(fatal=e)
except requests.exceptions.ReadTimeout as e:
durations[thread_id] = time() - start
log_info('Response: {}'.format(e))
csv_responses[thread_id] = dict(fatal=e)
finally:
end_threads.append(thread_id)
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!