__init__.py 6.86 KB
import os
import sys
import re
from time import sleep
import logging
from string import ascii_lowercase
from subprocess import (
    Popen,
    PIPE,
    call,
    )
from configparser import ConfigParser
from argparse import ArgumentParser
from .pg_conf import Reader
from .file_rotate import FileRotate
from .logger import setup_logging


RE_COUNT_ROW = re.compile(r'(\d*) row')
conf = ConfigParser()
log = logging.getLogger()


def get_tmp_files(dir_name, db_name):
    r = []
    for ch1 in ascii_lowercase:
        for ch2 in ascii_lowercase:
            suffix = ch1 + ch2
            filename = '_'.join([db_name, suffix])
            filename = os.path.join(dir_name, filename)
            if not os.path.exists(filename):
                return r
            r.append(filename)
    return r


def run(command):
    log.info(command)
    if os.system(command) != 0:
        sys.exit()


def run_list(cmd):
    log.info(' '.join(cmd))
    if call(cmd) != 0:
        sys.exit()


def run_event(event):
    scripts = conf.get('main', event)
    for script in scripts.splitlines():
        script = script.strip()
        if not script:
            continue
        run(script)


def pg_service(todo):
    var = 'pg_' + todo
    cmd = conf.get('main', var)
    run(cmd)


def pg_stop():
    pg_service('stop')


def pg_start():
    pg_service('start')
    # Beri waktu database server untuk up
    sleep(5)


def pg_port(conf_file):
    with open(conf_file) as f:
        for line in f.readlines():
            s = line.strip()
            if not s or s[0] == '#':
                continue
            t = s.split('=')
            key = t[0].strip()
            value = t[1].split('#')[0].strip()
            if key == 'port':
                return value


def is_mirror(pg_conf):
    r = Reader(pg_conf)
    mirror_conf = os.path.join(r['data_directory'], 'recovery.conf')
    return os.path.exists(mirror_conf)


# Pastikan database server hanya bisa diakses oleh user postgres secara local
# Abaikan bila cluster adalah mirror
def pg_local_conf():
    pg_conf_dir = conf.get('main', 'pg_conf_dir')
    for version in os.listdir(pg_conf_dir):
        conf_dir = os.path.join(pg_conf_dir, version)
        for cluster in os.listdir(conf_dir):
            cluster_dir = os.path.join(conf_dir, cluster)
            pg_conf = os.path.join(cluster_dir, 'postgresql.conf')
            if is_mirror(pg_conf):
                continue
            pg_hba_file = os.path.join(cluster_dir, 'pg_hba.conf')
            pg_hba_bak_file = pg_hba_file + '.orig'
            if not os.path.exists(pg_hba_bak_file):
                os.rename(pg_hba_file, pg_hba_bak_file)
            version_float = float(version)
            if version_float <= 8.4:
                line = 'local all postgres ident'
            elif version_float > 8.4 and version_float < 9:
                line = 'local all postgres ident sameuser'
            else:
                line = 'local all postgres peer'
            with open(pg_hba_file, 'w') as f:
                f.write(line)


def pg_original_conf():
    pg_conf_dir = conf.get('main', 'pg_conf_dir')
    for version in os.listdir(pg_conf_dir):
        version_dir = os.path.join(pg_conf_dir, version)
        for cluster in os.listdir(version_dir):
            cluster_dir = os.path.join(version_dir, cluster)
            pg_hba_file = os.path.join(cluster_dir, 'pg_hba.conf')
            pg_hba_bak_file = pg_hba_file + '.orig'
            if not os.path.exists(pg_hba_bak_file):
                continue
            os.rename(pg_hba_bak_file, pg_hba_file)


def is_db_exists(version, port, db_name):
    psql_bin = pg_bin(version, 'psql')
    sql = f"SELECT 1 FROM pg_database WHERE datname='{db_name}'"
    cmd_psql = f'{psql_bin} -p {port} -c "{sql}"'
    cmd = ['su', '-', 'postgres', '-c', cmd_psql]
    p = Popen(cmd, stdout=PIPE)
    out, err = p.communicate()
    s = out.decode('utf-8')
    match = RE_COUNT_ROW.search(s)
    if match:
        count = match.group(1)
        return int(count)
    sys.exit()


def run_as_postgres(cmd):
    cmd = ['su', '-', 'postgres', '-c', cmd]
    run_list(cmd)


def chown_postgres(filename):
    cmd = ['chown', 'postgres.postgres', filename]
    run_list(cmd)


def vacuum(version, port, db_name):
    psql_bin = pg_bin(version, 'psql')
    sql = 'VACUUM VERBOSE ANALYZE'
    cmd = f'{psql_bin} -p {port} {db_name} -c "{sql}"'
    run_as_postgres(cmd)


def pg_bin(version, cmd):
    pg_lib_dir = conf.get('main', 'pg_lib_dir')
    return os.path.join(pg_lib_dir, version, 'bin', cmd)


def pg_dump(version, port, db_name, dir_name):
    backup_file = os.path.join(dir_name, db_name) + '.pg'
    fr = FileRotate(backup_file, conf.getint('main', 'database_rotate'))
    fr.run()
    pg_dump_bin = pg_bin(version, 'pg_dump')
    tmp_file = os.path.join(dir_name, db_name) + '_'
    cmd = f'{pg_dump_bin} -p {port} -Fc {db_name} | split -b 1G - {tmp_file}'
    run_as_postgres(cmd)
    tmp_files = get_tmp_files(dir_name, db_name)
    if not tmp_files:
        return
    files = ' '.join(tmp_files)
    run(f'cat {files} > {backup_file}')
    chown_postgres(backup_file)
    for filename in tmp_files:
        os.remove(filename)


# http://postgresql.1045698.n5.nabble.com/large-database-problems-with-pg-dump-and-pg-restore-td3236910.html
def pg_backup():
    base_backup_dir = conf.get('main', 'database_backup_dir')
    mkdir(base_backup_dir)
    db_list = conf.get('main', 'database').split()
    pg_conf_dir = conf.get('main', 'pg_conf_dir')
    for version in os.listdir(pg_conf_dir):
        version_conf_dir = os.path.join(pg_conf_dir, version)
        version_backup_dir = os.path.join(base_backup_dir, version)
        mkdir(version_backup_dir)
        for cluster in os.listdir(version_conf_dir):
            cluster_conf_dir = os.path.join(version_conf_dir, cluster)
            cluster_backup_dir = os.path.join(version_backup_dir, cluster)
            mkdir(cluster_backup_dir)
            pg_conf = os.path.join(cluster_conf_dir, 'postgresql.conf')
            port = pg_port(pg_conf)
            for db_name in db_list:
                if not is_db_exists(version, port, db_name):
                    continue
                if not is_mirror(pg_conf):
                    vacuum(version, port, db_name)
                pg_dump(version, port, db_name, cluster_backup_dir)


def mkdir(name):
    if not os.path.exists(name):
        os.mkdir(name)
        chown_postgres(name)


def get_option(argv):
    parser = ArgumentParser()
    parser.add_argument('config')
    parser.add_argument('--stop-daemons-and-exit', action='store_true')
    return parser.parse_args(argv)


def main(argv=sys.argv[1:]):
    option = get_option(argv)
    conf_file = option.config
    setup_logging(conf_file)
    conf.read(conf_file)
    run_event('before')
    if option.stop_daemons_and_exit:
        return
    pg_stop()
    pg_local_conf()
    pg_start()
    pg_backup()
    pg_stop()
    pg_original_conf()
    pg_start()
    run_event('after')