__init__.py 8.35 KB
import os
import sys
import re
from time import sleep
import logging
from string import ascii_lowercase
from subprocess import (
    Popen,
    PIPE,
    call,
    )
from configparser import ConfigParser
from argparse import ArgumentParser
from .pg_conf import Reader
from .file_rotate import FileRotate
from .logger import setup_logging


RE_COUNT_ROW = re.compile(r'(\d*) row')
conf = ConfigParser()
log = logging.getLogger()


def get_tmp_files(dir_name, db_name):
    r = []
    for ch1 in ascii_lowercase:
        for ch2 in ascii_lowercase:
            suffix = ch1 + ch2
            filename = '_'.join([db_name, suffix])
            filename = os.path.join(dir_name, filename)
            if not os.path.exists(filename):
                return r
            r.append(filename)
    return r


def run(command):
    log.info(command)
    if os.system(command) != 0:
        sys.exit()


def run_list(cmd):
    log.info(' '.join(cmd))
    if call(cmd) != 0:
        sys.exit()


def run_event(event):
    scripts = conf.get('main', event)
    for script in scripts.splitlines():
        script = script.strip()
        if not script:
            continue
        run(script)


def is_systemd():
    script = '/bin/systemctl'
    return os.path.exists(script)


def systemd_pg_service(todo):
    pg_conf_dir = conf.get('main', 'pg_conf_dir')
    for version in os.listdir(pg_conf_dir):
        conf_dir = os.path.join(pg_conf_dir, version)
        for cluster in os.listdir(conf_dir):
            cluster_dir = os.path.join(conf_dir, cluster)
            pg_conf = os.path.join(cluster_dir, 'postgresql.conf')
            if is_mirror(pg_conf):
                continue
            cmd = f'systemctl {todo} postgresql@{version}-{cluster}.service'
            run(cmd)


def old_pg_service(todo):
    pg_conf_dir = conf.get('main', 'pg_conf_dir')
    for version in os.listdir(pg_conf_dir):
        cmd = f'/etc/init.d/postgresql {todo} {version}'
        run(cmd)


def pg_service(todo):
    if is_systemd():
        systemd_pg_service(todo)
    else:
        old_pg_service(todo)


def pg_stop():
    pg_service('stop')


def pg_start():
    pg_service('start')
    # Beri waktu database server untuk up
    sleep(5)


def pg_port(conf_file):
    with open(conf_file) as f:
        for line in f.readlines():
            s = line.strip()
            if not s or s[0] == '#':
                continue
            t = s.split('=')
            key = t[0].strip()
            value = t[1].split('#')[0].strip()
            if key == 'port':
                return value


def is_mirror(pg_conf):
    r = Reader(pg_conf)
    if 'include_dir' in r:
        # Versi 12 - 15
        conf_dir = os.path.split(pg_conf)[0]
        mirror_conf = os.path.join(conf_dir, r['include_dir'], 'recovery.conf')
        if os.path.exists(mirror_conf):
            return True
    # Versi 9.1 - 11
    mirror_conf = os.path.join(r['data_directory'], 'recovery.conf')
    if os.path.exists(mirror_conf):
        return True
    if 'primary_conninfo' in r:
        return True
    port = r['port']
    # Versi 11 - 15
    cmd_psql = f'psql -p {port} -c "SHOW primary_conninfo"'
    cmd1 = ['su', '-', 'postgres', '-c', cmd_psql]
    cmd2 = ['grep', 'user']
    p1 = Popen(cmd1, stdout=PIPE)
    p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE)
    out, err = p2.communicate()
    if not err:
        s = out.decode('utf-8')
        if s:
            return True


def is_db_exists(version, port, db_name):
    psql_bin = pg_bin(version, 'psql')
    sql = f"SELECT 1 FROM pg_database WHERE datname='{db_name}'"
    cmd_psql = f'{psql_bin} -p {port} -c "{sql}"'
    cmd = ['su', '-', 'postgres', '-c', cmd_psql]
    p = Popen(cmd, stdout=PIPE)
    out, err = p.communicate()
    s = out.decode('utf-8')
    found = RE_COUNT_ROW.search(s)
    if found:
        count = found.group(1)
        return int(count)
    sys.exit()


# Pastikan database server hanya bisa diakses oleh user postgres secara local
# Abaikan bila cluster adalah mirror
def pg_local_conf():
    pg_conf_dir = conf.get('main', 'pg_conf_dir')
    for version in os.listdir(pg_conf_dir):
        conf_dir = os.path.join(pg_conf_dir, version)
        for cluster in os.listdir(conf_dir):
            cluster_dir = os.path.join(conf_dir, cluster)
            pg_conf = os.path.join(cluster_dir, 'postgresql.conf')
            if is_mirror(pg_conf):
                continue
            pg_hba_file = os.path.join(cluster_dir, 'pg_hba.conf')
            pg_hba_bak_file = pg_hba_file + '.orig'
            if not os.path.exists(pg_hba_bak_file):
                os.rename(pg_hba_file, pg_hba_bak_file)
            version_float = float(version)
            if version_float <= 8.4:
                line = 'local all postgres ident'
            elif version_float > 8.4 and version_float < 9:
                line = 'local all postgres ident sameuser'
            else:
                line = 'local all postgres peer'
            with open(pg_hba_file, 'w') as f:
                f.write(line)


def pg_original_conf():
    pg_conf_dir = conf.get('main', 'pg_conf_dir')
    for version in os.listdir(pg_conf_dir):
        version_dir = os.path.join(pg_conf_dir, version)
        for cluster in os.listdir(version_dir):
            cluster_dir = os.path.join(version_dir, cluster)
            pg_hba_file = os.path.join(cluster_dir, 'pg_hba.conf')
            pg_hba_bak_file = pg_hba_file + '.orig'
            if not os.path.exists(pg_hba_bak_file):
                continue
            os.rename(pg_hba_bak_file, pg_hba_file)


def run_as_postgres(cmd):
    cmd = ['su', '-', 'postgres', '-c', cmd]
    run_list(cmd)


def chown_postgres(filename):
    cmd = ['chown', 'postgres.postgres', filename]
    run_list(cmd)


def vacuum(version, port, db_name):
    psql_bin = pg_bin(version, 'psql')
    sql = 'VACUUM VERBOSE ANALYZE'
    cmd = f'{psql_bin} -p {port} {db_name} -c "{sql}"'
    run_as_postgres(cmd)


def pg_bin(version, cmd):
    pg_lib_dir = conf.get('main', 'pg_lib_dir')
    return os.path.join(pg_lib_dir, version, 'bin', cmd)


def pg_dump(version, port, db_name, dir_name):
    backup_file = os.path.join(dir_name, db_name) + '.pg'
    fr = FileRotate(backup_file, conf.getint('main', 'database_rotate'))
    fr.run()
    pg_dump_bin = pg_bin(version, 'pg_dump')
    tmp_file = os.path.join(dir_name, db_name) + '_'
    cmd = f'{pg_dump_bin} -p {port} -Fc {db_name} | split -b 1G - {tmp_file}'
    run_as_postgres(cmd)
    tmp_files = get_tmp_files(dir_name, db_name)
    if not tmp_files:
        return
    files = ' '.join(tmp_files)
    run(f'cat {files} > {backup_file}')
    chown_postgres(backup_file)
    for filename in tmp_files:
        os.remove(filename)


# http://postgresql.1045698.n5.nabble.com/large-database-problems-with-pg-dump-and-pg-restore-td3236910.html
def pg_backup():
    base_backup_dir = conf.get('main', 'database_backup_dir')
    mkdir(base_backup_dir)
    db_list = conf.get('main', 'database').split()
    pg_conf_dir = conf.get('main', 'pg_conf_dir')
    for version in os.listdir(pg_conf_dir):
        version_conf_dir = os.path.join(pg_conf_dir, version)
        version_backup_dir = os.path.join(base_backup_dir, version)
        mkdir(version_backup_dir)
        for cluster in os.listdir(version_conf_dir):
            cluster_conf_dir = os.path.join(version_conf_dir, cluster)
            cluster_backup_dir = os.path.join(version_backup_dir, cluster)
            mkdir(cluster_backup_dir)
            pg_conf = os.path.join(cluster_conf_dir, 'postgresql.conf')
            port = pg_port(pg_conf)
            for db_name in db_list:
                if not is_db_exists(version, port, db_name):
                    continue
                if not is_mirror(pg_conf):
                    vacuum(version, port, db_name)
                pg_dump(version, port, db_name, cluster_backup_dir)


def mkdir(name):
    if not os.path.exists(name):
        os.mkdir(name)
        chown_postgres(name)


def get_option(argv):
    parser = ArgumentParser()
    parser.add_argument('config')
    parser.add_argument('--stop-daemons-and-exit', action='store_true')
    return parser.parse_args(argv)


def main(argv=sys.argv[1:]):
    option = get_option(argv)
    conf_file = option.config
    setup_logging(conf_file)
    conf.read(conf_file)
    run_event('before')
    if option.stop_daemons_and_exit:
        return
    pg_stop()
    pg_local_conf()
    pg_start()
    pg_backup()
    pg_stop()
    pg_original_conf()
    pg_start()
    run_event('after')