Commit 10985a0c by Owo Sugiana

Tambah log2db

1 parent 1d565a88
test* test*
__pycache__ __pycache__
dist
*egg-info
0.5 2-3-2021
------------
- Tambah log2db yaitu menyimpan log file ke tabel log_file.
0.4 17-1-2021
-------------
- Jangan cekal replication
0.3 29-12-2020 0.3 29-12-2020
-------------- --------------
- Helper untuk memasang pencatatan perubahan isi tabel. - Helper untuk memasang pencatatan perubahan isi tabel.
......
...@@ -93,4 +93,54 @@ Tetapkanlah tabel yang akan dicatat perubahannya. Utamanya tabel terkait transak ...@@ -93,4 +93,54 @@ Tetapkanlah tabel yang akan dicatat perubahannya. Utamanya tabel terkait transak
$ ~/env/bin/log_table_trigger live.ini --schema=pbb --table=pembayaran_sppt $ ~/env/bin/log_table_trigger live.ini --schema=pbb --table=pembayaran_sppt
Log File to Database
--------------------
Script ini digunakan untuk menyimpan baris-baris pada *log file* menjadi sebuah
record pada tabel ``log_file``. Jika pada *log file* terdapat beberapa baris
yang sebenarnya sebuah satu-kesatuan seperti pada *Python exception* maka akan
disimpan ke dalam sebuah record saja.
Buatlah databasenya terlebih dahulu. Buatlah file konfigurasi ``log2db.ini``::
[main]
db_url = postgresql://user:pass@localhost:5432/db
abaikan_error =
nilai kunci ganda
duplicate key
Buat tabelnya::
$ ~/env/bin/log2db_init log2db.ini
Buat init script-nya di ``/etc/systemd/system/log2db.service``::
[Unit]
Description=Log file to database
After=postgresql.service
[Service]
Type=simple
User=webr
ExecStart=/home/webr/env/bin/log2db /home/webr/log2db.ini --log-file=/home/webr/log/pserve.log
Restart=on-abort
[Install]
WantedBy=multi-user.target
Aktifkan::
$ sudo systemctl enable log2db.service
$ sudo systemctl start log2db.service
Tabel ``log_file`` akan terisi jika ada penambahan baris setelah daemon
dihidupkan. Kalau ingin menyimpan baris-baris sebelumnya maka jalankan ini,
biarkan daemon itu tetap berjalan::
$ ~/env/bin/log2db /home/webr/log2db.ini --log-file=/home/webr/log/pserve.log --run-and-stop
Jangan khawatir untuk menjalankannya berkali-kali karena setiap barisnya
dianggap unik di tabel ``log_file``.
Semoga berhasil. Semoga berhasil.
[main]
db_url = postgresql://user:pass@localhost:5432/db
abaikan_error =
nilai kunci ganda
duplicate key
import sys
import re
from argparse import ArgumentParser
from configparser import ConfigParser
import select
from hashlib import md5
from time import sleep
from subprocess import (
Popen,
PIPE,
)
from mmap import (
mmap,
ACCESS_READ,
)
from sqlalchemy import engine_from_config
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError
from opensipkd.waktu import create_datetime
from .models import (
Base,
Log,
)
FIRST_LINE_PATTERN = \
r'^([\d]*)-([\d]*)-([\d]*) '\
r'([\d]*):([\d]*):([\d]*),([\d]*) '
FIRST_LINE_REGEX = re.compile(FIRST_LINE_PATTERN)
abaikan_error = []
global_vars = dict()
class BacaFile:
def __init__(self, filename, cari=''):
if cari:
self.is_found = False
else:
self.is_found = True
is_first = True
self.file = open(filename, 'rb')
s = mmap(self.file.fileno(), 0, access=ACCESS_READ)
while cari:
pos = s.find(cari.encode('utf-8'))
if pos == -1: # Tidak ketemu
return
if pos == 0: # Ketemu di baris ke-1
self.is_found = True
break
if not is_first:
self.file.seek(pos)
self.file.readline()
self.is_found = True
break
cari = '\n' + cari # Cari di baris berikutnya
self.file.seek(0)
is_first = False
def close(self):
self.file.close()
def readline(self):
if self.is_found:
return self.file.readline()
def save(lines, line_id, waktu):
s = '\n'.join(lines)
print([s])
row = Log(line=s, line_id=line_id, tgl=waktu)
db_session = global_vars['db_session']
db_session.add(row)
try:
db_session.flush()
db_session.commit()
except IntegrityError as e:
s_err = str(e)
for error in abaikan_error:
if s_err.find(error) > -1:
print(' sudah ada')
db_session.rollback()
return
raise(e)
def read_once(log_file, start_from):
lines = []
line_id = None
f = BacaFile(log_file, start_from)
while True:
line = f.readline()
if not line and line_id:
save(lines, line_id, waktu)
break
s = line.rstrip().decode('utf-8')
match = FIRST_LINE_REGEX.search(s)
if match:
if line_id:
save(lines, line_id, waktu)
del lines[:]
line_id = md5(line).hexdigest()
year, month, day, hour, minute, sec, msec = match.groups()
waktu = create_datetime(
int(year), int(month), int(day), int(hour),
int(minute), int(sec), int(msec)*1000)
lines.append(s)
f.close()
def read_forever(log_file):
lines = []
line_id = None
f = Popen(['tail', '-f', log_file], stdout=PIPE, stderr=PIPE)
p = select.poll()
p.register(f.stdout)
while True:
if p.poll(1):
line = f.stdout.readline()
line = line.rstrip()
line = line.decode('utf-8')
match = FIRST_LINE_REGEX.search(line)
if match:
if line_id:
save(lines, line_id, waktu)
line_id = md5(line.encode('utf-8')).hexdigest()
lines = [line]
year, month, day, hour, minute, sec, msec = match.groups()
waktu = create_datetime(
int(year), int(month), int(day), int(hour),
int(minute), int(sec), int(msec)*1000)
else:
lines.append(line)
sleep(0.1)
def get_option(argv):
pars = ArgumentParser()
pars.add_argument('conf')
pars.add_argument('--log-file', help='log file')
pars.add_argument('--run-and-stop', action='store_true')
pars.add_argument('--start-from')
return pars.parse_args(argv)
def main(argv=sys.argv[1:]):
option = get_option(argv)
conf = ConfigParser()
conf.read(option.conf)
for line in conf.get('main', 'abaikan_error').splitlines():
s = line.strip()
if s:
abaikan_error.append(s)
engine = engine_from_config(conf['main'], 'db_')
factory = sessionmaker(bind=engine)
global_vars['db_session'] = factory()
if option.run_and_stop:
read_once(option.log_file, option.start_from)
else:
read_forever(option.log_file)
def init(argv=sys.argv[1:]):
option = get_option(argv)
conf = ConfigParser()
conf.read(option.conf)
engine = engine_from_config(conf['main'], 'db_')
engine.echo = True
Base.metadata.create_all(engine)
from sqlalchemy import (
Column,
String,
DateTime,
Integer,
Text,
)
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Log(Base):
__tablename__ = 'log_file'
id = Column(Integer, nullable=False, primary_key=True)
tgl = Column(DateTime(timezone=True), nullable=False)
line = Column(Text, nullable=False)
# line_id berisi md5 dari line
line_id = Column(String(32), nullable=False, unique=True)
...@@ -4,6 +4,7 @@ DECLARE ...@@ -4,6 +4,7 @@ DECLARE
v_one json; v_one json;
v_all json; v_all json;
v_field record; v_field record;
v_column text;
BEGIN BEGIN
FOR v_field IN FOR v_field IN
SELECT column_name SELECT column_name
...@@ -11,7 +12,8 @@ BEGIN ...@@ -11,7 +12,8 @@ BEGIN
WHERE table_schema = p_table_schema WHERE table_schema = p_table_schema
AND table_name = p_table_name AND table_name = p_table_name
LOOP LOOP
EXECUTE format('SELECT (($1).%I)::text', v_field.column_name) v_column = v_field.column_name;
EXECUTE format('SELECT (($1).%I)::text', v_column)
USING p_rec USING p_rec
INTO v_val; INTO v_val;
......
...@@ -27,6 +27,8 @@ def main(argv=sys.argv[1:]): ...@@ -27,6 +27,8 @@ def main(argv=sys.argv[1:]):
engine.echo = True engine.echo = True
table_name = option.table table_name = option.table
table_schema = option.schema table_schema = option.schema
if table_schema:
table_name = '.'.join([table_schema, table_name])
for operation in ('insert', 'update', 'delete'): for operation in ('insert', 'update', 'delete'):
trigger_name = f'log_table_{operation}' trigger_name = f'log_table_{operation}'
sql = f'DROP TRIGGER {trigger_name} ON {table_name}' sql = f'DROP TRIGGER {trigger_name} ON {table_name}'
......
...@@ -217,7 +217,7 @@ def mkdir(name): ...@@ -217,7 +217,7 @@ def mkdir(name):
def get_option(argv): def get_option(argv):
parser = ArgumentParser() parser = ArgumentParser()
parser.add_argument('config') parser.add_argument('config')
parser.add_argument('--stop-daemons-and-exit') parser.add_argument('--stop-daemons-and-exit', action='store_true')
return parser.parse_args(argv) return parser.parse_args(argv)
......
...@@ -5,6 +5,14 @@ from setuptools import ( ...@@ -5,6 +5,14 @@ from setuptools import (
) )
requires = [
'sqlalchemy',
'psycopg2-binary',
'opensipkd-hitung @ '
'git+https://git.opensipkd.com/sugiana/opensipkd-hitung.git',
]
here = os.path.abspath(os.path.dirname(__file__)) here = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(here, 'README.rst')) as f: with open(os.path.join(here, 'README.rst')) as f:
README = f.read() README = f.read()
...@@ -26,6 +34,7 @@ setup( ...@@ -26,6 +34,7 @@ setup(
author_email='sugiana@gmail.com', author_email='sugiana@gmail.com',
url='https://git.opensipkd.com/sugiana/maintenance', url='https://git.opensipkd.com/sugiana/maintenance',
keywords='postgresql', keywords='postgresql',
install_requires=requires,
packages=find_packages(), packages=find_packages(),
include_package_data=True, include_package_data=True,
zip_safe=False, zip_safe=False,
...@@ -34,6 +43,8 @@ setup( ...@@ -34,6 +43,8 @@ setup(
'maintenance = maintenance:main', 'maintenance = maintenance:main',
'log_table_init_db = log_table.init_db:main', 'log_table_init_db = log_table.init_db:main',
'log_table_trigger = log_table.set_trigger:main', 'log_table_trigger = log_table.set_trigger:main',
'log2db_init = log2db:init',
'log2db = log2db:main',
] ]
}, },
) )
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!