Commit ffcd6afc by Owo Sugiana

Parser untuk log file dari Cartenz

1 parent 7e027e2f
2.0 2022-07-20
2.0 2022-07-24
--------------
- Perbedaan utama dari versi 0.x adalah kini berangkat dari tabel aslinya dan
bukan dari tabel ISO8583 karena sudah memperhatikan channel MANUAL (input
manual pembayaran)
- Memahami channel VA dan QRIS
- Menyimpan log file dari Cartenz ke tabel log_file
- Menerjemahkan log_file menjadi log_iso
import sys
import transaction
from opensipkd.iso8583.bjb.pbb.models import Log
from opensipkd.iso8583.bjb.pbb import Doc
from payment_report.scripts.pbb import App as BaseApp
from payment_report.scripts.common import (
get_iso,
get_keys,
)
class App(BaseApp):
def __run_payment(self):
last = self.get_last_id('pbb payment last id')
q_iso = self.__get_query_iso(last.as_int())
found = False
for row_pay, row_inq in q_iso.limit(1000):
iso = get_iso(row_pay.iso_request, Doc, self.option.debug)
stan = iso.getBit(11)
ntb = iso.getBit(48)
d = get_keys(iso)
s = f"STAN {stan} NTB {ntb} Channel {d['channel']}"
self.log.info(s)
q = self.rpt_session.query(Log).filter_by(
bit_011=stan, bit_048=ntb)
iso_log = q.first()
if not iso_log:
iso_values = iso.get_values()
bits = dict(mti='0200')
for bit in iso_values:
value = iso_values[bit]
field = 'bit_{}'.format(str(bit).zfill(3))
bits[field] = value
iso_log = Log(**bits)
iso_log.error = d['channel'], # Numpang di field error
last.nilai = str(row_inq.id)
with transaction.manager:
self.rpt_session.add(iso_log)
self.rpt_session.add(last)
found = True
return found
def run_reversal(self): # Override
pass
app = App(sys.argv[1:])
app.run()
[main]
db_url = postgresql://cartenz@172.17.1.117:5432/h2h_pbb
log_dir = E:\ISO BPHTB\PRODUCTION\SYSTEM_LOG
[main]
models = opensipkd.iso8583.bjb.pbb.models
service = opensipkd.iso8583.bjb.pbb.doc
db_url = postgresql://cartenz:pass@localhost/h2h_pbb
report_db_url = postgresql://pcpd:pass@localhost/pcpd
pid_file = /home/sugiana/tmp/pbb-log.pid
log_file = /home/sugiana/log/pbb-log.log
from sqlalchemy import (
Column,
Integer,
String,
Text,
)
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class LogFile(Base):
__tablename__ = 'log_file'
id = Column(Integer, primary_key=True)
line_id = Column(String(64), nullable=False, unique=True)
line = Column(Text, nullable=False)
......@@ -265,13 +265,19 @@ def make_pid_file(filename):
return write_pid_file(filename)
def create_session(db_url, debug=False):
engine = create_engine(db_url, echo=debug)
factory = sessionmaker(bind=engine)
return factory()
class BaseApp:
conf_name = None # Override, please
report_orm = None # Override, please
va_product_code = '' # Override, please
def __init__(self, argv):
self.option = get_option(argv)
self.option = self.get_option(argv)
cp = ConfigParser()
cp.read(self.option.conf)
self.conf = cp['main']
......@@ -299,6 +305,9 @@ class BaseApp:
else:
self.va_session = None
def get_option(self, argv):
return get_option(argv)
def get_factory(self, name):
db_url = self.conf[name]
engine = create_engine(db_url, echo=self.option.debug_sql)
......@@ -315,9 +324,22 @@ class BaseApp:
pass
def get_report(self, pay):
q = self.rpt_session.query(self.report_orm).filter_by(id=pay.id)
session = self.get_session_for_save()
q = session.query(self.report_orm).filter_by(id=pay.id)
return q.first()
def get_prefix_log(self):
return f'Invoice ID {self.invoice_id}'
def get_estimate(self, no):
duration = time() - self.start_time
speed = duration / no
remain_row = self.count - no
return humanize_time(speed * remain_row)
def get_session_for_save(self):
return self.rpt_session
def update_from_date(self):
q = self.get_payment_query()
no = self.offset
......@@ -339,23 +361,25 @@ class BaseApp:
else:
msg = f'ALREADY SAME {d}'
rpt = None
if self.count == 1 and self.last: # Hemat log
print(msg)
print('Log yang sama, abaikan.')
return
else:
msg = f'INSERT {d}'
rpt = self.report_orm(**source)
msg = f'Invoice ID {self.invoice_id} {msg}'
msg = f'{self.get_prefix_log()} {msg}'
log_method = self.log.info
except InvalidSource as e:
msg = str(e)
log_method = self.log.warning
rpt = None
duration = time() - self.start_time
speed = duration / no
remain_row = self.count - no
e = humanize_time(speed * remain_row)
e = self.get_estimate(no)
log_method(f'#{no}/{self.count} {msg}, estimate {e}')
if rpt:
session = self.get_session_for_save()
with transaction.manager:
self.rpt_session.add(rpt)
session.add(rpt)
self.last_pay = pay
self.offset += row_limit
return found
......@@ -373,10 +397,10 @@ class BaseApp:
def run_payment(self):
self.last_pay = None
self.last = None
self.offset = 0
self.tgl_akhir = date.today()
if self.option.update_from_date:
self.last = None
try:
days_ago = int(self.option.update_from_date)
self.tgl_awal = date.today() + timedelta(days_ago)
......@@ -388,9 +412,11 @@ class BaseApp:
else:
self.last = self.get_last_id(self.conf_name)
self.tgl_awal = self.last.as_datetime()
self.count = self.get_count()
self.start_time = time()
while True:
self.count = self.get_count()
if not self.count:
return
self.start_time = time()
found = self.update_from_date()
if not found:
break
......
nama,nilai,keterangan
log file last id,0,Field log_file.id terakhir yang diproses
import sys
import os
import re
from argparse import ArgumentParser
from configparser import ConfigParser
from hashlib import sha256
from time import sleep
from multiprocessing import Process
import tailer
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError
import transaction
from zope.sqlalchemy import register
from ..log_models import LogFile
from .tools import BacaFile
my_registry = dict()
PATTERN = r'^([\d]*)-([\d]*)-([\d]*) '\
r'([\d]*):([\d]*):([\d]*)\.([\d]*) (.*) '\
r'\[RECEIVED RAW BUFFER\] : (.*)'
REGEX = re.compile(PATTERN)
def set_log_file(path):
t = os.path.split(path)
if t[-1] == 'H2H.log':
if my_registry.get('last_log_file') == path:
return
my_registry['log_file'] = path
class Pantau(FileSystemEventHandler):
def on_created(self, event): # Override
print(f'Terbentuk {event.src_path}')
set_log_file(event.src_path)
def on_modified(self, event): # Override
print(f'Berubah {event.src_path}')
set_log_file(event.src_path)
def save_log(line):
line = line.rstrip()
match = REGEX.search(line)
if not match:
return
print([line])
db_session = my_registry['db_session']
line_id = sha256(line.encode('utf-8')).hexdigest()
row = LogFile(line_id=line_id, line=line)
try:
with transaction.manager:
db_session.add(row)
except IntegrityError:
return
def read_log(log_file):
with BacaFile(log_file) as f:
while True:
line = f.readline()
if not line:
break
save_log(line.decode('utf-8'))
with open(log_file) as f:
for line in tailer.follow(f):
save_log(line)
def main(argv=sys.argv[1:]):
pars = ArgumentParser()
pars.add_argument('conf')
option = pars.parse_args(argv)
conf = ConfigParser()
conf.read(option.conf)
engine = create_engine(conf.get('main', 'db_url'))
factory = sessionmaker(bind=engine)
my_registry['db_session'] = db_session = factory()
register(db_session)
log_dir = conf.get('main', 'log_dir')
handler = Pantau()
observer = Observer()
observer.schedule(handler, path=log_dir, recursive=True)
observer.start()
try:
while True:
if 'log_file' in my_registry:
if 'read_log' in my_registry:
thread = my_registry['read_log']
print('Stop read file ...')
thread.terminate()
del my_registry['read_log']
log_file = my_registry['log_file']
del my_registry['log_file']
my_registry['last_log_file'] = log_file
thread = Process(target=read_log, args=(log_file,))
my_registry['read_log'] = thread
print('Start read file ...')
thread.start()
sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
import sys
import re
from argparse import ArgumentParser
from time import time
from ISO8583.ISOErrors import BitNotSet
from sqlalchemy import func
from sqlalchemy.ext.declarative import declarative_base
from zope.sqlalchemy import register
from opensipkd.waktu import create_datetime
from opensipkd.string import FixLength
from ..models import Common
from ..log_models import LogFile
from .common import (
BaseApp,
my_registry,
append_csv,
create_session,
InvalidSource,
)
PATTERN = r'^([\d]*)-([\d]*)-([\d]*) ([\d]*):([\d]*):([\d]*)\.([\d]*) '\
r'(.*) \[RECEIVED RAW BUFFER\] : (.*)'
REGEX = re.compile(PATTERN)
Base = declarative_base()
def get_parser():
pars = ArgumentParser()
pars.add_argument('conf')
pars.add_argument('--update-from-id', type=int)
pars.add_argument('--debug-sql', action='store_true')
return pars
def get_option(argv):
pars = get_parser()
return pars.parse_args(argv)
class App(BaseApp):
conf_name = 'log file last id'
def __init__(self, argv):
super().__init__(argv)
if not self.pid:
return
class Log(self.models.Log, Common):
pass
self.report_orm = Log
register(self.prod_session)
def get_option(self, argv): # Override
return get_option(argv)
def get_session_for_save(self): # Override
return self.prod_session
def get_filter_query(self, q):
return q.filter(LogFile.id > self.last_id)
def get_count(self): # Override
q = self.prod_session.query(func.count())
q = self.get_filter_query(q)
return q.scalar()
def get_payment_query(self): # Override
q = self.prod_session.query(LogFile)
q = self.get_filter_query(q)
return q.order_by(LogFile.id)
def get_prefix_log(self): # Override
return f'ID {self.log_id}'
def create_data(self, log_file): # Override
self.log_id = log_file.id
match = REGEX.search(log_file.line)
if not match:
raise InvalidSource(f'ID {log_file.id} pola tidak dipahami')
year, month, day, hour, minute, sec, msec, _, raw = \
match.groups()
waktu = create_datetime(
int(year), int(month), int(day), int(hour),
int(minute), int(sec), int(msec)*1000)
iso = self.service.Doc()
iso.setIsoContent(raw[4:].encode('utf-8'))
try:
if iso.getBit(3) != self.service.PAYMENT_CODE:
raise InvalidSource(f'ID {log_file.id} bukan payment')
except BitNotSet:
raise InvalidSource(f'ID {log_file.id} bit 3 tidak ada')
d = dict(mti=iso.getMTI(), created=waktu)
for bit in iso.get_bit_definition():
try:
value = iso.getBit(bit)
except BitNotSet:
continue
field = 'bit_{}'.format(str(bit).zfill(3))
d[field] = value
profile = FixLength(self.service.INVOICE_PROFILE)
profile.set_raw(iso.getBit(62))
d['bit_062_data'] = profile.to_dict()
return d
def get_last_time(self): # Override
return str(self.last_pay.id)
def run_payment(self): # Override
self.last_pay = None
self.offset = 0
if self.option.update_from_id is None:
self.last = self.get_last_id(self.conf_name)
self.last_id = self.last.as_int()
else:
self.last = None
self.last_id = self.option.update_from_id - 1
self.count = self.get_count()
self.start_time = time()
while True:
found = self.update_from_date()
if not found:
break
if self.last_pay and self.last:
self.update_last()
def main(argv=sys.argv[1:]):
app = App(argv)
app.run()
import sys
from configparser import ConfigParser
from sqlalchemy import create_engine
from opensipkd.views.models import Conf
from ..log_models import Base
from .common import (
my_registry,
append_csv,
create_session,
get_module_object,
)
def main(argv=sys.argv[1:]):
conf_file = argv[0]
conf = ConfigParser()
conf.read(conf_file)
db_url = conf.get('main', 'report_db_url')
my_registry['db_session'] = db_session = create_session(db_url, True)
Conf.metadata.create_all(db_session.bind)
append_csv(Conf, 'conf_log2iso.csv', ['nama'])
db_session.commit()
models = get_module_object(conf.get('main', 'models'))
class Log(Base, models.LogMixin):
pass
db_url = conf.get('main', 'db_url')
engine = create_engine(db_url, echo=True)
Base.metadata.create_all(engine)
......@@ -4,6 +4,10 @@ from datetime import (
time,
)
from decimal import Decimal
from mmap import (
mmap,
ACCESS_READ,
)
#################
......@@ -88,3 +92,42 @@ def update(source: dict, target: dict):
log_msg.append('{f} {t} to be {s}'.format(
f=field, t=[log_target_value], s=[log_source_value]))
return target_update, log_msg
class BacaFile:
def __init__(self, filename, cari=''):
if cari:
self.is_found = False
else:
self.is_found = True
is_first = True
self.file = open(filename, 'rb')
s = mmap(self.file.fileno(), 0, access=ACCESS_READ)
while cari:
pos = s.find(cari.encode('utf-8'))
if pos == -1: # Tidak ketemu
return
if pos == 0: # Ketemu di baris ke-1
self.is_found = True
break
if not is_first:
self.file.seek(pos)
self.file.readline()
self.is_found = True
break
cari = '\n' + cari # Cari di baris berikutnya
self.file.seek(0)
is_first = False
def close(self):
self.file.close()
def readline(self):
if self.is_found:
return self.file.readline()
def __enter__(self):
return self.file
def __exit__(self, type, value, traceback):
self.close()
[main]
models = sismiop.models.default
db_url = postgresql://user:pass@localhost/db
pbb_db_url = postgresql://user:pass@localhost/db
report_db_url = postgresql://user:pass@localhost/db
pid_file = /home/sugiana/tmp/pbb-json-report.pid
log_file = /home/sugiana/log/pbb-json-report.log
kd_kanwil = 22
kd_kantor = 14
kd_tp = 47
......@@ -36,12 +36,13 @@ setup(
'console_scripts': [
'payment_report_init_db = payment_report.scripts.common:init_db',
'pbb_report = payment_report.scripts.pbb:main',
'pbb_json_report = payment_report.scripts.pbb_json:main',
'bphtb_report = payment_report.scripts.bphtb:main',
'bphtb_json_report = payment_report.scripts.bphtb_json:main',
'pad_report = payment_report.scripts.pad:main',
'pad_json_report = payment_report.scripts.pad_json:main',
'webr_report = payment_report.scripts.webr:main',
'pbb_log = payment_report.scripts.pbb_log:main',
'log2iso_init_db = payment_report.scripts.log2iso_init:main',
'log2iso = payment_report.scripts.log2iso:main',
'log2db = payment_report.scripts.log2db:main',
],
}
)
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!