aagusti2 init

1 parent 3de1e0f6
...@@ -143,9 +143,13 @@ class Inquiry(Query): ...@@ -143,9 +143,13 @@ class Inquiry(Query):
def __init__( def __init__(
self, invoice_id=None, persen_denda=2, invoice=None, self, invoice_id=None, persen_denda=2, invoice=None,
tgl_bayar=None, debug=False): tgl_bayar=None, debug=False):
import time as _time
t0 = _time.time()
Query.__init__(self, invoice_id, invoice) Query.__init__(self, invoice_id, invoice)
t1 = _time.time()
self.debug_invoice = None self.debug_invoice = None
if not self.invoice: if not self.invoice:
self._init_timing = {'Query.__init__': t1-t0}
return return
if debug: if debug:
self.debug_invoice = self.invoice self.debug_invoice = self.invoice
...@@ -155,13 +159,28 @@ class Inquiry(Query): ...@@ -155,13 +159,28 @@ class Inquiry(Query):
self.tgl_bayar = tgl_bayar self.tgl_bayar = tgl_bayar
else: else:
self.tgl_bayar = date.today() self.tgl_bayar = date.today()
t2 = _time.time()
if not self.is_available(): if not self.is_available():
self.invoice = None self.invoice = None
self._init_timing = {'Query.__init__': t1-t0, 'is_available': t2-t1}
return return
self.persen_denda = self.get_persen_denda() self.persen_denda = self.get_persen_denda()
# Digunakan untuk ISO8583
self.tagihan = self.denda = self.discount = self.total = 0 self.tagihan = self.denda = self.discount = self.total = 0
t3 = _time.time()
self.hitung() self.hitung()
t4 = _time.time()
self._init_timing = {
'Query.__init__': t1-t0,
'is_available': t2-t1,
'get_persen_denda': t3-t2,
'hitung': t4-t3,
'total': t4-t0
}
try:
from iso8583_web.iso8583 import log
log.info(f"[TIMER] Inquiry.__init__: " + ", ".join(f"{k}={v*1000:.2f}ms" for k,v in self._init_timing.items()))
except Exception:
pass
def get_persen_denda(self): def get_persen_denda(self):
if self.get_jatuh_tempo().year < 2024: if self.get_jatuh_tempo().year < 2024:
......
No preview for this file type
File mode changed
No preview for this file type
import threading
import time
import os
import sys
def main():
print(f"Logical CPUs: {os.cpu_count()}")
print(f"Python version: {sys.version}")
print("Testing maximum number of threads...")
threads = []
max_threads = 0
try:
while True:
t = threading.Thread(target=time.sleep, args=(10,))
t.start()
threads.append(t)
max_threads += 1
if max_threads % 100 == 0:
print(f"Threads started: {max_threads}")
except Exception as e:
print(f"Max threads reached: {max_threads}")
print(f"Exception: {e}")
finally:
print("Cleaning up threads...")
for t in threads:
t.join(timeout=0.1)
print("Done.")
if __name__ == "__main__":
main()
from calendar import c
from datetime import datetime, timedelta
import socket
import time
from ISO8583.ISO8583 import ISO8583
import threading
import argparse
from sqlalchemy import false
def get_args():
parser = argparse.ArgumentParser(description='ISO8583 network test')
parser.add_argument('--host', default='localhost', help='Server host (default: localhost)')
parser.add_argument('--port', type=int, default=8585, help='Server port (default: 8583)')
parser.add_argument('--network-count', type=int, default=5, help='Number of network management messages to send (default: 5)')
parser.add_argument('--no-etx', dest='etx', action='store_false', help='Disable ETX trailer (default: enabled)')
parser.add_argument('--etx', dest='etx', action='store_true', help='Enable ETX trailer (default: enabled)')
parser.add_argument('--inv-file', type=str, default=None, help='File with lines for bit61 values (one per transaction)')
parser.set_defaults(etx=True)
return parser.parse_args()
def send_0200_transaction(sock, stan, bit61, use_etx=True):
payload = build_0200_transaction(stan, bit61=bit61)
if isinstance(payload, str):
payload_bytes = payload.encode('ascii')
else:
payload_bytes = payload
if payload_bytes and payload_bytes[0] == 0:
payload_bytes = payload_bytes[1:]
trailer = ETX if use_etx else b''
frame_len = len(payload_bytes) + len(trailer)
length = str(frame_len).zfill(4).encode('ascii')
sock.sendall(length + payload_bytes + trailer)
print(f"[SEND 0200] {payload_bytes}{trailer} (ETX={'on' if use_etx else 'off'})")
# Receive response
# Helper to build a simple 0800 network management request using ISO8583 module
def build_0800_network(stan: str) -> bytes:
iso = ISO8583()
iso.setMTI('0800')
iso.setBit(7, time.strftime('%m%d%H%M%S'))
iso.setBit(11, stan)
iso.setBit(70, '301')
return iso.getRawIso()
class ISO8583NetworkTest():
def __init__(self, sock: socket.socket, use_etx: bool=False):
self.sock = sock
self.use_etx = use_etx
def send_network_test(self, stan):
payload = build_0800_network(stan)
# Ensure payload is ASCII bytes
if isinstance(payload, str):
payload_bytes = payload.encode('ascii')
else:
payload_bytes = payload
# Remove leading null byte if present
if payload_bytes and payload_bytes[0] == 0:
payload_bytes = payload_bytes[1:]
trailer = ETX if self.use_etx else b''
frame_len = len(payload_bytes) + len(trailer)
length = str(frame_len).zfill(4).encode('ascii')
self.sock.sendall(length + payload_bytes + trailer)
ETX = b'\x03'
def build_0200_transaction(stan: str, *, bit61: str) -> bytes:
dt = datetime.now()
iso = ISO8583()
iso.setMTI('0200')
iso.setBit(2, '622011444444444444') # PAN
iso.setBit(3, '341019') # Processing code
iso.setBit(4, '000000000000') # Amount
iso.setBit(7, dt.strftime('%m%d%H%M%S'))
iso.setBit(11, stan)
iso.setBit(12, dt.strftime('%H%M%S'))
iso.setBit(13, dt.strftime('%m%d'))
iso.setBit(15, (dt + timedelta(days=1)).strftime('%m%d'))
iso.setBit(18, '6010')
iso.setBit(22, '021')
iso.setBit(32, '110')
iso.setBit(33, '00110')
iso.setBit(35, '622011444444444444=9912')
iso.setBit(37, ('000000' + stan)[-12:])
iso.setBit(41, 'N703'.ljust(8))
iso.setBit(42, '02N703'.ljust(15))
iso.setBit(43, 'TLR N703'.ljust(40))
iso.setBit(49, '360')
iso.setBit(59, 'PAY')
iso.setBit(60, '120')
iso.setBit(61, bit61)
iso.setBit(63, '214')
iso.setBit(102, '0010823214360'.ljust(20))
iso.setBit(107, '0010')
return iso.getRawIso()
def recv_data(sock, use_etx=True):
while True:
try:
length_raw = sock.recv(4)
except Exception as e:
print(f"[RECV ERROR] Failed to read length: {e}")
continue
if not length_raw:
continue
resp_len = int(length_raw.decode('ascii'))
resp = b''
while len(resp) < resp_len:
chunk = sock.recv(resp_len - len(resp))
if not chunk:
raise ConnectionError('Socket closed while reading response')
resp += chunk
print(f"[RECV 0200] {resp}")
if use_etx and resp.endswith(ETX):
resp = resp[:-1]
iso_resp = ISO8583()
iso_resp.setIsoContent(resp)
print(f"[0200 RESP] MTI={iso_resp.getMTI()} Bit39={iso_resp.getBit(39)}")
def main():
args = get_args()
host = args.host
port = args.port
with socket.create_connection((host, port), timeout=10) as sock:
threads = []
for i in range(args.network_count):
stan = str(100000 + i)[-6:]
server = ISO8583NetworkTest(sock=sock, use_etx=args.etx)
t_send = threading.Thread(target=server.send_network_test, args=(stan,))
t_send.start()
threads.append(t_send)
for t_send in threads:
t_send.join()
def receiver(sock, use_etx):
recv_data(sock, use_etx=use_etx)
t_recv = threading.Thread(target=receiver, args=(sock, args.etx))
t_recv.start()
t_recv.join()
if args.inv_file:
# Each line in inv-file is sent in its own thread, each with its own socket
with open(args.inv_file, 'r') as f:
bit61_lines = [line.strip() for line in f if line.strip()]
threads = []
def send_0200_thread(stan, bit61, use_etx, host, port):
send_0200_transaction(sock, stan, bit61, use_etx)
for i, bit61 in enumerate(bit61_lines):
if i<2:
stan = str(100000 + i)[-6:]
t = threading.Thread(target=send_0200_thread, args=(stan, bit61, args.etx, args.host, args.port))
t.start()
threads.append(t)
for t in threads:
t.join()
if __name__ == '__main__':
main()
from ISO8583.ISO8583 import ISO8583
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!