__init__.py 4.66 KB
import sys
import re
from argparse import ArgumentParser
from configparser import ConfigParser
import select
from hashlib import md5
from time import sleep
from subprocess import (
    Popen,
    PIPE,
    )
from mmap import (
    mmap,
    ACCESS_READ,
    )
from sqlalchemy import engine_from_config
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError
from opensipkd.waktu import create_datetime
from .models import (
    Base,
    Log,
    )


FIRST_LINE_PATTERN = \
        r'^([\d]*)-([\d]*)-([\d]*) '\
        r'([\d]*):([\d]*):([\d]*),([\d]*) '
FIRST_LINE_REGEX = re.compile(FIRST_LINE_PATTERN)

abaikan_error = []
global_vars = dict()


class BacaFile:
    def __init__(self, filename, cari=''):
        if cari:
            self.is_found = False
        else:
            self.is_found = True
        is_first = True
        self.file = open(filename, 'rb')
        s = mmap(self.file.fileno(), 0, access=ACCESS_READ)
        while cari:
            pos = s.find(cari.encode('utf-8'))
            if pos == -1:  # Tidak ketemu
                return
            if pos == 0:  # Ketemu di baris ke-1
                self.is_found = True
                break
            if not is_first:
                self.file.seek(pos)
                self.file.readline()
                self.is_found = True
                break
            cari = '\n' + cari  # Cari di baris berikutnya
            self.file.seek(0)
            is_first = False

    def close(self):
        self.file.close()

    def readline(self):
        if self.is_found:
            return self.file.readline()


def save(lines, line_id, waktu):
    s = '\n'.join(lines)
    print([s])
    row = Log(line=s, line_id=line_id, tgl=waktu)
    db_session = global_vars['db_session']
    db_session.add(row)
    try:
        db_session.flush()
        db_session.commit()
    except IntegrityError as e:
        s_err = str(e)
        for error in abaikan_error:
            if s_err.find(error) > -1:
                print('  sudah ada')
                db_session.rollback()
                return
        raise(e)


def read_once(log_file, start_from):
    lines = []
    line_id = None
    f = BacaFile(log_file, start_from)
    while True:
        line = f.readline()
        if not line and line_id:
            save(lines, line_id, waktu)
            break
        s = line.rstrip().decode('utf-8')
        match = FIRST_LINE_REGEX.search(s)
        if match:
            if line_id:
                save(lines, line_id, waktu)
            del lines[:]
            line_id = md5(line).hexdigest()
            year, month, day, hour, minute, sec, msec = match.groups()
            waktu = create_datetime(
                    int(year), int(month), int(day), int(hour),
                    int(minute), int(sec), int(msec)*1000)
        lines.append(s)
    f.close()


def read_forever(log_file):
    lines = []
    line_id = None
    f = Popen(['tail', '-f', log_file], stdout=PIPE, stderr=PIPE)
    p = select.poll()
    p.register(f.stdout)
    while True:
        if p.poll(1):
            line = f.stdout.readline()
            line = line.rstrip()
            line = line.decode('utf-8')
            match = FIRST_LINE_REGEX.search(line)
            if match:
                if line_id:
                    save(lines, line_id, waktu)
                line_id = md5(line.encode('utf-8')).hexdigest()
                lines = [line]
                year, month, day, hour, minute, sec, msec = match.groups()
                waktu = create_datetime(
                        int(year), int(month), int(day), int(hour),
                        int(minute), int(sec), int(msec)*1000)
            else:
                lines.append(line)
        sleep(0.1)


def get_option(argv):
    pars = ArgumentParser()
    pars.add_argument('conf')
    pars.add_argument('--log-file', help='log file')
    pars.add_argument('--run-and-stop', action='store_true')
    pars.add_argument('--start-from')
    return pars.parse_args(argv)


def main(argv=sys.argv[1:]):
    option = get_option(argv)
    conf = ConfigParser()
    conf.read(option.conf)
    for line in conf.get('main', 'abaikan_error').splitlines():
        s = line.strip()
        if s:
            abaikan_error.append(s)
    engine = engine_from_config(conf['main'], 'db_')
    factory = sessionmaker(bind=engine)
    global_vars['db_session'] = factory()
    if option.run_and_stop:
        read_once(option.log_file, option.start_from)
    else:
        read_forever(option.log_file)


def init(argv=sys.argv[1:]):
    option = get_option(argv)
    conf = ConfigParser()
    conf.read(option.conf)
    engine = engine_from_config(conf['main'], 'db_')
    engine.echo = True
    Base.metadata.create_all(engine)