feat: implement CSV upload functionality with foreign key handling

1 parent c1f8457b
......@@ -139,8 +139,7 @@ def get_params(params, alternate=None):
get_params('devel', False)
"""
settings = get_settings()
result = settings and params in settings and settings[
params].strip() or None
result = settings.get(params, alternate).strip() or None
return result and result or alternate
......
from . import get_ext
import csv
from sqlalchemy import inspect, Table, select
from sqlalchemy.types import BOOLEAN
import transaction
from ziggurat_foundations.models.services.user import UserService
DBSession = None # to be initialized by application
Base = None # to be initialized by application
def get_file(fname):
"""Harus dibuat yang berupa parameter"""
# base_dir = os.path.split(__file__)[0]
# fullpath = os.path.join(base_dir, 'data', fname)
# return open(fullpath)
return open(fname, 'r', encoding='utf-8')
def get_fields(db_session, table):
insp = inspect(db_session.connection())
schema = hasattr(
table.__table__, "schema") and table.__table__.schema or "public"
columns_table = insp.get_columns(table.__tablename__, schema)
fields = {}
for c in columns_table:
fields[c["name"]] = c["type"]
return columns_table, fields
def get_foreigns(eng, cf):
foreigns = dict()
fmap = dict()
for fname in cf.keys():
if not fname:
continue
try:
t = fname.split('/')
except Exception as e:
# log.debug(fname, cf.keys())
raise e
fname_orig = t[0]
schema = "public"
if t[1:]:
t_array = t[1].split('.')
if len(t_array) == 2:
foreign_table = t_array[0]
foreign_field = t_array[1]
else:
schema = t_array[0]
foreign_table = t_array[1]
foreign_field = t_array[2]
foreign_table = Table(foreign_table, Base.metadata,
# autoload=True, # merubah v1.4 ke v.2
autoload_with=eng,
schema=schema)
foreign_field = getattr(foreign_table.c, foreign_field)
foreigns[fname] = (foreign_table, foreign_field)
fmap[fname] = fname_orig
return foreigns, fmap
def append_csv(table, filename, keys, get_file_func=get_file,
db_session=DBSession, **args):
eng = db_session.get_bind()
update_exist = args.get("update_exist")
callback = args.get("callback")
delimiter = args.get("delimiter")
ext = get_ext(filename).lower()
if not delimiter:
delimiter = ","
if ext.strip() == '.tsv':
delimiter = "\t"
columns_table, fields = get_fields(db_session, table)
with get_file_func(filename) as f:
reader = csv.DictReader(f, delimiter=delimiter)
filter_ = dict()
foreigns = dict()
is_first = True
fmap = dict()
user = False
for cf in reader: # column field
if is_first:
is_first = False
foreigns, fmap = get_foreigns(eng, cf)
password = None
data = dict()
for fname in cf:
if not fname:
continue
# Buka Tabel Foreign
if fname in foreigns:
foreign_table, foreign_field = foreigns[fname]
value = cf[fname]
if callback:
value = callback("mapping", table=foreign_table, field=foreign_field,
value=value)
# merubah v1.4 ke v.2
# sql = select([foreign_table]).where(foreign_field == value)
# q = Base.metadata.bind.execute(sql)
sql = select(foreign_table).where(foreign_field == value)
with eng.connect() as conn:
q = conn.execute(sql)
row = q.fetchone()
value = row and row.id or None
q.close()
# connection.close()
else:
value = cf[fname]
fname_orig = fmap[fname]
if not value and callback:
value = callback("value", data=cf, field=fname_orig)
data[fname_orig] = value
for key in keys:
if key not in data or not data[key]:
raise ValueError(f"Key Field '{key}' wajib ada")
filter_[key] = data[key]
q = db_session.query(table).filter_by(**filter_)
row = q.first()
if row:
if not update_exist:
continue
else:
row = table()
for fname in cf:
if not fname:
continue
fname_orig = fmap[fname]
val = data[fname_orig]
if not val:
continue
if fname_orig == "user_password":
user = True
password = val
else:
if fname_orig in fields and type(fields[fname_orig]) is BOOLEAN:
val = (val == 'true' or val ==
'1' or val == 1) and True or False
setattr(row, fname_orig, val)
# Penambahan checking field nullable false wajib ada datanya 2024-09-05
for c in columns_table:
if (not c["nullable"] and c["name"] not in data and c["name"] != "id") and c["default"] is None:
raise ValueError(
f"Table {str(table.__name__)} Field '{c['name']}' wajib ada {c['type']} ")
db_session.add(row)
db_session.flush()
if user and password:
print("Table: ", table.__name__, filter_)
row = db_session.query(table).filter_by(id=row.id).first()
UserService.set_password(row, password)
db_session.add(row)
db_session.flush()
user = False
transaction.commit() # diperlukan commit per record khususnya untuk yang internal link
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!