test_inquiry.py
3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from datetime import datetime
from pprint import pprint
from time import sleep
from optparse import OptionParser
from opensipkd.forwarder.bjb.samsat.structure import INQUIRY_CODE
from .DbTransaction import DbTransaction
import os
import imp
def inquiry_request(iso, invoice_id, bank_id=None):
bank_id = bank_id and bank_id or '110'
kini = datetime.now()
iso.setBit(2, kini.strftime('%Y%m%d%H%M%S'))
iso.set_transaction_code(INQUIRY_CODE)
iso.setBit(12, kini.strftime('%H%M%S'))
iso.setBit(13, kini.strftime('%m%d'))
iso.setBit(15, kini.strftime('%m%d'))
iso.setBit(18, '6010')
iso.setBit(22, '021')
iso.setBit(32, bank_id)
iso.setBit(33, '00110')
# iso.setBit(35, '')
iso.setBit(37, kini.strftime('%H%M%S'))
iso.setBit(41, '000')
iso.setBit(42, '000000000000000')
iso.setBit(43, 'Nama Bank')
iso.setBit(49, '390')
# iso.setBit(59, 'PAY')
# iso.setBit(60, '142')
iso.setBit(61, invoice_id)
# iso.setBit(63, '')
# iso.setBit(102, '')
# iso.setBit(107, '')
# test_not_found = False
# name = '.'.join(['bppt', 'test'])
# try:
# module = __import__(name)
# except ImportError, test_not_found:
# name = '.'.join(['bppt', conf.module_name])
# module = __import__(name)
# area_module = getattr(module, conf.module_name)
# DbTransaction = area_module.DbTransaction
#
# if test_not_found:
# inquiry_request = default_inquiry_request
# else:
# inquiry_request = area_module.test.inquiry_request
class TestInquiry(object):
def __init__(self, argv):
self.option = get_option(argv)
if not self.option:
return
self.invoice_id = self.option.invoice_id.replace('-', '')
conf_file = self.option.conf_file
self.host = self.option.host
conf = imp.load_source('conf', conf_file)
self.conf = conf.host[self.host]
# dict(name=streamer_name, ip='127.0.0.1', bank_id=bank_id)
def run(self):
if not self.option:
return
print('\nBank kirim inquiry request')
req_iso = DbTransaction(conf=self.conf)
req_iso.set_transaction_request()
inquiry_request(req_iso, self.invoice_id)
raw = self.get_raw(req_iso)
print('\nPemda terima inquiry request')
from_iso = DbTransaction(conf=self.conf)
from_iso.setIsoContent(raw)
print('\nPemda kirim inquiry response')
resp_iso = DbTransaction(from_iso=from_iso, conf=self.conf)
func = getattr(resp_iso, from_iso.get_func_name())
func()
self.get_raw(resp_iso)
return resp_iso # Untuk test_payment.py
def get_raw(self, iso):
msg = 'MTI {mti}'.format(mti=iso.getMTI())
print(msg)
pprint(iso.getBitsAndValues())
raw = iso.getRawIso()
sleep(1)
print([raw])
return raw
def get_option(argv):
# bank = conf.host['streamer'] #'bjb'
pars = OptionParser()
conf_file = os.path.join('conf', 'forwarder.py')
host_name = 'bjb'
pars.add_option('-o', '--host', default=host_name, help='default ' + host_name)
pars.add_option('-i', '--invoice-id')
pars.add_option('-c', '--conf-file', default=conf_file, help='default ' + conf_file)
option, remain = pars.parse_args(argv)
if not option.invoice_id:
print('--invoice-id harus diisi.')
return
return option
def split_bank(s):
t = s.split(',')
if t[1:]:
streamer_name = t[0]
bank_id = int(t[1])
else:
streamer_name = s
bank_id = 0
return streamer_name, bank_id
def main(argv):
test = TestInquiry(argv)
test.run()