test-iso8583-v2.py 9.86 KB

import logging
from datetime import datetime, timedelta
import socket
import time
from typing import List

from ISO8583.ISO8583 import ISO8583
import threading

# Global dictionary to track send times by STAN
stan_send_times = {}
stan_lock = threading.Lock()

import argparse



def get_args():
    parser = argparse.ArgumentParser(description='ISO8583 network test')
    parser.add_argument('--host', default='localhost',
                        help='Server host (default: localhost)')
    parser.add_argument('--port', type=int, default=8585,
                        help='Server port (default: 8583)')
    parser.add_argument('--network-count', type=int, default=1,
                        help='Number of network management messages to send (default: 1)')
    parser.add_argument('--no-etx', dest='etx', action='store_false',
                        help='Disable ETX trailer (default: enabled)')
    parser.add_argument('--etx', dest='etx', action='store_true',
                        help='Enable ETX trailer (default: enabled)')
    parser.add_argument('--inv-file', type=str, default=None,
                        help='File with lines for bit61 values (one per transaction)')
    parser.add_argument('--limit', type=int, default=1,
                        help='Limit the number of transactions to send (default: 1)')
    parser.set_defaults(etx=True)
    return parser.parse_args()


def send_0200_transaction(sock, stan, bit61, use_etx=True):
    payload = build_0200_transaction(stan, bit61=bit61)
    if isinstance(payload, str):
        payload_bytes = payload.encode('ascii')
    else:
        payload_bytes = payload
    if payload_bytes and payload_bytes[0] == 0:
        payload_bytes = payload_bytes[1:]
    trailer = ETX if use_etx else b''
    frame_len = len(payload_bytes) + len(trailer)
    length = str(frame_len).zfill(4).encode('ascii')
    msg = length + payload_bytes + trailer
    # Record send time for this STAN
    with stan_lock:
        stan_send_times[stan] = time.time()
    sock.sendall(msg)
    print(f"[SEND 0200] {msg} (ETX={'on' if use_etx else 'off'})")
    # Receive response

# Helper to build a simple 0800 network management request using ISO8583 module


def build_0800_network(stan: str) -> bytes:
    iso = ISO8583()
    iso.setMTI('0800')
    iso.setBit(7, time.strftime('%m%d%H%M%S'))
    iso.setBit(11, stan)
    iso.setBit(70, '301')
    return iso.getRawIso()


class ISO8583NetworkTest():
    def __init__(self, sock: socket.socket, use_etx: bool = False):
        self.sock = sock
        self.use_etx = use_etx

    def send_network_test(self, stan):
        payload = build_0800_network(stan)
        # Ensure payload is ASCII bytes
        if isinstance(payload, str):
            payload_bytes = payload.encode('ascii')
        else:
            payload_bytes = payload
        # Remove leading null byte if present
        if payload_bytes and payload_bytes[0] == 0:
            payload_bytes = payload_bytes[1:]
        trailer = ETX if self.use_etx else b''
        frame_len = len(payload_bytes) + len(trailer)
        length = str(frame_len).zfill(4).encode('ascii')
        print(f"[SEND 0800] {payload_bytes}{trailer} (ETX={'on' if self.use_etx else 'off'})")
        # Record send time for this STAN
        with stan_lock:
            stan_send_times[stan] = time.time()
        try:
            self.sock.sendall(length + payload_bytes + trailer)
        except Exception as e:
            try:
                self.sock.close()
            except Exception:
                pass
            raise Exception(f"Failed to send network test message: {e}")


ETX = b'\x03'


def build_0200_transaction(stan: str, *, bit61: str) -> bytes:
    dt = datetime.now()
    iso = ISO8583()
    iso.setMTI('0200')
    iso.setBit(2, '622011444444444444')  # PAN
    iso.setBit(3, '341019')              # Processing code
    iso.setBit(4, '000000000000')        # Amount
    iso.setBit(7, dt.strftime('%m%d%H%M%S'))
    iso.setBit(11, stan)
    iso.setBit(12, dt.strftime('%H%M%S'))
    iso.setBit(13, dt.strftime('%m%d'))
    iso.setBit(15, (dt + timedelta(days=1)).strftime('%m%d'))
    iso.setBit(18, '6010')
    iso.setBit(22, '021')
    iso.setBit(32, '110')
    iso.setBit(33, '00110')
    iso.setBit(35, '622011444444444444=9912')
    iso.setBit(37, ('000000' + stan)[-12:])
    iso.setBit(41, 'N703'.ljust(8))
    iso.setBit(42, stan.ljust(15))
    iso.setBit(43, 'TLR     N703'.ljust(40))
    iso.setBit(49, '360')
    iso.setBit(59, 'PAY')
    iso.setBit(60, '120')
    iso.setBit(61, bit61)
    iso.setBit(63, '214')
    iso.setBit(102, '0010823214360'.ljust(20))
    iso.setBit(107, '0010')
    # iso.showIsoBits()
    return iso.getRawIso()


def recv_data(sock, use_etx=True):
    timeout = None
    buffer = b''
    while True:
        try:
            received = sock.recv(4096)
            if not received:
                continue
            print(f"[RECEIVE] Received raw data: {received}")
            buffer += received
            while True:
                length_raw = buffer[:4]
                resp_len = int(length_raw.decode('ascii'))
                msg = buffer[4:resp_len+4]
                print(f"[DEBUG] Parsed length: {resp_len}, Message: {msg}")
                if len(msg) < resp_len:
                    # Wait for more data to complete the message
                    break
                
                # print(f"[RECV {resp[:4]}] {resp}")
                if use_etx and msg.endswith(ETX):
                    msg = msg[:-1]
                iso_resp = ISO8583()
                iso_resp.setIsoContent(msg)
                stan = iso_resp.getBit(11)
                print(f"[RESP] MTI={iso_resp.getMTI()} Bit39={iso_resp.getBit(39)} STAN={stan}")
                # Calculate and log round-trip time if STAN matches
                if stan:
                    with stan_lock:
                        t_send = stan_send_times.pop(stan, None)
                    if t_send:
                        elapsed = time.time() - t_send
                        logging.info(f"[RTT] STAN={stan} Round-trip time: {elapsed:.3f} seconds")
                buffer = buffer[resp_len+4:]
                print(f"[DEBUG] Remaining buffer: {buffer}")

                if not buffer:
                    buffer = b''
                    # break
            time.sleep(0.1)
        except Exception:
            if sock._closed:
                print("Socket is closed, receiver thread exiting...")
                break
            now = time.perf_counter()
            if not timeout:
                timeout = now
                continue
            else:
                elapsed = now - timeout
                if elapsed > 30:
                    stan = str(int(now*1000))[-6:]
                    print("No data received for 30 seconds, echo test...")
                    try:
                        server = ISO8583NetworkTest(sock=sock, use_etx=use_etx)
                        t_send = threading.Thread(
                            target=server.send_network_test, args=(stan,))
                        t_send.start()
                        t_send.join()
                        timeout = None
                    except Exception as e:
                        print(f"Echo test failed: {e}")
                        break
                continue
    # breaked
    try:
        sock.close()
    except Exception:
        pass


def load_invoices_from_file(path: str) -> List[str]:
    raw = open(path, "rb").read()

    if raw.startswith(b"\xff\xfe") or raw.startswith(b"\xfe\xff"):
        text = raw.decode("utf-16")
    elif raw.startswith(b"\xef\xbb\xbf"):
        text = raw.decode("utf-8-sig")
    else:
        try:
            text = raw.decode("utf-8")
        except UnicodeDecodeError:
            text = raw.decode("utf-16")

    invoices: List[str] = []
    for line in text.splitlines():
        s = line.strip()
        if not s or s.startswith("#"):
            continue
        invoices.append(s)
    return invoices

def start(args, sock):
    threads = []

    t_recv = threading.Thread(target=recv_data, args=(sock, args.etx))
    t_recv.start()
    # t_recv.join()

    for i in range(args.network_count):
        stan = str(100000 + i)[-6:]
        server = ISO8583NetworkTest(sock=sock, use_etx=args.etx)
        t_send = threading.Thread(
            target=server.send_network_test, args=(stan,))
        t_send.start()
        threads.append(t_send)

    for t in threads:
        t.join()

    if args.inv_file:
        # Each line in inv-file is sent in its own thread, each with its own socket
        bit61_lines = load_invoices_from_file(args.inv_file)
        threads = []

        def send_0200_thread(stan, bit61, use_etx, host, port):
            send_0200_transaction(sock, stan, bit61, use_etx)
        for i, bit61 in enumerate(bit61_lines):
            if i < args.limit:
                stan = str(100000 + i)[-6:]
                t = threading.Thread(target=send_0200_thread, args=(
                    stan, bit61, args.etx, args.host, args.port))
                threads.append(t)
                t.start()
        for t in threads:
            t.join()

def main():
    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
    args = get_args()
    host = args.host
    port = args.port
    try:
        sock = socket.create_connection((host, port), timeout=10)
        start(args, sock)
    except Exception as e:
        print(f"Initial connection failed: {e}")
        sock = None
        time.sleep(5)
    try:
        while True:
            if not sock or sock._closed:
                try:
                    sock = socket.create_connection((host, port), timeout=10)
                    start(args, sock)
                except Exception as e:
                    print(f"Reconnection failed: {e}")
                    time.sleep(5)
    except KeyboardInterrupt:
        try:
            sock.close()
        except Exception:
            pass
        print("Exiting...")


if __name__ == '__main__':
    main()