log2db.py 3.62 KB
import sys
import os
import re
from argparse import ArgumentParser
from configparser import ConfigParser
from hashlib import sha256
from time import sleep
from multiprocessing import Process
import tailer
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import (
    IntegrityError,
    OperationalError,
    )
from ..log_models import LogFile
from .tools import BacaFile


my_registry = dict()

PATTERN = r'^([\d]*)-([\d]*)-([\d]*) '\
          r'([\d]*):([\d]*):([\d]*)\.([\d]*) (.*) '\
          r'\[SENT RAW BUFFER\] : (.*)'
REGEX = re.compile(PATTERN)


def set_log_file(path):
    t = os.path.split(path)
    if t[-1] == 'H2H.log':
        if my_registry.get('last_log_file') == path:
            return
        my_registry['log_file'] = path


class Pantau(FileSystemEventHandler):
    def on_created(self, event):  # Override
        print(f'Terbentuk {event.src_path}')
        set_log_file(event.src_path)

    def on_modified(self, event):  # Override
        print(f'Berubah {event.src_path}')
        set_log_file(event.src_path)


def read_log(log_file, db_url, tail_mode=True):
    def save_log(line):
        line = line.rstrip()
        match = REGEX.search(line)
        if not match:
            return
        print([line])
        line_id = sha256(line.encode('utf-8')).hexdigest()
        while True:
            row = LogFile(line_id=line_id, line=line)
            try:
                db_session.add(row)
                db_session.flush()
                db_session.commit()
                return
            except IntegrityError:
                db_session.rollback()
                return
            except OperationalError as e:
                db_session.rollback()
                print(e)
                print('Tunggu, mungkin sedang maintenance ...')
                sleep(10)

    engine = create_engine(db_url)
    factory = sessionmaker(bind=engine)
    db_session = factory()
    with BacaFile(log_file) as f:
        while True:
            line = f.readline()
            if not line:
                break
            try:
                save_log(line.decode('utf-8'))
            except UnicodeDecodeError:
                continue
    if tail_mode:
        with open(log_file) as f:
            for line in tailer.follow(f):
                save_log(line)


def main(argv=sys.argv[1:]):
    pars = ArgumentParser()
    pars.add_argument('conf')
    pars.add_argument('--log-file')
    option = pars.parse_args(argv)
    conf = ConfigParser()
    conf.read(option.conf)
    db_url = conf.get('main', 'db_url')
    if option.log_file:
        read_log(option.log_file, db_url, False)
        return
    log_dir = conf.get('main', 'log_dir')
    handler = Pantau()
    observer = Observer()
    observer.schedule(handler, path=log_dir, recursive=True)
    observer.start()
    try:
        while True:
            if 'log_file' in my_registry:
                if 'read_log' in my_registry:
                    thread = my_registry['read_log']
                    print('Stop read file  ...')
                    thread.terminate()
                    del my_registry['read_log']
                log_file = my_registry['log_file']
                del my_registry['log_file']
                my_registry['last_log_file'] = log_file
                thread = Process(target=read_log, args=(log_file, db_url))
                my_registry['read_log'] = thread
                print('Start read file ...')
                thread.start()
            sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()