log2db.py
3.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import sys
import os
import re
from argparse import ArgumentParser
from configparser import ConfigParser
from hashlib import sha256
from time import sleep
from multiprocessing import Process
import tailer
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError
import transaction
from zope.sqlalchemy import register
from ..log_models import LogFile
from .tools import BacaFile
my_registry = dict()
PATTERN = r'^([\d]*)-([\d]*)-([\d]*) '\
r'([\d]*):([\d]*):([\d]*)\.([\d]*) (.*) '\
r'\[RECEIVED RAW BUFFER\] : (.*)'
REGEX = re.compile(PATTERN)
def set_log_file(path):
t = os.path.split(path)
if t[-1] == 'H2H.log':
if my_registry.get('last_log_file') == path:
return
my_registry['log_file'] = path
class Pantau(FileSystemEventHandler):
def on_created(self, event): # Override
print(f'Terbentuk {event.src_path}')
set_log_file(event.src_path)
def on_modified(self, event): # Override
print(f'Berubah {event.src_path}')
set_log_file(event.src_path)
def save_log(line):
line = line.rstrip()
match = REGEX.search(line)
if not match:
return
print([line])
db_session = my_registry['db_session']
line_id = sha256(line.encode('utf-8')).hexdigest()
row = LogFile(line_id=line_id, line=line)
try:
with transaction.manager:
db_session.add(row)
except IntegrityError:
return
def read_log(log_file):
with BacaFile(log_file) as f:
while True:
line = f.readline()
if not line:
break
save_log(line.decode('utf-8'))
with open(log_file) as f:
for line in tailer.follow(f):
save_log(line)
def main(argv=sys.argv[1:]):
pars = ArgumentParser()
pars.add_argument('conf')
option = pars.parse_args(argv)
conf = ConfigParser()
conf.read(option.conf)
engine = create_engine(conf.get('main', 'db_url'))
factory = sessionmaker(bind=engine)
my_registry['db_session'] = db_session = factory()
register(db_session)
log_dir = conf.get('main', 'log_dir')
handler = Pantau()
observer = Observer()
observer.schedule(handler, path=log_dir, recursive=True)
observer.start()
try:
while True:
if 'log_file' in my_registry:
if 'read_log' in my_registry:
thread = my_registry['read_log']
print('Stop read file ...')
thread.terminate()
del my_registry['read_log']
log_file = my_registry['log_file']
del my_registry['log_file']
my_registry['last_log_file'] = log_file
thread = Process(target=read_log, args=(log_file,))
my_registry['read_log'] = thread
print('Start read file ...')
thread.start()
sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()