upload.py 5.86 KB
from . import get_ext
import csv
from sqlalchemy import inspect, Table, select
from sqlalchemy.types import BOOLEAN
import transaction
from ziggurat_foundations.models.services.user import UserService


DBSession = None  # to be initialized by application
Base = None  # to be initialized by application


def get_file(fname):
    """Harus dibuat yang berupa parameter"""

    # base_dir = os.path.split(__file__)[0]
    # fullpath = os.path.join(base_dir, 'data', fname)
    # return open(fullpath)
    return open(fname, 'r', encoding='utf-8')

def get_fields(db_session, table):
    insp = inspect(db_session.connection())
    schema = hasattr(
        table.__table__, "schema") and table.__table__.schema or "public"
    columns_table = insp.get_columns(table.__tablename__, schema)

    fields = {}
    for c in columns_table:
        fields[c["name"]] = c["type"]

    return columns_table, fields


def get_foreigns(eng, cf):
    foreigns = dict()
    fmap = dict()
    for fname in cf.keys():
        if not fname:
            continue
        try:
            t = fname.split('/')
        except Exception as e:
            # log.debug(fname, cf.keys())
            raise e

        fname_orig = t[0]
        schema = "public"
        if t[1:]:
            t_array = t[1].split('.')
            if len(t_array) == 2:
                foreign_table = t_array[0]
                foreign_field = t_array[1]
            else:
                schema = t_array[0]
                foreign_table = t_array[1]
                foreign_field = t_array[2]

            foreign_table = Table(foreign_table, Base.metadata,
                                    # autoload=True, # merubah v1.4 ke v.2
                                    autoload_with=eng,
                                    schema=schema)
            foreign_field = getattr(foreign_table.c, foreign_field)
            foreigns[fname] = (foreign_table, foreign_field)

        fmap[fname] = fname_orig
    return foreigns, fmap


def append_csv(table, filename, keys, get_file_func=get_file,
               db_session=DBSession, **args):
    eng = db_session.get_bind()
    update_exist = args.get("update_exist")
    callback = args.get("callback")
    delimiter = args.get("delimiter")
    ext = get_ext(filename).lower()
    if not delimiter:
        delimiter = ","
        if ext.strip() == '.tsv':
            delimiter = "\t"

    columns_table, fields = get_fields(db_session, table)
    with get_file_func(filename) as f:
        reader = csv.DictReader(f, delimiter=delimiter)
        filter_ = dict()
        foreigns = dict()
        is_first = True
        fmap = dict()
        user = False
        for cf in reader: # column field
            if is_first:
                is_first = False
                foreigns, fmap = get_foreigns(eng, cf)
            password = None
            data = dict()
            for fname in cf:
                if not fname:
                    continue
                # Buka Tabel Foreign
                if fname in foreigns:
                    foreign_table, foreign_field = foreigns[fname]
                    value = cf[fname]
                    if callback:
                        value = callback("mapping", table=foreign_table, field=foreign_field,
                                         value=value)

                    # merubah v1.4 ke v.2
                    # sql = select([foreign_table]).where(foreign_field == value)
                    # q = Base.metadata.bind.execute(sql)
                    sql = select(foreign_table).where(foreign_field == value)
                    with eng.connect() as conn:
                        q = conn.execute(sql)
                    row = q.fetchone()
                    value = row and row.id or None
                    q.close()
                    # connection.close()
                else:
                    value = cf[fname]

                fname_orig = fmap[fname]
                if not value and callback:
                    value = callback("value", data=cf, field=fname_orig)
                data[fname_orig] = value

            for key in keys:
                if key not in data or not data[key]:
                    raise ValueError(f"Key Field '{key}' wajib ada")

                filter_[key] = data[key]

            q = db_session.query(table).filter_by(**filter_)
            row = q.first()
            if row:
                if not update_exist:
                    continue
            else:
                row = table()
            for fname in cf:
                if not fname:
                    continue
                fname_orig = fmap[fname]
                val = data[fname_orig]
                if not val:
                    continue
                if fname_orig == "user_password":
                    user = True
                    password = val
                else:
                    if fname_orig in fields and type(fields[fname_orig]) is BOOLEAN:
                        val = (val == 'true' or val ==
                               '1' or val == 1) and True or False
                    setattr(row, fname_orig, val)

            # Penambahan checking field nullable false wajib ada datanya 2024-09-05
            for c in columns_table:
                if (not c["nullable"] and c["name"] not in data and c["name"] != "id") and c["default"] is None:
                    raise ValueError(
                        f"Table {str(table.__name__)} Field '{c['name']}' wajib ada {c['type']} ")

            db_session.add(row)
            db_session.flush()

            if user and password:
                print("Table: ", table.__name__, filter_)
                row = db_session.query(table).filter_by(id=row.id).first()
                UserService.set_password(row, password)
                db_session.add(row)
                db_session.flush()
                user = False


            transaction.commit()  # diperlukan commit per record khususnya untuk yang internal link