initialize_db.py 3.86 KB
import os
import sys
import csv
import subprocess
import transaction
from getpass import getpass
from translationstring import (
    TranslationString,
    Translator,
    ugettext_policy,
    )
from translationstring.tests.translations import Translations
from sqlalchemy import engine_from_config
from ziggurat_foundations.models.services.user import UserService
from pyramid.paster import (
    get_appsettings,
    setup_logging,
    )
from ..models import (
    DBSession,
    Base,
    )
from ..models.ziggurat import (
    Group,
    GroupPermission,
    UserGroup,
    User,
    )


class Penerjemah:
    def __init__(self, domain='initialize_db'):
        self.domain = domain

    def configure(self, settings):
        locale_ids = [settings['pyramid.default_locale_name']]
        here = os.path.abspath(os.path.dirname(__file__))
        localedir = os.path.join(here, '..', 'locale')
        try:
            self.translations = Translations.load(
                localedir, locale_ids, self.domain)
        except TypeError:
            self.translations = None
        self.translator = self.translations and Translator(
            self.translations, ugettext_policy)

    def terjemahkan(self, msg, mapping=None):
        ts = TranslationString(msg, mapping=mapping)
        if self.translator:
            return self.translator(ts)
        return ts.interpolate()


obj_penerjemah = Penerjemah()


def _(msg, mapping=None):
    return obj_penerjemah.terjemahkan(msg, mapping) 


def usage(argv):
    cmd = os.path.basename(argv[0])
    print('usage: %s <config_uri>\n'
          '(example: "%s development.ini")' % (cmd, cmd))
    sys.exit(1)


def read_file(filename):
    f = open(filename)
    s = f.read()
    f.close()
    return s


def alembic_run(ini_file, url):
    bin_path = os.path.split(sys.executable)[0]
    alembic_bin = os.path.join(bin_path, 'alembic') 
    command = (alembic_bin, 'upgrade', 'head')    
    s = read_file(ini_file)
    s = s.replace('{db_url}', url)
    f = open('alembic.ini', 'w')
    f.write(s)
    f.close()
    subprocess.call(command)   
    os.remove('alembic.ini')


def get_file(filename):
    base_dir = os.path.split(__file__)[0]
    fullpath = os.path.join(base_dir, 'data', filename)
    return open(fullpath)


def ask_password(name):
    data = dict(name=name) 
    msg1 = _('Enter new password for ${name}: ', mapping=data)
    msg2 = _('Retype new password for ${name}: ', mapping=data)
    while True:
        pass1 = getpass(msg1)
        if not pass1:
            continue
        pass2 = getpass(msg2)
        if pass1 == pass2:
            return pass1
        print(_('Sorry, passwords do not match'))


def restore_csv(table, filename):
    q = DBSession.query(table)
    if q.first():
        return
    with get_file(filename) as f: 
        reader = csv.DictReader(f)
        for cf in reader:
            row = table()
            for fieldname in cf:
                val = cf[fieldname]
                if not val: 
                    continue
                setattr(row, fieldname, val)
            DBSession.add(row)
    return True


def main(argv=sys.argv):
    if len(argv) != 2:
        usage(argv)
    config_uri = argv[1]
    setup_logging(config_uri)
    settings = get_appsettings(config_uri)
    obj_penerjemah.configure(settings)
    engine = engine_from_config(settings, 'sqlalchemy.')
    Base.metadata.bind = engine
    Base.metadata.create_all()
    alembic_run('alembic.ini.tpl', settings['sqlalchemy.url'])
    with transaction.manager:
        if restore_csv(User, 'users.csv'):
            DBSession.flush()
            q = DBSession.query(User).filter_by(id=1)
            user = q.first()
            password = ask_password(user.user_name)
            UserService.set_password(user, password)
            UserService.regenerate_security_code(user)
        restore_csv(Group, 'groups.csv')
        restore_csv(UserGroup, 'users_groups.csv')