penambahan feature mpp

1 parent d46036b9
from calendar import c
import configparser
import datetime
import multiprocessing
# Global manager for all managed objects
manager = None
import os import os
from sqlite3 import connect
import sys import sys
import signal import signal
import logging import logging
...@@ -7,6 +15,7 @@ from time import ( ...@@ -7,6 +15,7 @@ from time import (
time, time,
) )
from threading import Thread from threading import Thread
from .sender import ISO8583Sender
from waitress.server import create_server from waitress.server import create_server
from pyramid.paster import setup_logging from pyramid.paster import setup_logging
from opensipkd.tcp.connection import join_ip_port from opensipkd.tcp.connection import join_ip_port
...@@ -32,7 +41,8 @@ from .read_conf import ( ...@@ -32,7 +41,8 @@ from .read_conf import (
) )
from .tools import iso_to_dict from .tools import iso_to_dict
from .connection import conn_mgr from .connection import conn_mgr
from .mpp import mpp_connections, request_queue, response_queue
# connections["mgr"] = conn_mgr
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
...@@ -136,6 +146,9 @@ class Log: ...@@ -136,6 +146,9 @@ class Log:
class CommonConnection(Log): class CommonConnection(Log):
def __init__(self):
super().__init__()
self.conf = {}
def log_receive_raw(self, raw): def log_receive_raw(self, raw):
raw = to_str(raw) raw = to_str(raw)
self.log_info('Receive {}'.format([raw])) self.log_info('Receive {}'.format([raw]))
...@@ -160,9 +173,13 @@ class CommonConnection(Log): ...@@ -160,9 +173,13 @@ class CommonConnection(Log):
def process(self, raw): def process(self, raw):
self.log_raw_to_iso(raw) self.log_raw_to_iso(raw)
parser = Parser(self, raw) parser = Parser(self, raw)
thread = create_thread(parser.run) # diubah jadi multiprocessing
parser_threads.append((parser, thread)) # thread = create_thread(parser.run)
thread.start() # parser_threads.append((parser, thread))
# thread.start()
now = datetime.datetime.now().strftime('%Y%m%d%H%M%S%f')
flag = self.conf.get("ip")+":"+str(self.conf.get("port"))+"_" + now
request_queue.put((flag, raw))
def create_thread(func): def create_thread(func):
...@@ -193,6 +210,20 @@ class RequestHandler(BaseRequestHandler, CommonConnection): ...@@ -193,6 +210,20 @@ class RequestHandler(BaseRequestHandler, CommonConnection):
self.running = False self.running = False
self.created_time = time() self.created_time = time()
conn_mgr.add(self) conn_mgr.add(self)
# connections[join_ip_port(ip, port)] = self
ip_port = join_ip_port(ip, port)
if ip_port not in mpp_connections:
entry = manager.dict()
entry["socket"] = None
entry["etx"] = self.conf.get("streamer")=="bjb_with_suffix"
entry["client_sock"] = self.request
entry["running"] = True
mpp_connections[ip_port] = entry
else:
mpp_connections[ip_port]["client_sock"] = self.request
mpp_connections[ip_port]["running"] = True
log.info('Connection from {}:{}'.format(ip, port))
BaseRequestHandler.handle(self) BaseRequestHandler.handle(self)
def on_receive_raw(self, raw): def on_receive_raw(self, raw):
...@@ -216,10 +247,16 @@ class RequestHandler(BaseRequestHandler, CommonConnection): ...@@ -216,10 +247,16 @@ class RequestHandler(BaseRequestHandler, CommonConnection):
def close_because_timeout(self): def close_because_timeout(self):
self.log_timeout() self.log_timeout()
# Cleanup mpp_connections on disconnect
ip_port = join_ip_port(self.conf['ip'], self.conf['port'])
mpp_connections.pop(ip_port, None)
BaseRequestHandler.close_because_timeout(self) BaseRequestHandler.close_because_timeout(self)
def on_socket_error(self, err): def on_socket_error(self, err):
self.log_error(err) self.log_error(err)
# Cleanup mpp_connections on error
ip_port = join_ip_port(self.conf['ip'], self.conf['port'])
mpp_connections.pop(ip_port, None)
BaseRequestHandler.on_socket_error(self, err) BaseRequestHandler.on_socket_error(self, err)
def raw_for_send(self, raw): def raw_for_send(self, raw):
...@@ -266,6 +303,19 @@ class Client(BaseClient, CommonConnection): ...@@ -266,6 +303,19 @@ class Client(BaseClient, CommonConnection):
ip, port = self.address ip, port = self.address
self.log_info('connect to port {}'.format(port)) self.log_info('connect to port {}'.format(port))
BaseClient.connect(self) BaseClient.connect(self)
# Update mpp_connections on successful connect
ip_port = join_ip_port(ip, port)
if ip_port not in mpp_connections:
entry = manager.dict()
entry["socket"] = None
entry["etx"] = cfg.get("streamer")=="bjb_with_suffix"
entry["client_sock"] = self.request
entry["running"] = True
mpp_connections[ip_port] = entry
if ip_port in mpp_connections:
mpp_connections[ip_port]["client_sock"] = self.request
mpp_connections[ip_port]["running"] = True
def on_receive_raw(self, raw): def on_receive_raw(self, raw):
self.log_receive_raw(raw) self.log_receive_raw(raw)
...@@ -277,6 +327,11 @@ class Client(BaseClient, CommonConnection): ...@@ -277,6 +327,11 @@ class Client(BaseClient, CommonConnection):
def close_because_timeout(self): def close_because_timeout(self):
self.log_timeout() self.log_timeout()
# Cleanup mpp_connections on disconnect
ip_port = join_ip_port(self.conf['ip'], self.conf['port'])
if ip_port in mpp_connections:
mpp_connections[ip_port]["client_sock"] = None
mpp_connections[ip_port]["running"] = False
BaseClient.close_because_timeout(self) BaseClient.close_because_timeout(self)
def on_refused(self, err): def on_refused(self, err):
...@@ -286,6 +341,9 @@ class Client(BaseClient, CommonConnection): ...@@ -286,6 +341,9 @@ class Client(BaseClient, CommonConnection):
def on_socket_error(self, err): def on_socket_error(self, err):
self.log_error(err) self.log_error(err)
# Cleanup mpp_connections on error
ip_port = join_ip_port(self.conf['ip'], self.conf['port'])
mpp_connections.pop(ip_port, None)
BaseClient.on_socket_error(self, err) BaseClient.on_socket_error(self, err)
def raw_for_send(self, raw): def raw_for_send(self, raw):
...@@ -327,6 +385,7 @@ def stop_connections(reason): ...@@ -327,6 +385,7 @@ def stop_connections(reason):
####################### #######################
# Raw ISO 8583 parser # # Raw ISO 8583 parser #
####################### #######################
class Parser(Log): class Parser(Log):
def __init__(self, connection, raw): def __init__(self, connection, raw):
self.connection = connection self.connection = connection
...@@ -495,7 +554,9 @@ def out(sig=None, func=None): ...@@ -495,7 +554,9 @@ def out(sig=None, func=None):
def check_connection(): def check_connection():
for ip_port in ip_conf: global manager
for ip_port, cfg in ip_conf.items(): # pylint: disable=consider-iterating-dictionary
global mpp_connections
if ip_port in conn_mgr: if ip_port in conn_mgr:
index = -1 index = -1
while True: while True:
...@@ -505,6 +566,10 @@ def check_connection(): ...@@ -505,6 +566,10 @@ def check_connection():
this_ip_port, conn = conn_mgr[index] this_ip_port, conn = conn_mgr[index]
if this_ip_port != ip_port: if this_ip_port != ip_port:
continue continue
# if mpp_connections[ip_port].get("running", False) != conn.running:
# mpp_connections[ip_port]["running"] = conn.running
# mpp_connections[ip_port]["client_sock"] = conn.request
# print(f"[DEBUG] Set client_sock for {ip_port}: {mpp_connections[ip_port]['client_sock']}")
if conn.running: if conn.running:
continue continue
conn_mgr.remove_if_old(index) conn_mgr.remove_if_old(index)
...@@ -561,10 +626,49 @@ def check_parser(): ...@@ -561,10 +626,49 @@ def check_parser():
i -= 1 i -= 1
import multiprocessing
from .processor import ISO8583Processor
def init_mpp():
global request_queue, response_queue, mpp_connections, sender_lock, manager # pylint: disable=global-statement
manager = multiprocessing.Manager()
request_queue = multiprocessing.Queue()
response_queue = multiprocessing.Queue()
mpp_connections = manager.dict()
sender_lock = manager.Lock()
def start_processor(conf_file):
# todo: Add argument for processor pool size
# pool_size = getattr(args, 'processor_pool', 8)
# Start a pool of processor workers
config = configparser.ConfigParser()
config.read(conf_file)
db_config = dict(config['main'])
pool_size = int(db_config.get("pool_size", 8))
processor_pool = []
for i in range(pool_size):
processor = ISO8583Processor(
request_queue, response_queue, db_config=db_config)
p = multiprocessing.Process(target=processor.start, name=f"Processor-{i+1}")
p.start()
processor_pool.append(p)
def start_sender():
import threading
sender = ISO8583Sender(response_queue, sender_lock, mpp_connections)
sender_thread = threading.Thread(target=sender.start, name="SenderThread", daemon=True)
sender_thread.start()
def main_loop(conf_file): def main_loop(conf_file):
setup_logging(conf_file) setup_logging(conf_file)
running.append(True) running.append(True)
start_servers() start_servers()
init_mpp()
start_processor(conf_file)
start_sender()
try: try:
while running: while running:
check_connection() check_connection()
...@@ -580,3 +684,8 @@ def main(argv=sys.argv): ...@@ -580,3 +684,8 @@ def main(argv=sys.argv):
if read_conf(conf_file): if read_conf(conf_file):
signal.signal(signal.SIGTERM, out) signal.signal(signal.SIGTERM, out)
main_loop(conf_file) main_loop(conf_file)
if __name__ == '__main__':
main()
\ No newline at end of file \ No newline at end of file
from . import main
if __name__ == "__main__":
import sys
main(sys.argv)
\ No newline at end of file \ No newline at end of file
import sismiop.services.base
from zope.sqlalchemy import register
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import engine_from_config
import logging
# Will be set by main process using multiprocessing.Manager().dict()
mpp_connections = None
request_queue = None
response_queue = None
DBSession = scoped_session(sessionmaker())
register(DBSession)
def get_connection(config):
# print(f"[get_connection] Received config: {config}")
db_url = config.get('other_db_url', None)
if not db_url:
print(
"Database URL not found in ini file under [main] section with key 'db_url'.")
raise ValueError(
"Database URL not found in ini file under [main] section with key 'db_url'.")
module_name = config.get('module', None)
if not module_name:
print(
"Module name not found in ini file under [main] section with key 'module'.")
raise ValueError(
"Module name not found in ini file under [main] section with key 'module'.")
engine = engine_from_config(config, prefix='other_db_')
DBSession.configure(bind=engine)
# print(f"[get_connection] DBSession.bind after configure: {DBSession.bind}")
sismiop.services.base.DBSession = DBSession
import importlib
import logging
import multiprocessing
import re
from ISO8583.ISO8583 import ISO8583
from opensipkd.iso8583.bjb.pbb.structure import (
INQUIRY_CODE, INVOICE_PROFILE, PAYMENT_CODE,
RC_ALREADY_PAID, RC_NOT_AVAILABLE, RC_INSUFFICIENT_FUND, RC_LINK_DOWN,
RC_INVALID_NUMBER, RC_ALREADY_PAID, RC_NOT_AVAILABLE, RC_INSUFFICIENT_FUND)
# from .models import DBSession, get_connection
# from opensipkd.tools import FixLength
from .mpp import get_connection
log = logging.getLogger(__name__)
from opensipkd.string import FixLength
class ISO8583Processor:
def __init__(self, req_queue, resp_queue, db_config=None):
self.request_queue = req_queue
self.response_queue = resp_queue
self.db_config = db_config
def start(self):
while True:
filename, data = self.request_queue.get()
response = self.process_file(filename, data, self.db_config)
self.response_queue.put((filename, response))
def get_bit7(self):
from datetime import datetime
now = datetime.now()
return now.strftime('%m%d%H%M%S') # MMDDhhmmss format
def process_file(self, filename, data, db_config):
import time
start_time = time.time()
# Ensure DBSession is bound in this process
# print(f"[process_file] db_config: {db_config}")
iso = ISO8583(iso=data)
print(iso.showBitmap())
bit39 = '00'
mti = iso.getMTI() if iso.getMTI() else '0000'
log.info(f"MTI detected: {mti}")
response = b''
if mti == '0800':
log.info(f"Handling network management MTI: {mti}")
iso.setMTI('0810')
elif mti in ('0200', '0210'):
logging.info(f"Handling transaction MTI: {mti}")
func = iso.getBit(3) if iso.getBit(3) else '000000'
invid = iso.getBit(61) if iso.getBit(37) else None
invoice_id = re.sub(r'\D', '', invid) if invid else '000000'
iso.setMTI('0210')
if len(invoice_id) < 22:
bit39 = '76'
else:
if db_config:
get_connection(db_config)
modu = importlib.import_module(
f"sismiop.services.{db_config.get('module')}")
inq = modu.Inquiry(invoice_id=invoice_id)
if not inq.invoice:
bit39 = RC_NOT_AVAILABLE
else:
inq.hitung()
if inq.total < 1:
bit39 = RC_ALREADY_PAID
elif func == PAYMENT_CODE:
amount = iso.getBit(4) if iso.getBit(4) else '0'
if int(amount) != inq.total:
bit39 = RC_INSUFFICIENT_FUND
elif func == INQUIRY_CODE:
iso.setBit(4, inq.total)
inv = inq.invoice
invoice_profile = FixLength(INVOICE_PROFILE)
invoice_profile.set('Propinsi', inv.kd_propinsi)
invoice_profile.set('Kabupaten', inv.kd_dati2)
invoice_profile.set('Kecamatan', inv.kd_kecamatan)
invoice_profile.set('Kelurahan', inv.kd_kelurahan
if inv.kd_kelurahan else '000')
invoice_profile.set('Blok', inv.kd_blok)
invoice_profile.set('Urut', inv.no_urut)
invoice_profile.set('Jenis', inv.kd_jns_op)
invoice_profile.set('Tahun', inv.thn_pajak_sppt)
invoice_profile.set('Nama', inv.nm_wp_sppt)
invoice_profile.set('Lokasi', inq.get_alamat_op())
invoice_profile.set('Nama Kelurahan', inq.get_kelurahan_op())
invoice_profile.set('Nama Kecamatan', inq.get_kecamatan_op())
invoice_profile.set('Nama Propinsi', inq.get_propinsi_op())
invoice_profile.set('Luas Tanah', inv.luas_bumi_sppt)
invoice_profile.set('Luas Bangunan', inv.luas_bng_sppt)
invoice_profile.set(
'Jatuh Tempo', inv.tgl_jatuh_tempo_sppt.strftime('%Y%m%d'))
invoice_profile.set('Tagihan', inq.tagihan)
invoice_profile.set('Denda', inq.denda)
invoice_profile.set('Total Bayar', inq.total)
invoice_profile.set('Discount', inq.discount)
iso.setBit(61, invoice_profile.get_raw())
else:
bit39 = '76'
elif mti in ('0400', '0420'):
logging.info(f"Handling reversal MTI: {mti}")
response = b'RESP:0400/0420' # Add bit 7, 4, 62, 39 as needed
else:
logging.warning(f"Unknown MTI: {mti}")
response = b'UNKNOWN MTI'
if response == b'UNKNOWN MTI':
logging.info(f"Removed unprocessable file: {filename}")
return response
else:
iso.setBit(7, self.get_bit7())
iso.setBit(39, bit39)
response = iso.getRawIso()
log.info(
f"[WRITE] Prepared response for {filename}, data: {response.hex() if isinstance(response, bytes) else response}")
return response
end_time = time.time()
elapsed = end_time - start_time
log.info(
f"[PERF] Processed {filename} in {elapsed:.3f} seconds (MTI={mti})")
import logging
log = logging.getLogger(__name__)
class ISO8583Sender:
def __init__(self, response_queue, sender_lock, mpp_connections):
self.response_queue = response_queue
self.sender_lock = sender_lock
self.mpp_connections = mpp_connections
def start(self):
count = 0
print(f"[SENDER INIT] mpp_connections type: {type(self.mpp_connections)}, value: {self.mpp_connections}")
if self.mpp_connections is None:
raise RuntimeError("mpp_connections is None in sender process. Ensure init_mpp() is called in main process before starting sender.")
while True:
filename, response = self.response_queue.get()
conn_name = filename.rsplit('_', 3)[0]
with self.sender_lock:
count += 1
print(f"[SENDER DEBUG] Got from queue: filename={filename}")
print(f"[SENDER] mpp_connections keys: {list(self.mpp_connections.keys())}")
item = self.mpp_connections.get(conn_name)
if not item or not item.get("running", False):
# print(f"[SENDER] No connection for {conn_name}, skipping.")
continue
# Wait for client_sock to be set, retry if None
retries = 0
while True:
sock = item.get('client_sock')
# print(f"[SENDER] client_sock for {conn_name}: {sock}")
if sock:
break
retries += 1
if retries > 10:
# print(f"[SENDER] No client socket for {conn_name} after {retries} retries, skipping.")
break
import time
time.sleep(0.2)
if not sock:
continue
etx = item.get('etx', False)
response_data = response
if etx and not response_data.endswith(b'\x03'):
response_data += b'\x03'
frame_len = len(response_data)
length = str(frame_len).zfill(4).encode('ascii')
new_data = length + response_data
try:
sock.sendall(new_data)
log.info(f"[SENDER] Sending response to {conn_name}: {response_data}, frame_len={frame_len}, count={count}")
except Exception as send_err:
log.error(f"[SENDER ERROR] sendall failed for {conn_name}: {send_err}")
# except Exception as send_err:
# print(f"[SENDER ERROR] sendall failed for {conn_name}: {send_err}")
# def start(self, interval=0.5):
# import time
# while True:
# self.send_responses()
# time.sleep(interval)
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!