stress-test

1 parent 71175408
......@@ -3,5 +3,5 @@ build
dist
__pycache__
*.pyc
test*
# test*
pip-wheel-metadata
# if "lib_dir" in settings and settings["lib_dir"]:
import cx_Oracle
# lib_dir = rf"{settings['lib_dir']}"
lib_dir = r"C:\\xampp\\instantclient_11_2"
try:
cx_Oracle.init_oracle_client(lib_dir=lib_dir)
except Exception as e:
print("Error initializing Oracle client: %s", e)
3275010008024014902026
3275010008024015002026
3275010008024015102026
3275010008024015202026
3275010008024015302026
3275010008024015402026
3275010008024015502026
3275010008024015602026
3275010008024015802026
3275010008024015902026
3275010008024016002026
3275010008024016202026
3275010008024016302026
3275010008024016502026
3275010008024016602026
3275010008024016802026
3275010008024016902026
3275010008024017002026
3275010008024017102026
3275010008024017302026
3275010008024017402026
3275010008024017702026
3275010008024017802026
3275010008024017902026
3275010008024018002026
3275010008024018202026
3275010008025015802026
3275010008025015902026
3275010008025016002026
3275010008025016202026
3275010008025016302026
3275010008025016502026
3275010008025016602026
3275010008025016802026
3275010008025016902026
3275010008025017002026
3275010008025017102026
3275010008025017202026
3275010008025017302026
3275010008025017402026
3275010008025017502026
3275010008025017602026
3275010008025017702026
3275010008025017802026
3275010008025017902026
3275010008025018002026
3275010008025018102026
3275010008025018302026
3275010008025018402026
3275010008025018502026
3275010008025018602026
3275010008025018702026
3275010008025018802026
3275010008025019102026
3275010008025019202026
3275010008025019502026
3275010008025019602026
3275010008025019702026
3275010008025081802026
3275010008025081902026
3275010008025082002026
3275010008025082102026
3275010008025082202026
3275010008025082302026
3275010008025082402026
3275010008025082502026
3275010008025082602026
3275010008025082702026
3275010008025082802026
3275010008025082902026
3275010008025083002026
3275010008025083102026
3275010008025083202026
3275010008025083302026
3275010008025083402026
3275010008025083502026
3275010008025083602026
3275010008025083702026
3275010008025083802026
3275010008025083902026
3275010008025084002026
3275010008025084102026
3275010008025084202026
3275010008025084302026
3275010008025084402026
3275010008025084502026
3275010008025084602026
3275010008025084702026
3275010008025130502026
3275010008025130602026
3275010008025130702026
3275010008025130802026
3275010008025130902026
3275010008025131002026
3275010008025131102026
3275010008025131202026
3275010008025131302026
3275010008025131402026
3275010008025131502026
3275010008025131602026
\ No newline at end of file
"""Threaded ISO8583 socket tester.
Usage (Windows/Linux):
python test-iso8583.py --host 10.31.224.200 --port 10002 --threads 10 --count 100
What it does:
- Opens one TCP connection.
- Sends one optional sign-on (0800, bit70=301).
- Then sends N transaction tests (0200).
Framing matches the logs you shared:
- 4 ASCII digits length prefix (payload length)
- ISO payload as ASCII
- Trailing ETX byte 0x03
"""
from __future__ import annotations
import argparse
import queue
import socket
import statistics
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
ASCII_FIELD_SPECS: Dict[int, Tuple[str, int]] = {
2: ("llvar", 2),
3: ("fixed", 6),
4: ("fixed", 12),
7: ("fixed", 10),
11: ("fixed", 6),
12: ("fixed", 6),
13: ("fixed", 4),
15: ("fixed", 4),
18: ("fixed", 4),
22: ("fixed", 3),
32: ("llvar", 2),
33: ("llvar", 2),
35: ("llvar", 2),
37: ("fixed", 12),
39: ("fixed", 2),
41: ("fixed", 8),
42: ("fixed", 15),
43: ("fixed", 40),
49: ("fixed", 3),
59: ("lllvar", 3),
60: ("lllvar", 3),
61: ("lllvar", 3),
62: ("lllvar", 3),
63: ("lllvar", 3),
70: ("fixed", 3),
102: ("llvar", 2),
107: ("lllvar", 3),
}
ETX = b"\x03"
def load_invoices_from_file(path: str) -> List[str]:
raw = open(path, "rb").read()
# BOM-aware decoding: supports common encodings seen on Windows.
if raw.startswith(b"\xff\xfe") or raw.startswith(b"\xfe\xff"):
text = raw.decode("utf-16")
elif raw.startswith(b"\xef\xbb\xbf"):
text = raw.decode("utf-8-sig")
else:
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
# Many editors save plain "Unicode" as UTF-16 without an obvious BOM.
text = raw.decode("utf-16")
invoices: List[str] = []
for line in text.splitlines():
s = line.strip()
if not s or s.startswith("#"):
continue
invoices.append(s)
return invoices
class InvoicePicker:
def __init__(self, invoices: List[str], *, mode: str):
if not invoices:
raise ValueError("invoices must not be empty")
self._invoices = invoices
self._mode = mode
self._lock = threading.Lock()
self._rr_index = 0
def pick(self, *, thread_id: int) -> str:
if self._mode == "single":
return self._invoices[0]
if self._mode == "thread":
return self._invoices[thread_id % len(self._invoices)]
if self._mode == "roundrobin":
with self._lock:
invoice = self._invoices[self._rr_index % len(self._invoices)]
self._rr_index += 1
return invoice
raise ValueError(f"unknown inv mode: {self._mode}")
def now_bit7(dt: Optional[datetime] = None) -> str:
if dt is None:
dt = datetime.now()
return dt.strftime("%m%d%H%M%S")
class StanCounter:
def __init__(self, start: Optional[int] = None):
self._lock = threading.Lock()
if start is None:
start = int(datetime.now().strftime("%H%M%S"))
self._value = start % 1_000_000
def next(self) -> str:
with self._lock:
value = self._value
self._value = (self._value + 1) % 1_000_000
return str(value).zfill(6)
def recv_exact(sock: socket.socket, n: int) -> bytes:
chunks: List[bytes] = []
remaining = n
while remaining > 0:
chunk = sock.recv(remaining)
if not chunk:
raise ConnectionError("socket closed while reading")
chunks.append(chunk)
remaining -= len(chunk)
return b"".join(chunks)
def send_frame(
sock: socket.socket,
payload: bytes,
*,
use_etx: bool = True,
debug: bool = False,
) -> None:
# Some endpoints use a 4-digit ASCII length prefix + ETX trailer (0x03).
# Others use only the length prefix with no ETX. Make this selectable.
trailer = ETX if use_etx else b""
frame_len = len(payload) + len(trailer)
length = str(frame_len).zfill(4).encode("ascii")
if debug:
try:
mode = "etx" if use_etx else "no-etx"
# print(f"SEND[{mode}] len={length.decode('ascii')} payload={payload!r}")
except Exception:
pass
try:
parsed = parse_network_0810(payload)
mti = parsed.get(0)
bit7 = parsed.get(7)
bit11 = parsed.get(11)
bit41 = parsed.get(41)
bit61 = parsed.get(61)
bit70 = parsed.get(70)
info = []
if mti:
info.append(f"mti={mti}")
if bit7:
info.append(f"bit7={bit7}")
# if bit11:
# info.append(f"bit11={bit11}")
# # if bit41:
# info.append(f"bit41={bit41.rstrip()}")
if bit61:
info.append(f"bit61={bit61}")
# if bit70:
# info.append(f"bit70={bit70}")
if info:
print("SEND INFO " + " ".join(info))
except Exception:
pass
sock.sendall(length + payload + trailer)
def recv_frame(sock: socket.socket, *, expect_etx: bool = True, debug: bool = False) -> bytes:
length_raw = recv_exact(sock, 4)
try:
length = int(length_raw.decode("ascii"))
except ValueError as e:
raise ValueError(f"invalid length prefix: {length_raw!r}") from e
data = recv_exact(sock, length)
if expect_etx:
if not data.endswith(ETX):
raise ValueError(
f"invalid frame trailer (expected ETX 0x03): {data[-1:]!r}"
)
payload = data[:-1]
else:
# No ETX expected: payload is the whole body.
payload = data
if debug:
try:
mode = "etx" if expect_etx else "no-etx"
# print(f"RECV[{mode}] len={str(length).zfill(4)} payload={payload!r}")
except Exception:
pass
try:
parsed = parse_network_0810(payload)
mti = parsed.get(0)
bit7 = parsed.get(7)
bit11 = parsed.get(11)
rc39 = parsed.get(39)
info = []
if mti:
info.append(f"mti={mti}")
if bit7:
info.append(f"bit7={bit7}")
# if bit11:
# info.append(f"bit11={bit11}")
if rc39:
info.append(f"rc39={rc39}")
if info:
print("RECV INFO " + " ".join(info))
except Exception:
pass
return payload
def bits_from_hex_bitmap(hex16: str, offset: int) -> List[int]:
"""Return ISO bit numbers set in a 64-bit bitmap represented as 16 hex chars."""
b = bytes.fromhex(hex16)
out: List[int] = []
for byte_index, byte_value in enumerate(b):
for bit_index in range(8):
mask = 1 << (7 - bit_index)
if byte_value & mask:
out.append(offset + byte_index * 8 + bit_index + 1)
return out
def parse_network_0810(payload: bytes) -> Dict[int, str]:
"""Parse a minimal subset of fields from an ASCII ISO8583 response.
Uses bitmap + a small field spec table so we can reliably reach bit 39 for 0210.
"""
s = payload.decode("ascii", errors="replace")
if len(s) < 4 + 16:
raise ValueError(f"payload too short: {payload!r}")
mti = s[:4]
pmap = s[4:20]
bits = bits_from_hex_bitmap(pmap, 0)
index = 20
if 1 in bits:
smap = s[index: index + 16]
bits += bits_from_hex_bitmap(smap, 64)
index += 16
values: Dict[int, str] = {0: mti}
for bit in sorted(bits):
if bit == 1:
continue
spec = ASCII_FIELD_SPECS.get(bit)
if spec is None:
break
kind, n = spec
if kind == "fixed":
values[bit] = s[index: index + n]
index += n
elif kind == "llvar":
length = int(s[index: index + n])
index += n
values[bit] = s[index: index + length].rstrip()
index += length
elif kind == "lllvar":
length = int(s[index: index + n])
index += n
values[bit] = s[index: index + length].rstrip()
index += length
else:
break
if bit == 39:
break
return values
def build_0800_network(stan: str, code70: str) -> bytes:
"""Build an ASCII 0800 network management message with bits 7, 11, 70."""
mti = "0800"
# Primary bitmap: bit 1,7,11 set => 0x8220000000000000
# Secondary bitmap: bit 70 set => 0x0400000000000000
primary = "8220000000000000"
secondary = "0400000000000000"
bit7 = now_bit7()
return f"{mti}{primary}{secondary}{bit7}{stan}{code70}".encode("ascii")
def build_0200_transaction(
stan: str,
amount12: str = "000000000000",
bit61: str = "3275050002008001801990",
) -> bytes:
"""Build an ASCII 0200 transaction (BJB-style) matching the reference log."""
dt = datetime.now()
mti = "0200"
bitmap = "F23A4401A8E0803A0000000004200000"
pan = "622011444444444444"
proc_code = "341019"
bit7 = dt.strftime("%m%d%H%M%S")
bit11 = stan
bit12 = dt.strftime("%H%M%S")
bit13 = dt.strftime("%m%d")
bit15 = (dt + timedelta(days=1)).strftime("%m%d")
bit18 = "6010"
bit22 = "021"
bit32 = "110"
bit33 = "00110"
bit35 = "622011444444444444=9912"
bit37 = ("000000" + stan)[-12:]
bit41 = "N703".ljust(8)
bit42 = "02N703".ljust(15)
bit43 = "TLR N703".ljust(40)
bit49 = "360"
bit59 = "PAY"
bit60 = "120"
bit63 = "214"
bit102 = "0010823214360".ljust(20)
bit107 = "0010"
parts = [
mti,
bitmap,
f"{len(pan):02d}{pan}",
proc_code,
amount12,
bit7,
bit11,
bit12,
bit13,
bit15,
bit18,
bit22,
f"{len(bit32):02d}{bit32}",
f"{len(bit33):02d}{bit33}",
f"{len(bit35):02d}{bit35}",
bit37,
bit41,
bit42,
bit43,
bit49,
f"{len(bit59):03d}{bit59}",
f"{len(bit60):03d}{bit60}",
f"{len(bit61):03d}{bit61}",
f"{len(bit63):03d}{bit63}",
f"{len(bit102):02d}{bit102}",
f"{len(bit107):03d}{bit107}",
]
return "".join(parts).encode("ascii")
@dataclass
class WorkerResult:
thread_id: int
ok: int = 0
fail: int = 0
latencies_ms: List[float] = field(default_factory=list)
errors: Dict[str, int] = field(default_factory=dict)
def record_error(self, msg: str) -> None:
self.fail += 1
self.errors[msg] = self.errors.get(msg, 0) + 1
class ResponseDispatcher:
def __init__(self) -> None:
self._lock = threading.Lock()
self._pending: Dict[str, Tuple[int, "queue.Queue[bytes]"]] = {}
self._dead = threading.Event()
self._dead_exc: Optional[BaseException] = None
def register(self, stan: str, *, thread_id: int) -> "queue.Queue[bytes]":
q: "queue.Queue[bytes]" = queue.Queue(maxsize=1)
with self._lock:
self._pending[stan] = (thread_id, q)
return q
def unregister(self, stan: str) -> None:
with self._lock:
self._pending.pop(stan, None)
def mark_dead(self, exc: BaseException) -> None:
with self._lock:
self._dead_exc = exc
self._dead.set()
def deliver(self, stan: str, payload: bytes) -> bool:
with self._lock:
item = self._pending.get(stan)
if item is None:
return False
_, q = item
try:
q.put_nowait(payload)
except queue.Full:
return True
return True
def thread_id_for(self, stan: str) -> Optional[int]:
with self._lock:
item = self._pending.get(stan)
if item is None:
return None
return item[0]
def wait(self, q: "queue.Queue[bytes]", timeout: float) -> bytes:
if self._dead.is_set():
exc = self._dead_exc
raise ConnectionError(f"receiver stopped: {exc}")
try:
return q.get(timeout=timeout)
except queue.Empty as e:
if self._dead.is_set():
exc = self._dead_exc
raise ConnectionError(f"receiver stopped: {exc}") from e
raise TimeoutError("timed out waiting for response") from e
def receiver_loop(
s1socket: socket.socket,
dispatcher: ResponseDispatcher,
*,
expect_etx: bool,
debug_raw: bool,
stop_event: threading.Event,
) -> None:
payload_queue: "queue.Queue[Optional[bytes]]" = queue.Queue()
def process_loop() -> None:
try:
while True:
payload = payload_queue.get()
if payload is None:
break
try:
parsed = parse_network_0810(payload)
stan = parsed.get(11)
except Exception:
parsed = {}
stan = None
if stan:
if debug_raw:
tid = dispatcher.thread_id_for(stan)
mti = parsed.get(0)
bit7 = parsed.get(7)
rc39 = parsed.get(39)
tlabel = f"T{tid}" if tid is not None and tid >= 0 else "SIGNON"
mode = "etx" if expect_etx else "no-etx"
# print(
# f"RECV DATA[{mode}] {tlabel} bit11={stan} len={len(payload)} payload={payload!r}"
# )
info = []
if mti:
info.append(f"mti={mti}")
if bit7:
info.append(f"bit7={bit7}")
if rc39:
info.append(f"rc39={rc39}")
print(
f"RECV {tlabel} bit11={stan}"
+ (" " + " ".join(info) if info else "")
)
ok = dispatcher.deliver(stan, payload)
if not ok and debug_raw:
print(f"RECV WARN unregistered stan={stan}")
else:
if debug_raw:
print(f"RECV WARN no bit11/STAN in response payload={payload!r}")
except Exception as e:
dispatcher.mark_dead(e)
stop_event.set()
proc_thread = threading.Thread(target=process_loop, name="receiver_process", daemon=True)
proc_thread.start()
try:
while not stop_event.is_set():
payload = recv_frame(s1socket, expect_etx=expect_etx, debug=False)
payload_queue.put(payload)
except Exception as e:
dispatcher.mark_dead(e)
finally:
try:
payload_queue.put(None)
except Exception:
pass
def worker(
thread_id: int,
s1socket: socket.socket,
send_lock: threading.Lock,
count: int,
stan_counter: StanCounter,
result: WorkerResult,
debug_raw: bool,
use_etx: bool,
invoice_picker: InvoicePicker,
dispatcher: ResponseDispatcher,
timeout: float,
stop_event: threading.Event,
stop_at: Optional[float],
) -> None:
try:
sent = 0
while not stop_event.is_set():
if stop_at is not None and time.perf_counter() >= stop_at:
break
if count > 0 and sent >= count:
break
stan = stan_counter.next()
bit61 = invoice_picker.pick(thread_id=thread_id)
payload = build_0200_transaction(stan, bit61=bit61)
q = dispatcher.register(stan, thread_id=thread_id)
try:
t0 = time.perf_counter()
with send_lock:
if debug_raw:
print(f"SEND T{thread_id} bit11={stan}")
send_frame(s1socket, payload, use_etx=use_etx, debug=debug_raw)
resp = dispatcher.wait(q, timeout=timeout)
dt_ms = (time.perf_counter() - t0) * 1000.0
finally:
dispatcher.unregister(stan)
sent += 1
parsed = parse_network_0810(resp)
if parsed.get(0) == "0210" and parsed.get(39) == "00":
result.ok += 1
result.latencies_ms.append(dt_ms)
else:
result.record_error(f"bad_resp:{parsed}")
except Exception as e:
if debug_raw:
result.record_error(f"{type(e).__name__}:{e}")
else:
result.record_error(type(e).__name__)
def percentile(sorted_values: List[float], p: float) -> float:
if not sorted_values:
return 0.0
if p <= 0:
return sorted_values[0]
if p >= 100:
return sorted_values[-1]
k = (len(sorted_values) - 1) * (p / 100.0)
f = int(k)
c = min(f + 1, len(sorted_values) - 1)
if f == c:
return sorted_values[f]
d0 = sorted_values[f] * (c - k)
d1 = sorted_values[c] * (k - f)
return d0 + d1
def main(argv: Optional[List[str]] = None) -> int:
parser = argparse.ArgumentParser(
description="Threaded ISO8583 (BJB-style) socket tester")
parser.add_argument("--host", required=True)
parser.add_argument("--port", required=True, type=int)
parser.add_argument("--threads", type=int, default=1)
parser.add_argument("--count", type=int, default=10,
help="test count per thread")
parser.add_argument(
"--loop",
action="store_true",
help="loop forever (ignores --count unless --duration is set)",
)
parser.add_argument(
"--duration",
type=float,
default=None,
help="run for N seconds then stop (works with --loop)",
)
parser.add_argument("--timeout", type=float, default=5.0)
parser.add_argument(
"--inv",
"--bit61",
dest="bit61",
action="append",
default=[],
help=(
"field 61 (bit61) / invoice profile string. "
"Can be provided multiple times to spread requests across invoices."
),
)
parser.add_argument(
"--inv-file",
default=None,
help=(
"path to a text file with one invoice per line (blank lines and lines starting with # are ignored)"
),
)
parser.add_argument(
"--inv-mode",
choices=("single", "thread", "roundrobin"),
default=None,
help=(
"how to choose invoice when multiple are provided: "
"single=always first, thread=one invoice per thread, roundrobin=cycle globally"
),
)
parser.add_argument(
"--no-etx",
action="store_true",
help="use ASCII length prefix only (no trailing ETX byte)",
)
parser.add_argument("--no-signon", action="store_true",
help="skip initial 0800/301")
parser.add_argument("--debug-raw", action="store_true", help="print raw frames sent")
args = parser.parse_args(argv)
invoices: List[str] = []
if args.bit61:
invoices.extend(args.bit61)
if args.inv_file:
invoices.extend(load_invoices_from_file(args.inv_file))
if not invoices:
invoices = ["3275050002008001801990"]
inv_mode = args.inv_mode
if inv_mode is None:
inv_mode = "roundrobin" if len(invoices) > 1 else "single"
invoice_picker = InvoicePicker(invoices, mode=inv_mode)
stop_at: Optional[float] = None
if args.duration is not None:
if args.duration <= 0:
raise SystemExit("--duration must be > 0")
stop_at = time.perf_counter() + args.duration
effective_count = 0 if args.loop else args.count
total_planned = (
None
if args.loop
else (args.threads * effective_count + (0 if args.no_signon else 1))
)
frame_mode = "no-etx" if args.no_etx else "etx"
if total_planned is None:
planned_str = "total_requests~=unbounded"
else:
planned_str = f"total_requests~={total_planned}"
run_mode = "loop" if args.loop else "fixed"
dur_str = "" if args.duration is None else f" duration={args.duration}s"
print(
f"Target {args.host}:{args.port} mode={frame_mode} run={run_mode}{dur_str} "
f"threads={args.threads} count/thread={effective_count} {planned_str}"
)
stan_counter = StanCounter()
results: List[WorkerResult] = [WorkerResult(
thread_id=i) for i in range(args.threads)]
threads: List[threading.Thread] = []
s1socket = socket.create_connection((args.host, args.port), timeout=args.timeout)
s1socket.settimeout(args.timeout)
send_lock = threading.Lock()
dispatcher = ResponseDispatcher()
stop_event = threading.Event()
recv_thread = threading.Thread(
target=receiver_loop,
args=(
s1socket,
dispatcher,
),
kwargs={
"expect_etx": not args.no_etx,
"debug_raw": args.debug_raw,
"stop_event": stop_event,
},
daemon=True,
)
recv_thread.start()
if not args.no_signon:
stan = stan_counter.next()
payload = build_0800_network(stan, "301")
q = dispatcher.register(stan, thread_id=-1)
try:
with send_lock:
if args.debug_raw:
print(f"SEND SIGNON bit11={stan}")
send_frame(
s1socket,
payload,
use_etx=not args.no_etx,
debug=args.debug_raw,
)
resp = dispatcher.wait(q, timeout=args.timeout)
parsed = parse_network_0810(resp)
except Exception as e:
print(f"Sign-on error: {type(e).__name__}: {e}")
try:
dispatcher.unregister(stan)
stop_event.set()
s1socket.close()
finally:
return 2
finally:
dispatcher.unregister(stan)
if not (parsed.get(0) == "0810" and parsed.get(39) == "00"):
print(f"Sign-on failed: {parsed}")
try:
stop_event.set()
s1socket.close()
finally:
return 2
t_start = time.perf_counter()
try:
for i in range(args.threads):
t = threading.Thread(
target=worker,
args=(
i,
s1socket,
send_lock,
effective_count,
stan_counter,
results[i],
args.debug_raw,
not args.no_etx,
invoice_picker,
dispatcher,
args.timeout,
stop_event,
stop_at,
),
daemon=True,
)
threads.append(t)
t.start()
try:
for t in threads:
t.join()
except KeyboardInterrupt:
print("\nInterrupted: stopping threads...")
stop_event.set()
try:
s1socket.close()
except Exception:
pass
for t in threads:
t.join(timeout=1.0)
elapsed = time.perf_counter() - t_start
finally:
try:
stop_event.set()
s1socket.close()
except Exception:
pass
ok = sum(r.ok for r in results)
fail = sum(r.fail for r in results)
latencies = [x for r in results for x in r.latencies_ms]
latencies_sorted = sorted(latencies)
rps = ok / elapsed if elapsed > 0 else 0.0
print("\nSummary")
print(f" elapsed: {elapsed:.3f}s")
print(f" ok: {ok}")
print(f" fail: {fail}")
print(f" rps(ok): {rps:.2f}/s")
if latencies_sorted:
print(
f" latency_ms: avg={statistics.mean(latencies_sorted):.2f} p50={percentile(latencies_sorted, 50):.2f} p95={percentile(latencies_sorted, 95):.2f} max={latencies_sorted[-1]:.2f}")
merged_errors: Dict[str, int] = {}
for r in results:
for msg, cnt in r.errors.items():
merged_errors[msg] = merged_errors.get(msg, 0) + cnt
if merged_errors:
print("\nErrors")
for msg, cnt in sorted(merged_errors.items(), key=lambda kv: (-kv[1], kv[0])):
print(f" {cnt}x {msg}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
REFERENCE_LOG = """ Test ISO8583 BJB
Open connection
2026-04-18 01:57:33,883 INFO ISO8583 10.31.224.200 bjb 139792218643952 connect to port 10002
signon
2026-04-18 01:57:38,885 INFO ISO8583 10.31.224.200 bjb 139792218643952 Encode MTI 0800 Data {7: '0418015738', 11: '015730', 70: '301'}
2026-04-18 01:57:38,886 INFO ISO8583 10.31.224.200 bjb 139792218643952 ISO8583 to raw [b'0800822000000000000004000000000000000418015738015730301']
2026-04-18 01:57:38,886 INFO ISO8583 10.31.224.200 bjb 139792218643952 Send [b'00560800822000000000000004000000000000000418015738015730301\x03']
2026-04-18 01:57:38,920 INFO ISO8583 10.31.224.200 bjb 139792218643952 Receive [b'0058081082200000020000000400000000000000041718573901573000301\x03']
2026-04-18 01:57:38,920 INFO ISO8583 10.31.224.200 bjb 139792218643952 Raw to ISO8583 [b'081082200000020000000400000000000000041718573901573000301']
2026-04-18 01:57:38,923 INFO ISO8583 10.31.224.200 bjb 139792218643952 -> 139792218855264 Decode MTI 0810 Data {7: '0417185739', 11: '015730', 39: '00', 70: '301'}
network test
2026-04-18 01:57:38,924 INFO ISO8583 10.31.224.200 bjb 139792218643952 Encode MTI 0800 Data {7: '0418015738', 11: '015732', 70: '001'}
2026-04-18 01:57:38,924 INFO ISO8583 10.31.224.200 bjb 139792218643952 ISO8583 to raw [b'0800822000000000000004000000000000000418015738015732001']
2026-04-18 01:57:38,925 INFO ISO8583 10.31.224.200 bjb 139792218643952 Send [b'00560800822000000000000004000000000000000418015738015732001\x03']
2026-04-18 01:57:39,923 INFO ISO8583 10.31.224.200 bjb 139792218643952 Receive [b'0058081082200000020000000400000000000000041718573901573200001\x03']
2026-04-18 01:57:39,924 INFO ISO8583 10.31.224.200 bjb 139792218643952 Raw to ISO8583 [b'081082200000020000000400000000000000041718573901573200001']
2026-04-18 01:57:39,926 INFO ISO8583 10.31.224.200 bjb 139792218643952 -> 139792218855264 Decode MTI 0810 Data {7: '0417185739', 11: '015732', 39: '00', 70: '301'}
transaction
2026-04-17 10:52:40,576 INFO ISO8583 10.31.224.200 bjb 139781348025040 -> 139781375402896 Encode MTI 0200 Data {2: '622011444444444444', 3: '341019', 4: '000000000000', 7: '0417105240', 11: '398086', 12: '105240', 13: '0417', 15: '0418', 18: '6010', 22: '021', 32: '110', 33: '00110', 35: '622011444444444444=9912', 37: '000000358002', 41: 'N703', 42: '02N703', 43: 'TLR N703', 49: '360', 59: 'PAY', 60: '120', 61: '3275010003031084002000', 63: '214', 102: '0010823214360', 107: '0010'}
2026-04-17 10:52:40,569 INFO ISO8583 10.31.224.200 bjb 139781348025040 ISO8583 to raw [b'0200F23A4401A8E0803A000000000420000018622011444444444444341019000000000000041710524039808610524004170418601002103110050011023622011444444444444=9912000000358002N703 02N703 TLR N703 360003PAY0031200223275010003031084002000003214200010823214360 0040010']
2026-04-17 10:52:40,569 INFO ISO8583 10.31.224.200 bjb 139781348025040 Send [b'02990200F23A4401A8E0803A000000000420000018622011444444444444341019000000000000041710524039808610524004170418601002103110050011023622011444444444444=9912000000358002N703 02N703 TLR N703 360003PAY0031200223275010003031084002000003214200010823214360 0040010\x03']
2026-04-17 10:52:40,625 INFO ISO8583 10.31.224.200 bjb 139781348025040 Receipt [b'05810210f23a4401aae0803e000000000420000018622011444444444444341019000000010233041710524039808610524004170418601002103110050011023622011444444444444=991200000035800200N703 02N703 TLR N703 360003PAY00312002232750100030310840020002773275010003031084002000DRS S KRISTIONO HARYANTO JATIWARINGIN ASRI II,T NO.18,RT 001JATIWARINGIN PONDOK GEDE JAWA BARAT 00000000018800000000013220000930000000078708000000037780000000116488000000106255003214200010823214360 0040010\x03']
2026-04-17 10:52:40,625 INFO ISO8583 10.31.224.200 bjb 139781348025040 Raw to ISO8583 [b'0210f23a4401aae0803e000000000420000018622011444444444444341019000000010233041710524039808610524004170418601002103110050011023622011444444444444=991200000035800200N703 02N703 TLR N703 360003PAY00312002232750100030310840020002773275010003031084002000DRS S KRISTIONO HARYANTO JATIWARINGIN ASRI II,T NO.18,RT 001JATIWARINGIN PONDOK GEDE JAWA BARAT 00000000018800000000013220000930000000078708000000037780000000116488000000106255003214200010823214360 0040010']
2026-04-17 10:52:40,624 INFO ISO8583 10.31.224.200 bjb 139781348025040 Decode MTI 0210 Data {2: '622011444444444444', 3: '341019', 4: '000000010233', 7: '0417105240', 11: '398086', 12: '105240', 13: '0417', 15: '0418', 18: '6010', 22: '021', 32: '110', 33: '00110', 35: '622011444444444444=9912', 37: '000000358002', 39: '00', 41: 'N703', 42: '02N703', 43: 'TLR N703', 49: '360', 59: 'PAY', 60: '120', 61: '3275010003031084002000', 62: '3275010003031084002000DRS S KRISTIONO HARYANTO JATIWARINGIN ASRI II,T NO.18,RT 001JATIWARINGIN PONDOK GEDE JAWA BARAT 00000000018800000000013220000930000000078708000000037780000000116488000000106255', 63: '214', 102: '0010823214360', 107: '0010'}
"""
File mode changed
"""Minimal ISO8583 sender.
Sends one optional sign-on (0800/301) then one transaction (0200) using a single invoice
profile (field 61 / bit61).
Example:
python test-iso8583_send.py --host localhost --port 8592 --inv-id 3275050002008001801990
"""
from __future__ import annotations
import argparse
import socket
import threading
import time
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
ETX = b"\x03"
ASCII_FIELD_SPECS: Dict[int, Tuple[str, int]] = {
2: ("llvar", 2),
3: ("fixed", 6),
4: ("fixed", 12),
7: ("fixed", 10),
11: ("fixed", 6),
12: ("fixed", 6),
13: ("fixed", 4),
15: ("fixed", 4),
18: ("fixed", 4),
22: ("fixed", 3),
32: ("llvar", 2),
33: ("llvar", 2),
35: ("llvar", 2),
37: ("fixed", 12),
39: ("fixed", 2),
41: ("fixed", 8),
42: ("fixed", 15),
43: ("fixed", 40),
49: ("fixed", 3),
59: ("lllvar", 3),
60: ("lllvar", 3),
61: ("lllvar", 3),
62: ("lllvar", 3),
63: ("lllvar", 3),
70: ("fixed", 3),
102: ("llvar", 2),
107: ("lllvar", 3),
}
def load_invoices_from_file(path: str) -> List[str]:
raw = open(path, "rb").read()
if raw.startswith(b"\xff\xfe") or raw.startswith(b"\xfe\xff"):
text = raw.decode("utf-16")
elif raw.startswith(b"\xef\xbb\xbf"):
text = raw.decode("utf-8-sig")
else:
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
text = raw.decode("utf-16")
invoices: List[str] = []
for line in text.splitlines():
s = line.strip()
if not s or s.startswith("#"):
continue
invoices.append(s)
return invoices
def recv_exact(sock: socket.socket, n: int) -> bytes:
chunks: List[bytes] = []
remaining = n
while remaining > 0:
chunk = sock.recv(remaining)
if not chunk:
raise ConnectionError("socket closed while reading")
chunks.append(chunk)
remaining -= len(chunk)
return b"".join(chunks)
def send_frame(sock: socket.socket, payload: bytes, *, use_etx: bool) -> None:
trailer = ETX if use_etx else b""
frame_len = len(payload) + len(trailer)
length = str(frame_len).zfill(4).encode("ascii")
sock.sendall(length + payload + trailer)
def recv_frame(sock: socket.socket, *, expect_etx: bool) -> bytes:
length_raw = recv_exact(sock, 4)
try:
length = int(length_raw.decode("ascii"))
except ValueError as e:
raise ValueError(f"invalid length prefix: {length_raw!r}") from e
data = recv_exact(sock, length)
if expect_etx:
if not data.endswith(ETX):
raise ValueError(f"invalid frame trailer (expected ETX 0x03): {data[-1:]!r}")
return data[:-1]
return data
def bits_from_hex_bitmap(hex16: str, offset: int) -> List[int]:
b = bytes.fromhex(hex16)
out: List[int] = []
for byte_index, byte_value in enumerate(b):
for bit_index in range(8):
mask = 1 << (7 - bit_index)
if byte_value & mask:
out.append(offset + byte_index * 8 + bit_index + 1)
return out
def parse_until_39(payload: bytes) -> Dict[int, str]:
"""Parse MTI + fields up to (and including) bit 39."""
s = payload.decode("ascii", errors="replace")
if len(s) < 4 + 16:
raise ValueError(f"payload too short: {payload!r}")
mti = s[:4]
pmap = s[4:20]
bits = bits_from_hex_bitmap(pmap, 0)
index = 20
if 1 in bits:
if len(s) < index + 16:
raise ValueError("payload too short for secondary bitmap")
smap = s[index : index + 16]
bits += bits_from_hex_bitmap(smap, 64)
index += 16
values: Dict[int, str] = {0: mti}
for bit in sorted(bits):
if bit == 1:
continue
spec = ASCII_FIELD_SPECS.get(bit)
if spec is None:
raise ValueError(f"unsupported field bit={bit}; add spec to ASCII_FIELD_SPECS")
kind, n = spec
if kind == "fixed":
values[bit] = s[index : index + n]
index += n
elif kind == "llvar":
length = int(s[index : index + n])
index += n
values[bit] = s[index : index + length].rstrip()
index += length
elif kind == "lllvar":
length = int(s[index : index + n])
index += n
values[bit] = s[index : index + length].rstrip()
index += length
else:
raise ValueError(f"unsupported field kind: {kind}")
if bit == 39:
break
return values
def now_bit7(dt: Optional[datetime] = None) -> str:
if dt is None:
dt = datetime.now()
return dt.strftime("%m%d%H%M%S")
def ts() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
_print_lock = threading.Lock()
def log_line(line: str) -> None:
with _print_lock:
print(line, flush=True)
def format_ok_line(*, kind: str, inv_id: Optional[str], rc39: Optional[str], send_ts: str, t0: float) -> str:
dt_ms = (time.perf_counter() - t0) * 1000.0
inv_part = "" if inv_id is None else f" inv_id={inv_id}"
return f"{send_ts} {kind}{inv_part} rc39={rc39} duration_ms={dt_ms:.2f}"
def format_err_line(*, kind: str, inv_id: Optional[str], exc: BaseException, send_ts: str, t0: float) -> str:
dt_ms = (time.perf_counter() - t0) * 1000.0
inv_part = "" if inv_id is None else f" inv_id={inv_id}"
return f"{send_ts} {kind}{inv_part} ERROR {type(exc).__name__}:{exc} duration_ms={dt_ms:.2f}"
@dataclass
class TxResult:
inv_id: str
ok: bool
rc39: Optional[str]
duration_ms: Optional[float]
error: Optional[str] = None
@dataclass
class _Pending:
kind: str # "TX" or "SIGNON"
inv_id: Optional[str]
stan: str
send_ts: str
t0: float
event: threading.Event
result: Optional[TxResult] = None
ok: Optional[bool] = None
error: Optional[str] = None
class StanDispatcher:
"""Single TCP connection dispatcher keyed by STAN (field 11).
- Many threads may call .tx() concurrently.
- Only one receiver thread reads from socket.
- Responses are routed to the right waiter using bit 11 (STAN).
"""
def __init__(self, sock: socket.socket, *, use_etx: bool, timeout_s: float):
self._sock = sock
self._use_etx = use_etx
self._timeout_s = timeout_s
self._closed = threading.Event()
self._send_lock = threading.Lock()
self._pending_lock = threading.Lock()
self._pending: Dict[str, _Pending] = {}
self._stan_lock = threading.Lock()
self._stan_counter = StanCounter()
# Receiver loop uses a short timeout to notice close.
self._sock.settimeout(1.0)
self._rx_thread = threading.Thread(target=self._rx_loop, daemon=True)
self._rx_thread.start()
def close(self) -> None:
self._closed.set()
try:
self._sock.shutdown(socket.SHUT_RDWR)
except OSError:
pass
try:
self._sock.close()
except OSError:
pass
# Unblock any waiters.
with self._pending_lock:
pendings = list(self._pending.values())
self._pending.clear()
for p in pendings:
p.error = p.error or "closed"
if p.kind == "TX":
p.result = TxResult(inv_id=p.inv_id or "", ok=False, rc39=None, duration_ms=None, error=p.error)
else:
p.ok = False
p.event.set()
def _next_stan(self) -> str:
with self._stan_lock:
return self._stan_counter.next()
def _fail_all(self, err: BaseException) -> None:
with self._pending_lock:
pendings = list(self._pending.values())
self._pending.clear()
for p in pendings:
p.error = type(err).__name__
if p.kind == "TX":
p.result = TxResult(inv_id=p.inv_id or "", ok=False, rc39=None, duration_ms=None, error=p.error)
else:
p.ok = False
p.event.set()
def _rx_loop(self) -> None:
while not self._closed.is_set():
try:
resp = recv_frame(self._sock, expect_etx=self._use_etx)
except socket.timeout:
continue
except (OSError, ValueError) as e:
self._fail_all(e)
return
try:
parsed = parse_until_39(resp)
except (ValueError, UnicodeError) as e:
self._fail_all(e)
return
stan = parsed.get(11)
if not stan:
continue
with self._pending_lock:
p = self._pending.pop(stan, None)
if p is None:
continue
rc = parsed.get(39)
dt_ms = (time.perf_counter() - p.t0) * 1000.0
if p.kind == "SIGNON":
ok = parsed.get(0) == "0810" and rc == "00"
log_line(format_ok_line(kind="SIGNON", inv_id=None, rc39=rc, send_ts=p.send_ts, t0=p.t0))
p.ok = ok
else:
ok = parsed.get(0) == "0210" and rc == "00"
log_line(format_ok_line(kind="TX", inv_id=p.inv_id, rc39=rc, send_ts=p.send_ts, t0=p.t0))
p.result = TxResult(inv_id=p.inv_id or "", ok=ok, rc39=rc, duration_ms=dt_ms)
p.event.set()
def signon(self) -> bool:
stan = self._next_stan()
payload = build_0800_network(stan, "301")
send_ts = ts()
t0 = time.perf_counter()
p = _Pending(kind="SIGNON", inv_id=None, stan=stan, send_ts=send_ts, t0=t0, event=threading.Event())
with self._pending_lock:
self._pending[stan] = p
try:
with self._send_lock:
send_frame(self._sock, payload, use_etx=self._use_etx)
except (OSError, ValueError) as e:
with self._pending_lock:
self._pending.pop(stan, None)
log_line(format_err_line(kind="SIGNON", inv_id=None, exc=e, send_ts=send_ts, t0=t0))
return False
if not p.event.wait(self._timeout_s):
with self._pending_lock:
self._pending.pop(stan, None)
log_line(format_err_line(kind="SIGNON", inv_id=None, exc=TimeoutError("timeout"), send_ts=send_ts, t0=t0))
return False
return bool(p.ok)
def tx(self, inv_id: str) -> TxResult:
stan = self._next_stan()
payload = build_0200_transaction(stan, bit61=inv_id)
send_ts = ts()
t0 = time.perf_counter()
p = _Pending(kind="TX", inv_id=inv_id, stan=stan, send_ts=send_ts, t0=t0, event=threading.Event())
with self._pending_lock:
self._pending[stan] = p
try:
with self._send_lock:
send_frame(self._sock, payload, use_etx=self._use_etx)
except (OSError, ValueError) as e:
with self._pending_lock:
self._pending.pop(stan, None)
log_line(format_err_line(kind="TX", inv_id=inv_id, exc=e, send_ts=send_ts, t0=t0))
return TxResult(inv_id=inv_id, ok=False, rc39=None, duration_ms=None, error=type(e).__name__)
if not p.event.wait(self._timeout_s):
with self._pending_lock:
self._pending.pop(stan, None)
log_line(format_err_line(kind="TX", inv_id=inv_id, exc=TimeoutError("timeout"), send_ts=send_ts, t0=t0))
return TxResult(inv_id=inv_id, ok=False, rc39=None, duration_ms=None, error="timeout")
return p.result or TxResult(inv_id=inv_id, ok=False, rc39=None, duration_ms=None, error=p.error or "unknown")
def send_signon(sock: socket.socket, *, use_etx: bool, stan_counter: "StanCounter") -> bool:
stan = stan_counter.next()
payload = build_0800_network(stan, "301")
send_ts = ts()
t0 = time.perf_counter()
try:
send_frame(sock, payload, use_etx=use_etx)
resp = recv_frame(sock, expect_etx=use_etx)
parsed = parse_until_39(resp)
rc = parsed.get(39)
log_line(format_ok_line(kind="SIGNON", inv_id=None, rc39=rc, send_ts=send_ts, t0=t0))
return parsed.get(0) == "0810" and rc == "00"
except (OSError, ValueError) as e:
log_line(format_err_line(kind="SIGNON", inv_id=None, exc=e, send_ts=send_ts, t0=t0))
return False
def send_tx(sock: socket.socket, *, use_etx: bool, stan_counter: "StanCounter", inv_id: str) -> TxResult:
stan = stan_counter.next()
payload = build_0200_transaction(stan, bit61=inv_id)
send_ts = ts()
t0 = time.perf_counter()
try:
send_frame(sock, payload, use_etx=use_etx)
resp = recv_frame(sock, expect_etx=use_etx)
dt_ms = (time.perf_counter() - t0) * 1000.0
parsed = parse_until_39(resp)
rc = parsed.get(39)
ok = parsed.get(0) == "0210" and rc == "00"
log_line(format_ok_line(kind="TX", inv_id=inv_id, rc39=rc, send_ts=send_ts, t0=t0))
return TxResult(inv_id=inv_id, ok=ok, rc39=rc, duration_ms=dt_ms)
except (OSError, ValueError) as e:
dt_ms = (time.perf_counter() - t0) * 1000.0
log_line(format_err_line(kind="TX", inv_id=inv_id, exc=e, send_ts=send_ts, t0=t0))
return TxResult(inv_id=inv_id, ok=False, rc39=None, duration_ms=dt_ms, error=type(e).__name__)
def percentile(sorted_values: List[float], p: float) -> float:
if not sorted_values:
return 0.0
if p <= 0:
return sorted_values[0]
if p >= 100:
return sorted_values[-1]
k = (len(sorted_values) - 1) * (p / 100.0)
f = int(k)
c = min(f + 1, len(sorted_values) - 1)
if f == c:
return sorted_values[f]
d0 = sorted_values[f] * (c - k)
d1 = sorted_values[c] * (k - f)
return d0 + d1
def print_summary(results: List[TxResult]) -> None:
total = len(results)
ok = sum(1 for r in results if r.ok)
fail = total - ok
rc_counts: Dict[str, int] = {}
for r in results:
rc = r.rc39 or "NA"
rc_counts[rc] = rc_counts.get(rc, 0) + 1
durations = [r.duration_ms for r in results if r.duration_ms is not None]
durations_f = [float(x) for x in durations]
durations_sorted = sorted(durations_f)
log_line("\nSummary")
log_line(f" total: {total}")
log_line(f" ok: {ok}")
log_line(f" fail: {fail}")
if durations_sorted:
avg = sum(durations_sorted) / len(durations_sorted)
log_line(
" duration_ms: "
f"avg={avg:.2f} p50={percentile(durations_sorted, 50):.2f} "
f"p95={percentile(durations_sorted, 95):.2f} max={durations_sorted[-1]:.2f}"
)
log_line(" rc39_counts:")
for rc, cnt in sorted(rc_counts.items(), key=lambda kv: (-kv[1], kv[0])):
log_line(f" {rc}: {cnt}")
def print_run_start(
*,
start_ts: str,
host: str,
port: int,
use_etx: bool,
do_signon: bool,
invoices_count: int,
threads: int,
timeout: float,
) -> None:
mode = "single-connection"
frame = "etx" if use_etx else "no-etx"
signon = "on" if do_signon else "off"
log_line(
f"Start ts={start_ts} mode={mode} frame={frame} signon={signon} "
f"invoices={invoices_count} threads={threads} timeout={timeout}s target={host}:{port}"
)
def print_run_end(*, end_ts: str, elapsed_s: float) -> None:
log_line(f"End ts={end_ts} elapsed_s={elapsed_s:.3f}")
class StanCounter:
def __init__(self) -> None:
self._value = int(datetime.now().strftime("%H%M%S")) % 1_000_000
def next(self) -> str:
self._value = (self._value + 1) % 1_000_000
return str(self._value).zfill(6)
def build_0800_network(stan: str, code70: str) -> bytes:
mti = "0800"
primary = "8220000000000000" # bits 1,7,11
secondary = "0400000000000000" # bit 70
bit7 = now_bit7()
return f"{mti}{primary}{secondary}{bit7}{stan}{code70}".encode("ascii")
def build_0200_transaction(stan: str, *, bit61: str) -> bytes:
dt = datetime.now()
mti = "0200"
bitmap = "F23A4401A8E0803A0000000004200000"
pan = "622011444444444444"
proc_code = "341019"
bit7 = dt.strftime("%m%d%H%M%S")
bit11 = stan
bit12 = dt.strftime("%H%M%S")
bit13 = dt.strftime("%m%d")
bit15 = (dt + timedelta(days=1)).strftime("%m%d")
bit18 = "6010"
bit22 = "021"
bit32 = "110"
bit33 = "00110"
bit35 = "622011444444444444=9912"
bit37 = ("000000" + stan)[-12:]
bit41 = "N703".ljust(8)
bit42 = "02N703".ljust(15)
bit43 = "TLR N703".ljust(40)
bit49 = "360"
bit59 = "PAY"
bit60 = "120"
bit63 = "214"
bit102 = "0010823214360".ljust(20)
bit107 = "0010"
amount12 = "000000000000"
parts = [
mti,
bitmap,
f"{len(pan):02d}{pan}",
proc_code,
amount12,
bit7,
bit11,
bit12,
bit13,
bit15,
bit18,
bit22,
f"{len(bit32):02d}{bit32}",
f"{len(bit33):02d}{bit33}",
f"{len(bit35):02d}{bit35}",
bit37,
bit41,
bit42,
bit43,
bit49,
f"{len(bit59):03d}{bit59}",
f"{len(bit60):03d}{bit60}",
f"{len(bit61):03d}{bit61}",
f"{len(bit63):03d}{bit63}",
f"{len(bit102):02d}{bit102}",
f"{len(bit107):03d}{bit107}",
]
return "".join(parts).encode("ascii")
def main(argv: Optional[List[str]] = None) -> int:
p = argparse.ArgumentParser(description="Minimal ISO8583 sender")
p.add_argument("--host", required=True)
p.add_argument("--port", required=True, type=int)
p.add_argument("--inv-id", dest="bit61", default=None, help="invoice profile string for bit 61")
p.add_argument(
"--inv-file",
default=None,
help="path to invoices.txt (one invoice per line; blank/# ignored)",
)
p.add_argument(
"--limit",
type=int,
default=None,
help="optional max number of invoices to send from --inv-file",
)
p.add_argument(
"--threads",
type=int,
default=1,
help="max concurrent invoice threads (still uses one TCP connection)",
)
p.add_argument("--timeout", type=float, default=25.0)
p.add_argument("--no-etx", action="store_true", help="length prefix only (no trailing ETX)")
p.add_argument("--no-signon", action="store_true", help="skip initial 0800/301")
args = p.parse_args(argv)
invoices: List[str] = []
if args.inv_file:
invoices = load_invoices_from_file(args.inv_file)
if args.limit is not None:
if args.limit <= 0:
raise SystemExit("--limit must be > 0")
invoices = invoices[: args.limit]
elif args.bit61:
invoices = [args.bit61]
else:
raise SystemExit("provide --inv-id or --inv-file")
if not invoices:
raise SystemExit("no invoices loaded")
if args.threads <= 0:
raise SystemExit("--threads must be > 0")
use_etx = not args.no_etx
do_signon = not args.no_signon
run_start_ts = ts()
run_t0 = time.perf_counter()
print_run_start(
start_ts=run_start_ts,
host=args.host,
port=args.port,
use_etx=use_etx,
do_signon=do_signon,
invoices_count=len(invoices),
threads=args.threads,
timeout=args.timeout,
)
# Server closes old connections when a new connection is opened from the same
# configured client key. So keep exactly one TCP connection for the whole run.
results: List[TxResult] = []
any_fail = False
try:
with socket.create_connection((args.host, args.port), timeout=args.timeout) as s:
dispatcher = StanDispatcher(s, use_etx=use_etx, timeout_s=args.timeout)
if do_signon:
ok_signon = dispatcher.signon()
if not ok_signon:
any_fail = True
# Still continue to print summary/run end.
# Prepare result slots to preserve invoice ordering in summary.
results = [
TxResult(inv_id=inv_id, ok=False, rc39=None, duration_ms=None, error="not_processed")
for inv_id in invoices
]
sem = threading.Semaphore(args.threads)
def run_one(idx: int, inv_id: str) -> None:
nonlocal any_fail
with sem:
r = dispatcher.tx(inv_id)
results[idx] = r
if not r.ok:
any_fail = True
threads: List[threading.Thread] = []
for i, inv_id in enumerate(invoices):
t = threading.Thread(target=run_one, args=(i, inv_id), daemon=True)
threads.append(t)
t.start()
for t in threads:
t.join()
dispatcher.close()
except (OSError, ValueError) as e:
log_line(format_err_line(kind="CONNECT", inv_id=None, exc=e, send_ts=ts(), t0=time.perf_counter()))
# Mark all as failed if we never managed to connect.
if not results:
results = [TxResult(inv_id=inv_id, ok=False, rc39=None, duration_ms=None, error=type(e).__name__) for inv_id in invoices]
any_fail = True
print_summary(results)
print_run_end(end_ts=ts(), elapsed_s=time.perf_counter() - run_t0)
return 1 if any_fail else 0
if __name__ == "__main__":
raise SystemExit(main())
import datetime
import logging
from operator import inv
from opensipkd.pbb.models.views.api_invoice import SismiopInvoice
settings = {"module": "bekasi_kota"}
log = logging.getLogger(__name__)
def main():
invoice = {'kd_propinsi': '32', 'kd_dati2': '79', 'kd_kecamatan': '010',
'kd_kelurahan': '001', 'kd_blok': '001', 'no_urut': '0002',
'kd_jns_op': '0', 'thn_pajak_sppt': '2020', 'siklus_sppt': 1,
'kd_tp': '11', 'nm_wp_sppt': 'WP_3279035807770001',
'jln_wp_sppt': 'LINGK.CISAUHEUN', 'blok_kav_no_wp_sppt': 'CISAUHEUN',
'rw_wp_sppt': '07', 'rt_wp_sppt': '022',
'kelurahan_wp_sppt': 'KELURAHAN SITUBATU',
'kota_wp_sppt': 'KOTA BANJAR',
'kd_pos_wp_sppt': None, 'npwp_sppt': '-',
'tgl_jatuh_tempo_sppt': datetime.datetime(2020, 9, 30, 0, 0),
'luas_bumi_sppt': 1350, 'luas_bng_sppt': 0, 'njop_bumi_sppt': 64800000,
'njop_bng_sppt': 0, 'njop_sppt': 64800000,
'pbb_yg_harus_dibayar_sppt': 81000, 'status_pembayaran_sppt': '0',
'kelurahan': 'KELURAHAN SITUBATU', 'kecamatan': 'KECAMATAN BANJAR',
'kota': 'KOTA BANJAR', 'nop': '32.79-010.001-001.0002-0',
"faktor_pengurang_sppt": 0,
"tagihan":81000
}
import importlib
# settings = get_settings()
module = settings.get('module', 'default')
try:
sismiop = importlib.import_module(f'sismiop.services.{module}')
except ImportError as e:
log.error("sismiop_models module not found: %s", e)
return
invoice["tgl_jatuh_tempo_sppt"] = invoice["tgl_jatuh_tempo_sppt"].date()
inq = sismiop.Inquiry(invoice=SismiopInvoice(**invoice), hitung=False)
inq.tagihan = invoice['pbb_yg_harus_dibayar_sppt']
inq.hitung_denda()
inq.hitung_discount()
print(inq.discount_pokok)
print(inq.discount_denda)
if __name__ == "__main__":
main()
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!