test-iso8583-v2.py 6.03 KB
from calendar import c
from datetime import datetime, timedelta
import socket
import time
from ISO8583.ISO8583 import ISO8583
import threading

import argparse

from sqlalchemy import false

def get_args():
    parser = argparse.ArgumentParser(description='ISO8583 network test')
    parser.add_argument('--host', default='localhost', help='Server host (default: localhost)')
    parser.add_argument('--port', type=int, default=8585, help='Server port (default: 8583)')
    parser.add_argument('--network-count', type=int, default=5, help='Number of network management messages to send (default: 5)')
    parser.add_argument('--no-etx', dest='etx', action='store_false', help='Disable ETX trailer (default: enabled)')
    parser.add_argument('--etx', dest='etx', action='store_true', help='Enable ETX trailer (default: enabled)')
    parser.add_argument('--inv-file', type=str, default=None, help='File with lines for bit61 values (one per transaction)')
    parser.set_defaults(etx=True)
    return parser.parse_args()

def send_0200_transaction(sock, stan, bit61, use_etx=True):
    payload = build_0200_transaction(stan, bit61=bit61)
    if isinstance(payload, str):
        payload_bytes = payload.encode('ascii')
    else:
        payload_bytes = payload
    if payload_bytes and payload_bytes[0] == 0:
        payload_bytes = payload_bytes[1:]
    trailer = ETX if use_etx else b''
    frame_len = len(payload_bytes) + len(trailer)
    length = str(frame_len).zfill(4).encode('ascii')
    sock.sendall(length + payload_bytes + trailer)
    print(f"[SEND 0200] {payload_bytes}{trailer} (ETX={'on' if use_etx else 'off'})")
    # Receive response
    
# Helper to build a simple 0800 network management request using ISO8583 module


def build_0800_network(stan: str) -> bytes:
    iso = ISO8583()
    iso.setMTI('0800')
    iso.setBit(7, time.strftime('%m%d%H%M%S'))
    iso.setBit(11, stan)
    iso.setBit(70, '301')
    return iso.getRawIso()


class ISO8583NetworkTest():
    def __init__(self, sock: socket.socket, use_etx: bool=False):
        self.sock = sock
        self.use_etx = use_etx

    def send_network_test(self, stan):
        payload = build_0800_network(stan)
        # Ensure payload is ASCII bytes
        if isinstance(payload, str):
            payload_bytes = payload.encode('ascii')
        else:
            payload_bytes = payload
        # Remove leading null byte if present
        if payload_bytes and payload_bytes[0] == 0:
            payload_bytes = payload_bytes[1:]
        trailer = ETX if self.use_etx else b''
        frame_len = len(payload_bytes) + len(trailer)
        length = str(frame_len).zfill(4).encode('ascii')
        self.sock.sendall(length + payload_bytes + trailer)


ETX = b'\x03'


def build_0200_transaction(stan: str, *, bit61: str) -> bytes:
    dt = datetime.now()
    iso = ISO8583()
    iso.setMTI('0200')
    iso.setBit(2, '622011444444444444')  # PAN
    iso.setBit(3, '341019')              # Processing code
    iso.setBit(4, '000000000000')        # Amount
    iso.setBit(7, dt.strftime('%m%d%H%M%S'))
    iso.setBit(11, stan)
    iso.setBit(12, dt.strftime('%H%M%S'))
    iso.setBit(13, dt.strftime('%m%d'))
    iso.setBit(15, (dt + timedelta(days=1)).strftime('%m%d'))
    iso.setBit(18, '6010')
    iso.setBit(22, '021')
    iso.setBit(32, '110')
    iso.setBit(33, '00110')
    iso.setBit(35, '622011444444444444=9912')
    iso.setBit(37, ('000000' + stan)[-12:])
    iso.setBit(41, 'N703'.ljust(8))
    iso.setBit(42, '02N703'.ljust(15))
    iso.setBit(43, 'TLR     N703'.ljust(40))
    iso.setBit(49, '360')
    iso.setBit(59, 'PAY')
    iso.setBit(60, '120')
    iso.setBit(61, bit61)
    iso.setBit(63, '214')
    iso.setBit(102, '0010823214360'.ljust(20))
    iso.setBit(107, '0010')
    return iso.getRawIso()


def recv_data(sock, use_etx=True):
    while True:
        try:
            length_raw = sock.recv(4)
        except Exception as e:
            print(f"[RECV ERROR] Failed to read length: {e}")
            continue
        if not length_raw:
            continue
        
        resp_len = int(length_raw.decode('ascii'))
        resp = b''
        while len(resp) < resp_len:
            chunk = sock.recv(resp_len - len(resp))
            if not chunk:
                raise ConnectionError('Socket closed while reading response')
            resp += chunk
        print(f"[RECV 0200] {resp}")
        if use_etx and resp.endswith(ETX):
            resp = resp[:-1]
        iso_resp = ISO8583()
        iso_resp.setIsoContent(resp)
        print(f"[0200 RESP] MTI={iso_resp.getMTI()} Bit39={iso_resp.getBit(39)}")

def main():
    args = get_args()
    host = args.host
    port = args.port
    with socket.create_connection((host, port), timeout=10) as sock:
        threads = []
        for i in range(args.network_count):
            stan = str(100000 + i)[-6:]
            server = ISO8583NetworkTest(sock=sock, use_etx=args.etx)
            t_send = threading.Thread(target=server.send_network_test, args=(stan,))
            t_send.start()
            threads.append(t_send)
        for t_send in threads:
            t_send.join()

        
        def receiver(sock, use_etx):
            recv_data(sock, use_etx=use_etx)

        t_recv = threading.Thread(target=receiver, args=(sock, args.etx))
        t_recv.start()
        t_recv.join()

        if args.inv_file:
            # Each line in inv-file is sent in its own thread, each with its own socket
            with open(args.inv_file, 'r') as f:
                bit61_lines = [line.strip() for line in f if line.strip()]
            threads = []
            def send_0200_thread(stan, bit61, use_etx, host, port):
                    send_0200_transaction(sock, stan, bit61, use_etx)
            for i, bit61 in enumerate(bit61_lines):
                if i<2:
                    stan = str(100000 + i)[-6:]
                    t = threading.Thread(target=send_0200_thread, args=(stan, bit61, args.etx, args.host, args.port))
                    t.start()
                    threads.append(t)
                    for t in threads:
                        t.join()
    

if __name__ == '__main__':
    main()