Skip to content
Toggle navigation
Projects
Groups
Snippets
Help
Owo Sugiana
/
sismiop-models
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Settings
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit 40bdd876
authored
Apr 25, 2026
by
aa.gustiana@gmail.com
Browse Files
Options
Browse Files
Tag
Download
Email Patches
Plain Diff
test final
1 parent
27eea365
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
192 additions
and
68 deletions
tests/test-iso8583-v2.py
tests/test-iso8583-v2.py
View file @
40bdd87
from
calendar
import
c
import
logging
from
datetime
import
datetime
,
timedelta
from
datetime
import
datetime
,
timedelta
import
socket
import
socket
import
time
import
time
from
typing
import
List
from
ISO8583.ISO8583
import
ISO8583
from
ISO8583.ISO8583
import
ISO8583
import
threading
import
threading
# Global dictionary to track send times by STAN
stan_send_times
=
{}
stan_lock
=
threading
.
Lock
()
import
argparse
import
argparse
from
sqlalchemy
import
false
def
get_args
():
def
get_args
():
parser
=
argparse
.
ArgumentParser
(
description
=
'ISO8583 network test'
)
parser
=
argparse
.
ArgumentParser
(
description
=
'ISO8583 network test'
)
parser
.
add_argument
(
'--host'
,
default
=
'localhost'
,
help
=
'Server host (default: localhost)'
)
parser
.
add_argument
(
'--host'
,
default
=
'localhost'
,
parser
.
add_argument
(
'--port'
,
type
=
int
,
default
=
8585
,
help
=
'Server port (default: 8583)'
)
help
=
'Server host (default: localhost)'
)
parser
.
add_argument
(
'--network-count'
,
type
=
int
,
default
=
5
,
help
=
'Number of network management messages to send (default: 5)'
)
parser
.
add_argument
(
'--port'
,
type
=
int
,
default
=
8585
,
parser
.
add_argument
(
'--no-etx'
,
dest
=
'etx'
,
action
=
'store_false'
,
help
=
'Disable ETX trailer (default: enabled)'
)
help
=
'Server port (default: 8583)'
)
parser
.
add_argument
(
'--etx'
,
dest
=
'etx'
,
action
=
'store_true'
,
help
=
'Enable ETX trailer (default: enabled)'
)
parser
.
add_argument
(
'--network-count'
,
type
=
int
,
default
=
1
,
parser
.
add_argument
(
'--inv-file'
,
type
=
str
,
default
=
None
,
help
=
'File with lines for bit61 values (one per transaction)'
)
help
=
'Number of network management messages to send (default: 1)'
)
parser
.
add_argument
(
'--no-etx'
,
dest
=
'etx'
,
action
=
'store_false'
,
help
=
'Disable ETX trailer (default: enabled)'
)
parser
.
add_argument
(
'--etx'
,
dest
=
'etx'
,
action
=
'store_true'
,
help
=
'Enable ETX trailer (default: enabled)'
)
parser
.
add_argument
(
'--inv-file'
,
type
=
str
,
default
=
None
,
help
=
'File with lines for bit61 values (one per transaction)'
)
parser
.
add_argument
(
'--limit'
,
type
=
int
,
default
=
1
,
help
=
'Limit the number of transactions to send (default: 1)'
)
parser
.
set_defaults
(
etx
=
True
)
parser
.
set_defaults
(
etx
=
True
)
return
parser
.
parse_args
()
return
parser
.
parse_args
()
def
send_0200_transaction
(
sock
,
stan
,
bit61
,
use_etx
=
True
):
def
send_0200_transaction
(
sock
,
stan
,
bit61
,
use_etx
=
True
):
payload
=
build_0200_transaction
(
stan
,
bit61
=
bit61
)
payload
=
build_0200_transaction
(
stan
,
bit61
=
bit61
)
if
isinstance
(
payload
,
str
):
if
isinstance
(
payload
,
str
):
...
@@ -31,10 +47,14 @@ def send_0200_transaction(sock, stan, bit61, use_etx=True):
...
@@ -31,10 +47,14 @@ def send_0200_transaction(sock, stan, bit61, use_etx=True):
trailer
=
ETX
if
use_etx
else
b
''
trailer
=
ETX
if
use_etx
else
b
''
frame_len
=
len
(
payload_bytes
)
+
len
(
trailer
)
frame_len
=
len
(
payload_bytes
)
+
len
(
trailer
)
length
=
str
(
frame_len
)
.
zfill
(
4
)
.
encode
(
'ascii'
)
length
=
str
(
frame_len
)
.
zfill
(
4
)
.
encode
(
'ascii'
)
sock
.
sendall
(
length
+
payload_bytes
+
trailer
)
msg
=
length
+
payload_bytes
+
trailer
print
(
f
"[SEND 0200] {payload_bytes}{trailer} (ETX={'on' if use_etx else 'off'})"
)
# Record send time for this STAN
with
stan_lock
:
stan_send_times
[
stan
]
=
time
.
time
()
sock
.
sendall
(
msg
)
print
(
f
"[SEND 0200] {msg} (ETX={'on' if use_etx else 'off'})"
)
# Receive response
# Receive response
# Helper to build a simple 0800 network management request using ISO8583 module
# Helper to build a simple 0800 network management request using ISO8583 module
...
@@ -48,7 +68,7 @@ def build_0800_network(stan: str) -> bytes:
...
@@ -48,7 +68,7 @@ def build_0800_network(stan: str) -> bytes:
class
ISO8583NetworkTest
():
class
ISO8583NetworkTest
():
def
__init__
(
self
,
sock
:
socket
.
socket
,
use_etx
:
bool
=
False
):
def
__init__
(
self
,
sock
:
socket
.
socket
,
use_etx
:
bool
=
False
):
self
.
sock
=
sock
self
.
sock
=
sock
self
.
use_etx
=
use_etx
self
.
use_etx
=
use_etx
...
@@ -65,7 +85,18 @@ class ISO8583NetworkTest():
...
@@ -65,7 +85,18 @@ class ISO8583NetworkTest():
trailer
=
ETX
if
self
.
use_etx
else
b
''
trailer
=
ETX
if
self
.
use_etx
else
b
''
frame_len
=
len
(
payload_bytes
)
+
len
(
trailer
)
frame_len
=
len
(
payload_bytes
)
+
len
(
trailer
)
length
=
str
(
frame_len
)
.
zfill
(
4
)
.
encode
(
'ascii'
)
length
=
str
(
frame_len
)
.
zfill
(
4
)
.
encode
(
'ascii'
)
self
.
sock
.
sendall
(
length
+
payload_bytes
+
trailer
)
print
(
f
"[SEND 0800] {payload_bytes}{trailer} (ETX={'on' if self.use_etx else 'off'})"
)
# Record send time for this STAN
with
stan_lock
:
stan_send_times
[
stan
]
=
time
.
time
()
try
:
self
.
sock
.
sendall
(
length
+
payload_bytes
+
trailer
)
except
Exception
as
e
:
try
:
self
.
sock
.
close
()
except
Exception
:
pass
raise
Exception
(
f
"Failed to send network test message: {e}"
)
ETX
=
b
'
\x03
'
ETX
=
b
'
\x03
'
...
@@ -90,7 +121,7 @@ def build_0200_transaction(stan: str, *, bit61: str) -> bytes:
...
@@ -90,7 +121,7 @@ def build_0200_transaction(stan: str, *, bit61: str) -> bytes:
iso
.
setBit
(
35
,
'622011444444444444=9912'
)
iso
.
setBit
(
35
,
'622011444444444444=9912'
)
iso
.
setBit
(
37
,
(
'000000'
+
stan
)[
-
12
:])
iso
.
setBit
(
37
,
(
'000000'
+
stan
)[
-
12
:])
iso
.
setBit
(
41
,
'N703'
.
ljust
(
8
))
iso
.
setBit
(
41
,
'N703'
.
ljust
(
8
))
iso
.
setBit
(
42
,
'02N703'
.
ljust
(
15
))
iso
.
setBit
(
42
,
stan
.
ljust
(
15
))
iso
.
setBit
(
43
,
'TLR N703'
.
ljust
(
40
))
iso
.
setBit
(
43
,
'TLR N703'
.
ljust
(
40
))
iso
.
setBit
(
49
,
'360'
)
iso
.
setBit
(
49
,
'360'
)
iso
.
setBit
(
59
,
'PAY'
)
iso
.
setBit
(
59
,
'PAY'
)
...
@@ -99,72 +130,165 @@ def build_0200_transaction(stan: str, *, bit61: str) -> bytes:
...
@@ -99,72 +130,165 @@ def build_0200_transaction(stan: str, *, bit61: str) -> bytes:
iso
.
setBit
(
63
,
'214'
)
iso
.
setBit
(
63
,
'214'
)
iso
.
setBit
(
102
,
'0010823214360'
.
ljust
(
20
))
iso
.
setBit
(
102
,
'0010823214360'
.
ljust
(
20
))
iso
.
setBit
(
107
,
'0010'
)
iso
.
setBit
(
107
,
'0010'
)
# iso.showIsoBits()
return
iso
.
getRawIso
()
return
iso
.
getRawIso
()
def
recv_data
(
sock
,
use_etx
=
True
):
def
recv_data
(
sock
,
use_etx
=
True
):
timeout
=
None
buffer
=
b
''
while
True
:
while
True
:
try
:
try
:
length_raw
=
sock
.
recv
(
4
)
received
=
sock
.
recv
(
4096
)
except
Exception
as
e
:
if
not
received
:
print
(
f
"[RECV ERROR] Failed to read length: {e}"
)
continue
continue
print
(
f
"[RECEIVE] Received raw data: {received}"
)
if
not
length_raw
:
buffer
+=
received
while
True
:
length_raw
=
buffer
[:
4
]
resp_len
=
int
(
length_raw
.
decode
(
'ascii'
))
msg
=
buffer
[
4
:
resp_len
+
4
]
print
(
f
"[DEBUG] Parsed length: {resp_len}, Message: {msg}"
)
if
len
(
msg
)
<
resp_len
:
# Wait for more data to complete the message
break
# print(f"[RECV {resp[:4]}] {resp}")
if
use_etx
and
msg
.
endswith
(
ETX
):
msg
=
msg
[:
-
1
]
iso_resp
=
ISO8583
()
iso_resp
.
setIsoContent
(
msg
)
stan
=
iso_resp
.
getBit
(
11
)
print
(
f
"[RESP] MTI={iso_resp.getMTI()} Bit39={iso_resp.getBit(39)} STAN={stan}"
)
# Calculate and log round-trip time if STAN matches
if
stan
:
with
stan_lock
:
t_send
=
stan_send_times
.
pop
(
stan
,
None
)
if
t_send
:
elapsed
=
time
.
time
()
-
t_send
logging
.
info
(
f
"[RTT] STAN={stan} Round-trip time: {elapsed:.3f} seconds"
)
buffer
=
buffer
[
resp_len
+
4
:]
print
(
f
"[DEBUG] Remaining buffer: {buffer}"
)
if
not
buffer
:
buffer
=
b
''
# break
time
.
sleep
(
0.1
)
except
Exception
:
if
sock
.
_closed
:
print
(
"Socket is closed, receiver thread exiting..."
)
break
now
=
time
.
perf_counter
()
if
not
timeout
:
timeout
=
now
continue
else
:
elapsed
=
now
-
timeout
if
elapsed
>
30
:
stan
=
str
(
int
(
now
*
1000
))[
-
6
:]
print
(
"No data received for 30 seconds, echo test..."
)
try
:
server
=
ISO8583NetworkTest
(
sock
=
sock
,
use_etx
=
use_etx
)
t_send
=
threading
.
Thread
(
target
=
server
.
send_network_test
,
args
=
(
stan
,))
t_send
.
start
()
t_send
.
join
()
timeout
=
None
except
Exception
as
e
:
print
(
f
"Echo test failed: {e}"
)
break
continue
# breaked
try
:
sock
.
close
()
except
Exception
:
pass
def
load_invoices_from_file
(
path
:
str
)
->
List
[
str
]:
raw
=
open
(
path
,
"rb"
)
.
read
()
if
raw
.
startswith
(
b
"
\xff\xfe
"
)
or
raw
.
startswith
(
b
"
\xfe\xff
"
):
text
=
raw
.
decode
(
"utf-16"
)
elif
raw
.
startswith
(
b
"
\xef\xbb\xbf
"
):
text
=
raw
.
decode
(
"utf-8-sig"
)
else
:
try
:
text
=
raw
.
decode
(
"utf-8"
)
except
UnicodeDecodeError
:
text
=
raw
.
decode
(
"utf-16"
)
invoices
:
List
[
str
]
=
[]
for
line
in
text
.
splitlines
():
s
=
line
.
strip
()
if
not
s
or
s
.
startswith
(
"#"
):
continue
continue
invoices
.
append
(
s
)
resp_len
=
int
(
length_raw
.
decode
(
'ascii'
))
return
invoices
resp
=
b
''
while
len
(
resp
)
<
resp_len
:
def
start
(
args
,
sock
):
chunk
=
sock
.
recv
(
resp_len
-
len
(
resp
))
threads
=
[]
if
not
chunk
:
raise
ConnectionError
(
'Socket closed while reading response'
)
t_recv
=
threading
.
Thread
(
target
=
recv_data
,
args
=
(
sock
,
args
.
etx
))
resp
+=
chunk
t_recv
.
start
()
print
(
f
"[RECV 0200] {resp}"
)
# t_recv.join()
if
use_etx
and
resp
.
endswith
(
ETX
):
resp
=
resp
[:
-
1
]
for
i
in
range
(
args
.
network_count
):
iso_resp
=
ISO8583
()
stan
=
str
(
100000
+
i
)[
-
6
:]
iso_resp
.
setIsoContent
(
resp
)
server
=
ISO8583NetworkTest
(
sock
=
sock
,
use_etx
=
args
.
etx
)
print
(
f
"[0200 RESP] MTI={iso_resp.getMTI()} Bit39={iso_resp.getBit(39)}"
)
t_send
=
threading
.
Thread
(
target
=
server
.
send_network_test
,
args
=
(
stan
,))
t_send
.
start
()
threads
.
append
(
t_send
)
for
t
in
threads
:
t
.
join
()
if
args
.
inv_file
:
# Each line in inv-file is sent in its own thread, each with its own socket
bit61_lines
=
load_invoices_from_file
(
args
.
inv_file
)
threads
=
[]
def
send_0200_thread
(
stan
,
bit61
,
use_etx
,
host
,
port
):
send_0200_transaction
(
sock
,
stan
,
bit61
,
use_etx
)
for
i
,
bit61
in
enumerate
(
bit61_lines
):
if
i
<
args
.
limit
:
stan
=
str
(
100000
+
i
)[
-
6
:]
t
=
threading
.
Thread
(
target
=
send_0200_thread
,
args
=
(
stan
,
bit61
,
args
.
etx
,
args
.
host
,
args
.
port
))
threads
.
append
(
t
)
t
.
start
()
for
t
in
threads
:
t
.
join
()
def
main
():
def
main
():
logging
.
basicConfig
(
level
=
logging
.
INFO
,
format
=
'
%(asctime)
s
%(levelname)
s
%(message)
s'
)
args
=
get_args
()
args
=
get_args
()
host
=
args
.
host
host
=
args
.
host
port
=
args
.
port
port
=
args
.
port
with
socket
.
create_connection
((
host
,
port
),
timeout
=
10
)
as
sock
:
try
:
threads
=
[]
sock
=
socket
.
create_connection
((
host
,
port
),
timeout
=
10
)
for
i
in
range
(
args
.
network_count
):
start
(
args
,
sock
)
stan
=
str
(
100000
+
i
)[
-
6
:]
except
Exception
as
e
:
server
=
ISO8583NetworkTest
(
sock
=
sock
,
use_etx
=
args
.
etx
)
print
(
f
"Initial connection failed: {e}"
)
t_send
=
threading
.
Thread
(
target
=
server
.
send_network_test
,
args
=
(
stan
,))
sock
=
None
t_send
.
start
()
time
.
sleep
(
5
)
threads
.
append
(
t_send
)
try
:
for
t_send
in
threads
:
while
True
:
t_send
.
join
()
if
not
sock
or
sock
.
_closed
:
try
:
sock
=
socket
.
create_connection
((
host
,
port
),
timeout
=
10
)
def
receiver
(
sock
,
use_etx
):
start
(
args
,
sock
)
recv_data
(
sock
,
use_etx
=
use_etx
)
except
Exception
as
e
:
print
(
f
"Reconnection failed: {e}"
)
t_recv
=
threading
.
Thread
(
target
=
receiver
,
args
=
(
sock
,
args
.
etx
))
time
.
sleep
(
5
)
t_recv
.
start
()
except
KeyboardInterrupt
:
t_recv
.
join
()
try
:
sock
.
close
()
if
args
.
inv_file
:
except
Exception
:
# Each line in inv-file is sent in its own thread, each with its own socket
pass
with
open
(
args
.
inv_file
,
'r'
)
as
f
:
print
(
"Exiting..."
)
bit61_lines
=
[
line
.
strip
()
for
line
in
f
if
line
.
strip
()]
threads
=
[]
def
send_0200_thread
(
stan
,
bit61
,
use_etx
,
host
,
port
):
send_0200_transaction
(
sock
,
stan
,
bit61
,
use_etx
)
for
i
,
bit61
in
enumerate
(
bit61_lines
):
if
i
<
2
:
stan
=
str
(
100000
+
i
)[
-
6
:]
t
=
threading
.
Thread
(
target
=
send_0200_thread
,
args
=
(
stan
,
bit61
,
args
.
etx
,
args
.
host
,
args
.
port
))
t
.
start
()
threads
.
append
(
t
)
for
t
in
threads
:
t
.
join
()
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
main
()
main
()
Write
Preview
Markdown
is supported
Attach a file
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to post a comment