Commit 7eb0c0d3 by Owo Sugiana

Daemon forwarder bisa dihentikan dengan baik

1 parent db249bb3
......@@ -4,3 +4,4 @@ __pycache__
*pytest_cache
test-*
test.ini
dist
0.1.2 2019-07-10
----------------
- Web server berhasil dihentikan.
- Web log saat inquiry.
0.1.1 2019-04-22
----------------
- Web server bisa multi-thread
- Web client ditambah opsi --count=int untuk stress test
- Waktu tunggu diturunkan dari 5 detik menjadi 0,5 detik
- Penambahan log_debug()
0.1 2019-02-04
--------------
- Kali pertama
......@@ -34,7 +34,6 @@ Jalankan daemon-nya, anggap sebagai pemda (biller)::
Anda akan mendapat pesan seperti ini::
2019-02-07 20:41:30,179 INFO Web server listen at 0.0.0.0:6543
2019-02-07 20:41:30,180 INFO Connect to 127.0.0.1 port 10002
2019-02-07 20:41:30,180 ERROR [Errno 111] Connection refused
......@@ -101,18 +100,16 @@ membuat aplikasi teller bank. Pada ``test-bjb.ini`` aktifkan section
Kemudian restart daemon-nya. Setelah *echo established* dengan daemon pemda
lakukan *echo request* dengan cara::
$ ../env/bin/python contrib/bank-teller.py
$ ../env/bin/python contrib/web-client.py
Hasilnya menjadi seperti ini::
{'id': 0,
'jsonrpc': '2.0',
'result': {'code': 0,
'data': {'11': '163105',
'39': '00',
'7': '0221163106',
'70': '301'},
'message': 'OK'}}
2019-04-22 10:47:03.625 Request: {'method': 'echo', 'params': [{'host': 'pemda'}], 'jsonrpc': '2.0', 'id': 0}
2019-04-22 10:47:03.938 Response: {'jsonrpc': '2.0', 'id': 0, 'result': {'code': 0, 'message': 'OK', 'data': {'7': '0422104703', '11': '104703', '39': '00', '70': '301'}}}
thread 0 0.3129305839538574 detik
Jika ingin *stress test* silakan gunakan opsi ``-c 10`` yang berarti 10 kali
bersamaan.
JsonRpc Log File
----------------
......
import sys
import requests
import json
from optparse import OptionParser
from pprint import pprint
url = 'http://localhost:7000/rpc'
default_url = 'http://localhost:7000/rpc'
default_host = 'pemda'
help_url = 'default ' + default_url
help_host = 'default ' + default_host
parser = OptionParser()
parser.add_option('', '--url', default=default_url, help=help_url)
parser.add_option('', '--host', default=default_host, help=help_host)
option, args = parser.parse_args(sys.argv[1:])
url = option.url
headers = {'content-type': 'application/json'}
p = {'host': 'pemda'}
p = {'host': option.host}
data = {
'method': 'echo',
'params': [p],
......
class CommonIso:
def get_label(self):
return (self.is_echo_request() and 'Echo Request') or \
(self.is_echo_response() and 'Echo Response') or \
(self.is_sign_on_request() or \
self.is_sign_on_response() or \
self.is_sign_off_request() or \
self.is_sign_off_response() or \
self.is_inquiry_request() or \
self.is_inquiry_response() or \
self.is_payment_request() or \
self.is_payment_response() or \
self.is_reversal_request() or \
self.is_reversal_response()
import sys
from iso8583_web.read_conf import (
read_conf,
get_conf,
)
def show(iso):
flow = iso.is_request() and 'Request' or 'Response'
msg = '{} {} MTI {} Data {}'.format(
iso.get_name(), flow, iso.getMTI(), iso.get_values())
print(msg)
conf_file = sys.argv[1]
read_conf(conf_file)
ip = '127.0.0.1'
port = 10002
conf = get_conf(ip, port)
cls = conf['module_obj'].doc.Doc
iso_req = cls()
iso_req.echo_request()
show(iso_req)
iso_resp = cls(from_iso=iso_req)
iso_resp.process()
show(iso_resp)
iso_req = cls()
iso_req.sign_on_request()
show(iso_req)
iso_resp = cls(from_iso=iso_req)
iso_resp.process()
show(iso_resp)
iso_req = cls()
iso_req.sign_off_request()
show(iso_req)
iso_resp = cls(from_iso=iso_req)
iso_resp.process()
show(iso_resp)
import sys
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from iso8583_web.read_conf import (
read_conf,
get_conf,
)
import sismiop.services
def show(iso):
flow = iso.is_request() and 'Request' or 'Response'
msg = '{} {} MTI {} Data {}'.format(
iso.get_name(), flow, iso.getMTI(), iso.get_values())
print(msg)
conf_file = sys.argv[1]
invoice_id = sys.argv[2]
read_conf(conf_file)
ip = '127.0.0.1'
port = 10002
conf = get_conf(ip, port)
engine = create_engine(conf['db_url'])
session_factory = sessionmaker(bind=engine)
sismiop.services.DBSession = session_factory()
cls = conf['module_obj'].doc.Doc
iso_req = cls(conf=conf)
iso_req.set_inquiry_request()
iso_req.set_invoice_id(invoice_id)
show(iso_req)
iso_resp = cls(from_iso=iso_req, conf=conf)
iso_resp.process()
show(iso_resp)
import sys
import requests
import json
from datetime import datetime
from time import (
sleep,
time,
)
from threading import Thread
from optparse import OptionParser
threads = {}
end_threads = []
durations = {}
json_responses = {}
def create_thread(func, args=[]):
thread = Thread(target=func, args=args)
# Exit the server thread when the main thread terminates
thread.daemon = True
return thread
def send(p):
key = p['id']
log_info('Request: {}'.format(p))
start = time()
try:
resp = requests.post(url, data=json.dumps(p), headers=headers)
durations[key] = time() - start
json_resp = resp.json()
log_info('Response: {}'.format(json_resp))
json_responses[key] = json_resp
finally:
end_threads.append(key)
def log_info(s):
t = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
t = t[:-3]
msg = '{} {}'.format(t, s)
print(msg)
default_url = 'http://localhost:7000/rpc'
default_host = 'pemda'
default_count = 1
help_url = 'default ' + default_url
help_host = 'default ' + default_host
help_count = 'default {}'.format(default_count)
parser = OptionParser()
parser.add_option('', '--url', default=default_url, help=help_url)
parser.add_option('', '--host', default=default_host, help=help_host)
parser.add_option(
'-c', '--count', type='int', default=default_count, help=help_count)
option, args = parser.parse_args(sys.argv[1:])
url = option.url
count = option.count
headers = {'content-type': 'application/json'}
p = {'host': option.host}
data = {
'method': 'echo',
'params': [p],
'jsonrpc': '2.0',
}
for i in range(count):
data['id'] = i
thread = create_thread(send, [dict(data)])
threads[i] = thread
for key in threads:
thread = threads[key]
thread.start()
sleep(0.2)
while threads:
sleep(1)
if not end_threads:
continue
i = end_threads[0]
if i in threads:
thread = threads[i]
thread.join()
del threads[i]
index = end_threads.index(i)
del end_threads[index]
for key in durations:
val = durations[key]
resp = json_responses[key]
if 'error' in resp:
break
result = resp['result']
if result['code'] == 0:
stan = result['data']['11']
else:
stan = '-'
print('thread {} stan {} {} detik'.format(key, stan, val))
......@@ -60,6 +60,7 @@ def main(global_config, **settings):
config.include('pyramid_tm')
config.include('pyramid_beaker')
config.include('pyramid_chameleon')
config.include('pyramid_rpc.jsonrpc')
authn_policy = AuthTktAuthenticationPolicy(
'sosecret', callback=group_finder, hashalg='sha512')
......
......@@ -72,6 +72,20 @@ def get_streamer_class(name):
obj = get_module_object('opensipkd.streamer.' + name)
return obj.Streamer
def append_others(cfg, conf, section):
# Konfigurasi tambahan terkait host
for key, val in conf.items(section):
if key in cfg:
continue
cfg[key] = val
# Konfigurasi tambahan terkait modul
section = 'module_' + cfg['module']
if not conf.has_section(section):
return
for key, val in conf.items(section):
cfg[key] = val
def read_conf(conf_file):
conf = ConfigParser()
......@@ -105,6 +119,11 @@ def read_conf(conf_file):
cfg['listen'] = get_boolean(conf, section, 'listen', True)
cfg['echo'] = get_boolean(conf, section, 'echo', not cfg['listen'])
cfg['timeout'] = get_int(conf, section, 'timeout', 60)
module_section = 'module_' + cfg['module']
if conf.has_section(module_section):
for key in conf.options(module_section):
cfg[key] = conf.get(module_section, key)
append_others(cfg, conf, section)
if cfg['listen']:
if port not in listen_ports:
listen_ports.append(port)
......
......@@ -7,7 +7,8 @@ from time import (
time,
)
from threading import Thread
from wsgiref.simple_server import make_server
from configparser import ConfigParser
from waitress.server import create_server
from pyramid.config import Configurator
from pyramid.paster import setup_logging
from pyramid_rpc.jsonrpc import jsonrpc_method
......@@ -40,11 +41,6 @@ from ..read_conf import (
)
def debug(s):
msg = 'DEBUG {}'.format(s)
print(msg)
def get_log():
return logs[0]
......@@ -59,6 +55,11 @@ def log_error(s):
log.error(s)
def log_debug(s):
log = get_log()
log.debug(s)
def iso_to_dict(iso):
data = iso.get_values()
return dict_to_str(data)
......@@ -85,6 +86,10 @@ class Log:
msg = self.log_message(msg)
log_error(msg)
def log_debug(self, msg):
msg = self.log_message(msg)
log_debug(msg)
def log_unknown(self):
msg = exception_message()
self.log_error(msg)
......@@ -159,7 +164,8 @@ class RequestHandler(BaseRequestHandler, CommonConnection):
ip = self.client_address[0]
port = self.server.server_address[1]
self.conf = get_conf(ip, port)
self.running = False
self.running = False
self.created_time = time()
conn_mgr.add(self)
BaseRequestHandler.handle(self)
......@@ -167,10 +173,21 @@ class RequestHandler(BaseRequestHandler, CommonConnection):
self.log_receive_raw(raw)
BaseRequestHandler.on_receive_raw(self, raw)
# Override BaseRequestHandler.process()
# Override
def process(self, raw):
CommonConnection.process(self, raw)
# Override
def set_timeout(self):
if self.conf['echo']:
BaseRequestHandler.set_timeout(self)
# Override
def is_timeout(self):
if not self.conf['echo']:
return False
return BaseRequestHandler.is_timeout(self)
def close_because_timeout(self):
self.log_timeout()
BaseRequestHandler.close_because_timeout(self)
......@@ -181,9 +198,11 @@ class RequestHandler(BaseRequestHandler, CommonConnection):
def raw_for_send(self, raw):
self.log_iso_to_raw(raw)
raw = BaseRequestHandler.raw_for_send(self, raw)
return BaseRequestHandler.raw_for_send(self, raw)
def just_send(self, raw):
self.log_send(raw)
return raw
BaseRequestHandler.just_send(self, raw)
def run(self):
try:
......@@ -201,15 +220,18 @@ def start_servers():
log_info('ISO8583 server listen at {}:{}'.format(*listen_address))
server = Server(listen_address, RequestHandler)
thread = create_thread(server.serve_forever)
servers[listen_port] = (server, thread)
servers[listen_port] = (server, thread)
thread.start()
def stop_servers(reason):
for listen_port in listen_ports:
server, thread = servers[listen_port]
log_debug('ISO8583 server shutdown')
server.shutdown()
log_debug('ISO8583 server thread join')
thread.join()
log_debug('ISO8583 server thread joined')
sleep(1)
......@@ -245,9 +267,11 @@ class Client(BaseClient, CommonConnection):
def raw_for_send(self, raw):
self.log_iso_to_raw(raw)
raw = BaseClient.raw_for_send(self, raw)
return BaseClient.raw_for_send(self, raw)
def just_send(self, raw):
self.log_send(raw)
return raw
BaseClient.just_send(self, raw)
def run(self):
try:
......@@ -275,8 +299,12 @@ def stop_connections(reason):
sleep(1)
for ip_port in clients:
client, thread = clients[ip_port]
msg = 'Thread {} join'.format(ip_port)
log_debug(msg)
thread.join()
msg = 'Thread {} joined'.format(ip_port)
log_debug(msg)
#######################
# Raw ISO 8583 parser #
......@@ -308,7 +336,7 @@ class Parser(Log):
ip_port = join_ip_port(self.conf['ip'], self.conf['port'])
if ip_port in web_process:
stan_list = web_process[ip_port]
stan = from_iso.get_stan()
stan = from_iso.get_stan()
if stan in stan_list:
i = stan_list.index(stan)
del stan_list[i]
......@@ -326,7 +354,7 @@ class ConnectionManager(BaseConnectionManager):
conn_mgr = ConnectionManager()
#######
# Web #
......@@ -347,6 +375,12 @@ def log_web_error(s):
log.error(msg)
def log_web_debug(s):
msg = log_web_msg(s)
log = get_log()
log.debug(msg)
def conn_by_name(name):
conf = name_conf[name]
found_conn = None
......@@ -370,7 +404,7 @@ def conn_by_name(name):
web_request = {}
# Daftar job yang sedang diproses, yaitu menunggu iso response
# key: ip:port, value: list of stan (bit 11)
# key: ip:port, value: list of stan (bit 11)
web_process = {}
# Daftar job yang sudah selesai, berisi iso response
......@@ -402,7 +436,7 @@ def web_job(conn, iso):
stan = iso.get_stan()
awal = time()
while True:
sleep(1)
sleep(0.1)
if time() - awal > 5:
raise JsonRpcBillerNetwork(message='Timeout')
if ip_port not in web_response:
......@@ -410,7 +444,7 @@ def web_job(conn, iso):
result = web_response[ip_port]
if stan in result:
iso = result[stan]
del result[stan]
del result[stan]
data = iso_to_dict(iso)
return dict(code=0, message='OK', data=data)
......@@ -442,9 +476,12 @@ def echo(request, p):
@jsonrpc_method(endpoint='rpc')
def inquiry(request, p):
log_web_receive(request, 'inquiry', p)
conn = validate_rpc(p)
iso = conn.job.inquiry(p)
return web_job(conn, iso)
r = web_job(conn, iso)
log_web_send(request, 'inquiry', r)
return r
@jsonrpc_method(endpoint='rpc')
......@@ -468,17 +505,17 @@ def start_web_server():
port = get_web_port()
if not port:
return
host = '0.0.0.0'
with Configurator() as config:
config.include('pyramid_tm')
config.include('pyramid_rpc.jsonrpc')
config.add_jsonrpc_endpoint('rpc', '/rpc')
config.scan(__name__)
app = config.make_wsgi_app()
web_server['listener'] = server = make_server('0.0.0.0', port, app)
web_server['thread'] = create_thread(server.serve_forever)
web_server['listener'] = server = create_server(app, host=host, port=port)
web_server['thread'] = create_thread(server.run)
web_server['thread'].start()
ip_port = web_server['listener'].server_address
log_web_info('listen at {}:{}'.format(*ip_port))
log_web_info('listen at {}:{}'.format(host, port))
def stop_web_server(reason):
......@@ -486,10 +523,17 @@ def stop_web_server(reason):
return
msg = 'stop because {}'.format(reason)
log_web_info(msg)
# shutdown() ini kadang tidak segera mengakhiri web server. Akan cepat
# berakhir bila ada client yang akses.
web_server['listener'].shutdown()
srv = web_server['listener']
srv.close()
while srv._map:
triggers = list(srv._map.values())
for trigger in triggers:
trigger.handle_close()
srv.maintenance(0)
srv.task_dispatcher.shutdown()
log_web_debug('thread join')
web_server['thread'].join()
log_web_debug('thread joined')
MSG_KILL_BY_SIGNAL = 'kill by signal {}'
......@@ -502,7 +546,7 @@ def out(sig=None, func=None):
if sig:
reason = MSG_KILL_BY_SIGNAL.format(sig)
else:
reason = MSG_KILL_BY_KEYBOARD
reason = MSG_KILL_BY_KEYBOARD
stop_servers(reason)
stop_connections(reason)
stop_web_server(reason)
......@@ -528,7 +572,7 @@ def check_connection():
continue
if conn.running:
continue
conn_mgr.remove(index)
conn_mgr.remove_if_old(index)
break
continue
cfg = ip_conf[ip_port]
......@@ -545,18 +589,28 @@ def check_job():
if not connection.is_connected():
continue
iso = connection.job.get_iso()
if not iso:
if iso:
iso_list = [iso]
else:
if ip_port not in web_request:
continue
jobs = web_request[ip_port]
if not jobs:
web_iso_list = web_request[ip_port]
if not web_iso_list:
continue
iso = jobs[0]
del jobs[0]
append_web_process(ip_port, iso)
connection.log_encode(iso)
raw = iso.getRawIso()
connection.send(raw)
iso_list = []
while web_iso_list:
iso = web_iso_list[0]
del web_iso_list[0]
append_web_process(ip_port, iso)
iso_list.append(iso)
with_headers = []
for iso in iso_list:
connection.log_encode(iso)
raw = iso.getRawIso()
with_header = connection.raw_for_send(raw)
with_headers.append(with_header)
raw = ''.join(with_headers)
connection.just_send(raw)
def check_parser():
......@@ -569,9 +623,17 @@ def check_parser():
if not parser.running:
thread.join()
del parser_threads[i]
i -= 1
i -= 1
def set_log(conf_file):
conf = ConfigParser()
conf.read(conf_file)
log = logging.getLogger(__file__)
log.setLevel(conf.get('logger_root', 'level'))
logs.append(log)
logs = []
running = []
......@@ -582,8 +644,7 @@ def main(argv=sys.argv):
config_uri = argv[1]
setup_logging(config_uri)
read_conf(config_uri)
log = logging.getLogger(__file__)
logs.append(log)
set_log(config_uri)
running.append(True)
start_web_server()
start_servers()
......@@ -594,6 +655,6 @@ def main(argv=sys.argv):
check_connection()
check_job()
check_parser()
sleep(5)
sleep(0.5)
except KeyboardInterrupt:
out()
......@@ -36,7 +36,7 @@ requires = [
customs_require = [
'http://repo.opensipkd.com/pip/opensipkd-base-0.2.tar.gz',
'http://repo.opensipkd.com/pip/opensipkd-hitung-0.1.tar.gz',
'http://repo.opensipkd.com/pip/opensipkd-iso8583-0.1.tar.gz',
'git+https://git.opensipkd.com/sugiana/opensipkd-iso8583',
'http://repo.opensipkd.com/pip/opensipkd-jsonrpc-0.1.tar.gz',
]
......@@ -64,6 +64,7 @@ if sys.argv[1:] and sys.argv[1] == 'develop-use-pip':
pip = os.path.join(bin_, 'pip')
pip_install('pip', True)
pip_install('setuptools', True)
pip_install('ebcdic', True)
requires_ = requires + customs_require
for package in requires_:
if sys.argv[2:]:
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!