initialize_db.py 4.64 KB
import os
import sys
import csv
import subprocess
import transaction
from getpass import getpass
from sqlalchemy import engine_from_config
from ziggurat_foundations.models.services.user import UserService
from pyramid.paster import (
    get_appsettings,
    setup_logging,
    )
from pyramid.i18n import (
    Localizer,
    TranslationStringFactory,
    Translations,
    )
from ..models import (
    DBSession,
    Base,
    )
from ..models.ziggurat import (
    Group,
    GroupPermission,
    UserGroup,
    User,
    )
from ..models.imgw import (
    Produk,
    Status,
    StatusAgent,
    Jalur,
    Agent,
    Modem,
    ModemPengirim,
    Pulsa,
    Antrian,
    Selesai,
    MSISDN,
    )


domain = 'initialize_db'
_ = TranslationStringFactory(domain)

my_registry = dict()


class MyLocalizer:
    def __init__(self):
        settings = my_registry['settings']
        locale_name = settings['pyramid.default_locale_name']
        here = os.path.abspath(os.path.dirname(__file__))
        locale_dir = os.path.join(here, '..', 'locale')
        translations = Translations.load(locale_dir, [locale_name], domain)
        self.localizer = Localizer(locale_name, translations)

    def translate(self, ts):
        return self.localizer.translate(ts)


def usage(argv):
    cmd = os.path.basename(argv[0])
    print('usage: %s <config_uri>\n'
          '(example: "%s development.ini")' % (cmd, cmd))
    sys.exit(1)


def read_file(filename):
    f = open(filename)
    s = f.read()
    f.close()
    return s


def alembic_run(ini_file, url):
    bin_path = os.path.split(sys.executable)[0]
    alembic_bin = os.path.join(bin_path, 'alembic') 
    command = (alembic_bin, 'upgrade', 'head')    
    s = read_file(ini_file)
    s = s.replace('{db_url}', url)
    f = open('alembic.ini', 'w')
    f.write(s)
    f.close()
    subprocess.call(command)   
    os.remove('alembic.ini')


def get_file(filename):
    base_dir = os.path.split(__file__)[0]
    fullpath = os.path.join(base_dir, 'data', filename)
    return open(fullpath)


def ask_password(name):
    localizer = MyLocalizer()
    data = dict(name=name) 
    t_msg1 = _(
            'ask-password-1', default='Enter new password for ${name}: ',
            mapping=data)
    t_msg2 = _(
            'ask-password-2', default='Retype new password for ${name}: ',
            mapping=data)
    msg1 = localizer.translate(t_msg1)
    msg2 = localizer.translate(t_msg2)
    while True:
        pass1 = getpass(msg1)
        if not pass1:
            continue
        pass2 = getpass(msg2)
        if pass1 == pass2:
            return pass1
        ts = _('Sorry, passwords do not match')
        print(localizer.translate(ts))


def restore_csv(table, filename):
    q = DBSession.query(table)
    if q.first():
        return
    with get_file(filename) as f: 
        reader = csv.DictReader(f)
        for cf in reader:
            row = table()
            for fieldname in cf:
                val = cf[fieldname]
                if not val: 
                    continue
                setattr(row, fieldname, val)
            DBSession.add(row)
    return True


def append_csv(table, filename, keys):
    with get_file(filename) as f: 
        reader = csv.DictReader(f)
        filter_ = dict()
        for cf in reader:
            for key in keys:
                filter_[key] = cf[key]
            q = DBSession.query(table).filter_by(**filter_)
            found = q.first()
            if found:
                continue
            row = table()
            for fieldname in cf:
                val = cf[fieldname]
                if not val: 
                    continue
                setattr(row, fieldname, val)
            DBSession.add(row)


def create_schema(name):
    return 'CREATE SCHEMA IF NOT EXISTS {}'.format(name)


def main(argv=sys.argv):
    if len(argv) != 2:
        usage(argv)
    config_uri = argv[1]
    setup_logging(config_uri)
    settings = get_appsettings(config_uri)
    my_registry['settings'] = settings
    engine = engine_from_config(settings, 'sqlalchemy.')
    Base.metadata.bind = engine
    engine.execute(create_schema('im'))
    Base.metadata.create_all()
    alembic_run('alembic.ini.tpl', settings['sqlalchemy.url'])
    with transaction.manager:
        if restore_csv(User, 'users.csv'):
            DBSession.flush()
            q = DBSession.query(User).filter_by(id=1)
            user = q.first()
            password = ask_password(user.user_name)
            UserService.set_password(user, password)
        append_csv(Group, 'groups.csv', ['group_name'])
        restore_csv(UserGroup, 'users_groups.csv')
        append_csv(Produk, 'produk.csv', ['nama'])
        append_csv(MSISDN, 'msisdn.csv', ['awalan'])