Commit c1cfbdd9 by Owo Sugiana

Pencegahan payment ganda dengan menerapkan antrian berdasarkan Invoice ID

1 parent f6b47668
0.5 2021-5-10
-------------
- Pencegahan payment ganda yaitu menerapkan antrian berdasarkan Invoice ID.
0.4.1 2020-11-26
----------------
- Aspek security yang lebih rapi.
......
......@@ -58,6 +58,13 @@ parser_threads = []
# iso response for aggregator])
iso_process = {}
# Daftar antrian Invoice ID untuk menghindari payment ganda
# key: ip:port
# value: dict of (
# key: Invoice ID
# value: list of Thread ID)
invoice_ids = {}
def append_iso_process(
ip_port_target, iso_response, iso_for_host, ip_port_source):
......@@ -69,6 +76,23 @@ def append_iso_process(
iso_process[ip_port_target] = {stan: data}
def append_invoice_id(ip_port, inv_id, mem_id):
if ip_port in invoice_ids:
if inv_id in invoice_ids[ip_port]:
invoice_ids[ip_port][inv_id].append(mem_id)
else:
invoice_ids[ip_port][inv_id] = [mem_id]
else:
invoice_ids[ip_port] = {inv_id: [mem_id]}
def remove_invoice_id(ip_port, inv_id):
mem_id = invoice_ids[ip_port][inv_id][0]
invoice_ids[ip_port][inv_id].remove(mem_id)
if not invoice_ids[ip_port][inv_id]:
del invoice_ids[ip_port][inv_id]
def to_str(s):
if sys.version_info.major > 2:
return s
......@@ -308,8 +332,10 @@ class Parser(Log):
self.connection = connection
self.raw = raw
self.conf = connection.conf
self.ip_port = join_ip_port(self.conf['ip'], self.conf['port'])
self.conn_id = id(connection)
self.parser_id = id(self)
self.begin_time = time()
self.running = True
# Override
......@@ -318,14 +344,45 @@ class Parser(Log):
ip=self.conf['ip'], name=self.conf['name'], conn_id=self.conn_id,
parser_id=self.parser_id, msg=msg)
def append_invoice_id(self, inv_id):
append_invoice_id(self.ip_port, inv_id, self.parser_id)
def remove_invoice_id(self, inv_id):
remove_invoice_id(self.ip_port, inv_id)
def is_my_turn(self, inv_id):
mem_id = invoice_ids[self.ip_port][inv_id][0]
return mem_id == self.parser_id
def wait_for_same_invoice_id(self, inv_id):
self.log_debug(f'Antrian {invoice_ids[self.ip_port][inv_id]}')
sleep(1)
duration = time() - self.begin_time
if duration < 10:
return True
self.log_error('Timeout')
self.running = False
def run(self):
from_iso = self.connection.job.raw_to_iso(self.raw)
self.log_decode(from_iso)
is_trx = from_iso.is_transaction_request()
if is_trx:
inv_id = from_iso.get_invoice_id()
self.append_invoice_id(inv_id)
while True:
if self.is_my_turn(inv_id):
break
if not self.wait_for_same_invoice_id(inv_id):
return
try:
iso = self.connection.job.process(from_iso)
except Exception:
self.log_unknown()
self.running = False
if is_trx:
self.remove_invoice_id(inv_id)
if not self.running:
return
if iso:
try:
......@@ -344,21 +401,20 @@ class Parser(Log):
self.running = False
def on_response(self, from_iso):
ip_port = join_ip_port(self.conf['ip'], self.conf['port'])
if ip_port in web_process:
stan_list = web_process[ip_port]
if self.ip_port in web_process:
stan_list = web_process[self.ip_port]
stan = from_iso.get_stan()
if stan in stan_list:
i = stan_list.index(stan)
del stan_list[i]
append_web_response(ip_port, stan, from_iso)
append_web_response(self.ip_port, stan, from_iso)
# as forwarder
if ip_port not in iso_process:
if self.ip_port not in iso_process:
return
stan = from_iso.get_stan()
if stan not in iso_process[ip_port]:
if stan not in iso_process[self.ip_port]:
return
ip_port_source, iso = iso_process[ip_port][stan]
ip_port_source, iso = iso_process[self.ip_port][stan]
try:
iso.continue_response(from_iso)
ok = True
......@@ -369,7 +425,7 @@ class Parser(Log):
conn = get_connection_from_ip_port(ip_port_source)
self.log_ack(iso)
self.send_by_conn(iso, conn)
del iso_process[ip_port][stan]
del iso_process[self.ip_port][stan]
def send_by_conn(self, iso, conn):
conn.log_encode(iso)
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!