Commit 60ed2adc by Owo Sugiana

Tambah threads pada section web di konfigurasi

1 parent 47bf07df
0.1.8 2020-06-24
----------------
- Konfigurasi section web ditambah threads (integer)
0.1.7 2020-05-12 0.1.7 2020-05-12
---------------- ----------------
- Bug fixed append_others() - Bug fixed append_others()
......
...@@ -34,6 +34,7 @@ format = %(asctime)s %(levelname)s %(message)s ...@@ -34,6 +34,7 @@ format = %(asctime)s %(levelname)s %(message)s
# bisa dilakukan melalui JsonRPC client. # bisa dilakukan melalui JsonRPC client.
# [web] # [web]
# port = 7000 # port = 7000
# threads = 12
[host_bjb] [host_bjb]
ip = 127.0.0.1 ip = 127.0.0.1
......
...@@ -23,10 +23,6 @@ def get_conf(ip, port): ...@@ -23,10 +23,6 @@ def get_conf(ip, port):
return ip_conf[key] return ip_conf[key]
def get_web_port():
return 'port' in web and web['port']
def get_str(conf, section, option, default): def get_str(conf, section, option, default):
try: try:
return conf.get(section, option) return conf.get(section, option)
...@@ -89,10 +85,12 @@ def append_others(cfg, conf, section): ...@@ -89,10 +85,12 @@ def append_others(cfg, conf, section):
def read_conf(conf_file): def read_conf(conf_file):
conf = ConfigParser() conf = ConfigParser()
conf['DEFAULT'] = dict(threads=12)
conf.read(conf_file) conf.read(conf_file)
for section in conf.sections(): for section in conf.sections():
if section == 'web': if section == 'web':
web['port'] = conf.getint(section, 'port') web['port'] = conf.getint(section, 'port')
web['threads'] = conf.getint(section, 'threads')
continue continue
if section.find('host_') < 0: if section.find('host_') < 0:
continue continue
......
...@@ -33,7 +33,7 @@ from opensipkd.jsonrpc.exc import ( ...@@ -33,7 +33,7 @@ from opensipkd.jsonrpc.exc import (
from ..read_conf import ( from ..read_conf import (
read_conf, read_conf,
ip_conf, ip_conf,
get_web_port, web as web_conf,
listen_ports, listen_ports,
allowed_ips, allowed_ips,
get_conf, get_conf,
...@@ -444,7 +444,7 @@ def web_job(conn, iso): ...@@ -444,7 +444,7 @@ def web_job(conn, iso):
stan = iso.get_stan() stan = iso.get_stan()
awal = time() awal = time()
while True: while True:
sleep(0.1) sleep(0.001)
if time() - awal > 5: if time() - awal > 5:
raise JsonRpcBillerNetwork(message='Timeout') raise JsonRpcBillerNetwork(message='Timeout')
if ip_port not in web_response: if ip_port not in web_response:
...@@ -516,9 +516,10 @@ web_server = {} ...@@ -516,9 +516,10 @@ web_server = {}
def start_web_server(): def start_web_server():
port = get_web_port() port = web_conf.get('port')
if not port: if not port:
return return
thread_count = web_conf.get('threads')
host = '0.0.0.0' host = '0.0.0.0'
with Configurator() as config: with Configurator() as config:
config.include('pyramid_tm') config.include('pyramid_tm')
...@@ -526,10 +527,12 @@ def start_web_server(): ...@@ -526,10 +527,12 @@ def start_web_server():
config.add_jsonrpc_endpoint('rpc', '/rpc') config.add_jsonrpc_endpoint('rpc', '/rpc')
config.scan(__name__) config.scan(__name__)
app = config.make_wsgi_app() app = config.make_wsgi_app()
web_server['listener'] = server = create_server(app, host=host, port=port) web_server['listener'] = server = create_server(
app, host=host, port=port, threads=thread_count)
web_server['thread'] = create_thread(server.run) web_server['thread'] = create_thread(server.run)
web_server['thread'].start() web_server['thread'].start()
log_web_info('listen at {}:{}'.format(host, port)) log_web_info('listen at {}:{} with {} workers'.format(
host, port, thread_count))
def stop_web_server(reason): def stop_web_server(reason):
......
import sys import sys
import os
import requests import requests
import json import json
from datetime import datetime from datetime import datetime
...@@ -11,10 +12,11 @@ from argparse import ArgumentParser ...@@ -11,10 +12,11 @@ from argparse import ArgumentParser
headers = {'content-type': 'application/json'} headers = {'content-type': 'application/json'}
threads = {} threads = dict()
end_threads = [] end_threads = list()
durations = {} durations = dict()
json_responses = {} json_responses = dict()
server_info = dict()
default_url = 'http://localhost:7000/rpc' default_url = 'http://localhost:7000/rpc'
default_host = 'pemda' default_host = 'pemda'
...@@ -36,12 +38,6 @@ def error(s): ...@@ -36,12 +38,6 @@ def error(s):
sys.exit() sys.exit()
def required(name, default=None):
value = getattr(option, name)
if not value and not default:
error('--{} harus diisi'.format(name))
p[name] = value or default
def log_info(s): def log_info(s):
t = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f') t = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
...@@ -67,12 +63,93 @@ def get_option(argv): ...@@ -67,12 +63,93 @@ def get_option(argv):
return parser.parse_args(argv) return parser.parse_args(argv)
def send(p):
url = server_info['url']
key = p['id']
log_info('Request: {}'.format(p))
start = time()
try:
resp = requests.post(url, data=json.dumps(p), headers=headers)
durations[key] = time() - start
json_resp = resp.json()
log_info('Response: {}'.format(json_resp))
json_responses[key] = json_resp
finally:
end_threads.append(key)
def show_durations():
key_fastest = None
key_slowest = None
total_duration = 0
messages = dict()
for key in durations:
duration = durations[key]
resp = json_responses[key]
if 'error' in resp:
break
result = resp['result']
if result['code'] == 0:
stan = result['data']['11']
else:
stan = '-'
msg = 'thread {} stan {} {} detik'.format(key, stan, duration)
log_info(msg)
messages[key] = msg
if key_fastest:
if duration < durations[key_fastest]:
key_fastest = key
else:
key_fastest = key
if key_slowest:
if duration > durations[key_slowest]:
key_slowest = key
else:
key_slowest = key
total_duration += duration
log_info('Tercepat {}'.format(messages[key_fastest]))
log_info('Terlama {}'.format(messages[key_slowest]))
log_info('Rerata {} detik / request'.format(total_duration/len(durations)))
class App: class App:
def __init__(self, argv): def __init__(self, argv):
self.option = get_option(argv) self.option = get_option(argv)
server_info['url'] = self.option.url
def set_transaction(self, p): def create_thread(self, data):
p['invoice_id'] = self.option.invoice_id thread = Thread(target=send, args=[data])
# Exit the server thread when the main thread terminates
thread.daemon = True
thread_id = data['id']
threads[thread_id] = thread
thread.start()
def get_invoice_ids(self):
if not os.path.exists(self.option.invoice_id):
return [self.option.invoice_id]
r = []
with open(self.option.invoice_id) as f:
for line in f.readlines():
invoice_id = line.rstrip()
r += [invoice_id]
return r
def get_method(self):
if self.option.payment:
return 'payment'
if self.option.reversal:
return 'reversal'
return 'inquiry'
def get_transaction(self, invoice_id):
def required(name, default=None):
value = getattr(self.option, name)
if not value and not default:
error('--{} harus diisi'.format(name))
p[name] = value or default
p = dict(host=self.option.host, invoice_id=invoice_id)
if self.option.payment or self.option.reversal: if self.option.payment or self.option.reversal:
required('amount') required('amount')
if self.option.method == 'payment': if self.option.method == 'payment':
...@@ -93,50 +170,33 @@ class App: ...@@ -93,50 +170,33 @@ class App:
key, val = t.split(':') key, val = t.split(':')
conf[key] = val conf[key] = val
p['conf'] = conf p['conf'] = conf
return p
def send(self, p): def run_transaction(self):
key = p['id'] method = self.get_method()
log_info('Request: {}'.format(p)) thread_id = 0
start = time() for i in range(self.option.count):
try: for invoice_id in self.get_invoice_ids():
resp = requests.post( thread_id += 1
self.option.url, data=json.dumps(p), headers=headers) p = self.get_transaction(invoice_id)
durations[key] = time() - start data = dict(
json_resp = resp.json() id=thread_id, method=method, params=[p],
log_info('Response: {}'.format(json_resp)) jsonrpc='2.0')
json_responses[key] = json_resp self.create_thread(data)
finally:
end_threads.append(key) def run_echo(self):
for thread_id in range(1, self.option.count+1):
def create_thread(self, args=[]): p = dict(host=self.option.host, id=thread_id)
thread = Thread(target=self.send, args=args) data = dict(id=thread_id, method='echo', params=[p], jsonrpc='2.0')
# Exit the server thread when the main thread terminates self.create_thread(dict(data))
thread.daemon = True
return thread
def run(self): def run(self):
p = dict(host=self.option.host) p = dict(host=self.option.host)
if self.option.invoice_id: if self.option.invoice_id:
self.set_transaction(p) self.run_transaction()
if self.option.payment:
method = 'payment'
elif self.option.reversal:
method = 'reversal'
else:
method = 'inquiry'
else: else:
method = 'echo' self.run_echo()
data = dict(method=method, params=[p], jsonrpc='2.0')
for i in range(self.option.count):
data['id'] = i
thread = self.create_thread([dict(data)])
threads[i] = thread
for key in threads:
thread = threads[key]
thread.start()
sleep(0.2)
while threads: while threads:
sleep(1)
if not end_threads: if not end_threads:
continue continue
i = end_threads[0] i = end_threads[0]
...@@ -146,18 +206,7 @@ class App: ...@@ -146,18 +206,7 @@ class App:
del threads[i] del threads[i]
index = end_threads.index(i) index = end_threads.index(i)
del end_threads[index] del end_threads[index]
for key in durations: show_durations()
val = durations[key]
resp = json_responses[key]
if 'error' in resp:
break
result = resp['result']
if result['code'] == 0:
stan = result['data']['11']
else:
stan = '-'
msg = 'thread {} stan {} {} detik'.format(key, stan, val)
log_info(msg)
def main(argv=sys.argv[1:]): def main(argv=sys.argv[1:]):
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!