log2db.py
3.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import sys
import os
import re
from argparse import ArgumentParser
from configparser import ConfigParser
from hashlib import sha256
from time import sleep
from multiprocessing import Process
import tailer
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import (
IntegrityError,
OperationalError,
)
from ..log_models import LogFile
from .tools import BacaFile
my_registry = dict()
PATTERN = r'^([\d]*)-([\d]*)-([\d]*) '\
r'([\d]*):([\d]*):([\d]*)\.([\d]*) (.*) '\
r'\[SENT RAW BUFFER\] : (.*)'
REGEX = re.compile(PATTERN)
def set_log_file(path):
t = os.path.split(path)
if t[-1] == 'H2H.log':
if my_registry.get('last_log_file') == path:
return
my_registry['log_file'] = path
class Pantau(FileSystemEventHandler):
def on_created(self, event): # Override
print(f'Terbentuk {event.src_path}')
set_log_file(event.src_path)
def on_modified(self, event): # Override
print(f'Berubah {event.src_path}')
set_log_file(event.src_path)
def read_log(log_file, db_url, tail_mode=True):
def save_log(line):
line = line.rstrip()
match = REGEX.search(line)
if not match:
return
print([line])
line_id = sha256(line.encode('utf-8')).hexdigest()
while True:
row = LogFile(line_id=line_id, line=line)
try:
db_session.add(row)
db_session.flush()
db_session.commit()
return
except IntegrityError:
db_session.rollback()
return
except OperationalError as e:
db_session.rollback()
print(e)
print('Tunggu, mungkin sedang maintenance ...')
sleep(10)
engine = create_engine(db_url)
factory = sessionmaker(bind=engine)
db_session = factory()
with BacaFile(log_file) as f:
while True:
line = f.readline()
if not line:
break
try:
save_log(line.decode('utf-8'))
except UnicodeDecodeError:
continue
if tail_mode:
with open(log_file) as f:
for line in tailer.follow(f):
save_log(line)
def main(argv=sys.argv[1:]):
pars = ArgumentParser()
pars.add_argument('conf')
pars.add_argument('--log-file')
option = pars.parse_args(argv)
conf = ConfigParser()
conf.read(option.conf)
db_url = conf.get('main', 'db_url')
if option.log_file:
read_log(option.log_file, db_url, False)
return
log_dir = conf.get('main', 'log_dir')
handler = Pantau()
observer = Observer()
observer.schedule(handler, path=log_dir, recursive=True)
observer.start()
try:
while True:
if 'log_file' in my_registry:
if 'read_log' in my_registry:
thread = my_registry['read_log']
print('Stop read file ...')
thread.terminate()
del my_registry['read_log']
log_file = my_registry['log_file']
del my_registry['log_file']
my_registry['last_log_file'] = log_file
thread = Process(target=read_log, args=(log_file, db_url))
my_registry['read_log'] = thread
print('Start read file ...')
thread.start()
sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()