log2db.py 3.09 KB
import sys
import os
import re
from argparse import ArgumentParser
from configparser import ConfigParser
from hashlib import sha256
from time import sleep
from multiprocessing import Process
import tailer
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError
import transaction
from zope.sqlalchemy import register
from ..log_models import LogFile
from .tools import BacaFile


my_registry = dict()

PATTERN = r'^([\d]*)-([\d]*)-([\d]*) '\
          r'([\d]*):([\d]*):([\d]*)\.([\d]*) (.*) '\
          r'\[RECEIVED RAW BUFFER\] : (.*)'
REGEX = re.compile(PATTERN)


def set_log_file(path):
    t = os.path.split(path)
    if t[-1] == 'H2H.log':
        if my_registry.get('last_log_file') == path:
            return
        my_registry['log_file'] = path


class Pantau(FileSystemEventHandler):
    def on_created(self, event):  # Override
        print(f'Terbentuk {event.src_path}')
        set_log_file(event.src_path)

    def on_modified(self, event):  # Override
        print(f'Berubah {event.src_path}')
        set_log_file(event.src_path)


def save_log(line):
    line = line.rstrip()
    match = REGEX.search(line)
    if not match:
        return
    print([line])
    db_session = my_registry['db_session']
    line_id = sha256(line.encode('utf-8')).hexdigest()
    row = LogFile(line_id=line_id, line=line)
    try:
        with transaction.manager:
            db_session.add(row)
    except IntegrityError:
        return


def read_log(log_file):
    with BacaFile(log_file) as f:
        while True:
            line = f.readline()
            if not line:
                break
            save_log(line.decode('utf-8'))
    with open(log_file) as f:
        for line in tailer.follow(f):
            save_log(line)


def main(argv=sys.argv[1:]):
    pars = ArgumentParser()
    pars.add_argument('conf')
    option = pars.parse_args(argv)
    conf = ConfigParser()
    conf.read(option.conf)
    engine = create_engine(conf.get('main', 'db_url'))
    factory = sessionmaker(bind=engine)
    my_registry['db_session'] = db_session = factory()
    register(db_session)
    log_dir = conf.get('main', 'log_dir')
    handler = Pantau()
    observer = Observer()
    observer.schedule(handler, path=log_dir, recursive=True)
    observer.start()
    try:
        while True:
            if 'log_file' in my_registry:
                if 'read_log' in my_registry:
                    thread = my_registry['read_log']
                    print('Stop read file  ...')
                    thread.terminate()
                    del my_registry['read_log']
                log_file = my_registry['log_file']
                del my_registry['log_file']
                my_registry['last_log_file'] = log_file
                thread = Process(target=read_log, args=(log_file,))
                my_registry['read_log'] = thread
                print('Start read file ...')
                thread.start()
            sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()