test-iso8583-v2.py
6.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
from calendar import c
from datetime import datetime, timedelta
import socket
import time
from ISO8583.ISO8583 import ISO8583
import threading
import argparse
from sqlalchemy import false
def get_args():
parser = argparse.ArgumentParser(description='ISO8583 network test')
parser.add_argument('--host', default='localhost', help='Server host (default: localhost)')
parser.add_argument('--port', type=int, default=8585, help='Server port (default: 8583)')
parser.add_argument('--network-count', type=int, default=5, help='Number of network management messages to send (default: 5)')
parser.add_argument('--no-etx', dest='etx', action='store_false', help='Disable ETX trailer (default: enabled)')
parser.add_argument('--etx', dest='etx', action='store_true', help='Enable ETX trailer (default: enabled)')
parser.add_argument('--inv-file', type=str, default=None, help='File with lines for bit61 values (one per transaction)')
parser.set_defaults(etx=True)
return parser.parse_args()
def send_0200_transaction(sock, stan, bit61, use_etx=True):
payload = build_0200_transaction(stan, bit61=bit61)
if isinstance(payload, str):
payload_bytes = payload.encode('ascii')
else:
payload_bytes = payload
if payload_bytes and payload_bytes[0] == 0:
payload_bytes = payload_bytes[1:]
trailer = ETX if use_etx else b''
frame_len = len(payload_bytes) + len(trailer)
length = str(frame_len).zfill(4).encode('ascii')
sock.sendall(length + payload_bytes + trailer)
print(f"[SEND 0200] {payload_bytes}{trailer} (ETX={'on' if use_etx else 'off'})")
# Receive response
# Helper to build a simple 0800 network management request using ISO8583 module
def build_0800_network(stan: str) -> bytes:
iso = ISO8583()
iso.setMTI('0800')
iso.setBit(7, time.strftime('%m%d%H%M%S'))
iso.setBit(11, stan)
iso.setBit(70, '301')
return iso.getRawIso()
class ISO8583NetworkTest():
def __init__(self, sock: socket.socket, use_etx: bool=False):
self.sock = sock
self.use_etx = use_etx
def send_network_test(self, stan):
payload = build_0800_network(stan)
# Ensure payload is ASCII bytes
if isinstance(payload, str):
payload_bytes = payload.encode('ascii')
else:
payload_bytes = payload
# Remove leading null byte if present
if payload_bytes and payload_bytes[0] == 0:
payload_bytes = payload_bytes[1:]
trailer = ETX if self.use_etx else b''
frame_len = len(payload_bytes) + len(trailer)
length = str(frame_len).zfill(4).encode('ascii')
self.sock.sendall(length + payload_bytes + trailer)
ETX = b'\x03'
def build_0200_transaction(stan: str, *, bit61: str) -> bytes:
dt = datetime.now()
iso = ISO8583()
iso.setMTI('0200')
iso.setBit(2, '622011444444444444') # PAN
iso.setBit(3, '341019') # Processing code
iso.setBit(4, '000000000000') # Amount
iso.setBit(7, dt.strftime('%m%d%H%M%S'))
iso.setBit(11, stan)
iso.setBit(12, dt.strftime('%H%M%S'))
iso.setBit(13, dt.strftime('%m%d'))
iso.setBit(15, (dt + timedelta(days=1)).strftime('%m%d'))
iso.setBit(18, '6010')
iso.setBit(22, '021')
iso.setBit(32, '110')
iso.setBit(33, '00110')
iso.setBit(35, '622011444444444444=9912')
iso.setBit(37, ('000000' + stan)[-12:])
iso.setBit(41, 'N703'.ljust(8))
iso.setBit(42, '02N703'.ljust(15))
iso.setBit(43, 'TLR N703'.ljust(40))
iso.setBit(49, '360')
iso.setBit(59, 'PAY')
iso.setBit(60, '120')
iso.setBit(61, bit61)
iso.setBit(63, '214')
iso.setBit(102, '0010823214360'.ljust(20))
iso.setBit(107, '0010')
return iso.getRawIso()
def recv_data(sock, use_etx=True):
while True:
try:
length_raw = sock.recv(4)
except Exception as e:
print(f"[RECV ERROR] Failed to read length: {e}")
continue
if not length_raw:
continue
resp_len = int(length_raw.decode('ascii'))
resp = b''
while len(resp) < resp_len:
chunk = sock.recv(resp_len - len(resp))
if not chunk:
raise ConnectionError('Socket closed while reading response')
resp += chunk
print(f"[RECV 0200] {resp}")
if use_etx and resp.endswith(ETX):
resp = resp[:-1]
iso_resp = ISO8583()
iso_resp.setIsoContent(resp)
print(f"[0200 RESP] MTI={iso_resp.getMTI()} Bit39={iso_resp.getBit(39)}")
def main():
args = get_args()
host = args.host
port = args.port
with socket.create_connection((host, port), timeout=10) as sock:
threads = []
for i in range(args.network_count):
stan = str(100000 + i)[-6:]
server = ISO8583NetworkTest(sock=sock, use_etx=args.etx)
t_send = threading.Thread(target=server.send_network_test, args=(stan,))
t_send.start()
threads.append(t_send)
for t_send in threads:
t_send.join()
def receiver(sock, use_etx):
recv_data(sock, use_etx=use_etx)
t_recv = threading.Thread(target=receiver, args=(sock, args.etx))
t_recv.start()
t_recv.join()
if args.inv_file:
# Each line in inv-file is sent in its own thread, each with its own socket
with open(args.inv_file, 'r') as f:
bit61_lines = [line.strip() for line in f if line.strip()]
threads = []
def send_0200_thread(stan, bit61, use_etx, host, port):
send_0200_transaction(sock, stan, bit61, use_etx)
for i, bit61 in enumerate(bit61_lines):
if i<2:
stan = str(100000 + i)[-6:]
t = threading.Thread(target=send_0200_thread, args=(stan, bit61, args.etx, args.host, args.port))
t.start()
threads.append(t)
for t in threads:
t.join()
if __name__ == '__main__':
main()